readded zmq messaging

Former-commit-id: e38653c4dc4ffe6aada19cac2251c3ec28bfbc2b
This commit is contained in:
Arthur Lu 2022-02-19 22:58:58 +00:00
parent 56447603e1
commit 80b63269df
3 changed files with 75 additions and 23 deletions

View File

@ -5,11 +5,10 @@ import json
class Logger(L): class Logger(L):
send = None
file = None file = None
debug = False
levels = { levels = {
0: "",
10:"[DEBUG] ", 10:"[DEBUG] ",
20:"[INFO] ", 20:"[INFO] ",
30:"[WARNING] ", 30:"[WARNING] ",
@ -17,20 +16,24 @@ class Logger(L):
50:"[CRITICAL]", 50:"[CRITICAL]",
} }
targets = []
def __init__(self, verbose, profile, debug, file = None): def __init__(self, verbose, profile, debug, file = None):
super().__init__("tra_logger") super().__init__("tra_logger")
self.debug = debug
self.file = file self.file = file
if file != None:
self.targets.append(self._send_file)
if profile: if profile:
self.send = self._send_null self.targets.append(self._send_null)
elif verbose: elif verbose:
self.send = self._send_scli self.targets.append(self._send_scli)
elif debug: elif debug:
self.send = self._send_scli self.targets.append(self._send_scli)
elif file != None:
self.send = self._send_file
else: else:
self.send = self._send_null self.targets.append(self._send_null)
def _send_null(self, msg): def _send_null(self, msg):
pass pass
@ -47,7 +50,8 @@ class Logger(L):
return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S %Z") return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S %Z")
def log(self, level, msg): def log(self, level, msg):
self.send(self.get_time_formatted() + "| " + self.levels[level] + ": " + msg) for t in self.targets:
t(self.get_time_formatted() + "| " + self.levels[level] + ": " + msg)
def debug(self, msg): def debug(self, msg):
self.log(10, msg) self.log(10, msg)
@ -67,13 +71,13 @@ class Logger(L):
def splash(self, version): def splash(self, version):
def hrule(): def hrule():
self.send("#"+38*"-"+"#") self.log(0, "#"+38*"-"+"#")
def box(s): def box(s):
temp = "|" temp = "|"
temp += s temp += s
temp += (40-len(s)-2)*" " temp += (40-len(s)-2)*" "
temp += "|" temp += "|"
self.send(temp) self.log(0, temp)
hrule() hrule()
box(" superscript version: " + version) box(" superscript version: " + version)

View File

@ -157,6 +157,7 @@ from config import Configuration, ConfigurationError
from data import get_previous_time, set_current_time, check_new_database_matches from data import get_previous_time, set_current_time, check_new_database_matches
from interface import Logger from interface import Logger
from module import Match, Metric, Pit from module import Match, Metric, Pit
import zmq
config_path = "config.json" config_path = "config.json"
@ -180,22 +181,27 @@ def main(logger, verbose, profile, debug, socket_send = None):
loop_start = time.time() loop_start = time.time()
logger.info("current time: " + str(loop_start)) logger.info("current time: " + str(loop_start))
socket_send("current time: " + str(loop_start))
config = Configuration(config_path) config = Configuration(config_path)
logger.info("found and loaded config at <" + config_path + ">") logger.info("found and loaded config at <" + config_path + ">")
socket_send("found and loaded config at <" + config_path + ">")
apikey, tbakey = config.database, config.tba apikey, tbakey = config.database, config.tba
logger.info("found and loaded database and tba keys") logger.info("found and loaded database and tba keys")
socket_send("found and loaded database and tba keys")
client = pymongo.MongoClient(apikey) client = pymongo.MongoClient(apikey)
logger.info("established connection to database") logger.info("established connection to database")
socket_send("established connection to database")
previous_time = get_previous_time(client) previous_time = get_previous_time(client)
logger.info("analysis backtimed to: " + str(previous_time)) logger.info("analysis backtimed to: " + str(previous_time))
socket_send("analysis backtimed to: " + str(previous_time))
config.resolve_config_conflicts(logger, client) config.resolve_config_conflicts(logger, client)
@ -210,6 +216,7 @@ def main(logger, verbose, profile, debug, socket_send = None):
continue continue
current_module.run() current_module.run()
logger.info(m + " module finished in " + str(time.time() - start) + " seconds") logger.info(m + " module finished in " + str(time.time() - start) + " seconds")
socket_send(m + " module finished in " + str(time.time() - start) + " seconds")
if debug: if debug:
logger.save_module_to_file(m, current_module.data, current_module.results) # logging flag check done in logger logger.save_module_to_file(m, current_module.data, current_module.results) # logging flag check done in logger
@ -218,6 +225,8 @@ def main(logger, verbose, profile, debug, socket_send = None):
logger.info("closed threads and database client") logger.info("closed threads and database client")
logger.info("finished all tasks in " + str(time.time() - loop_start) + " seconds, looping") logger.info("finished all tasks in " + str(time.time() - loop_start) + " seconds, looping")
socket_send("closed threads and database client")
socket_send("finished all tasks in " + str(time.time() - loop_start) + " seconds, looping")
if profile: if profile:
exit_code = 0 exit_code = 0
@ -230,36 +239,43 @@ def main(logger, verbose, profile, debug, socket_send = None):
event_delay = config["variable"]["event-delay"] event_delay = config["variable"]["event-delay"]
if event_delay: if event_delay:
logger.info("loop delayed until database returns new matches") logger.info("loop delayed until database returns new matches")
socket_send("loop delayed until database returns new matches")
new_match = False new_match = False
while not new_match: while not new_match:
time.sleep(1) time.sleep(1)
new_match = check_new_database_matches(client, competition) new_match = check_new_database_matches(client, competition)
logger.info("database returned new matches") logger.info("database returned new matches")
socket_send("database returned new matches")
else: else:
loop_delay = float(config["variable"]["loop-delay"]) loop_delay = float(config["variable"]["loop-delay"])
remaining_time = loop_delay - (time.time() - loop_start) remaining_time = loop_delay - (time.time() - loop_start)
if remaining_time > 0: if remaining_time > 0:
logger.info("loop delayed by " + str(remaining_time) + " seconds") logger.info("loop delayed by " + str(remaining_time) + " seconds")
socket_send("loop delayed by " + str(remaining_time) + " seconds")
time.sleep(remaining_time) time.sleep(remaining_time)
except KeyboardInterrupt: except KeyboardInterrupt:
logger.info("detected KeyboardInterrupt, killing threads")
close_all() close_all()
logger.info("terminated threads, exiting") logger.info("detected KeyboardInterrupt, exiting")
socket_send("detected KeyboardInterrupt, exiting")
break break
except ConfigurationError as e: except ConfigurationError as e:
str_e = "".join(traceback.format_exception(e))
logger.error("encountered a configuration error: " + str(e)) logger.error("encountered a configuration error: " + str(e))
logger.error("".join(traceback.format_exception(e))) logger.error(str_e)
#traceback.print_exc(file = stderr) socket_send("encountered a configuration error: " + str(e))
socket_send(str_e)
exit_code = 1 exit_code = 1
close_all() close_all()
break break
except Exception as e: except Exception as e:
str_e = "".join(traceback.format_exception(e))
logger.error("encountered an exception while running") logger.error("encountered an exception while running")
logger.error("".join(traceback.format_exception(e))) logger.error(str_e)
#traceback.print_exc(file = stderr) socket_send("encountered an exception while running")
socket_send(str_e)
exit_code = 1 exit_code = 1
close_all() close_all()
break break
@ -270,12 +286,15 @@ def start(pid_path, verbose, profile, debug):
if profile: if profile:
def send(msg):
pass
logger = Logger(verbose, profile, debug) logger = Logger(verbose, profile, debug)
import cProfile, pstats, io import cProfile, pstats, io
profile = cProfile.Profile() profile = cProfile.Profile()
profile.enable() profile.enable()
exit_code = main(logger, verbose, profile, debug) exit_code = main(logger, verbose, profile, debug, socket_send = send)
profile.disable() profile.disable()
f = open("profile.txt", 'w+') f = open("profile.txt", 'w+')
ps = pstats.Stats(profile, stream = f).sort_stats('cumtime') ps = pstats.Stats(profile, stream = f).sort_stats('cumtime')
@ -284,16 +303,22 @@ def start(pid_path, verbose, profile, debug):
elif verbose: elif verbose:
def send(msg):
pass
logger = Logger(verbose, profile, debug) logger = Logger(verbose, profile, debug)
exit_code = main(logger, verbose, profile, debug) exit_code = main(logger, verbose, profile, debug, socket_send = send)
sys.exit(exit_code) sys.exit(exit_code)
elif debug: elif debug:
def send(msg):
pass
logger = Logger(verbose, profile, debug) logger = Logger(verbose, profile, debug)
exit_code = main(logger, verbose, profile, debug) exit_code = main(logger, verbose, profile, debug, socket_send = send)
sys.exit(exit_code) sys.exit(exit_code)
else: else:
@ -310,12 +335,21 @@ def start(pid_path, verbose, profile, debug):
stderr = e stderr = e
): ):
context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5678")
socket.send(b'status')
def send(msg):
socket.send(bytes("status: " + msg, "utf-8"))
logger = Logger(verbose, profile, debug, file = logfile) logger = Logger(verbose, profile, debug, file = logfile)
exit_code = main(logger, verbose, profile, debug) exit_code = main(logger, verbose, profile, debug, socket_send = send)
socket.close()
f.close() f.close()
sys.exit(exit_code) sys.exit(exit_code)
def stop(pid_path): def stop(pid_path):

14
test/test_zmq.py Normal file
View File

@ -0,0 +1,14 @@
import signal
import zmq
signal.signal(signal.SIGINT, signal.SIG_DFL)
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5678')
socket.setsockopt(zmq.SUBSCRIBE, b'status')
while True:
message = socket.recv_multipart()
print(f'Received: {message}')