1 Commits

Author SHA1 Message Date
snyk-bot
36b0a80384 fix: src/requirements.txt to reduce vulnerabilities
The following vulnerabilities are fixed by pinning transitive dependencies:
- https://snyk.io/vuln/SNYK-PYTHON-SETUPTOOLS-3113904
2022-11-15 12:03:37 +00:00
21 changed files with 888 additions and 1194 deletions

View File

@@ -1,6 +1,7 @@
FROM python:slim FROM ubuntu:20.04
WORKDIR / WORKDIR /
RUN apt-get -y update; apt-get -y upgrade RUN apt-get -y update
RUN apt-get -y install git binutils RUN DEBIAN_FRONTEND=noninteractive apt-get install -y --no-install-recommends tzdata
COPY requirements.txt . RUN apt-get install -y python3 python3-dev git python3-pip python3-kivy python-is-python3 libgl1-mesa-dev build-essential
RUN pip install -r requirements.txt RUN ln -s $(which pip3) /usr/bin/pip
RUN pip install pymongo pandas numpy scipy scikit-learn matplotlib pylint kivy

View File

@@ -0,0 +1,2 @@
FROM titanscout2022/tra-analysis-base:latest
WORKDIR /

View File

@@ -1,23 +1,28 @@
{ {
"name": "TRA Analysis Development Environment", "name": "TRA Analysis Development Environment",
"build": { "build": {
"dockerfile": "Dockerfile", "dockerfile": "dev-dockerfile",
}, },
"settings": { "settings": {
"terminal.integrated.shell.linux": "/bin/bash", "terminal.integrated.shell.linux": "/bin/bash",
"python.pythonPath": "/usr/local/bin/python", "python.pythonPath": "/usr/local/bin/python",
"python.linting.enabled": true, "python.linting.enabled": true,
"python.linting.pylintEnabled": true, "python.linting.pylintEnabled": true,
"python.linting.pylintPath": "/usr/local/bin/pylint", "python.formatting.autopep8Path": "/usr/local/py-utils/bin/autopep8",
"python.linting.pylintArgs": ["--indent-string", "\t"], "python.formatting.blackPath": "/usr/local/py-utils/bin/black",
"python.testing.pytestPath": "/usr/local/bin/pytest", "python.formatting.yapfPath": "/usr/local/py-utils/bin/yapf",
"editor.tabSize": 4, "python.linting.banditPath": "/usr/local/py-utils/bin/bandit",
"editor.insertSpaces": false "python.linting.flake8Path": "/usr/local/py-utils/bin/flake8",
"python.linting.mypyPath": "/usr/local/py-utils/bin/mypy",
"python.linting.pycodestylePath": "/usr/local/py-utils/bin/pycodestyle",
"python.linting.pydocstylePath": "/usr/local/py-utils/bin/pydocstyle",
"python.linting.pylintPath": "/usr/local/py-utils/bin/pylint",
"python.testing.pytestPath": "/usr/local/py-utils/bin/pytest"
}, },
"extensions": [ "extensions": [
"mhutchie.git-graph", "mhutchie.git-graph",
"ms-python.python", "ms-python.python",
"waderyan.gitblame" "waderyan.gitblame"
], ],
"postCreateCommand": "" "postCreateCommand": "/usr/bin/pip3 install -r ${containerWorkspaceFolder}/src/requirements.txt && /usr/bin/pip3 install --no-cache-dir pylint && /usr/bin/pip3 install pytest"
} }

View File

@@ -1,13 +0,0 @@
cerberus
dnspython
numpy
pyinstaller
pylint
pymongo
pyparsing
pytest
requests
scikit-learn
scipy
six
tra-analysis

View File

@@ -1,7 +1,7 @@
# This workflow will install Python dependencies, run tests and lint with a variety of Python versions # This workflow will install Python dependencies, run tests and lint with a variety of Python versions
# For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions # For more information see: https://help.github.com/actions/language-and-framework-guides/using-python-with-github-actions
name: Build Superscript Linux name: Superscript Unit Tests
on: on:
release: release:
@@ -11,25 +11,7 @@ jobs:
generate: generate:
name: Build Linux name: Build Linux
runs-on: ubuntu-latest runs-on: ubuntu-latest
steps: steps:
- name: Checkout master - name: Checkout master
uses: actions/checkout@master uses: actions/checkout@master
- name: Install Dependencies
run: pip install -r requirements.txt
working-directory: src/
- name: Give Execute Permission
run: chmod +x build-CLI.sh
working-directory: build/
- name: Build Binary
run: ./build-CLI.sh
working-directory: build/
- name: Copy Binary to Root Dir
run: cp superscript ..
working-directory: dist/
- name: Upload Release Asset
uses: svenstaro/upload-release-action@v2
with:
repo_token: ${{ secrets.GITHUB_TOKEN }}
file: superscript
asset_name: superscript
tag: ${{ github.ref }}

8
.gitignore vendored
View File

@@ -9,10 +9,6 @@
**/tra_analysis/ **/tra_analysis/
**/temp/* **/temp/*
**/*.pid
**/profile.*
**/*.log
**/errorlog.txt **/errorlog.txt
/dist/* /dist/superscript.*
/dist/superscript

View File

@@ -1,4 +1,4 @@
# Red Alliance Analysis · ![GitHub release (latest by date)](https://img.shields.io/github/v/release/titanscout2022/red-alliance-analysis) # Red Alliance Analysis · ![GitHub release (latest by date)](https://img.shields.io/github/v/release/titanscouting/tra-superscript)
Titan Robotics 2022 Strategy Team Repository for Data Analysis Tools. Included with these tools are the backend data analysis engine formatted as a python package, associated binaries for the analysis package, and premade scripts that can be pulled directly from this repository and will integrate with other Red Alliance applications to quickly deploy FRC scouting tools. Titan Robotics 2022 Strategy Team Repository for Data Analysis Tools. Included with these tools are the backend data analysis engine formatted as a python package, associated binaries for the analysis package, and premade scripts that can be pulled directly from this repository and will integrate with other Red Alliance applications to quickly deploy FRC scouting tools.

View File

@@ -1,5 +1,5 @@
set pathtospec="superscript.spec" set pathtospec="../src/superscript.spec"
set pathtodist="../dist/" set pathtodist="../dist/"
set pathtowork="temp/" set pathtowork="temp/"
pyinstaller --clean --distpath %pathtodist% --workpath %pathtowork% %pathtospec% pyinstaller --onefile --clean --distpath %pathtodist% --workpath %pathtowork% %pathtospec%

View File

@@ -1,5 +1,5 @@
pathtospec="superscript.spec" pathtospec="../src/superscript.spec"
pathtodist="../dist/" pathtodist="../dist/"
pathtowork="temp/" pathtowork="temp/"
pyinstaller --clean --distpath ${pathtodist} --workpath ${pathtowork} ${pathtospec} pyinstaller --onefile --clean --distpath ${pathtodist} --workpath ${pathtowork} ${pathtospec}

View File

@@ -1,35 +0,0 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(
['../src/superscript.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=['dnspython', 'sklearn.utils._weight_vector', 'sklearn.utils._typedefs', 'sklearn.neighbors._partition_nodes', 'requests'],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False
)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[('W ignore', None, 'OPTION')],
name='superscript',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True )

View File

@@ -1,240 +0,0 @@
import json
from exceptions import ConfigurationError
from cerberus import Validator
class Configuration:
path = None
config = {}
_sample_config = {
"persistent":{
"key":{
"database":"",
"tba":"",
"tra":{
"CLIENT_ID":"",
"CLIENT_SECRET":"",
"url": ""
}
},
"config-preference":"local",
"synchronize-config":False
},
"variable":{
"event-delay":False,
"loop-delay":0,
"competition": "2020ilch",
"modules":{
"match":{
"tests":{
"balls-blocked":[
"basic_stats",
"historical_analysis",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
],
"balls-collected":[
"basic_stats",
"historical_analysis",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
],
"balls-lower-teleop":[
"basic_stats",
"historical_analysis",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
],
"balls-lower-auto":[
"basic_stats",
"historical_analysis",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
],
"balls-started":[
"basic_stats",
"historical_analyss",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
],
"balls-upper-teleop":[
"basic_stats",
"historical_analysis",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
],
"balls-upper-auto":[
"basic_stats",
"historical_analysis",
"regression_linear",
"regression_logarithmic",
"regression_exponential",
"regression_polynomial",
"regression_sigmoidal"
]
}
},
"metric":{
"tests":{
"elo":{
"score":1500,
"N":400,
"K":24
},
"gl2":{
"score":1500,
"rd":250,
"vol":0.06
},
"ts":{
"mu":25,
"sigma":8.33
}
}
},
"pit":{
"tests":{
"wheel-mechanism":True,
"low-balls":True,
"high-balls":True,
"wheel-success":True,
"strategic-focus":True,
"climb-mechanism":True,
"attitude":True
}
}
}
}
}
_validation_schema = {
"persistent": {
"type": "dict",
"required": True,
"require_all": True,
"schema": {
"key": {
"type": "dict",
"require_all":True,
"schema": {
"database": {"type":"string"},
"tba": {"type": "string"},
"tra": {
"type": "dict",
"require_all": True,
"schema": {
"CLIENT_ID": {"type": "string"},
"CLIENT_SECRET": {"type": "string"},
"url": {"type": "string"}
}
}
}
},
"config-preference": {"type": "string", "required": True},
"synchronize-config": {"type": "boolean", "required": True}
}
}
}
def __init__(self, path):
self.path = path
self.load_config()
self.validate_config()
def load_config(self):
try:
f = open(self.path, "r")
self.config.update(json.load(f))
f.close()
except:
self.config = self._sample_config
self.save_config()
f.close()
raise ConfigurationError("could not find config file at <" + self.path + ">, created new sample config file at that path")
def save_config(self):
f = open(self.path, "w+")
json.dump(self.config, f, ensure_ascii=False, indent=4)
f.close()
def validate_config(self):
v = Validator(self._validation_schema, allow_unknown = True)
isValidated = v.validate(self.config)
if not isValidated:
raise ConfigurationError("config validation error: " + v.errors)
def __getattr__(self, name): # better hashed lookup method for common multikey-value paths, TYPE UNSAFE
attr_lookup = {
"persistent": self.config["persistent"],
"key": self.config["persistent"]["key"],
"database": self.config["persistent"]["key"]["database"],
"tba": self.config["persistent"]["key"]["tba"],
"tra": self.config["persistent"]["key"]["tra"],
"priority": self.config["persistent"]["config-preference"],
"sync": self.config["persistent"]["synchronize-config"],
"variable": self.config["variable"],
"event_delay": self.config["variable"]["event-delay"],
"loop_delay": self.config["variable"]["loop-delay"],
"competition": self.config["variable"]["competition"],
"modules": self.config["variable"]["modules"]
}
try:
return attr_lookup[name]
except KeyError:
return None
def __getitem__(self, key):
return self.config[key]
def resolve_config_conflicts(self, logger, client): # needs improvement with new localization scheme
sync = self.sync
priority = self.priority
if sync:
if priority == "local" or priority == "client":
logger.info("config-preference set to local/client, loading local config information")
remote_config = client.get_database_config()
if remote_config != self.config["variable"]:
client.set_database_config(self.config["variable"])
logger.info("database config was different and was updated")
# no change to config
elif priority == "remote" or priority == "database":
logger.info("config-preference set to remote/database, loading remote config information")
remote_config = client.get_database_config()
if remote_config != self.config["variable"]:
self.config["variable"] = remote_config
self.save_config()
# change variable to match remote
logger.info("local config was different and was updated")
else:
raise ConfigurationError("persistent/config-preference field must be \"local\"/\"client\" or \"remote\"/\"database\"")
else:
if priority == "local" or priority == "client":
logger.info("config-preference set to local/client, loading local config information")
# no change to config
elif priority == "remote" or priority == "database":
logger.info("config-preference set to remote/database, loading database config information")
self.config["variable"] = client.get_database_config()
# change variable to match remote without updating local version
else:
raise ConfigurationError("persistent/config-preference field must be \"local\"/\"client\" or \"remote\"/\"database\"")

View File

@@ -1,56 +1,123 @@
import requests import requests
import pandas as pd
import pymongo import pymongo
from exceptions import APIError import pandas as pd
import time
class Client: def pull_new_tba_matches(apikey, competition, cutoff):
api_key= apikey
def __init__(self, config): x=requests.get("https://www.thebluealliance.com/api/v3/event/"+competition+"/matches/simple", headers={"X-TBA-Auth_Key":api_key})
self.competition = config.competition
self.tbakey = config.tba
self.mongoclient = pymongo.MongoClient(config.database)
self.trakey = config.tra
def close(self):
self.mongoclient.close()
def pull_new_tba_matches(self, cutoff):
competition = self.competition
api_key= self.tbakey
x=requests.get("https://www.thebluealliance.com/api/v3/event/"+competition+"/matches/simple", headers={"X-TBA-Auth-Key":api_key})
json = x.json()
out = [] out = []
for i in json: for i in x.json():
if i["actual_time"] != None and i["comp_level"] == "qm": if i["actual_time"] != None and i["actual_time"]-cutoff >= 0 and i["comp_level"] == "qm":
out.append({"match" : i['match_number'], "blue" : list(map(lambda x: int(x[3:]), i['alliances']['blue']['team_keys'])), "red" : list(map(lambda x: int(x[3:]), i['alliances']['red']['team_keys'])), "winner": i["winning_alliance"]}) out.append({"match" : i['match_number'], "blue" : list(map(lambda x: int(x[3:]), i['alliances']['blue']['team_keys'])), "red" : list(map(lambda x: int(x[3:]), i['alliances']['red']['team_keys'])), "winner": i["winning_alliance"]})
out.sort(key=lambda x: x['match'])
return out return out
def get_team_match_data(self, team_num): def get_team_match_data(apikey, competition, team_num):
client = self.mongoclient client = pymongo.MongoClient(apikey)
competition = self.competition
db = client.data_scouting db = client.data_scouting
mdata = db.matchdata mdata = db.matchdata
out = {} out = {}
for i in mdata.find({"competition" : competition, "team_scouted": str(team_num)}): for i in mdata.find({"competition" : competition, "team_scouted": team_num}):
out[i['match']] = i['data'] out[i['match']] = i['data']
return pd.DataFrame(out) return pd.DataFrame(out)
def get_team_metrics_data(self, team_num): def get_team_pit_data(apikey, competition, team_num):
client = self.mongoclient client = pymongo.MongoClient(apikey)
competition = self.competition db = client.data_scouting
mdata = db.pitdata
out = {}
return mdata.find_one({"competition" : competition, "team_scouted": team_num})["data"]
def get_team_metrics_data(apikey, competition, team_num):
client = pymongo.MongoClient(apikey)
db = client.data_processing db = client.data_processing
mdata = db.team_metrics mdata = db.team_metrics
return mdata.find_one({"competition" : competition, "team": team_num}) return mdata.find_one({"competition" : competition, "team": team_num})
def get_team_pit_data(self, team_num): def get_match_data_formatted(apikey, competition):
client = self.mongoclient client = pymongo.MongoClient(apikey)
competition = self.competition
db = client.data_scouting db = client.data_scouting
mdata = db.pitdata mdata = db.teamlist
return mdata.find_one({"competition" : competition, "team_scouted": str(team_num)})["data"] x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = unkeyify_2l(get_team_match_data(apikey, competition, int(i)).transpose().to_dict())
except:
pass
return out
def unkeyify_2l(self, layered_dict): def get_metrics_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = d.get_team_metrics_data(apikey, competition, int(i))
except:
pass
return out
def get_pit_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = get_team_pit_data(apikey, competition, int(i))
except:
pass
return out
def get_pit_variable_data(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_processing
mdata = db.team_pit
out = {}
return mdata.find()
def get_pit_variable_formatted(apikey, competition):
temp = get_pit_variable_data(apikey, competition)
out = {}
for i in temp:
out[i["variable"]] = i["data"]
return out
def push_team_tests_data(apikey, competition, team_num, data, dbname = "data_processing", colname = "team_tests"):
client = pymongo.MongoClient(apikey)
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "data" : data}, True)
def push_team_metrics_data(apikey, competition, team_num, data, dbname = "data_processing", colname = "team_metrics"):
client = pymongo.MongoClient(apikey)
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "metrics" : data}, True)
def push_team_pit_data(apikey, competition, variable, data, dbname = "data_processing", colname = "team_pit"):
client = pymongo.MongoClient(apikey)
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "variable": variable}, {"competition" : competition, "variable" : variable, "data" : data}, True)
def get_analysis_flags(apikey, flag):
client = pymongo.MongoClient(apikey)
db = client.data_processing
mdata = db.flags
return mdata.find_one({flag:{"$exists":True}})
def set_analysis_flags(apikey, flag, data):
client = pymongo.MongoClient(apikey)
db = client.data_processing
mdata = db.flags
return mdata.replace_one({flag:{"$exists":True}}, data, True)
def unkeyify_2l(layered_dict):
out = {} out = {}
for i in layered_dict.keys(): for i in layered_dict.keys():
add = [] add = []
@@ -60,239 +127,3 @@ class Client:
add.sort(key = lambda x: x[0]) add.sort(key = lambda x: x[0])
out[i] = list(map(lambda x: x[1], add)) out[i] = list(map(lambda x: x[1], add))
return out return out
def get_match_data_formatted(self):
teams_at_comp = self.get_teams_at_competition()
out = {}
for team in teams_at_comp:
try:
out[int(team)] = self.unkeyify_2l(self.get_team_match_data(team).transpose().to_dict())
except:
pass
return out
def get_metrics_data_formatted(self):
competition = self.competition
teams_at_comp = self.get_teams_at_competition()
out = {}
for team in teams_at_comp:
try:
out[int(team)] = self.get_team_metrics_data(int(team))
except:
pass
return out
def get_pit_data_formatted(self):
client = self.mongoclient
competition = self.competition
x=requests.get("https://titanscouting.epochml.org/api/fetchAllTeamNicknamesAtCompetition?competition="+competition)
x = x.json()
x = x['data']
x = x.keys()
out = {}
for i in x:
try:
out[int(i)] = self.get_team_pit_data(int(i))
except:
pass
return out
def get_pit_variable_data(self):
client = self.mongoclient
db = client.data_processing
mdata = db.team_pit
return mdata.find()
def get_pit_variable_formatted(self):
temp = self.get_pit_variable_data()
out = {}
for i in temp:
out[i["variable"]] = i["data"]
return out
def push_team_tests_data(self, team_num, data, dbname = "data_processing", colname = "team_tests"):
client = self.mongoclient
competition = self.competition
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "data" : data}, True)
def push_team_metrics_data(self, team_num, data, dbname = "data_processing", colname = "team_metrics"):
client = self.mongoclient
competition = self.competition
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "metrics" : data}, True)
def push_team_pit_data(self, variable, data, dbname = "data_processing", colname = "team_pit"):
client = self.mongoclient
competition = self.competition
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "variable": variable}, {"competition" : competition, "variable" : variable, "data" : data}, True)
def get_analysis_flags(self, flag):
client = self.mongoclient
db = client.data_processing
mdata = db.flags
return mdata.find_one({flag:{"$exists":True}})
def set_analysis_flags(self, flag, data):
client = self.mongoclient
db = client.data_processing
mdata = db.flags
return mdata.replace_one({flag:{"$exists":True}}, data, True)
def get_previous_time(self):
previous_time = self.get_analysis_flags("latest_update")
if previous_time == None:
self.set_analysis_flags("latest_update", 0)
previous_time = 0
else:
previous_time = previous_time["latest_update"]
return previous_time
def set_current_time(self, current_time):
self.set_analysis_flags("latest_update", {"latest_update":current_time})
def get_database_config(self):
remote_config = self.get_analysis_flags("config")
return remote_config["config"] if remote_config != None else None
def set_database_config(self, config):
self.set_analysis_flags("config", {"config": config})
def load_match(self):
return self.get_match_data_formatted()
def load_metric(self, match, group_name, metrics):
group = {}
for team in match[group_name]:
db_data = self.get_team_metrics_data(team)
if db_data == None:
elo = {"score": metrics["elo"]["score"]}
gl2 = {"score": metrics["gl2"]["score"], "rd": metrics["gl2"]["rd"], "vol": metrics["gl2"]["vol"]}
ts = {"mu": metrics["ts"]["mu"], "sigma": metrics["ts"]["sigma"]}
group[team] = {"elo": elo, "gl2": gl2, "ts": ts}
else:
metrics = db_data["metrics"]
elo = metrics["elo"]
gl2 = metrics["gl2"]
ts = metrics["ts"]
group[team] = {"elo": elo, "gl2": gl2, "ts": ts}
return group
def load_pit(self):
return self.get_pit_data_formatted()
def push_match(self, results):
for team in results:
self.push_team_tests_data(team, results[team])
def push_metric(self, metric):
for team in metric:
self.push_team_metrics_data(team, metric[team])
def push_pit(self, pit):
for variable in pit:
self.push_team_pit_data(variable, pit[variable])
def check_new_database_matches(self):
return True
#----- API implementations below -----#
def get_team_competition(self):
trakey = self.trakey
url = self.trakey['url']
endpoint = '/api/fetchTeamCompetition'
params = {
"CLIENT_ID": trakey['CLIENT_ID'],
"CLIENT_SECRET": trakey['CLIENT_SECRET']
}
response = requests.request("GET", url + endpoint, params=params)
json = response.json()
if json['success']:
return json['competition']
else:
raise APIError(json)
def get_team(self):
trakey = self.trakey
url = self.trakey['url']
endpoint = '/api/fetchTeamCompetition'
params = {
"CLIENT_ID": trakey['CLIENT_ID'],
"CLIENT_SECRET": trakey['CLIENT_SECRET']
}
response = requests.request("GET", url + endpoint, params=params)
json = response.json()
if json['success']:
return json['team']
else:
raise APIError(json)
""" doesn't seem to be functional:
def get_team_match_data(self, team_num):
trakey = self.trakey
url = self.trakey['url']
competition = self.competition
endpoint = '/api/fetchAllTeamMatchData'
params = {
"competition": competition,
"teamScouted": team_num,
"CLIENT_ID": trakey['CLIENT_ID'],
"CLIENT_SECRET": trakey['CLIENT_SECRET']
}
response = requests.request("GET", url + endpoint, params=params)
json = response.json()
if json['success']:
return json['data'][team_num]
else:
raise APIError(json)"""
def get_teams_at_competition(self):
trakey = self.trakey
url = self.trakey['url']
competition = self.competition
endpoint = '/api/fetchAllTeamNicknamesAtCompetition'
params = {
"competition": competition,
"CLIENT_ID": trakey['CLIENT_ID'],
"CLIENT_SECRET": trakey['CLIENT_SECRET']
}
response = requests.request("GET", url + endpoint, params=params)
json = response.json()
if json['success']:
return list(json['data'].keys())
else:
raise APIError(json)

151
src/design.kv Normal file
View File

@@ -0,0 +1,151 @@
<Launch>:
orientation: "vertical"
NavigationLayout:
ScreenManager:
id: screen_manager
HomeScreen:
name: "Home"
BoxLayout:
orientation: "vertical"
MDToolbar:
title: screen_manager.current
elevation: 10
left_action_items: [['menu', lambda x: nav_drawer.toggle_nav_drawer()]]
GridLayout:
cols: 1
padding: 15, 15
spacing: 20, 20
MDTextFieldRect:
hint_text: "Console Log"
# size_hint: .8, None
# align: 'center'
# Widget:
SettingsScreen:
name: "Settings"
BoxLayout:
orientation: 'vertical'
MDToolbar:
title: screen_manager.current
elevation: 10
left_action_items: [['menu', lambda x: nav_drawer.toggle_nav_drawer()]]
Widget:
InfoScreen:
name: "Info"
BoxLayout:
orientation: 'vertical'
MDToolbar:
title: screen_manager.current
elevation: 10
left_action_items: [['menu', lambda x: nav_drawer.toggle_nav_drawer()]]
# GridLayout:
# cols: 2
# padding: 15, 15
# spacing: 20, 20
BoxLayout:
orientation: "horizontal"
MDLabel:
text: "DB Key:"
halign: 'center'
MDTextField:
hint_text: "placeholder"
pos_hint: {"center_y": .5}
BoxLayout:
orientation: "horizontal"
MDLabel:
text: "TBA Key:"
halign: 'center'
MDTextField:
hint_text: "placeholder"
pos_hint: {"center_y": .5}
BoxLayout:
orientation: "horizontal"
MDLabel:
text: "CPU Use:"
halign: 'center'
MDLabel:
text: "placeholder"
halign: 'center'
BoxLayout:
orientation: "horizontal"
MDLabel:
text: "Network:"
halign: 'center'
MDLabel:
text: "placeholder"
halign: 'center'
Widget:
BoxLayout:
orientation: "horizontal"
MDLabel:
text: "Progress"
halign: 'center'
MDProgressBar:
id: progress
value: 50
StatsScreen:
name: "Stats"
MDCheckbox:
size_hint: None, None
size: "48dp", "48dp"
pos_hint: {'center_x': .5, 'center_y': .5}
on_active: Screen.test()
#Navigation Drawer -------------------------
MDNavigationDrawer:
id: nav_drawer
BoxLayout:
orientation: "vertical"
padding: "8dp"
spacing: "8dp"
MDLabel:
text: "Titan Scouting"
font_style: "Button"
size_hint_y: None
height: self.texture_size[1]
MDLabel:
text: "Data Analysis"
font_style: "Caption"
size_hint_y: None
height: self.texture_size[1]
ScrollView:
MDList:
OneLineAvatarListItem:
text: "Home"
on_press:
# nav_drawer.set_state("close")
# screen_manager.transition.direction = "left"
screen_manager.current = "Home"
IconLeftWidget:
icon: "home"
OneLineAvatarListItem:
text: "Settings"
on_press:
# nav_drawer.set_state("close")
# screen_manager.transition.direction = "right"
# screen_manager.fade
screen_manager.current = "Settings"
IconLeftWidget:
icon: "cog"
OneLineAvatarListItem:
text: "Info"
on_press:
# nav_drawer.set_state("close")
# screen_manager.transition.direction = "right"
# screen_manager.fade
screen_manager.current = "Info"
IconLeftWidget:
icon: "cog"
OneLineAvatarListItem:
text: "Stats"
on_press:
# nav_drawer.set_state("close")
# screen_manager.transition.direction = "right"
# screen_manager.fade
screen_manager.current = "Stats"
IconLeftWidget:
icon: "cog"

View File

@@ -1,7 +0,0 @@
class APIError(Exception):
def __init__(self, str):
super().__init__(str)
class ConfigurationError (Exception):
def __init__(self, str):
super().__init__(str)

View File

@@ -1,91 +0,0 @@
from logging import Logger as L
import datetime
import platform
import json
class Logger(L):
file = None
levels = {
0: "",
10:"[DEBUG] ",
20:"[INFO] ",
30:"[WARNING] ",
40:"[ERROR] ",
50:"[CRITICAL]",
}
targets = []
def __init__(self, verbose, profile, debug, file = None):
super().__init__("tra_logger")
self.file = file
if file is not None:
self.targets.append(self._send_file)
if profile:
self.targets.append(self._send_null)
elif verbose:
self.targets.append(self._send_scli)
elif debug:
self.targets.append(self._send_scli)
else:
self.targets.append(self._send_null)
def _send_null(self, msg):
pass
def _send_scli(self, msg):
print(msg)
def _send_file(self, msg):
f = open(self.file, 'a')
f.write(msg + "\n")
f.close()
def get_time_formatted(self):
return datetime.datetime.now().strftime("%Y/%m/%d %H:%M:%S %Z")
def log(self, level, msg):
for t in self.targets:
t(self.get_time_formatted() + "| " + self.levels[level] + ": " + msg)
def debug(self, msg):
self.log(10, msg)
def info(self, msg):
self.log(20, msg)
def warning(self, msg):
self.log(30, msg)
def error(self, msg):
self.log(40, msg)
def critical(self, msg):
self.log(50, msg)
def splash(self, version):
def hrule():
self.log(0, "#"+38*"-"+"#")
def box(s):
temp = "|"
temp += s
temp += (40-len(s)-2)*" "
temp += "|"
self.log(0, temp)
hrule()
box(" superscript version: " + version)
box(" os: " + platform.system())
box(" python: " + platform.python_version())
hrule()
def save_module_to_file(self, module, data, results):
f = open(module + ".log", "w")
json.dump({"data": data, "results":results}, f, ensure_ascii=False, indent=4)
f.close()

58
src/main.py Normal file
View File

@@ -0,0 +1,58 @@
from kivy.lang import Builder
from kivymd.uix.screen import Screen
from kivymd.uix.list import OneLineListItem, MDList, TwoLineListItem, ThreeLineListItem
from kivymd.uix.list import OneLineIconListItem, IconLeftWidget
from kivy.uix.scrollview import ScrollView
from kivy.uix.boxlayout import BoxLayout
from kivy.uix.screenmanager import ScreenManager, Screen
from kivy.uix.dropdown import DropDown
from kivy.uix.button import Button
from kivy.base import runTouchApp
from kivymd.uix.menu import MDDropdownMenu, MDMenuItem
from kivymd.app import MDApp
# import superscript as ss
# from tra_analysis import analysis as an
import data as d
from collections import defaultdict
import json
import math
import numpy as np
import os
from os import system, name
from pathlib import Path
from multiprocessing import Pool
import matplotlib.pyplot as plt
from concurrent.futures import ThreadPoolExecutor
import time
import warnings
# global exec_threads
# Screens
class HomeScreen(Screen):
pass
class SettingsScreen(Screen):
pass
class InfoScreen(Screen):
pass
class StatsScreen(Screen):
pass
class MyApp(MDApp):
def build(self):
self.theme_cls.primary_palette = "Red"
return Builder.load_file("design.kv")
def test():
print("test")
if __name__ == "__main__":
MyApp().run()

View File

@@ -1,313 +0,0 @@
import abc
import signal
import numpy as np
from tra_analysis import Analysis as an
from tqdm import tqdm
class Module(metaclass = abc.ABCMeta):
@classmethod
def __subclasshook__(cls, subclass):
return (hasattr(subclass, '__init__') and
callable(subclass.__init__) and
hasattr(subclass, 'validate_config') and
callable(subclass.validate_config) and
hasattr(subclass, 'run') and
callable(subclass.run)
)
@abc.abstractmethod
def __init__(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def validate_config(self, *args, **kwargs):
raise NotImplementedError
@abc.abstractmethod
def run(self, *args, **kwargs):
raise NotImplementedError
class Match (Module):
config = None
timestamp = None
client = None
data = None
results = None
def __init__(self, config, timestamp, client):
self.config = config
self.timestamp = timestamp
self.client = client
def validate_config(self):
return True, ""
def run(self):
self._load_data()
self._process_data()
self._push_results()
def _load_data(self):
self.data = self.client.load_match()
def _simplestats(self, data_test):
signal.signal(signal.SIGINT, signal.SIG_IGN)
data = np.array(data_test[3])
data = data[np.isfinite(data)]
ranges = list(range(len(data)))
test = data_test[2]
if test == "basic_stats":
return an.basic_stats(data)
if test == "historical_analysis":
return an.histo_analysis([ranges, data])
if test == "regression_linear":
return an.regression(ranges, data, ['lin'])
if test == "regression_logarithmic":
return an.regression(ranges, data, ['log'])
if test == "regression_exponential":
return an.regression(ranges, data, ['exp'])
if test == "regression_polynomial":
return an.regression(ranges, data, ['ply'])
if test == "regression_sigmoidal":
return an.regression(ranges, data, ['sig'])
def _process_data(self):
tests = self.config["tests"]
data = self.data
input_vector = []
for team in tqdm(data, desc = "Match Module ", unit = " team"):
for variable in data[team]:
if variable in tests:
for test in tests[variable]:
input_vector.append((team, variable, test, data[team][variable]))
self.data = input_vector
self.results = []
for test_var_data in self.data:
self.results.append(self._simplestats(test_var_data))
def _push_results(self):
short_mapping = {"regression_linear": "lin", "regression_logarithmic": "log", "regression_exponential": "exp", "regression_polynomial": "ply", "regression_sigmoidal": "sig"}
class AutoVivification(dict):
def __getitem__(self, item):
try:
return dict.__getitem__(self, item)
except KeyError:
value = self[item] = type(self)()
return value
result_filtered = self.results
input_vector = self.data
return_vector = AutoVivification()
i = 0
for result in result_filtered:
filtered = input_vector[i][2]
try:
short = short_mapping[filtered]
return_vector[input_vector[i][0]][input_vector[i][1]][input_vector[i][2]] = result[short]
except KeyError: # not in mapping
return_vector[input_vector[i][0]][input_vector[i][1]][input_vector[i][2]] = result
i += 1
self.results = return_vector
self.client.push_match(self.results)
class Metric (Module):
config = None
timestamp = None
client = None
data = None
results = None
def __init__(self, config, timestamp, client):
self.config = config
self.timestamp = timestamp
self.client = client
def validate_config(self):
return True, ""
def run(self):
self._load_data()
self._process_data()
self._push_results()
def _load_data(self):
self.data = self.client.pull_new_tba_matches(self.timestamp)
def _process_data(self):
self.results = {}
elo_N = self.config["tests"]["elo"]["N"]
elo_K = self.config["tests"]["elo"]["K"]
matches = self.data
red = {}
blu = {}
for match in tqdm(matches, desc = "Metric Module ", unit = " match"):
red = self.client.load_metric(match, "red", self.config["tests"])
blu = self.client.load_metric(match, "blue", self.config["tests"])
elo_red_total = 0
elo_blu_total = 0
gl2_red_score_total = 0
gl2_blu_score_total = 0
gl2_red_rd_total = 0
gl2_blu_rd_total = 0
gl2_red_vol_total = 0
gl2_blu_vol_total = 0
for team in red:
elo_red_total += red[team]["elo"]["score"]
gl2_red_score_total += red[team]["gl2"]["score"]
gl2_red_rd_total += red[team]["gl2"]["rd"]
gl2_red_vol_total += red[team]["gl2"]["vol"]
for team in blu:
elo_blu_total += blu[team]["elo"]["score"]
gl2_blu_score_total += blu[team]["gl2"]["score"]
gl2_blu_rd_total += blu[team]["gl2"]["rd"]
gl2_blu_vol_total += blu[team]["gl2"]["vol"]
red_elo = {"score": elo_red_total / len(red)}
blu_elo = {"score": elo_blu_total / len(blu)}
red_gl2 = {"score": gl2_red_score_total / len(red), "rd": gl2_red_rd_total / len(red), "vol": gl2_red_vol_total / len(red)}
blu_gl2 = {"score": gl2_blu_score_total / len(blu), "rd": gl2_blu_rd_total / len(blu), "vol": gl2_blu_vol_total / len(blu)}
if match["winner"] == "red":
observations = {"red": 1, "blu": 0}
elif match["winner"] == "blue":
observations = {"red": 0, "blu": 1}
else:
observations = {"red": 0.5, "blu": 0.5}
red_elo_delta = an.Metric().elo(red_elo["score"], blu_elo["score"], observations["red"], elo_N, elo_K) - red_elo["score"]
blu_elo_delta = an.Metric().elo(blu_elo["score"], red_elo["score"], observations["blu"], elo_N, elo_K) - blu_elo["score"]
new_red_gl2_score, new_red_gl2_rd, new_red_gl2_vol = an.Metric().glicko2(red_gl2["score"], red_gl2["rd"], red_gl2["vol"], [blu_gl2["score"]], [blu_gl2["rd"]], [observations["red"], observations["blu"]])
new_blu_gl2_score, new_blu_gl2_rd, new_blu_gl2_vol = an.Metric().glicko2(blu_gl2["score"], blu_gl2["rd"], blu_gl2["vol"], [red_gl2["score"]], [red_gl2["rd"]], [observations["blu"], observations["red"]])
red_gl2_delta = {"score": new_red_gl2_score - red_gl2["score"], "rd": new_red_gl2_rd - red_gl2["rd"], "vol": new_red_gl2_vol - red_gl2["vol"]}
blu_gl2_delta = {"score": new_blu_gl2_score - blu_gl2["score"], "rd": new_blu_gl2_rd - blu_gl2["rd"], "vol": new_blu_gl2_vol - blu_gl2["vol"]}
for team in red:
red[team]["elo"]["score"] = red[team]["elo"]["score"] + red_elo_delta
red[team]["gl2"]["score"] = red[team]["gl2"]["score"] + red_gl2_delta["score"]
red[team]["gl2"]["rd"] = red[team]["gl2"]["rd"] + red_gl2_delta["rd"]
red[team]["gl2"]["vol"] = red[team]["gl2"]["vol"] + red_gl2_delta["vol"]
for team in blu:
blu[team]["elo"]["score"] = blu[team]["elo"]["score"] + blu_elo_delta
blu[team]["gl2"]["score"] = blu[team]["gl2"]["score"] + blu_gl2_delta["score"]
blu[team]["gl2"]["rd"] = blu[team]["gl2"]["rd"] + blu_gl2_delta["rd"]
blu[team]["gl2"]["vol"] = blu[team]["gl2"]["vol"] + blu_gl2_delta["vol"]
temp_vector = {}
temp_vector.update(red)
temp_vector.update(blu)
self.results[match['match']] = temp_vector
self.client.push_metric(temp_vector)
def _push_results(self):
pass
class Pit (Module):
config = None
timestamp = None
client = None
data = None
results = None
def __init__(self, config, timestamp, client):
self.config = config
self.timestamp = timestamp
self.client = client
def validate_config(self):
return True, ""
def run(self):
self._load_data()
self._process_data()
self._push_results()
def _load_data(self):
self.data = self.client.load_pit()
def _process_data(self):
tests = self.config["tests"]
return_vector = {}
for team in tqdm(self.data, desc = "Pit Module ", unit = " team"):
for variable in self.data[team]:
if variable in tests:
if not variable in return_vector:
return_vector[variable] = []
return_vector[variable].append(self.data[team][variable])
self.results = return_vector
def _push_results(self):
self.client.push_pit(self.results)
class Rating (Module):
pass
class Heatmap (Module):
pass
class Sentiment (Module):
pass

19
src/requirements.txt Normal file
View File

@@ -0,0 +1,19 @@
requests
pymongo
pandas
tra-analysis
dnspython
pyinstaller
requests
pymongo
numpy
scipy
scikit-learn
six
pyparsing
pandas
kivy==2.0.0rc2
setuptools>=65.5.1 # not directly required, pinned by Snyk to avoid a vulnerability

View File

@@ -3,42 +3,10 @@
# Notes: # Notes:
# setup: # setup:
__version__ = "1.0.0" __version__ = "0.8.6"
# changelog should be viewed using print(analysis.__changelog__) # changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog: __changelog__ = """changelog:
1.0.0:
- superscript now runs in PEP 3143 compliant well behaved daemon on Linux systems
- removed daemon and socket functionality, user can implement using external software
- added verbose option to linux superscript to allow for interactive output
- moved pymongo import to superscript.py
- added profile option to linux superscript to profile runtime of script
- reduced memory usage slightly by consolidating the unwrapped input data
- added debug option, which performs one loop of analysis and dumps results to local files
- added event and time delay options to config
- event delay pauses loop until even listener recieves an update
- time delay pauses loop until the time specified has elapsed since the BEGINNING of previous loop
- added options to pull config information from database (reatins option to use local config file)
- config-preference option selects between prioritizing local config and prioritizing database config
- synchronize-config option selects whether to update the non prioritized config with the prioritized one
- divided config options between persistent ones (keys), and variable ones (everything else)
- generalized behavior of various core components by collecting loose functions in several dependencies into classes
- module.py contains classes, each one represents a single data analysis routine
- config.py contains the Configuration class, which stores the configuration information and abstracts the getter methods
0.9.3:
- improved data loading performance by removing redundant PyMongo client creation (120s to 14s)
- passed singular instance of PyMongo client as standin for apikey parameter in all data.py functions
0.9.2:
- removed unessasary imports from data
- minor changes to interface
0.9.1:
- fixed bugs in configuration item loading exception handling
0.9.0:
- moved printing and logging related functions to interface.py (changelog will stay in this file)
- changed function return files for load_config and save_config to standard C values (0 for success, 1 for error)
- added local variables for config location
- moved dataset getting and setting functions to dataset.py (changelog will stay in this file)
- moved matchloop, metricloop, pitloop and helper functions (simplestats) to processing.py
0.8.6: 0.8.6:
- added proper main function - added proper main function
0.8.5: 0.8.5:
@@ -146,157 +114,514 @@ __author__ = (
"Jacob Levine <jlevine@imsa.edu>", "Jacob Levine <jlevine@imsa.edu>",
) )
__all__ = [
"load_config",
"save_config",
"get_previous_time",
"load_match",
"matchloop",
"load_metric",
"metricloop",
"load_pit",
"pitloop",
"push_match",
"push_metric",
"push_pit",
]
# imports: # imports:
import argparse, sys, time, traceback, warnings from tra_analysis import analysis as an
from config import Configuration, ConfigurationError import data as d
from data import Client from collections import defaultdict
from interface import Logger import json
from module import Match, Metric, Pit import math
import numpy as np
import os
from os import system, name
from pathlib import Path
from multiprocessing import Pool
import platform
import sys
import time
import warnings
def main(logger, verbose, profile, debug, config_path): global exec_threads
def close_all(): def main():
if "client" in locals():
client.close() global exec_threads
sys.stderr = open("errorlog.txt", "w")
warnings.filterwarnings("ignore") warnings.filterwarnings("ignore")
logger.splash(__version__) splash()
modules = {"match": Match, "metric": Metric, "pit": Pit} while (True):
while True:
try: try:
loop_start = time.time() current_time = time.time()
print("[OK] time: " + str(current_time))
logger.info("current time: " + str(loop_start)) config = load_config("config.json")
competition = config["competition"]
match_tests = config["statistics"]["match"]
pit_tests = config["statistics"]["pit"]
metrics_tests = config["statistics"]["metric"]
print("[OK] configs loaded")
config = Configuration(config_path) print("[OK] starting threads")
cfg_max_threads = config["max-threads"]
logger.info("found and loaded config at <" + config_path + ">") sys_max_threads = os.cpu_count()
if cfg_max_threads > -sys_max_threads and cfg_max_threads < 0 :
client = Client(config) alloc_processes = sys_max_threads + cfg_max_threads
elif cfg_max_threads > 0 and cfg_max_threads < 1:
logger.info("established connection to database") alloc_processes = math.floor(cfg_max_threads * sys_max_threads)
elif cfg_max_threads > 1 and cfg_max_threads <= sys_max_threads:
previous_time = client.get_previous_time() alloc_processes = cfg_max_threads
elif cfg_max_threads == 0:
logger.info("analysis backtimed to: " + str(previous_time)) alloc_processes = sys_max_threads
config.resolve_config_conflicts(logger, client)
config_modules, competition = config.modules, config.competition
client.competition = competition
for m in config_modules:
if m in modules:
start = time.time()
current_module = modules[m](config_modules[m], previous_time, client)
valid = current_module.validate_config()
if not valid:
continue
current_module.run()
logger.info(m + " module finished in " + str(time.time() - start) + " seconds")
if debug:
logger.save_module_to_file(m, current_module.data, current_module.results) # logging flag check done in logger
client.set_current_time(loop_start)
close_all()
logger.info("closed threads and database client")
logger.info("finished all tasks in " + str(time.time() - loop_start) + " seconds, looping")
if profile:
return 0
if debug:
return 0
event_delay = config["variable"]["event-delay"]
if event_delay:
logger.info("loop delayed until database returns new matches")
new_match = False
while not new_match:
time.sleep(1)
new_match = client.check_new_database_matches()
logger.info("database returned new matches")
else: else:
loop_delay = float(config["variable"]["loop-delay"]) print("[ERROR] Invalid number of processes, must be between -" + str(sys_max_threads) + " and " + str(sys_max_threads))
remaining_time = loop_delay - (time.time() - loop_start) exit()
if remaining_time > 0: exec_threads = Pool(processes = alloc_processes)
logger.info("loop delayed by " + str(remaining_time) + " seconds") print("[OK] " + str(alloc_processes) + " threads started")
time.sleep(remaining_time)
apikey = config["key"]["database"]
tbakey = config["key"]["tba"]
print("[OK] loaded keys")
previous_time = get_previous_time(apikey)
print("[OK] analysis backtimed to: " + str(previous_time))
print("[OK] loading data")
start = time.time()
match_data = load_match(apikey, competition)
pit_data = load_pit(apikey, competition)
print("[OK] loaded data in " + str(time.time() - start) + " seconds")
print("[OK] running match stats")
start = time.time()
matchloop(apikey, competition, match_data, match_tests)
print("[OK] finished match stats in " + str(time.time() - start) + " seconds")
print("[OK] running team metrics")
start = time.time()
metricloop(tbakey, apikey, competition, previous_time, metrics_tests)
print("[OK] finished team metrics in " + str(time.time() - start) + " seconds")
print("[OK] running pit analysis")
start = time.time()
pitloop(apikey, competition, pit_data, pit_tests)
print("[OK] finished pit analysis in " + str(time.time() - start) + " seconds")
set_current_time(apikey, current_time)
print("[OK] finished all tests, looping")
print_hrule()
except KeyboardInterrupt: except KeyboardInterrupt:
close_all() print("\n[OK] caught KeyboardInterrupt, killing processes")
logger.info("detected KeyboardInterrupt, exiting") exec_threads.terminate()
return 0 print("[OK] processes killed, exiting")
exit()
except ConfigurationError as e: else:
str_e = "".join(traceback.format_exception(e)) pass
logger.error("encountered a configuration error: " + str(e))
logger.error(str_e)
close_all()
return 1
except Exception as e: #clear()
str_e = "".join(traceback.format_exception(e))
logger.error("encountered an exception while running")
logger.error(str_e)
close_all()
return 1
def start(verbose, profile, debug, config_path, log_path): def clear():
logger = Logger(verbose, profile, debug, file = log_path) # for windows
if name == 'nt':
_ = system('cls')
if profile: # for mac and linux(here, os.name is 'posix')
else:
_ = system('clear')
import cProfile, pstats, io def print_hrule():
profile = cProfile.Profile()
profile.enable()
exit_code = main(logger, verbose, profile, debug, config_path)
profile.disable()
f = open("profile.txt", "w+")
ps = pstats.Stats(profile, stream = f).sort_stats("cumtime")
ps.print_stats()
sys.exit(exit_code)
elif verbose: print("#"+38*"-"+"#")
exit_code = main(logger, verbose, profile, debug, config_path) def print_box(s):
sys.exit(exit_code)
elif debug: temp = "|"
temp += s
temp += (40-len(s)-2)*" "
temp += "|"
print(temp)
exit_code = main(logger, verbose, profile, debug, config_path) def splash():
sys.exit(exit_code)
print_hrule()
print_box(" superscript version: " + __version__)
print_box(" os: " + platform.system())
print_box(" python: " + platform.python_version())
print_hrule()
def load_config(file):
config_vector = {}
try:
f = open(file)
except:
print("[ERROR] could not locate config.json, generating blank config.json and exiting")
f = open(file, "w")
f.write(sample_json)
exit()
config_vector = json.load(f)
return config_vector
def save_config(file, config_vector):
with open(file) as f:
json.dump(config_vector, f)
def get_previous_time(apikey):
previous_time = d.get_analysis_flags(apikey, "latest_update")
if previous_time == None:
d.set_analysis_flags(apikey, "latest_update", 0)
previous_time = 0
else: else:
pass # must be vebose, debug or profile previous_time = previous_time["latest_update"]
return previous_time
def set_current_time(apikey, current_time):
d.set_analysis_flags(apikey, "latest_update", {"latest_update":current_time})
def load_match(apikey, competition):
return d.get_match_data_formatted(apikey, competition)
def simplestats(data_test):
data = np.array(data_test[0])
data = data[np.isfinite(data)]
ranges = list(range(len(data)))
test = data_test[1]
if test == "basic_stats":
return an.basic_stats(data)
if test == "historical_analysis":
return an.histo_analysis([ranges, data])
if test == "regression_linear":
return an.regression(ranges, data, ['lin'])
if test == "regression_logarithmic":
return an.regression(ranges, data, ['log'])
if test == "regression_exponential":
return an.regression(ranges, data, ['exp'])
if test == "regression_polynomial":
return an.regression(ranges, data, ['ply'])
if test == "regression_sigmoidal":
return an.regression(ranges, data, ['sig'])
def matchloop(apikey, competition, data, tests): # expects 3D array with [Team][Variable][Match]
global exec_threads
short_mapping = {"regression_linear": "lin", "regression_logarithmic": "log", "regression_exponential": "exp", "regression_polynomial": "ply", "regression_sigmoidal": "sig"}
class AutoVivification(dict):
def __getitem__(self, item):
try:
return dict.__getitem__(self, item)
except KeyError:
value = self[item] = type(self)()
return value
return_vector = {}
team_filtered = []
variable_filtered = []
variable_data = []
test_filtered = []
result_filtered = []
return_vector = AutoVivification()
for team in data:
for variable in data[team]:
if variable in tests:
for test in tests[variable]:
team_filtered.append(team)
variable_filtered.append(variable)
variable_data.append((data[team][variable], test))
test_filtered.append(test)
result_filtered = exec_threads.map(simplestats, variable_data)
i = 0
result_filtered = list(result_filtered)
for result in result_filtered:
filtered = test_filtered[i]
try:
short = short_mapping[filtered]
return_vector[team_filtered[i]][variable_filtered[i]][test_filtered[i]] = result[short]
except KeyError: # not in mapping
return_vector[team_filtered[i]][variable_filtered[i]][test_filtered[i]] = result
i += 1
push_match(apikey, competition, return_vector)
def load_metric(apikey, competition, match, group_name, metrics):
group = {}
for team in match[group_name]:
db_data = d.get_team_metrics_data(apikey, competition, team)
if d.get_team_metrics_data(apikey, competition, team) == None:
elo = {"score": metrics["elo"]["score"]}
gl2 = {"score": metrics["gl2"]["score"], "rd": metrics["gl2"]["rd"], "vol": metrics["gl2"]["vol"]}
ts = {"mu": metrics["ts"]["mu"], "sigma": metrics["ts"]["sigma"]}
group[team] = {"elo": elo, "gl2": gl2, "ts": ts}
else:
metrics = db_data["metrics"]
elo = metrics["elo"]
gl2 = metrics["gl2"]
ts = metrics["ts"]
group[team] = {"elo": elo, "gl2": gl2, "ts": ts}
return group
def metricloop(tbakey, apikey, competition, timestamp, metrics): # listener based metrics update
elo_N = metrics["elo"]["N"]
elo_K = metrics["elo"]["K"]
matches = d.pull_new_tba_matches(tbakey, competition, timestamp)
red = {}
blu = {}
for match in matches:
red = load_metric(apikey, competition, match, "red", metrics)
blu = load_metric(apikey, competition, match, "blue", metrics)
elo_red_total = 0
elo_blu_total = 0
gl2_red_score_total = 0
gl2_blu_score_total = 0
gl2_red_rd_total = 0
gl2_blu_rd_total = 0
gl2_red_vol_total = 0
gl2_blu_vol_total = 0
for team in red:
elo_red_total += red[team]["elo"]["score"]
gl2_red_score_total += red[team]["gl2"]["score"]
gl2_red_rd_total += red[team]["gl2"]["rd"]
gl2_red_vol_total += red[team]["gl2"]["vol"]
for team in blu:
elo_blu_total += blu[team]["elo"]["score"]
gl2_blu_score_total += blu[team]["gl2"]["score"]
gl2_blu_rd_total += blu[team]["gl2"]["rd"]
gl2_blu_vol_total += blu[team]["gl2"]["vol"]
red_elo = {"score": elo_red_total / len(red)}
blu_elo = {"score": elo_blu_total / len(blu)}
red_gl2 = {"score": gl2_red_score_total / len(red), "rd": gl2_red_rd_total / len(red), "vol": gl2_red_vol_total / len(red)}
blu_gl2 = {"score": gl2_blu_score_total / len(blu), "rd": gl2_blu_rd_total / len(blu), "vol": gl2_blu_vol_total / len(blu)}
if match["winner"] == "red":
observations = {"red": 1, "blu": 0}
elif match["winner"] == "blue":
observations = {"red": 0, "blu": 1}
else:
observations = {"red": 0.5, "blu": 0.5}
red_elo_delta = an.Metric().elo(red_elo["score"], blu_elo["score"], observations["red"], elo_N, elo_K) - red_elo["score"]
blu_elo_delta = an.Metric().elo(blu_elo["score"], red_elo["score"], observations["blu"], elo_N, elo_K) - blu_elo["score"]
new_red_gl2_score, new_red_gl2_rd, new_red_gl2_vol = an.Metric().glicko2(red_gl2["score"], red_gl2["rd"], red_gl2["vol"], [blu_gl2["score"]], [blu_gl2["rd"]], [observations["red"], observations["blu"]])
new_blu_gl2_score, new_blu_gl2_rd, new_blu_gl2_vol = an.Metric().glicko2(blu_gl2["score"], blu_gl2["rd"], blu_gl2["vol"], [red_gl2["score"]], [red_gl2["rd"]], [observations["blu"], observations["red"]])
red_gl2_delta = {"score": new_red_gl2_score - red_gl2["score"], "rd": new_red_gl2_rd - red_gl2["rd"], "vol": new_red_gl2_vol - red_gl2["vol"]}
blu_gl2_delta = {"score": new_blu_gl2_score - blu_gl2["score"], "rd": new_blu_gl2_rd - blu_gl2["rd"], "vol": new_blu_gl2_vol - blu_gl2["vol"]}
for team in red:
red[team]["elo"]["score"] = red[team]["elo"]["score"] + red_elo_delta
red[team]["gl2"]["score"] = red[team]["gl2"]["score"] + red_gl2_delta["score"]
red[team]["gl2"]["rd"] = red[team]["gl2"]["rd"] + red_gl2_delta["rd"]
red[team]["gl2"]["vol"] = red[team]["gl2"]["vol"] + red_gl2_delta["vol"]
for team in blu:
blu[team]["elo"]["score"] = blu[team]["elo"]["score"] + blu_elo_delta
blu[team]["gl2"]["score"] = blu[team]["gl2"]["score"] + blu_gl2_delta["score"]
blu[team]["gl2"]["rd"] = blu[team]["gl2"]["rd"] + blu_gl2_delta["rd"]
blu[team]["gl2"]["vol"] = blu[team]["gl2"]["vol"] + blu_gl2_delta["vol"]
temp_vector = {}
temp_vector.update(red)
temp_vector.update(blu)
push_metric(apikey, competition, temp_vector)
def load_pit(apikey, competition):
return d.get_pit_data_formatted(apikey, competition)
def pitloop(apikey, competition, pit, tests):
return_vector = {}
for team in pit:
for variable in pit[team]:
if variable in tests:
if not variable in return_vector:
return_vector[variable] = []
return_vector[variable].append(pit[team][variable])
push_pit(apikey, competition, return_vector)
def push_match(apikey, competition, results):
for team in results:
d.push_team_tests_data(apikey, competition, team, results[team])
def push_metric(apikey, competition, metric):
for team in metric:
d.push_team_metrics_data(apikey, competition, team, metric[team])
def push_pit(apikey, competition, pit):
for variable in pit:
d.push_team_pit_data(apikey, competition, variable, pit[variable])
def get_team_metrics(apikey, tbakey, competition):
metrics = d.get_metrics_data_formatted(apikey, competition)
elo = {}
gl2 = {}
for team in metrics:
elo[team] = metrics[team]["metrics"]["elo"]["score"]
gl2[team] = metrics[team]["metrics"]["gl2"]["score"]
elo = {k: v for k, v in sorted(elo.items(), key=lambda item: item[1])}
gl2 = {k: v for k, v in sorted(gl2.items(), key=lambda item: item[1])}
elo_ranked = []
for team in elo:
elo_ranked.append({"team": str(team), "elo": str(elo[team])})
gl2_ranked = []
for team in gl2:
gl2_ranked.append({"team": str(team), "gl2": str(gl2[team])})
return {"elo-ranks": elo_ranked, "glicko2-ranks": gl2_ranked}
sample_json = """{
"max-threads": 0.5,
"team": "",
"competition": "2020ilch",
"key":{
"database":"",
"tba":""
},
"statistics":{
"match":{
"balls-blocked":["basic_stats","historical_analysis","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"],
"balls-collected":["basic_stats","historical_analysis","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"],
"balls-lower-teleop":["basic_stats","historical_analysis","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"],
"balls-lower-auto":["basic_stats","historical_analysis","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"],
"balls-started":["basic_stats","historical_analyss","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"],
"balls-upper-teleop":["basic_stats","historical_analysis","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"],
"balls-upper-auto":["basic_stats","historical_analysis","regression_linear","regression_logarithmic","regression_exponential","regression_polynomial","regression_sigmoidal"]
},
"metric":{
"elo":{
"score":1500,
"N":400,
"K":24
},
"gl2":{
"score":1500,
"rd":250,
"vol":0.06
},
"ts":{
"mu":25,
"sigma":8.33
}
},
"pit":{
"wheel-mechanism":true,
"low-balls":true,
"high-balls":true,
"wheel-success":true,
"strategic-focus":true,
"climb-mechanism":true,
"attitude":true
}
}
}"""
if __name__ == "__main__": if __name__ == "__main__":
if sys.platform.startswith('win'):
parser = argparse.ArgumentParser(description = "TRA data processing application.") multiprocessing.freeze_support()
parser.add_argument("mode", metavar = "MODE", type = str, nargs = 1, choices = ["verbose", "profile", "debug"], help = "verbose, debug, profile") main()
parser.add_argument("--config", dest = "config", default = "config.json", type = str, help = "path to config file")
parser.add_argument("--logfile", dest = "logfile", default = "logfile.log", type = str, help = "path to log file")
args = parser.parse_args()
mode = args.mode[0]
config_path = args.config
log_path = args.logfile
if mode == "verbose":
start(True, False, False, config_path = config_path, log_path = log_path)
elif mode == "profile":
start(False, True, False, config_path = config_path, log_path = log_path)
elif mode == "debug":
start(False, False, True, config_path = config_path, log_path = log_path)
exit(0)

37
src/superscript.spec Normal file
View File

@@ -0,0 +1,37 @@
# -*- mode: python ; coding: utf-8 -*-
block_cipher = None
a = Analysis(['superscript.py'],
pathex=['/workspaces/tra-data-analysis/src'],
binaries=[],
datas=[],
hiddenimports=[
"dnspython",
"sklearn.utils._weight_vector",
"requests",
],
hookspath=[],
runtime_hooks=[],
excludes=[],
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False)
pyz = PYZ(a.pure, a.zipped_data,
cipher=block_cipher)
exe = EXE(pyz,
a.scripts,
a.binaries,
a.zipfiles,
a.datas,
[('W ignore', None, 'OPTION')],
name='superscript',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True )

View File

@@ -1,14 +0,0 @@
import signal
import zmq
signal.signal(signal.SIGINT, signal.SIG_DFL)
context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect('tcp://localhost:5678')
socket.setsockopt(zmq.SUBSCRIBE, b'status')
while True:
message = socket.recv_multipart()
print(f'Received: {message}')