diff --git a/reportIPBlock.py b/reportIPBlock.py index d80a036..090b8f3 100644 --- a/reportIPBlock.py +++ b/reportIPBlock.py @@ -33,56 +33,74 @@ def main(): if os.path.exists(socketFile): os.remove(socketFile) - stop_event = threading.Event() + stopEvent = threading.Event() + failureEvent = threading.Event() + dataBuffer = [] - def publishData(stop_event): + def publishData(stopEvent): - if config.loggingMode == 'rabbitmq': + try: - import rabbitmq - rabbitmq = rabbitmq.rabbitMQClient(config.rabbitmqca,config.rabbitmqcacert,config.rabbitmqcakey,config.rabbitmqHost,config.rabbitmqPort,config.rabbitmqRoutingKey) + if config.loggingMode == 'rabbitmq': + + import rabbitmq + rabbitmq = rabbitmq.rabbitMQClient(config.rabbitmqca,config.rabbitmqcacert,config.rabbitmqcakey,config.rabbitmqHost,config.rabbitmqPort,config.rabbitmqRoutingKey) - while not (stop_event.is_set()): - if dataBuffer: - data = dataBuffer.pop(0) + while not (stopEvent.is_set() and not failureEvent.is_set()): + + if dataBuffer: + + data = dataBuffer.pop(0) - if config.loggingMode == 'rabbitmq': - rabbitmq.publish(f"{data}") - else: - print("Not yet implemented") + if config.loggingMode == 'rabbitmq': + + rabbitmq.publish(f"{data}") + + else: + + print("Not yet implemented") + + except Exception: + failureEvent.set() def cleanup(signum, frame): print("Signal received, shutting down...") - stop_event.set() + stopEvent.set() signal.signal(signal.SIGTERM, cleanup) signal.signal(signal.SIGINT, cleanup) - def server(stop_event): + def server(stopEvent): - serverSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) - serverSocket.bind(socketFile) - serverSocket.listen(50) - - while not (stop_event.is_set()): - - readable, _, _ = select.select([serverSocket], [], [], 1.0) + try: - if readable: - clientSocket, _ = serverSocket.accept() - data = clientSocket.recv(1024).decode('utf-8') + serverSocket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) + serverSocket.bind(socketFile) + serverSocket.listen(50) + + while not (stopEvent.is_set() and not failureEvent.is_set()): + + readable, _, _ = select.select([serverSocket], [], [], 1.0) + + if readable: + clientSocket, _ = serverSocket.accept() + data = clientSocket.recv(1024).decode('utf-8') - if data: + if data: - dataBuffer.append(data) - print(data) + dataBuffer.append(data) + print(data) - clientSocket.close() + clientSocket.close() - publishThread = threading.Thread(target=publishData, args=(stop_event,)) - serverThread = threading.Thread(target=server, args=(stop_event,)) + except Exception: + + failureEvent.set() + + publishThread = threading.Thread(target=publishData, args=(stopEvent,)) + serverThread = threading.Thread(target=server, args=(stopEvent,)) publishThread.start() serverThread.start() @@ -90,5 +108,9 @@ def main(): publishThread.join() serverThread.join() + if failure_event.is_set(): + print("One of the threads failed. Terminating") + sys.exit(1) + if __name__ == "__main__": main()