Merge pull request #11 from titanscouting/daemonize-superscript

Merge changes in daemonize-superscript to refactor-superscript

Former-commit-id: 3e8d876f80
This commit is contained in:
Arthur Lu 2021-08-18 17:36:55 -07:00 committed by GitHub
commit e4eb824f51
6 changed files with 231 additions and 99 deletions

4
.gitignore vendored
View File

@ -9,6 +9,10 @@
**/tra_analysis/
**/temp/*
**/*.pid
**/profile.*
**/errorlog.txt
/dist/superscript.*
/dist/superscript

View File

@ -1,18 +1,16 @@
import requests
import pymongo
import pandas as pd
def pull_new_tba_matches(apikey, competition, cutoff):
api_key= apikey
x=requests.get("https://www.thebluealliance.com/api/v3/event/"+competition+"/matches/simple", headers={"X-TBA-Auth_Key":api_key})
x=requests.get("https://www.thebluealliance.com/api/v3/event/"+competition+"/matches/simple", headers={"X-TBA-Auth_Key":api_key}, verify=False)
out = []
for i in x.json():
if i["actual_time"] != None and i["actual_time"]-cutoff >= 0 and i["comp_level"] == "qm":
out.append({"match" : i['match_number'], "blue" : list(map(lambda x: int(x[3:]), i['alliances']['blue']['team_keys'])), "red" : list(map(lambda x: int(x[3:]), i['alliances']['red']['team_keys'])), "winner": i["winning_alliance"]})
return out
def get_team_match_data(apikey, competition, team_num):
client = pymongo.MongoClient(apikey)
def get_team_match_data(client, competition, team_num):
db = client.data_scouting
mdata = db.matchdata
out = {}
@ -20,98 +18,87 @@ def get_team_match_data(apikey, competition, team_num):
out[i['match']] = i['data']
return pd.DataFrame(out)
def get_team_pit_data(apikey, competition, team_num):
client = pymongo.MongoClient(apikey)
def get_team_pit_data(client, competition, team_num):
db = client.data_scouting
mdata = db.pitdata
out = {}
return mdata.find_one({"competition" : competition, "team_scouted": team_num})["data"]
def get_team_metrics_data(apikey, competition, team_num):
client = pymongo.MongoClient(apikey)
def get_team_metrics_data(client, competition, team_num):
db = client.data_processing
mdata = db.team_metrics
return mdata.find_one({"competition" : competition, "team": team_num})
def get_match_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
def get_match_data_formatted(client, competition):
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = unkeyify_2l(get_team_match_data(apikey, competition, int(i)).transpose().to_dict())
out[int(i)] = unkeyify_2l(get_team_match_data(client, competition, int(i)).transpose().to_dict())
except:
pass
return out
def get_metrics_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
def get_metrics_data_formatted(client, competition):
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = get_team_metrics_data(apikey, competition, int(i))
out[int(i)] = get_team_metrics_data(client, competition, int(i))
except:
pass
return out
def get_pit_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
def get_pit_data_formatted(client, competition):
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = get_team_pit_data(apikey, competition, int(i))
out[int(i)] = get_team_pit_data(client, competition, int(i))
except:
pass
return out
def get_pit_variable_data(apikey, competition):
client = pymongo.MongoClient(apikey)
def get_pit_variable_data(client, competition):
db = client.data_processing
mdata = db.team_pit
out = {}
return mdata.find()
def get_pit_variable_formatted(apikey, competition):
temp = get_pit_variable_data(apikey, competition)
def get_pit_variable_formatted(client, competition):
temp = get_pit_variable_data(client, competition)
out = {}
for i in temp:
out[i["variable"]] = i["data"]
return out
def push_team_tests_data(apikey, competition, team_num, data, dbname = "data_processing", colname = "team_tests"):
client = pymongo.MongoClient(apikey)
def push_team_tests_data(client, competition, team_num, data, dbname = "data_processing", colname = "team_tests"):
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "data" : data}, True)
def push_team_metrics_data(apikey, competition, team_num, data, dbname = "data_processing", colname = "team_metrics"):
client = pymongo.MongoClient(apikey)
def push_team_metrics_data(client, competition, team_num, data, dbname = "data_processing", colname = "team_metrics"):
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "metrics" : data}, True)
def push_team_pit_data(apikey, competition, variable, data, dbname = "data_processing", colname = "team_pit"):
client = pymongo.MongoClient(apikey)
def push_team_pit_data(client, competition, variable, data, dbname = "data_processing", colname = "team_pit"):
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "variable": variable}, {"competition" : competition, "variable" : variable, "data" : data}, True)
def get_analysis_flags(apikey, flag):
client = pymongo.MongoClient(apikey)
def get_analysis_flags(client, flag):
db = client.data_processing
mdata = db.flags
return mdata.find_one({flag:{"$exists":True}})
def set_analysis_flags(apikey, flag, data):
client = pymongo.MongoClient(apikey)
def set_analysis_flags(client, flag, data):
db = client.data_processing
mdata = db.flags
return mdata.replace_one({flag:{"$exists":True}}, data, True)
@ -158,7 +145,7 @@ def load_metric(apikey, competition, match, group_name, metrics):
db_data = get_team_metrics_data(apikey, competition, team)
if get_team_metrics_data(apikey, competition, team) == None:
if db_data == None:
elo = {"score": metrics["elo"]["score"]}
gl2 = {"score": metrics["gl2"]["score"], "rd": metrics["gl2"]["rd"], "vol": metrics["gl2"]["vol"]}

View File

@ -17,7 +17,7 @@ stderr = sys.stderr
def log(target, level, message, code = 0):
message = time.ctime() + empty_delim + str(level) + l_brack + f"{code:04}" + r_brack + empty_delim + soft_divided_delim + empty_delim + message
message = time.ctime() + empty_delim + str(level) + l_brack + f"{code:+05}" + r_brack + empty_delim + soft_divided_delim + empty_delim + message
print(message, file = target)
def clear():

View File

@ -36,7 +36,7 @@ def simplestats(data_test):
if test == "regression_sigmoidal":
return an.regression(ranges, data, ['sig'])
def matchloop(apikey, competition, data, tests, exec_threads):
def matchloop(client, competition, data, tests, exec_threads):
short_mapping = {"regression_linear": "lin", "regression_logarithmic": "log", "regression_exponential": "exp", "regression_polynomial": "ply", "regression_sigmoidal": "sig"}
@ -88,7 +88,7 @@ def matchloop(apikey, competition, data, tests, exec_threads):
return return_vector
def metricloop(tbakey, apikey, competition, timestamp, metrics): # listener based metrics update
def metricloop(tbakey, client, competition, timestamp, metrics): # listener based metrics update
elo_N = metrics["elo"]["N"]
elo_K = metrics["elo"]["K"]
@ -100,8 +100,8 @@ def metricloop(tbakey, apikey, competition, timestamp, metrics): # listener base
for match in matches:
red = load_metric(apikey, competition, match, "red", metrics)
blu = load_metric(apikey, competition, match, "blue", metrics)
red = load_metric(client, competition, match, "red", metrics)
blu = load_metric(client, competition, match, "blue", metrics)
elo_red_total = 0
elo_blu_total = 0
@ -179,9 +179,9 @@ def metricloop(tbakey, apikey, competition, timestamp, metrics): # listener base
temp_vector.update(red)
temp_vector.update(blu)
push_metric(apikey, competition, temp_vector)
push_metric(client, competition, temp_vector)
def pitloop(apikey, competition, pit, tests):
def pitloop(client, competition, pit, tests):
return_vector = {}
for team in pit:

View File

@ -3,10 +3,23 @@
# Notes:
# setup:
__version__ = "0.9.1"
__version__ = "1.0.0"
# changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
1.0.0:
- superscript now runs in PEP 3143 compliant well behaved daemon on Linux systems
- linux superscript daemon has integrated websocket output to monitor progress/status remotely
- linux daemon now sends stderr to errorlog.txt
- added verbose option to linux superscript to allow for interactive output
- moved pymongo import to superscript.py
- added profile option to linux superscript to profile runtime of script
0.9.3:
- improved data loading performance by removing redundant PyMongo client creation (120s to 14s)
- passed singular instance of PyMongo client as standin for apikey parameter in all data.py functions
0.9.2:
- removed unessasary imports from data
- minor changes to interface
0.9.1:
- fixed bugs in configuration item loading exception handling
0.9.0:
@ -136,6 +149,10 @@ from multiprocessing import Pool, freeze_support
import time
import warnings
import sys
import asyncio
import websockets
import pymongo
import threading
from interface import splash, log, ERR, INF, stdout, stderr
from data import get_previous_time, set_current_time, load_match, push_match, load_metric, push_metric, load_pit, push_pit
@ -189,7 +206,9 @@ sample_json = """{
}
}"""
def main():
def main(send, verbose = False, profile = False):
if verbose or profile:
warnings.filterwarnings("ignore")
sys.stderr = open("errorlog.txt", "w")
@ -206,33 +225,38 @@ def main():
loop_start = time.time()
current_time = time.time()
log(stdout, INF, "current time: " + str(current_time))
send(stdout, INF, "current time: " + str(current_time))
send(stdout, INF, "loading config at <" + config_path + ">", code = 0)
config = {}
if load_config(config_path, config) == 1:
send(stderr, ERR, "could not find config at <" + config_path + ">, generating blank config and exiting", code = 100)
sys.exit(1)
send(stdout, INF, "found and opened config at <" + config_path + ">", code = 0)
error_flag = False
try:
competition = config["competition"]
except:
log(stderr, ERR, "could not find competition field in config", code = 101)
send(stderr, ERR, "could not find competition field in config", code = 101)
error_flag = True
try:
match_tests = config["statistics"]["match"]
except:
log(stderr, ERR, "could not find match_tests field in config", code = 102)
send(stderr, ERR, "could not find match_tests field in config", code = 102)
error_flag = True
try:
metrics_tests = config["statistics"]["metric"]
except:
log(stderr, ERR, "could not find metrics_tests field in config", code = 103)
send(stderr, ERR, "could not find metrics_tests field in config", code = 103)
error_flag = True
try:
pit_tests = config["statistics"]["pit"]
except:
log(stderr, ERR, "could not find pit_tests field in config", code = 104)
send(stderr, ERR, "could not find pit_tests field in config", code = 104)
error_flag = True
if error_flag:
@ -240,28 +264,28 @@ def main():
error_flag = False
if competition == None or competition == "":
log(stderr, ERR, "competition field in config must not be empty", code = 105)
send(stderr, ERR, "competition field in config must not be empty", code = 105)
error_flag = True
if match_tests == None:
log(stderr, ERR, "match_tests field in config must not be empty", code = 106)
send(stderr, ERR, "match_tests field in config must not be empty", code = 106)
error_flag = True
if metrics_tests == None:
log(stderr, ERR, "metrics_tests field in config must not be empty", code = 107)
send(stderr, ERR, "metrics_tests field in config must not be empty", code = 107)
error_flag = True
if pit_tests == None:
log(stderr, ERR, "pit_tests field in config must not be empty", code = 108)
send(stderr, ERR, "pit_tests field in config must not be empty", code = 108)
error_flag = True
if error_flag:
sys.exit(1)
log(stdout, INF, "found and loaded competition, match_tests, metrics_tests, pit_tests from config")
send(stdout, INF, "found and loaded competition, match_tests, metrics_tests, pit_tests from config")
sys_max_threads = os.cpu_count()
try:
cfg_max_threads = config["max-threads"]
except:
log(stderr, ERR, "max-threads field in config must not be empty, refer to documentation for configuration options", code = 109)
send(stderr, ERR, "max-threads field in config must not be empty, refer to documentation for configuration options", code = 109)
sys.exit(1)
if cfg_max_threads > -sys_max_threads and cfg_max_threads < 0 :
@ -273,101 +297,108 @@ def main():
elif cfg_max_threads == 0:
alloc_processes = sys_max_threads
else:
log(stderr, ERR, "max-threads must be between -" + str(sys_max_threads) + " and " + str(sys_max_threads) + ", but got " + cfg_max_threads, code = 110)
send(stderr, ERR, "max-threads must be between -" + str(sys_max_threads) + " and " + str(sys_max_threads) + ", but got " + cfg_max_threads, code = 110)
sys.exit(1)
log(stdout, INF, "found and loaded max-threads from config")
log(stdout, INF, "attempting to start " + str(alloc_processes) + " threads")
send(stdout, INF, "found and loaded max-threads from config")
send(stdout, INF, "attempting to start " + str(alloc_processes) + " threads")
try:
exec_threads = Pool(processes = alloc_processes)
except Exception as e:
log(stderr, ERR, "unable to start threads", code = 200)
log(stderr, INF, e)
send(stderr, ERR, "unable to start threads", code = 200)
send(stderr, INF, e)
sys.exit(1)
log(stdout, INF, "successfully initialized " + str(alloc_processes) + " threads")
send(stdout, INF, "successfully initialized " + str(alloc_processes) + " threads")
exit_flag = False
try:
apikey = config["key"]["database"]
except:
log(stderr, ERR, "database key field in config must be present", code = 111)
send(stderr, ERR, "database key field in config must be present", code = 111)
exit_flag = True
try:
tbakey = config["key"]["tba"]
except:
log(stderr, ERR, "tba key field in config must be present", code = 112)
send(stderr, ERR, "tba key field in config must be present", code = 112)
exit_flag = True
if apikey == None or apikey == "":
log(stderr, ERR, "database key field in config must not be empty, please populate the database key")
send(stderr, ERR, "database key field in config must not be empty, please populate the database key")
exit_flag = True
if tbakey == None or tbakey == "":
log(stderr, ERR, "tba key field in config must not be empty, please populate the tba key")
send(stderr, ERR, "tba key field in config must not be empty, please populate the tba key")
exit_flag = True
if exit_flag:
sys.exit(1)
log(stdout, INF, "found and loaded database and tba keys")
send(stdout, INF, "found and loaded database and tba keys")
previous_time = get_previous_time(apikey)
log(stdout, INF, "analysis backtimed to: " + str(previous_time))
client = pymongo.MongoClient(apikey)
previous_time = get_previous_time(client)
send(stdout, INF, "analysis backtimed to: " + str(previous_time))
start = time.time()
log(stdout, INF, "loading match data")
match_data = load_match(apikey, competition)
log(stdout, INF, "finished loading match data in " + str(time.time() - start) + " seconds")
send(stdout, INF, "loading match data")
match_data = load_match(client, competition)
send(stdout, INF, "finished loading match data in " + str(time.time() - start) + " seconds")
start = time.time()
log(stdout, INF, "performing analysis on match data")
results = matchloop(apikey, competition, match_data, match_tests, exec_threads)
log(stdout, INF, "finished match analysis in " + str(time.time() - start) + " seconds")
send(stdout, INF, "performing analysis on match data")
results = matchloop(client, competition, match_data, match_tests, exec_threads)
send(stdout, INF, "finished match analysis in " + str(time.time() - start) + " seconds")
start = time.time()
log(stdout, INF, "uploading match results to database")
push_match(apikey, competition, results)
log(stdout, INF, "finished uploading match results in " + str(time.time() - start) + " seconds")
send(stdout, INF, "uploading match results to database")
push_match(client, competition, results)
send(stdout, INF, "finished uploading match results in " + str(time.time() - start) + " seconds")
start = time.time()
log(stdout, INF, "performing analysis on team metrics")
results = metricloop(tbakey, apikey, competition, current_time, metrics_tests)
log(stdout, INF, "finished metric analysis and pushed to database in " + str(time.time() - start) + " seconds")
send(stdout, INF, "performing analysis on team metrics")
results = metricloop(tbakey, client, competition, current_time, metrics_tests)
send(stdout, INF, "finished metric analysis and pushed to database in " + str(time.time() - start) + " seconds")
start = time.time()
log(stdout, INF, "loading pit data")
pit_data = load_pit(apikey, competition)
log(stdout, INF, "finished loading pit data in " + str(time.time() - start) + " seconds")
send(stdout, INF, "loading pit data")
pit_data = load_pit(client, competition)
send(stdout, INF, "finished loading pit data in " + str(time.time() - start) + " seconds")
start = time.time()
log(stdout, INF, "performing analysis on pit data")
results = pitloop(apikey, competition, pit_data, pit_tests)
log(stdout, INF, "finished pit analysis in " + str(time.time() - start) + " seconds")
send(stdout, INF, "performing analysis on pit data")
results = pitloop(client, competition, pit_data, pit_tests)
send(stdout, INF, "finished pit analysis in " + str(time.time() - start) + " seconds")
start = time.time()
log(stdout, INF, "uploading pit results to database")
push_pit(apikey, competition, results)
log(stdout, INF, "finished uploading pit results in " + str(time.time() - start) + " seconds")
send(stdout, INF, "uploading pit results to database")
push_pit(client, competition, results)
send(stdout, INF, "finished uploading pit results in " + str(time.time() - start) + " seconds")
set_current_time(apikey, current_time)
log(stdout, INF, "finished all tests in " + str(time.time() - loop_start) + " seconds, looping")
client.close()
set_current_time(client, current_time)
send(stdout, INF, "finished all tests in " + str(time.time() - loop_start) + " seconds, looping")
except KeyboardInterrupt:
log(stdout, INF, "detected KeyboardInterrupt, killing threads")
send(stdout, INF, "detected KeyboardInterrupt, killing threads")
if "exec_threads" in locals():
exec_threads.terminate()
exec_threads.join()
exec_threads.close()
log(stdout, INF, "terminated threads, exiting")
send(stdout, INF, "terminated threads, exiting")
loop_stored_exception = sys.exc_info()
loop_exit_code = 0
break
except Exception as e:
log(stderr, ERR, "encountered an exception while running")
send(stderr, ERR, "encountered an exception while running")
print(e, file = stderr)
loop_exit_code = 1
break
if profile:
return
sys.exit(loop_exit_code)
def load_config(path, config_vector):
@ -375,10 +406,8 @@ def load_config(path, config_vector):
f = open(path, "r")
config_vector.update(json.load(f))
f.close()
log(stdout, INF, "found and opened config at <" + path + ">")
return 0
except:
log(stderr, ERR, "could not find config at <" + path + ">, generating blank config and exiting", code = 100)
f = open(path, "w")
f.write(sample_json)
f.close()
@ -393,7 +422,117 @@ def save_config(path, config_vector):
except:
return 1
def start(pid_path, verbose = False, profile = False):
if profile:
def send(target, level, message, code = 0):
pass
import cProfile, pstats, io
profile = cProfile.Profile()
profile.enable()
main(send, profile = True)
profile.disable()
f = open("profile.txt", 'w+')
ps = pstats.Stats(profile, stream = f).sort_stats('cumtime')
ps.print_stats()
elif verbose:
main(log, verbose = verbose)
else:
f = open('errorlog.txt', 'w+')
with daemon.DaemonContext(
working_directory=os.getcwd(),
pidfile=pidfile.TimeoutPIDLockFile(pid_path),
stderr=f
):
async def handler(client, path):
clients.append(client)
while True:
try:
pong_waiter = await client.ping()
await pong_waiter
time.sleep(3)
except Exception as e:
clients.remove(client)
break
async def send_one(client, data):
await client.send(data)
def send(target, level, message, code = 0):
message_clients = clients.copy()
for client in message_clients:
try:
asyncio.run(send_one(client, message))
except:
pass
clients = []
start_server = websockets.serve(handler, "0.0.0.0", 5678)
asyncio.get_event_loop().run_until_complete(start_server)
threading.Thread(target = asyncio.get_event_loop().run_forever).start()
main(send)
def stop(pid_path):
try:
pf = open(pid_path, 'r')
pid = int(pf.read().strip())
pf.close()
except IOError:
sys.stderr.write("pidfile at <" + pid_path + "> does not exist. Daemon not running?\n")
return
try:
while True:
os.kill(pid, SIGTERM)
time.sleep(0.01)
except OSError as err:
err = str(err)
if err.find("No such process") > 0:
if os.path.exists(pid_path):
os.remove(pid_path)
else:
print(str(err))
sys.exit(1)
def restart(pid_path):
stop(pid_path)
start(pid_path)
if __name__ == "__main__":
if sys.platform.startswith("win"):
freeze_support()
main()
start(None, verbose = True)
else:
import daemon
from daemon import pidfile
from signal import SIGTERM
pid_path = "tra-daemon.pid"
if len(sys.argv) == 2:
if 'start' == sys.argv[1]:
start(pid_path)
elif 'stop' == sys.argv[1]:
stop(pid_path)
elif 'restart' == sys.argv[1]:
restart(pid_path)
elif 'verbose' == sys.argv[1]:
start(None, verbose = True)
elif 'profile' == sys.argv[1]:
start(None, profile=True)
else:
print("usage: %s start|stop|restart|verbose|profile" % sys.argv[0])
sys.exit(2)
sys.exit(0)
else:
print("usage: %s start|stop|restart|verbose|profile" % sys.argv[0])
sys.exit(2)

View File

@ -10,6 +10,8 @@ a = Analysis(['superscript.py'],
"dnspython",
"sklearn.utils._weight_vector",
"requests",
"websockets.legacy",
"websockets.legacy.server",
],
hookspath=[],
runtime_hooks=[],