From 80b63269df168229e91e4f0b4520af141f75b226 Mon Sep 17 00:00:00 2001 From: Arthur Lu Date: Sat, 19 Feb 2022 22:58:58 +0000 Subject: [PATCH] readded zmq messaging Former-commit-id: e38653c4dc4ffe6aada19cac2251c3ec28bfbc2b --- src/interface.py | 28 +++++++++++++---------- src/superscript.py | 56 +++++++++++++++++++++++++++++++++++++--------- test/test_zmq.py | 14 ++++++++++++ 3 files changed, 75 insertions(+), 23 deletions(-) create mode 100644 test/test_zmq.py diff --git a/src/interface.py b/src/interface.py index 4bd7f5a..5eef26b 100644 --- a/src/interface.py +++ b/src/interface.py @@ -5,11 +5,10 @@ import json class Logger(L): - send = None file = None - debug = False levels = { + 0: "", 10:"[DEBUG] ", 20:"[INFO] ", 30:"[WARNING] ", @@ -17,20 +16,24 @@ class Logger(L): 50:"[CRITICAL]", } + targets = [] + def __init__(self, verbose, profile, debug, file = None): super().__init__("tra_logger") - self.debug = debug + self.file = file + + if file != None: + self.targets.append(self._send_file) + if profile: - self.send = self._send_null + self.targets.append(self._send_null) elif verbose: - self.send = self._send_scli + self.targets.append(self._send_scli) elif debug: - self.send = self._send_scli - elif file != None: - self.send = self._send_file + self.targets.append(self._send_scli) else: - self.send = self._send_null + self.targets.append(self._send_null) def _send_null(self, msg): pass @@ -47,7 +50,8 @@ class Logger(L): return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S %Z") def log(self, level, msg): - self.send(self.get_time_formatted() + "| " + self.levels[level] + ": " + msg) + for t in self.targets: + t(self.get_time_formatted() + "| " + self.levels[level] + ": " + msg) def debug(self, msg): self.log(10, msg) @@ -67,13 +71,13 @@ class Logger(L): def splash(self, version): def hrule(): - self.send("#"+38*"-"+"#") + self.log(0, "#"+38*"-"+"#") def box(s): temp = "|" temp += s temp += (40-len(s)-2)*" " temp += "|" - self.send(temp) + self.log(0, temp) hrule() box(" superscript version: " + version) diff --git a/src/superscript.py b/src/superscript.py index e67692d..59e01d0 100644 --- a/src/superscript.py +++ b/src/superscript.py @@ -157,6 +157,7 @@ from config import Configuration, ConfigurationError from data import get_previous_time, set_current_time, check_new_database_matches from interface import Logger from module import Match, Metric, Pit +import zmq config_path = "config.json" @@ -180,22 +181,27 @@ def main(logger, verbose, profile, debug, socket_send = None): loop_start = time.time() logger.info("current time: " + str(loop_start)) + socket_send("current time: " + str(loop_start)) config = Configuration(config_path) logger.info("found and loaded config at <" + config_path + ">") + socket_send("found and loaded config at <" + config_path + ">") apikey, tbakey = config.database, config.tba logger.info("found and loaded database and tba keys") + socket_send("found and loaded database and tba keys") client = pymongo.MongoClient(apikey) logger.info("established connection to database") + socket_send("established connection to database") previous_time = get_previous_time(client) logger.info("analysis backtimed to: " + str(previous_time)) + socket_send("analysis backtimed to: " + str(previous_time)) config.resolve_config_conflicts(logger, client) @@ -210,6 +216,7 @@ def main(logger, verbose, profile, debug, socket_send = None): continue current_module.run() logger.info(m + " module finished in " + str(time.time() - start) + " seconds") + socket_send(m + " module finished in " + str(time.time() - start) + " seconds") if debug: logger.save_module_to_file(m, current_module.data, current_module.results) # logging flag check done in logger @@ -218,6 +225,8 @@ def main(logger, verbose, profile, debug, socket_send = None): logger.info("closed threads and database client") logger.info("finished all tasks in " + str(time.time() - loop_start) + " seconds, looping") + socket_send("closed threads and database client") + socket_send("finished all tasks in " + str(time.time() - loop_start) + " seconds, looping") if profile: exit_code = 0 @@ -230,36 +239,43 @@ def main(logger, verbose, profile, debug, socket_send = None): event_delay = config["variable"]["event-delay"] if event_delay: logger.info("loop delayed until database returns new matches") + socket_send("loop delayed until database returns new matches") new_match = False while not new_match: time.sleep(1) new_match = check_new_database_matches(client, competition) logger.info("database returned new matches") + socket_send("database returned new matches") else: loop_delay = float(config["variable"]["loop-delay"]) remaining_time = loop_delay - (time.time() - loop_start) if remaining_time > 0: logger.info("loop delayed by " + str(remaining_time) + " seconds") + socket_send("loop delayed by " + str(remaining_time) + " seconds") time.sleep(remaining_time) except KeyboardInterrupt: - logger.info("detected KeyboardInterrupt, killing threads") close_all() - logger.info("terminated threads, exiting") + logger.info("detected KeyboardInterrupt, exiting") + socket_send("detected KeyboardInterrupt, exiting") break except ConfigurationError as e: + str_e = "".join(traceback.format_exception(e)) logger.error("encountered a configuration error: " + str(e)) - logger.error("".join(traceback.format_exception(e))) - #traceback.print_exc(file = stderr) + logger.error(str_e) + socket_send("encountered a configuration error: " + str(e)) + socket_send(str_e) exit_code = 1 close_all() break except Exception as e: + str_e = "".join(traceback.format_exception(e)) logger.error("encountered an exception while running") - logger.error("".join(traceback.format_exception(e))) - #traceback.print_exc(file = stderr) + logger.error(str_e) + socket_send("encountered an exception while running") + socket_send(str_e) exit_code = 1 close_all() break @@ -270,12 +286,15 @@ def start(pid_path, verbose, profile, debug): if profile: + def send(msg): + pass + logger = Logger(verbose, profile, debug) import cProfile, pstats, io profile = cProfile.Profile() profile.enable() - exit_code = main(logger, verbose, profile, debug) + exit_code = main(logger, verbose, profile, debug, socket_send = send) profile.disable() f = open("profile.txt", 'w+') ps = pstats.Stats(profile, stream = f).sort_stats('cumtime') @@ -284,16 +303,22 @@ def start(pid_path, verbose, profile, debug): elif verbose: + def send(msg): + pass + logger = Logger(verbose, profile, debug) - exit_code = main(logger, verbose, profile, debug) + exit_code = main(logger, verbose, profile, debug, socket_send = send) sys.exit(exit_code) elif debug: + def send(msg): + pass + logger = Logger(verbose, profile, debug) - exit_code = main(logger, verbose, profile, debug) + exit_code = main(logger, verbose, profile, debug, socket_send = send) sys.exit(exit_code) else: @@ -310,12 +335,21 @@ def start(pid_path, verbose, profile, debug): stderr = e ): + context = zmq.Context() + socket = context.socket(zmq.PUB) + socket.bind("tcp://*:5678") + socket.send(b'status') + + def send(msg): + socket.send(bytes("status: " + msg, "utf-8")) + logger = Logger(verbose, profile, debug, file = logfile) - exit_code = main(logger, verbose, profile, debug) + exit_code = main(logger, verbose, profile, debug, socket_send = send) + socket.close() f.close() - + sys.exit(exit_code) def stop(pid_path): diff --git a/test/test_zmq.py b/test/test_zmq.py new file mode 100644 index 0000000..df4c0e4 --- /dev/null +++ b/test/test_zmq.py @@ -0,0 +1,14 @@ +import signal +import zmq + +signal.signal(signal.SIGINT, signal.SIG_DFL) + +context = zmq.Context() + +socket = context.socket(zmq.SUB) +socket.connect('tcp://localhost:5678') +socket.setsockopt(zmq.SUBSCRIBE, b'status') + +while True: + message = socket.recv_multipart() + print(f'Received: {message}') \ No newline at end of file