From efc353dafb9026d2d1fab1fa1be000e8997326d2 Mon Sep 17 00:00:00 2001 From: Arthur Lu Date: Wed, 18 Aug 2021 05:13:25 +0000 Subject: [PATCH] fixed daemon no start issue --- src/cli/superscript.py | 369 ++++++++++++++++++++++------------------- 1 file changed, 198 insertions(+), 171 deletions(-) diff --git a/src/cli/superscript.py b/src/cli/superscript.py index 34d82ee..d06dbea 100644 --- a/src/cli/superscript.py +++ b/src/cli/superscript.py @@ -151,6 +151,7 @@ import sys import asyncio import websockets import pymongo +import threading from interface import splash, log, ERR, INF, stdout, stderr from data import get_previous_time, set_current_time, load_match, push_match, load_metric, push_metric, load_pit, push_pit @@ -204,179 +205,213 @@ sample_json = """{ } }""" -async def main_lin(socket, path): +def main_lin(pid_path): + f = open('errorlog.txt', 'w+') + with daemon.DaemonContext( + working_directory=os.getcwd(), + pidfile=pidfile.TimeoutPIDLockFile(pid_path), + stderr=f + ): - while True: + async def handler(client, path): + clients.append(client) + while True: + try: + pong_waiter = await client.ping() + await pong_waiter + time.sleep(3) + except Exception as e: + clients.remove(client) + break - try: + clients = [] + start_server = websockets.serve(handler, "127.0.0.1", 5678) - loop_start = time.time() + asyncio.get_event_loop().run_until_complete(start_server) + threading.Thread(target = asyncio.get_event_loop().run_forever).start() - current_time = time.time() - await socket.send("current time: " + str(current_time)) + while True: - config = {} - if load_config(config_path, config) == 1: - sys.exit(1) - - error_flag = False - - try: - competition = config["competition"] - except: - await socket.send("could not find competition field in config") - error_flag = True - try: - match_tests = config["statistics"]["match"] - except: - await socket.send("could not find match_tests field in config") - error_flag = True - try: - metrics_tests = config["statistics"]["metric"] - except: - await socket.send("could not find metrics_tests field in config") - error_flag = True - try: - pit_tests = config["statistics"]["pit"] - except: - await socket.send("could not find pit_tests field in config") - error_flag = True + async def send_one(client, data): + await client.send(data) - if error_flag: - sys.exit(1) - error_flag = False + def send(data): + message_clients = clients.copy() + for client in message_clients: + try: + asyncio.run(send_one(client, data)) + except: + pass - if competition == None or competition == "": - await socket.send("competition field in config must not be empty") - error_flag = True - if match_tests == None: - await socket.send("match_tests field in config must not be empty") - error_flag = True - if metrics_tests == None: - await socket.send("metrics_tests field in config must not be empty") - error_flag = True - if pit_tests == None: - await socket.send("pit_tests field in config must not be empty") - error_flag = True - - if error_flag: - sys.exit(1) - - await socket.send("found and loaded competition, match_tests, metrics_tests, pit_tests from config") - - sys_max_threads = os.cpu_count() try: - cfg_max_threads = config["max-threads"] - except: - await socket.send("max-threads field in config must not be empty, refer to documentation for configuration options", code = 109) - sys.exit(1) - if cfg_max_threads > -sys_max_threads and cfg_max_threads < 0 : - alloc_processes = sys_max_threads + cfg_max_threads - elif cfg_max_threads > 0 and cfg_max_threads < 1: - alloc_processes = math.floor(cfg_max_threads * sys_max_threads) - elif cfg_max_threads > 1 and cfg_max_threads <= sys_max_threads: - alloc_processes = cfg_max_threads - elif cfg_max_threads == 0: - alloc_processes = sys_max_threads - else: - await socket.send("max-threads must be between -" + str(sys_max_threads) + " and " + str(sys_max_threads) + ", but got " + cfg_max_threads) - sys.exit(1) + loop_start = time.time() - await socket.send("found and loaded max-threads from config") - await socket.send("attempting to start " + str(alloc_processes) + " threads") - try: - exec_threads = Pool(processes = alloc_processes) + current_time = time.time() + send("current time: " + str(current_time)) + + config = {} + if load_config(config_path, config) == 1: + sys.exit(1) + + error_flag = False + + try: + competition = config["competition"] + except: + send("could not find competition field in config") + error_flag = True + try: + match_tests = config["statistics"]["match"] + except: + send("could not find match_tests field in config") + error_flag = True + try: + metrics_tests = config["statistics"]["metric"] + except: + send("could not find metrics_tests field in config") + error_flag = True + try: + pit_tests = config["statistics"]["pit"] + except: + send("could not find pit_tests field in config") + error_flag = True + + if error_flag: + sys.exit(1) + error_flag = False + + if competition == None or competition == "": + send("competition field in config must not be empty") + error_flag = True + if match_tests == None: + send("match_tests field in config must not be empty") + error_flag = True + if metrics_tests == None: + send("metrics_tests field in config must not be empty") + error_flag = True + if pit_tests == None: + send("pit_tests field in config must not be empty") + error_flag = True + + if error_flag: + sys.exit(1) + + send("found and loaded competition, match_tests, metrics_tests, pit_tests from config") + + sys_max_threads = os.cpu_count() + try: + cfg_max_threads = config["max-threads"] + except: + send("max-threads field in config must not be empty, refer to documentation for configuration options", code = 109) + sys.exit(1) + + if cfg_max_threads > -sys_max_threads and cfg_max_threads < 0 : + alloc_processes = sys_max_threads + cfg_max_threads + elif cfg_max_threads > 0 and cfg_max_threads < 1: + alloc_processes = math.floor(cfg_max_threads * sys_max_threads) + elif cfg_max_threads > 1 and cfg_max_threads <= sys_max_threads: + alloc_processes = cfg_max_threads + elif cfg_max_threads == 0: + alloc_processes = sys_max_threads + else: + send("max-threads must be between -" + str(sys_max_threads) + " and " + str(sys_max_threads) + ", but got " + cfg_max_threads) + sys.exit(1) + + send("found and loaded max-threads from config") + send("attempting to start " + str(alloc_processes) + " threads") + try: + exec_threads = Pool(processes = alloc_processes) + except Exception as e: + send("unable to start threads") + sys.exit(1) + send("successfully initialized " + str(alloc_processes) + " threads") + + exit_flag = False + + try: + apikey = config["key"]["database"] + except: + send("database key field in config must be present") + exit_flag = True + try: + tbakey = config["key"]["tba"] + except: + send("tba key field in config must be present") + exit_flag = True + + if apikey == None or apikey == "": + send("database key field in config must not be empty, please populate the database key") + exit_flag = True + if tbakey == None or tbakey == "": + send("tba key field in config must not be empty, please populate the tba key") + exit_flag = True + + if exit_flag: + sys.exit(1) + + send("found and loaded database and tba keys") + + client = pymongo.MongoClient(apikey) + + previous_time = get_previous_time(client) + send("analysis backtimed to: " + str(previous_time)) + + start = time.time() + send("loading match data") + match_data = load_match(client, competition) + send("finished loading match data in " + str(time.time() - start) + " seconds") + + start = time.time() + send("performing analysis on match data") + results = matchloop(client, competition, match_data, match_tests, exec_threads) + send("finished match analysis in " + str(time.time() - start) + " seconds") + + start = time.time() + send("uploading match results to database") + push_match(client, competition, results) + send("finished uploading match results in " + str(time.time() - start) + " seconds") + + start = time.time() + send("performing analysis on team metrics") + results = metricloop(tbakey, client, competition, current_time, metrics_tests) + send("finished metric analysis and pushed to database in " + str(time.time() - start) + " seconds") + + start = time.time() + send("loading pit data") + pit_data = load_pit(client, competition) + send("finished loading pit data in " + str(time.time() - start) + " seconds") + + start = time.time() + send("performing analysis on pit data") + results = pitloop(client, competition, pit_data, pit_tests) + send("finished pit analysis in " + str(time.time() - start) + " seconds") + + start = time.time() + send("uploading pit results to database") + push_pit(client, competition, results) + send("finished uploading pit results in " + str(time.time() - start) + " seconds") + + set_current_time(client, current_time) + send("finished all tests in " + str(time.time() - loop_start) + " seconds, looping") + + except KeyboardInterrupt: + send("detected KeyboardInterrupt, killing threads") + if "exec_threads" in locals(): + exec_threads.terminate() + exec_threads.join() + exec_threads.close() + send("terminated threads, exiting") + loop_stored_exception = sys.exc_info() + loop_exit_code = 0 + break except Exception as e: - await socket.send("unable to start threads") - sys.exit(1) - await socket.send("successfully initialized " + str(alloc_processes) + " threads") + send("encountered an exception while running") + send(str(e)) + loop_exit_code = 1 + break - exit_flag = False - - try: - apikey = config["key"]["database"] - except: - await socket.send("database key field in config must be present") - exit_flag = True - try: - tbakey = config["key"]["tba"] - except: - await socket.send("tba key field in config must be present") - exit_flag = True - - if apikey == None or apikey == "": - await socket.send("database key field in config must not be empty, please populate the database key") - exit_flag = True - if tbakey == None or tbakey == "": - await socket.send("tba key field in config must not be empty, please populate the tba key") - exit_flag = True - - if exit_flag: - sys.exit(1) - - await socket.send("found and loaded database and tba keys") - - client = pymongo.MongoClient(apikey) - - previous_time = get_previous_time(client) - await socket.send("analysis backtimed to: " + str(previous_time)) - - start = time.time() - await socket.send("loading match data") - match_data = load_match(client, competition) - await socket.send("finished loading match data in " + str(time.time() - start) + " seconds") - - start = time.time() - await socket.send("performing analysis on match data") - results = matchloop(client, competition, match_data, match_tests, exec_threads) - await socket.send("finished match analysis in " + str(time.time() - start) + " seconds") - - start = time.time() - await socket.send("uploading match results to database") - push_match(client, competition, results) - await socket.send("finished uploading match results in " + str(time.time() - start) + " seconds") - - start = time.time() - await socket.send("performing analysis on team metrics") - results = metricloop(tbakey, client, competition, current_time, metrics_tests) - await socket.send("finished metric analysis and pushed to database in " + str(time.time() - start) + " seconds") - - start = time.time() - await socket.send("loading pit data") - pit_data = load_pit(client, competition) - await socket.send("finished loading pit data in " + str(time.time() - start) + " seconds") - - start = time.time() - await socket.send("performing analysis on pit data") - results = pitloop(client, competition, pit_data, pit_tests) - await socket.send("finished pit analysis in " + str(time.time() - start) + " seconds") - - start = time.time() - await socket.send("uploading pit results to database") - push_pit(client, competition, results) - await socket.send("finished uploading pit results in " + str(time.time() - start) + " seconds") - - set_current_time(client, current_time) - await socket.send("finished all tests in " + str(time.time() - loop_start) + " seconds, looping") - - except KeyboardInterrupt: - await socket.send("detected KeyboardInterrupt, killing threads") - if "exec_threads" in locals(): - exec_threads.terminate() - exec_threads.join() - exec_threads.close() - await socket.send("terminated threads, exiting") - loop_stored_exception = sys.exc_info() - loop_exit_code = 0 - break - except Exception as e: - await socket.send("encountered an exception while running") - await socket.send(str(e)) - loop_exit_code = 1 - break - - sys.exit(loop_exit_code) + sys.exit(loop_exit_code) def main_win(): @@ -584,16 +619,8 @@ def save_config(path, config_vector): except: return 1 -def start(pid_path, profile = False): - f = open('errorlog.txt', 'w+') - with daemon.DaemonContext( - working_directory=os.getcwd(), - pidfile=pidfile.TimeoutPIDLockFile(pid_path), - stderr=f - ): - start_server = websockets.serve(main_lin, "127.0.0.1", 5678) - asyncio.get_event_loop().run_until_complete(start_server) - asyncio.get_event_loop().run_forever() +def start(pid_path): + main_lin(pid_path) def stop(pid_path): try: