43 Commits

Author SHA1 Message Date
ltcptgeneral
375befd0c4 analysis pkg v 1.0.0.9 2020-03-17 20:03:49 -05:00
ltcptgeneral
893d1fb1d0 analysis.py v 1.1.13.007 2020-03-16 22:05:52 -05:00
ltcptgeneral
6a426ae4cd a 2020-03-10 00:45:42 -05:00
ltcptgeneral
50c064ffa4 worked 2020-03-09 22:58:51 -05:00
ltcptgeneral
1b0a9967c8 test1 2020-03-09 22:58:11 -05:00
ltcptgeneral
2605f7c29f Merge pull request #6 from titanscout2022/testing
Testing
2020-03-09 20:42:30 -05:00
ltcptgeneral
6f5a3edd88 superscript.py v 0.0.5.000 2020-03-09 20:35:11 -05:00
ltcptgeneral
457146b0e4 working 2020-03-09 20:29:44 -05:00
ltcptgeneral
f7fd8ffcf9 working 2020-03-09 20:18:30 -05:00
art
77bc792426 removed unessasary stuff 2020-03-09 10:29:59 -05:00
ltcptgeneral
39146cc555 Merge pull request #5 from titanscout2022/comp-edits
Comp edits
2020-03-09 10:28:48 -05:00
ltcptgeneral
04141bbec8 analysis.py v 1.1.13.006
regression.py v 1.0.0.003
analysis pkg v 1.0.0.8
2020-03-08 16:48:19 -05:00
ltcptgeneral
40e5899972 added get_team_rakings.py 2020-03-08 14:26:21 -05:00
ltcptgeneral
025c7f9b3c a 2020-03-06 21:39:46 -06:00
Dev Singh
2daa09c040 hi 2020-03-06 21:21:37 -06:00
ltcptgeneral
9776136649 superscript.py v 0.0.4.002 2020-03-06 21:09:16 -06:00
Dev Singh
68d27a6302 add reqs 2020-03-06 20:44:40 -06:00
Dev Singh
7fc18b7c35 add Procfile 2020-03-06 20:41:53 -06:00
ltcptgeneral
9b412b51a8 analysis pkg v 1.0.0.7 2020-03-06 20:32:41 -06:00
ltcptgeneral
b6ac05a66e Merge pull request #4 from titanscout2022/comp-edits
Comp edits merge
2020-03-06 20:29:50 -06:00
Dev Singh
435c8a7bc6 tiny brain fix 2020-03-06 14:52:41 -06:00
Dev Singh
a69b18354b ultimate carl the fat kid brain working 2020-03-06 14:50:54 -06:00
Dev Singh
7b9e6921d0 ultra galaxybrain working 2020-03-06 14:44:13 -06:00
Dev Singh
fb2800cf9e fix 2020-03-06 13:12:01 -06:00
Dev Singh
12cbb21077 super ultra working 2020-03-06 12:43:01 -06:00
Dev Singh
46d1a48999 even more working 2020-03-06 12:21:17 -06:00
Dev Singh
ad0a761d53 more working 2020-03-06 12:18:42 -06:00
Dev Singh
43f503a38d working 2020-03-06 12:15:35 -06:00
Dev Singh
d38744438b working 2020-03-06 11:50:07 -06:00
Dev Singh
eb8914aa26 maybe working 2020-03-06 11:27:32 -06:00
Dev Singh
283140094f a 2020-03-06 11:18:02 -06:00
Dev Singh
66ac1c304e testing part 2 better electric boogaloo 2020-03-06 11:16:24 -06:00
Dev Singh
0eb9e07711 testing 2020-03-06 11:14:10 -06:00
Dev Singh
f56c85b298 10:57 2020-03-06 10:57:39 -06:00
Dev Singh
6a9a17c5b4 10:43 2020-03-06 10:43:45 -06:00
Dev Singh
e24c49bedb 10:25 2020-03-06 10:25:20 -06:00
Dev Singh
2daed73aaa 10:21 unverified 2020-03-06 10:21:23 -06:00
art
8ebdb3b89b superscript.py v 0.0.3.000 2020-03-05 22:52:02 -06:00
art
a0e1293361 analysis.py v 1.1.13.001
analysis pkg v 1.0.0.006
2020-03-05 13:18:33 -06:00
art
b669e55283 analysis pkg v 1.0.0.005 2020-03-05 12:44:09 -06:00
art
3e38446eae analysis pkg v 1.0.0.004 2020-03-05 12:29:58 -06:00
art
dac0a4a0cd analysis.py v 1.1.13.000 2020-03-05 12:28:16 -06:00
art
897ba03078 removed unessasaary folders and files 2020-03-05 11:17:49 -06:00
37 changed files with 530 additions and 2714 deletions

7
.gitignore vendored
View File

@@ -16,4 +16,9 @@ data analysis/.ipynb_checkpoints/test-checkpoint.ipynb
.vscode
data analysis/arthur_pull.ipynb
data analysis/keys.txt
data analysis/check_for_new_matches.ipynb
data analysis/check_for_new_matches.ipynb
data analysis/test.ipynb
data analysis/visualize_pit.ipynb
data analysis/config/keys.config
analysis-master/analysis/__pycache__/
data analysis/__pycache__/

View File

@@ -1,6 +1,6 @@
Metadata-Version: 2.1
Name: analysis
Version: 1.0.0.3
Version: 1.0.0.9
Summary: analysis package developed by Titan Scouting for The Red Alliance
Home-page: https://github.com/titanscout2022/tr2022-strategy
Author: The Titan Scouting Team

View File

@@ -8,4 +8,5 @@ analysis/visualization.py
analysis.egg-info/PKG-INFO
analysis.egg-info/SOURCES.txt
analysis.egg-info/dependency_links.txt
analysis.egg-info/requires.txt
analysis.egg-info/top_level.txt

View File

@@ -0,0 +1,6 @@
numba
numpy
scipy
scikit-learn
six
matplotlib

View File

@@ -1,952 +0,0 @@
# Titan Robotics Team 2022: Data Analysis Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this should be imported as a python module using 'import analysis'
# this should be included in the local directory or environment variable
# this module has been optimized for multhreaded computing
# current benchmark of optimization: 1.33 times faster
# setup:
__version__ = "1.1.12.003"
# changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
1.1.12.003:
- removed depreciated code
1.1.12.002:
- removed team first time trueskill instantiation in favor of integration in superscript.py
1.1.12.001:
- improved readibility of regression outputs by stripping tensor data
- used map with lambda to acheive the improved readibility
- lost numba jit support with regression, and generated_jit hangs at execution
- TODO: reimplement correct numba integration in regression
1.1.12.000:
- temporarily fixed polynomial regressions by using sklearn's PolynomialFeatures
1.1.11.010:
- alphabeticaly ordered import lists
1.1.11.009:
- bug fixes
1.1.11.008:
- bug fixes
1.1.11.007:
- bug fixes
1.1.11.006:
- tested min and max
- bug fixes
1.1.11.005:
- added min and max in basic_stats
1.1.11.004:
- bug fixes
1.1.11.003:
- bug fixes
1.1.11.002:
- consolidated metrics
- fixed __all__
1.1.11.001:
- added test/train split to RandomForestClassifier and RandomForestRegressor
1.1.11.000:
- added RandomForestClassifier and RandomForestRegressor
- note: untested
1.1.10.000:
- added numba.jit to remaining functions
1.1.9.002:
- kernelized PCA and KNN
1.1.9.001:
- fixed bugs with SVM and NaiveBayes
1.1.9.000:
- added SVM class, subclasses, and functions
- note: untested
1.1.8.000:
- added NaiveBayes classification engine
- note: untested
1.1.7.000:
- added knn()
- added confusion matrix to decisiontree()
1.1.6.002:
- changed layout of __changelog to be vscode friendly
1.1.6.001:
- added additional hyperparameters to decisiontree()
1.1.6.000:
- fixed __version__
- fixed __all__ order
- added decisiontree()
1.1.5.003:
- added pca
1.1.5.002:
- reduced import list
- added kmeans clustering engine
1.1.5.001:
- simplified regression by using .to(device)
1.1.5.000:
- added polynomial regression to regression(); untested
1.1.4.000:
- added trueskill()
1.1.3.002:
- renamed regression class to Regression, regression_engine() to regression gliko2_engine class to Gliko2
1.1.3.001:
- changed glicko2() to return tuple instead of array
1.1.3.000:
- added glicko2_engine class and glicko()
- verified glicko2() accuracy
1.1.2.003:
- fixed elo()
1.1.2.002:
- added elo()
- elo() has bugs to be fixed
1.1.2.001:
- readded regrression import
1.1.2.000:
- integrated regression.py as regression class
- removed regression import
- fixed metadata for regression class
- fixed metadata for analysis class
1.1.1.001:
- regression_engine() bug fixes, now actaully regresses
1.1.1.000:
- added regression_engine()
- added all regressions except polynomial
1.1.0.007:
- updated _init_device()
1.1.0.006:
- removed useless try statements
1.1.0.005:
- removed impossible outcomes
1.1.0.004:
- added performance metrics (r^2, mse, rms)
1.1.0.003:
- resolved nopython mode for mean, median, stdev, variance
1.1.0.002:
- snapped (removed) majority of uneeded imports
- forced object mode (bad) on all jit
- TODO: stop numba complaining about not being able to compile in nopython mode
1.1.0.001:
- removed from sklearn import * to resolve uneeded wildcard imports
1.1.0.000:
- removed c_entities,nc_entities,obstacles,objectives from __all__
- applied numba.jit to all functions
- depreciated and removed stdev_z_split
- cleaned up histo_analysis to include numpy and numba.jit optimizations
- depreciated and removed all regression functions in favor of future pytorch optimizer
- depreciated and removed all nonessential functions (basic_analysis, benchmark, strip_data)
- optimized z_normalize using sklearn.preprocessing.normalize
- TODO: implement kernel/function based pytorch regression optimizer
1.0.9.000:
- refactored
- numpyed everything
- removed stats in favor of numpy functions
1.0.8.005:
- minor fixes
1.0.8.004:
- removed a few unused dependencies
1.0.8.003:
- added p_value function
1.0.8.002:
- updated __all__ correctly to contain changes made in v 1.0.8.000 and v 1.0.8.001
1.0.8.001:
- refactors
- bugfixes
1.0.8.000:
- depreciated histo_analysis_old
- depreciated debug
- altered basic_analysis to take array data instead of filepath
- refactor
- optimization
1.0.7.002:
- bug fixes
1.0.7.001:
- bug fixes
1.0.7.000:
- added tanh_regression (logistical regression)
- bug fixes
1.0.6.005:
- added z_normalize function to normalize dataset
- bug fixes
1.0.6.004:
- bug fixes
1.0.6.003:
- bug fixes
1.0.6.002:
- bug fixes
1.0.6.001:
- corrected __all__ to contain all of the functions
1.0.6.000:
- added calc_overfit, which calculates two measures of overfit, error and performance
- added calculating overfit to optimize_regression
1.0.5.000:
- added optimize_regression function, which is a sample function to find the optimal regressions
- optimize_regression function filters out some overfit funtions (functions with r^2 = 1)
- planned addition: overfit detection in the optimize_regression function
1.0.4.002:
- added __changelog__
- updated debug function with log and exponential regressions
1.0.4.001:
- added log regressions
- added exponential regressions
- added log_regression and exp_regression to __all__
1.0.3.008:
- added debug function to further consolidate functions
1.0.3.007:
- added builtin benchmark function
- added builtin random (linear) data generation function
- added device initialization (_init_device)
1.0.3.006:
- reorganized the imports list to be in alphabetical order
- added search and regurgitate functions to c_entities, nc_entities, obstacles, objectives
1.0.3.005:
- major bug fixes
- updated historical analysis
- depreciated old historical analysis
1.0.3.004:
- added __version__, __author__, __all__
- added polynomial regression
- added root mean squared function
- added r squared function
1.0.3.003:
- bug fixes
- added c_entities
1.0.3.002:
- bug fixes
- added nc_entities, obstacles, objectives
- consolidated statistics.py to analysis.py
1.0.3.001:
- compiled 1d, column, and row basic stats into basic stats function
1.0.3.000:
- added historical analysis function
1.0.2.xxx:
- added z score test
1.0.1.xxx:
- major bug fixes
1.0.0.xxx:
- added loading csv
- added 1d, column, row basic stats
"""
__author__ = (
"Arthur Lu <learthurgo@gmail.com>",
"Jacob Levine <jlevine@imsa.edu>",
)
__all__ = [
'_init_device',
'load_csv',
'basic_stats',
'z_score',
'z_normalize',
'histo_analysis',
'regression',
'elo',
'gliko2',
'trueskill',
'RegressionMetrics',
'ClassificationMetrics',
'kmeans',
'pca',
'decisiontree',
'knn_classifier',
'knn_regressor',
'NaiveBayes',
'SVM',
'random_forest_classifier',
'random_forest_regressor',
'Regression',
'Gliko2',
# all statistics functions left out due to integration in other functions
]
# now back to your regularly scheduled programming:
# imports (now in alphabetical order! v 1.0.3.006):
import csv
import numba
from numba import jit
import numpy as np
import math
import sklearn
from sklearn import *
import torch
try:
from analysis import trueskill as Trueskill
except:
import trueskill as Trueskill
class error(ValueError):
pass
def _init_device(): # initiates computation device for ANNs
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
return device
def load_csv(filepath):
with open(filepath, newline='') as csvfile:
file_array = np.array(list(csv.reader(csvfile)))
csvfile.close()
return file_array
# expects 1d array
@jit(forceobj=True)
def basic_stats(data):
data_t = np.array(data).astype(float)
_mean = mean(data_t)
_median = median(data_t)
_stdev = stdev(data_t)
_variance = variance(data_t)
_min = npmin(data_t)
_max = npmax(data_t)
return _mean, _median, _stdev, _variance, _min, _max
# returns z score with inputs of point, mean and standard deviation of spread
@jit(forceobj=True)
def z_score(point, mean, stdev):
score = (point - mean) / stdev
return score
# expects 2d array, normalizes across all axes
@jit(forceobj=True)
def z_normalize(array, *args):
array = np.array(array)
for arg in args:
array = sklearn.preprocessing.normalize(array, axis = arg)
return array
@jit(forceobj=True)
# expects 2d array of [x,y]
def histo_analysis(hist_data):
hist_data = np.array(hist_data)
derivative = np.array(len(hist_data) - 1, dtype = float)
t = np.diff(hist_data)
derivative = t[1] / t[0]
np.sort(derivative)
return basic_stats(derivative)[0], basic_stats(derivative)[3]
def regression(ndevice, inputs, outputs, args, loss = torch.nn.MSELoss(), _iterations = 10000, lr = 0.01, _iterations_ply = 10000, lr_ply = 0.01): # inputs, outputs expects N-D array
regressions = []
Regression().set_device(ndevice)
if 'lin' in args: # formula: ax + b
model = Regression().SGDTrain(Regression.LinearRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor([outputs]).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
if 'log' in args: # formula: a log (b(x + c)) + d
model = Regression().SGDTrain(Regression.LogRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
if 'exp' in args: # formula: a e ^ (b(x + c)) + d
model = Regression().SGDTrain(Regression.ExpRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
if 'ply' in args: # formula: a + bx^1 + cx^2 + dx^3 + ...
plys = []
limit = len(outputs[0])
for i in range(2, limit):
model = sklearn.preprocessing.PolynomialFeatures(degree = i)
model = sklearn.pipeline.make_pipeline(model, sklearn.linear_model.LinearRegression())
model = model.fit(np.rot90(inputs), np.rot90(outputs))
params = model.steps[1][1].intercept_.tolist()
params = np.append(params, model.steps[1][1].coef_[0].tolist()[1::])
params.flatten()
params = params.tolist()
plys.append(params)
regressions.append(plys)
if 'sig' in args: # formula: a sig (b(x + c)) + d | sig() = 1/(1 + e ^ -x)
model = Regression().SGDTrain(Regression.SigmoidalRegKernelArthur(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
return regressions
@jit(nopython=True)
def elo(starting_score, opposing_score, observed, N, K):
expected = 1/(1+10**((np.array(opposing_score) - starting_score)/N))
return starting_score + K*(np.sum(observed) - np.sum(expected))
@jit(forceobj=True)
def gliko2(starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations):
player = Gliko2(rating = starting_score, rd = starting_rd, vol = starting_vol)
player.update_player([x for x in opposing_score], [x for x in opposing_rd], observations)
return (player.rating, player.rd, player.vol)
@jit(forceobj=True)
def trueskill(teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]]
team_ratings = []
for team in teams_data:
team_temp = []
for player in team:
player = Trueskill.Rating(player[0], player[1])
team_temp.append(player)
team_ratings.append(team_temp)
return Trueskill.rate(teams_data, observations)
class RegressionMetrics():
def __new__(cls, predictions, targets):
return cls.r_squared(cls, predictions, targets), cls.mse(cls, predictions, targets), cls.rms(cls, predictions, targets)
def r_squared(self, predictions, targets): # assumes equal size inputs
return sklearn.metrics.r2_score(targets, predictions)
def mse(self, predictions, targets):
return sklearn.metrics.mean_squared_error(targets, predictions)
def rms(self, predictions, targets):
return math.sqrt(sklearn.metrics.mean_squared_error(targets, predictions))
class ClassificationMetrics():
def __new__(cls, predictions, targets):
return cls.cm(cls, predictions, targets), cls.cr(cls, predictions, targets)
def cm(self, predictions, targets):
return sklearn.metrics.confusion_matrix(targets, predictions)
def cr(self, predictions, targets):
return sklearn.metrics.classification_report(targets, predictions)
@jit(nopython=True)
def mean(data):
return np.mean(data)
@jit(nopython=True)
def median(data):
return np.median(data)
@jit(nopython=True)
def stdev(data):
return np.std(data)
@jit(nopython=True)
def variance(data):
return np.var(data)
@jit(nopython=True)
def npmin(data):
return np.amin(data)
@jit(nopython=True)
def npmax(data):
return np.amax(data)
@jit(forceobj=True)
def kmeans(data, n_clusters=8, init="k-means++", n_init=10, max_iter=300, tol=0.0001, precompute_distances="auto", verbose=0, random_state=None, copy_x=True, n_jobs=None, algorithm="auto"):
kernel = sklearn.cluster.KMeans(n_clusters = n_clusters, init = init, n_init = n_init, max_iter = max_iter, tol = tol, precompute_distances = precompute_distances, verbose = verbose, random_state = random_state, copy_x = copy_x, n_jobs = n_jobs, algorithm = algorithm)
kernel.fit(data)
predictions = kernel.predict(data)
centers = kernel.cluster_centers_
return centers, predictions
@jit(forceobj=True)
def pca(data, n_components = None, copy = True, whiten = False, svd_solver = "auto", tol = 0.0, iterated_power = "auto", random_state = None):
kernel = sklearn.decomposition.PCA(n_components = n_components, copy = copy, whiten = whiten, svd_solver = svd_solver, tol = tol, iterated_power = iterated_power, random_state = random_state)
return kernel.fit_transform(data)
@jit(forceobj=True)
def decisiontree(data, labels, test_size = 0.3, criterion = "gini", splitter = "default", max_depth = None): #expects *2d data and 1d labels
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.tree.DecisionTreeClassifier(criterion = criterion, splitter = splitter, max_depth = max_depth)
model = model.fit(data_train,labels_train)
predictions = model.predict(data_test)
metrics = ClassificationMetrics(predictions, labels_test)
return model, metrics
@jit(forceobj=True)
def knn_classifier(data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsClassifier()
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def knn_regressor(data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None):
data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs)
model.fit(data_train, outputs_train)
predictions = model.predict(data_test)
return model, RegressionMetrics(predictions, outputs_test)
class NaiveBayes:
def guassian(self, data, labels, test_size = 0.3, priors = None, var_smoothing = 1e-09):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.GaussianNB(priors = priors, var_smoothing = var_smoothing)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def multinomial(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.MultinomialNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def bernoulli(self, data, labels, test_size = 0.3, alpha=1.0, binarize=0.0, fit_prior=True, class_prior=None):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.BernoulliNB(alpha = alpha, binarize = binarize, fit_prior = fit_prior, class_prior = class_prior)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def complement(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None, norm=False):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.ComplementNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior, norm = norm)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
class SVM:
class CustomKernel:
def __new__(cls, C, kernel, degre, gamma, coef0, shrinking, probability, tol, cache_size, class_weight, verbose, max_iter, decision_function_shape, random_state):
return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state)
class StandardKernel:
def __new__(cls, kernel, C=1.0, degree=3, gamma='auto_deprecated', coef0=0.0, shrinking=True, probability=False, tol=0.001, cache_size=200, class_weight=None, verbose=False, max_iter=-1, decision_function_shape='ovr', random_state=None):
return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state)
class PrebuiltKernel:
class Linear:
def __new__(cls):
return sklearn.svm.SVC(kernel = 'linear')
class Polynomial:
def __new__(cls, power, r_bias):
return sklearn.svm.SVC(kernel = 'polynomial', degree = power, coef0 = r_bias)
class RBF:
def __new__(cls, gamma):
return sklearn.svm.SVC(kernel = 'rbf', gamma = gamma)
class Sigmoid:
def __new__(cls, r_bias):
return sklearn.svm.SVC(kernel = 'sigmoid', coef0 = r_bias)
def fit(self, kernel, train_data, train_outputs): # expects *2d data, 1d labels or outputs
return kernel.fit(train_data, train_outputs)
def eval_classification(self, kernel, test_data, test_outputs):
predictions = kernel.predict(test_data)
return ClassificationMetrics(predictions, test_outputs)
def eval_regression(self, kernel, test_data, test_outputs):
predictions = kernel.predict(test_data)
return RegressionMetrics(predictions, test_outputs)
def random_forest_classifier(data, labels, test_size, n_estimators="warn", criterion="gini", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False, class_weight=None):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
kernel = sklearn.ensemble.RandomForestClassifier(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_samples_leaf = min_samples_leaf, min_weight_fraction_leaf = min_weight_fraction_leaf, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start, class_weight = class_weight)
kernel.fit(data_train, labels_train)
predictions = kernel.predict(data_test)
return kernel, ClassificationMetrics(predictions, labels_test)
def random_forest_regressor(data, outputs, test_size, n_estimators="warn", criterion="mse", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False):
data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1)
kernel = sklearn.ensemble.RandomForestRegressor(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_weight_fraction_leaf = min_weight_fraction_leaf, max_features = max_features, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, min_impurity_split = min_impurity_split, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start)
kernel.fit(data_train, outputs_train)
predictions = kernel.predict(data_test)
return kernel, RegressionMetrics(predictions, outputs_test)
class Regression:
# Titan Robotics Team 2022: CUDA-based Regressions Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this module has been automatically inegrated into analysis.py, and should be callable as a class from the package
# this module is cuda-optimized and vectorized (except for one small part)
# setup:
__version__ = "1.0.0.003"
# changelog should be viewed using print(analysis.regression.__changelog__)
__changelog__ = """
1.0.0.003:
- bug fixes
1.0.0.002:
-Added more parameters to log, exponential, polynomial
-Added SigmoidalRegKernelArthur, because Arthur apparently needs
to train the scaling and shifting of sigmoids
1.0.0.001:
-initial release, with linear, log, exponential, polynomial, and sigmoid kernels
-already vectorized (except for polynomial generation) and CUDA-optimized
"""
__author__ = (
"Jacob Levine <jlevine@imsa.edu>",
"Arthur Lu <learthurgo@gmail.com>"
)
__all__ = [
'factorial',
'take_all_pwrs',
'num_poly_terms',
'set_device',
'LinearRegKernel',
'SigmoidalRegKernel',
'LogRegKernel',
'PolyRegKernel',
'ExpRegKernel',
'SigmoidalRegKernelArthur',
'SGDTrain',
'CustomTrain'
]
global device
device = "cuda:0" if torch.torch.cuda.is_available() else "cpu"
#todo: document completely
def set_device(self, new_device):
device=new_device
class LinearRegKernel():
parameters= []
weights=None
bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,mtx)+long_bias
class SigmoidalRegKernel():
parameters= []
weights=None
bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return self.sigmoid(torch.matmul(self.weights,mtx)+long_bias)
class SigmoidalRegKernelArthur():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*self.sigmoid(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class LogRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.log(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class ExpRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.exp(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class PolyRegKernel():
parameters= []
weights=None
bias=None
power=None
def __init__(self, num_vars, power):
self.power=power
num_terms=self.num_poly_terms(num_vars, power)
self.weights=torch.rand(num_terms, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def num_poly_terms(self,num_vars, power):
if power == 0:
return 0
return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1)
def factorial(self,n):
if n==0:
return 1
else:
return n*self.factorial(n-1)
def take_all_pwrs(self, vec, pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0]).to(device).to(torch.float)
for i in torch.t(combins).to(device).to(torch.float):
out *= i
if pwr == 1:
return out
else:
return torch.cat((out,self.take_all_pwrs(vec, pwr-1)))
def forward(self,mtx):
#TODO: Vectorize the last part
cols=[]
for i in torch.t(mtx):
cols.append(self.take_all_pwrs(i,self.power))
new_mtx=torch.t(torch.stack(cols))
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,new_mtx)+long_bias
def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
optim=torch.optim.SGD(kernel.parameters, lr=learning_rate)
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data)
ls=loss(pred,ground)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
class Gliko2:
_tau = 0.5
def getRating(self):
return (self.__rating * 173.7178) + 1500
def setRating(self, rating):
self.__rating = (rating - 1500) / 173.7178
rating = property(getRating, setRating)
def getRd(self):
return self.__rd * 173.7178
def setRd(self, rd):
self.__rd = rd / 173.7178
rd = property(getRd, setRd)
def __init__(self, rating = 1500, rd = 350, vol = 0.06):
self.setRating(rating)
self.setRd(rd)
self.vol = vol
def _preRatingRD(self):
self.__rd = math.sqrt(math.pow(self.__rd, 2) + math.pow(self.vol, 2))
def update_player(self, rating_list, RD_list, outcome_list):
rating_list = [(x - 1500) / 173.7178 for x in rating_list]
RD_list = [x / 173.7178 for x in RD_list]
v = self._v(rating_list, RD_list)
self.vol = self._newVol(rating_list, RD_list, outcome_list, v)
self._preRatingRD()
self.__rd = 1 / math.sqrt((1 / math.pow(self.__rd, 2)) + (1 / v))
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * \
(outcome_list[i] - self._E(rating_list[i], RD_list[i]))
self.__rating += math.pow(self.__rd, 2) * tempSum
def _newVol(self, rating_list, RD_list, outcome_list, v):
i = 0
delta = self._delta(rating_list, RD_list, outcome_list, v)
a = math.log(math.pow(self.vol, 2))
tau = self._tau
x0 = a
x1 = 0
while x0 != x1:
# New iteration, so x(i) becomes x(i-1)
x0 = x1
d = math.pow(self.__rating, 2) + v + math.exp(x0)
h1 = -(x0 - a) / math.pow(tau, 2) - 0.5 * math.exp(x0) \
/ d + 0.5 * math.exp(x0) * math.pow(delta / d, 2)
h2 = -1 / math.pow(tau, 2) - 0.5 * math.exp(x0) * \
(math.pow(self.__rating, 2) + v) \
/ math.pow(d, 2) + 0.5 * math.pow(delta, 2) * math.exp(x0) \
* (math.pow(self.__rating, 2) + v - math.exp(x0)) / math.pow(d, 3)
x1 = x0 - (h1 / h2)
return math.exp(x1 / 2)
def _delta(self, rating_list, RD_list, outcome_list, v):
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * (outcome_list[i] - self._E(rating_list[i], RD_list[i]))
return v * tempSum
def _v(self, rating_list, RD_list):
tempSum = 0
for i in range(len(rating_list)):
tempE = self._E(rating_list[i], RD_list[i])
tempSum += math.pow(self._g(RD_list[i]), 2) * tempE * (1 - tempE)
return 1 / tempSum
def _E(self, p2rating, p2RD):
return 1 / (1 + math.exp(-1 * self._g(p2RD) * \
(self.__rating - p2rating)))
def _g(self, RD):
return 1 / math.sqrt(1 + 3 * math.pow(RD, 2) / math.pow(math.pi, 2))
def did_not_compete(self):
self._preRatingRD()

View File

@@ -7,10 +7,28 @@
# current benchmark of optimization: 1.33 times faster
# setup:
__version__ = "1.1.12.006"
__version__ = "1.1.13.007"
# changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
1.1.13.007:
- fixed bug with trueskill
1.1.13.006:
- cleaned up imports
1.1.13.005:
- cleaned up package
1.1.13.004:
- small fixes to regression to improve performance
1.1.13.003:
- filtered nans from regression
1.1.13.002:
- removed torch requirement, and moved Regression back to regression.py
1.1.13.001:
- bug fix with linear regression not returning a proper value
- cleaned up regression
- fixed bug with polynomial regressions
1.1.13.000:
- fixed all regressions to now properly work
1.1.12.006:
- fixed bg with a division by zero in histo_analysis
1.1.12.005:
@@ -233,7 +251,6 @@ __author__ = (
)
__all__ = [
'_init_device',
'load_csv',
'basic_stats',
'z_score',
@@ -254,7 +271,6 @@ __all__ = [
'SVM',
'random_forest_classifier',
'random_forest_regressor',
'Regression',
'Glicko2',
# all statistics functions left out due to integration in other functions
]
@@ -267,22 +283,15 @@ import csv
import numba
from numba import jit
import numpy as np
import math
import scipy
from scipy import *
import sklearn
from sklearn import *
import torch
try:
from analysis import trueskill as Trueskill
except:
import trueskill as Trueskill
from analysis import trueskill as Trueskill
class error(ValueError):
pass
def _init_device(): # initiates computation device for ANNs
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
return device
def load_csv(filepath):
with open(filepath, newline='') as csvfile:
file_array = np.array(list(csv.reader(csvfile)))
@@ -339,33 +348,65 @@ def histo_analysis(hist_data):
return None
def regression(ndevice, inputs, outputs, args, loss = torch.nn.MSELoss(), _iterations = 10000, lr = 0.01, _iterations_ply = 10000, lr_ply = 0.01): # inputs, outputs expects N-D array
def regression(inputs, outputs, args): # inputs, outputs expects N-D array
X = np.array(inputs)
y = np.array(outputs)
regressions = []
Regression().set_device(ndevice)
if 'lin' in args: # formula: ax + b
model = Regression().SGDTrain(Regression.LinearRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor([outputs]).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b):
return a * x + b
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'log' in args: # formula: a log (b(x + c)) + d
model = Regression().SGDTrain(Regression.LogRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b, c, d):
return a * np.log(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'exp' in args: # formula: a e ^ (b(x + c)) + d
model = Regression().SGDTrain(Regression.ExpRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b, c, d):
return a * np.exp(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'ply' in args: # formula: a + bx^1 + cx^2 + dx^3 + ...
inputs = np.array([inputs])
outputs = np.array([outputs])
plys = []
limit = len(outputs[0])
@@ -385,12 +426,21 @@ def regression(ndevice, inputs, outputs, args, loss = torch.nn.MSELoss(), _itera
regressions.append(plys)
if 'sig' in args: # formula: a sig (b(x + c)) + d | sig() = 1/(1 + e ^ -x)
if 'sig' in args: # formula: a tanh (b(x + c)) + d
model = Regression().SGDTrain(Regression.SigmoidalRegKernelArthur(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b, c, d):
return a * np.tanh(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
return regressions
@@ -413,13 +463,13 @@ def trueskill(teams_data, observations): # teams_data is array of array of tuple
team_ratings = []
for team in teams_data:
team_temp = []
team_temp = ()
for player in team:
player = Trueskill.Rating(player[0], player[1])
team_temp.append(player)
team_temp = team_temp + (player,)
team_ratings.append(team_temp)
return Trueskill.rate(teams_data, observations)
return Trueskill.rate(team_ratings, ranks=observations)
class RegressionMetrics():
@@ -642,225 +692,6 @@ def random_forest_regressor(data, outputs, test_size, n_estimators="warn", crite
return kernel, RegressionMetrics(predictions, outputs_test)
class Regression:
# Titan Robotics Team 2022: CUDA-based Regressions Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this module has been automatically inegrated into analysis.py, and should be callable as a class from the package
# this module is cuda-optimized and vectorized (except for one small part)
# setup:
__version__ = "1.0.0.003"
# changelog should be viewed using print(analysis.regression.__changelog__)
__changelog__ = """
1.0.0.003:
- bug fixes
1.0.0.002:
-Added more parameters to log, exponential, polynomial
-Added SigmoidalRegKernelArthur, because Arthur apparently needs
to train the scaling and shifting of sigmoids
1.0.0.001:
-initial release, with linear, log, exponential, polynomial, and sigmoid kernels
-already vectorized (except for polynomial generation) and CUDA-optimized
"""
__author__ = (
"Jacob Levine <jlevine@imsa.edu>",
"Arthur Lu <learthurgo@gmail.com>"
)
__all__ = [
'factorial',
'take_all_pwrs',
'num_poly_terms',
'set_device',
'LinearRegKernel',
'SigmoidalRegKernel',
'LogRegKernel',
'PolyRegKernel',
'ExpRegKernel',
'SigmoidalRegKernelArthur',
'SGDTrain',
'CustomTrain'
]
global device
device = "cuda:0" if torch.torch.cuda.is_available() else "cpu"
#todo: document completely
def set_device(self, new_device):
device=new_device
class LinearRegKernel():
parameters= []
weights=None
bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,mtx)+long_bias
class SigmoidalRegKernel():
parameters= []
weights=None
bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return self.sigmoid(torch.matmul(self.weights,mtx)+long_bias)
class SigmoidalRegKernelArthur():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*self.sigmoid(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class LogRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.log(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class ExpRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.exp(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class PolyRegKernel():
parameters= []
weights=None
bias=None
power=None
def __init__(self, num_vars, power):
self.power=power
num_terms=self.num_poly_terms(num_vars, power)
self.weights=torch.rand(num_terms, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def num_poly_terms(self,num_vars, power):
if power == 0:
return 0
return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1)
def factorial(self,n):
if n==0:
return 1
else:
return n*self.factorial(n-1)
def take_all_pwrs(self, vec, pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0]).to(device).to(torch.float)
for i in torch.t(combins).to(device).to(torch.float):
out *= i
if pwr == 1:
return out
else:
return torch.cat((out,self.take_all_pwrs(vec, pwr-1)))
def forward(self,mtx):
#TODO: Vectorize the last part
cols=[]
for i in torch.t(mtx):
cols.append(self.take_all_pwrs(i,self.power))
new_mtx=torch.t(torch.stack(cols))
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,new_mtx)+long_bias
def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
optim=torch.optim.SGD(kernel.parameters, lr=learning_rate)
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data)
ls=loss(pred,ground)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
class Glicko2:
_tau = 0.5
@@ -958,4 +789,4 @@ class Glicko2:
def did_not_compete(self):
self._preRatingRD()
self._preRatingRD()

View File

@@ -1,20 +1,23 @@
# Titan Robotics Team 2022: CUDA-based Regressions Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this should be imported as a python module using 'import regression'
# this should be included in the local directory or environment variable
# this module is cuda-optimized and vectorized (except for one small part)
# this module has been automatically inegrated into analysis.py, and should be callable as a class from the package
# this module is cuda-optimized and vectorized (except for one small part)
# setup:
__version__ = "1.0.0.002"
__version__ = "1.0.0.004"
# changelog should be viewed using print(regression.__changelog__)
# changelog should be viewed using print(analysis.regression.__changelog__)
__changelog__ = """
1.0.0.004:
- bug fixes
- fixed changelog
1.0.0.003:
- bug fixes
1.0.0.002:
-Added more parameters to log, exponential, polynomial
-Added SigmoidalRegKernelArthur, because Arthur apparently needs
to train the scaling and shifting of sigmoids
1.0.0.001:
-initial release, with linear, log, exponential, polynomial, and sigmoid kernels
-already vectorized (except for polynomial generation) and CUDA-optimized
@@ -22,6 +25,7 @@ __changelog__ = """
__author__ = (
"Jacob Levine <jlevine@imsa.edu>",
"Arthur Lu <learthurgo@gmail.com>"
)
__all__ = [
@@ -39,35 +43,15 @@ __all__ = [
'CustomTrain'
]
# imports (just one for now):
import torch
global device
device = "cuda:0" if torch.torch.cuda.is_available() else "cpu"
#todo: document completely
def factorial(n):
if n==0:
return 1
else:
return n*factorial(n-1)
def num_poly_terms(num_vars, power):
if power == 0:
return 0
return int(factorial(num_vars+power-1) / factorial(power) / factorial(num_vars-1)) + num_poly_terms(num_vars, power-1)
def take_all_pwrs(vec,pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0])
for i in torch.t(combins):
out *= i
return torch.cat(out,take_all_pwrs(vec, pwr-1))
def set_device(new_device):
global device
def set_device(self, new_device):
device=new_device
class LinearRegKernel():
@@ -154,20 +138,39 @@ class PolyRegKernel():
power=None
def __init__(self, num_vars, power):
self.power=power
num_terms=num_poly_terms(num_vars, power)
num_terms=self.num_poly_terms(num_vars, power)
self.weights=torch.rand(num_terms, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def num_poly_terms(self,num_vars, power):
if power == 0:
return 0
return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1)
def factorial(self,n):
if n==0:
return 1
else:
return n*self.factorial(n-1)
def take_all_pwrs(self, vec, pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0]).to(device).to(torch.float)
for i in torch.t(combins).to(device).to(torch.float):
out *= i
if pwr == 1:
return out
else:
return torch.cat((out,self.take_all_pwrs(vec, pwr-1)))
def forward(self,mtx):
#TODO: Vectorize the last part
cols=[]
for i in torch.t(mtx):
cols.append(take_all_pwrs(i,self.power))
cols.append(self.take_all_pwrs(i,self.power))
new_mtx=torch.t(torch.stack(cols))
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,new_mtx)+long_bias
def SGDTrain(kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
optim=torch.optim.SGD(kernel.parameters, lr=learning_rate)
data_cuda=data.to(device)
ground_cuda=ground.to(device)
@@ -192,7 +195,7 @@ def SGDTrain(kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, lea
optim.step()
return kernel
def CustomTrain(kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
@@ -214,4 +217,4 @@ def CustomTrain(kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
return kernel

View File

@@ -7,10 +7,28 @@
# current benchmark of optimization: 1.33 times faster
# setup:
__version__ = "1.1.12.006"
__version__ = "1.1.13.007"
# changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
1.1.13.007:
- fixed bug with trueskill
1.1.13.006:
- cleaned up imports
1.1.13.005:
- cleaned up package
1.1.13.004:
- small fixes to regression to improve performance
1.1.13.003:
- filtered nans from regression
1.1.13.002:
- removed torch requirement, and moved Regression back to regression.py
1.1.13.001:
- bug fix with linear regression not returning a proper value
- cleaned up regression
- fixed bug with polynomial regressions
1.1.13.000:
- fixed all regressions to now properly work
1.1.12.006:
- fixed bg with a division by zero in histo_analysis
1.1.12.005:
@@ -233,7 +251,6 @@ __author__ = (
)
__all__ = [
'_init_device',
'load_csv',
'basic_stats',
'z_score',
@@ -254,7 +271,6 @@ __all__ = [
'SVM',
'random_forest_classifier',
'random_forest_regressor',
'Regression',
'Glicko2',
# all statistics functions left out due to integration in other functions
]
@@ -267,22 +283,15 @@ import csv
import numba
from numba import jit
import numpy as np
import math
import scipy
from scipy import *
import sklearn
from sklearn import *
import torch
try:
from analysis import trueskill as Trueskill
except:
import trueskill as Trueskill
from analysis import trueskill as Trueskill
class error(ValueError):
pass
def _init_device(): # initiates computation device for ANNs
device = 'cuda:0' if torch.cuda.is_available() else 'cpu'
return device
def load_csv(filepath):
with open(filepath, newline='') as csvfile:
file_array = np.array(list(csv.reader(csvfile)))
@@ -339,33 +348,65 @@ def histo_analysis(hist_data):
return None
def regression(ndevice, inputs, outputs, args, loss = torch.nn.MSELoss(), _iterations = 10000, lr = 0.01, _iterations_ply = 10000, lr_ply = 0.01): # inputs, outputs expects N-D array
def regression(inputs, outputs, args): # inputs, outputs expects N-D array
X = np.array(inputs)
y = np.array(outputs)
regressions = []
Regression().set_device(ndevice)
if 'lin' in args: # formula: ax + b
model = Regression().SGDTrain(Regression.LinearRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor([outputs]).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b):
return a * x + b
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'log' in args: # formula: a log (b(x + c)) + d
model = Regression().SGDTrain(Regression.LogRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b, c, d):
return a * np.log(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'exp' in args: # formula: a e ^ (b(x + c)) + d
model = Regression().SGDTrain(Regression.ExpRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b, c, d):
return a * np.exp(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'ply' in args: # formula: a + bx^1 + cx^2 + dx^3 + ...
inputs = np.array([inputs])
outputs = np.array([outputs])
plys = []
limit = len(outputs[0])
@@ -385,12 +426,21 @@ def regression(ndevice, inputs, outputs, args, loss = torch.nn.MSELoss(), _itera
regressions.append(plys)
if 'sig' in args: # formula: a sig (b(x + c)) + d | sig() = 1/(1 + e ^ -x)
if 'sig' in args: # formula: a tanh (b(x + c)) + d
model = Regression().SGDTrain(Regression.SigmoidalRegKernelArthur(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True)
params = model[0].parameters
params[:] = map(lambda x: x.item(), params)
regressions.append((params, model[1][::-1][0]))
try:
def func(x, a, b, c, d):
return a * np.tanh(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
return regressions
@@ -413,13 +463,13 @@ def trueskill(teams_data, observations): # teams_data is array of array of tuple
team_ratings = []
for team in teams_data:
team_temp = []
team_temp = ()
for player in team:
player = Trueskill.Rating(player[0], player[1])
team_temp.append(player)
team_temp = team_temp + (player,)
team_ratings.append(team_temp)
return Trueskill.rate(teams_data, observations)
return Trueskill.rate(team_ratings, ranks=observations)
class RegressionMetrics():
@@ -642,225 +692,6 @@ def random_forest_regressor(data, outputs, test_size, n_estimators="warn", crite
return kernel, RegressionMetrics(predictions, outputs_test)
class Regression:
# Titan Robotics Team 2022: CUDA-based Regressions Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this module has been automatically inegrated into analysis.py, and should be callable as a class from the package
# this module is cuda-optimized and vectorized (except for one small part)
# setup:
__version__ = "1.0.0.003"
# changelog should be viewed using print(analysis.regression.__changelog__)
__changelog__ = """
1.0.0.003:
- bug fixes
1.0.0.002:
-Added more parameters to log, exponential, polynomial
-Added SigmoidalRegKernelArthur, because Arthur apparently needs
to train the scaling and shifting of sigmoids
1.0.0.001:
-initial release, with linear, log, exponential, polynomial, and sigmoid kernels
-already vectorized (except for polynomial generation) and CUDA-optimized
"""
__author__ = (
"Jacob Levine <jlevine@imsa.edu>",
"Arthur Lu <learthurgo@gmail.com>"
)
__all__ = [
'factorial',
'take_all_pwrs',
'num_poly_terms',
'set_device',
'LinearRegKernel',
'SigmoidalRegKernel',
'LogRegKernel',
'PolyRegKernel',
'ExpRegKernel',
'SigmoidalRegKernelArthur',
'SGDTrain',
'CustomTrain'
]
global device
device = "cuda:0" if torch.torch.cuda.is_available() else "cpu"
#todo: document completely
def set_device(self, new_device):
device=new_device
class LinearRegKernel():
parameters= []
weights=None
bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,mtx)+long_bias
class SigmoidalRegKernel():
parameters= []
weights=None
bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return self.sigmoid(torch.matmul(self.weights,mtx)+long_bias)
class SigmoidalRegKernelArthur():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*self.sigmoid(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class LogRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.log(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class ExpRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.exp(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class PolyRegKernel():
parameters= []
weights=None
bias=None
power=None
def __init__(self, num_vars, power):
self.power=power
num_terms=self.num_poly_terms(num_vars, power)
self.weights=torch.rand(num_terms, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def num_poly_terms(self,num_vars, power):
if power == 0:
return 0
return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1)
def factorial(self,n):
if n==0:
return 1
else:
return n*self.factorial(n-1)
def take_all_pwrs(self, vec, pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0]).to(device).to(torch.float)
for i in torch.t(combins).to(device).to(torch.float):
out *= i
if pwr == 1:
return out
else:
return torch.cat((out,self.take_all_pwrs(vec, pwr-1)))
def forward(self,mtx):
#TODO: Vectorize the last part
cols=[]
for i in torch.t(mtx):
cols.append(self.take_all_pwrs(i,self.power))
new_mtx=torch.t(torch.stack(cols))
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,new_mtx)+long_bias
def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
optim=torch.optim.SGD(kernel.parameters, lr=learning_rate)
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data)
ls=loss(pred,ground)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
class Glicko2:
_tau = 0.5
@@ -958,4 +789,4 @@ class Glicko2:
def did_not_compete(self):
self._preRatingRD()
self._preRatingRD()

View File

@@ -1,20 +1,23 @@
# Titan Robotics Team 2022: CUDA-based Regressions Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this should be imported as a python module using 'import regression'
# this should be included in the local directory or environment variable
# this module is cuda-optimized and vectorized (except for one small part)
# this module has been automatically inegrated into analysis.py, and should be callable as a class from the package
# this module is cuda-optimized and vectorized (except for one small part)
# setup:
__version__ = "1.0.0.002"
__version__ = "1.0.0.004"
# changelog should be viewed using print(regression.__changelog__)
# changelog should be viewed using print(analysis.regression.__changelog__)
__changelog__ = """
1.0.0.004:
- bug fixes
- fixed changelog
1.0.0.003:
- bug fixes
1.0.0.002:
-Added more parameters to log, exponential, polynomial
-Added SigmoidalRegKernelArthur, because Arthur apparently needs
to train the scaling and shifting of sigmoids
1.0.0.001:
-initial release, with linear, log, exponential, polynomial, and sigmoid kernels
-already vectorized (except for polynomial generation) and CUDA-optimized
@@ -22,6 +25,7 @@ __changelog__ = """
__author__ = (
"Jacob Levine <jlevine@imsa.edu>",
"Arthur Lu <learthurgo@gmail.com>"
)
__all__ = [
@@ -39,35 +43,15 @@ __all__ = [
'CustomTrain'
]
# imports (just one for now):
import torch
global device
device = "cuda:0" if torch.torch.cuda.is_available() else "cpu"
#todo: document completely
def factorial(n):
if n==0:
return 1
else:
return n*factorial(n-1)
def num_poly_terms(num_vars, power):
if power == 0:
return 0
return int(factorial(num_vars+power-1) / factorial(power) / factorial(num_vars-1)) + num_poly_terms(num_vars, power-1)
def take_all_pwrs(vec,pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0])
for i in torch.t(combins):
out *= i
return torch.cat(out,take_all_pwrs(vec, pwr-1))
def set_device(new_device):
global device
def set_device(self, new_device):
device=new_device
class LinearRegKernel():
@@ -154,20 +138,39 @@ class PolyRegKernel():
power=None
def __init__(self, num_vars, power):
self.power=power
num_terms=num_poly_terms(num_vars, power)
num_terms=self.num_poly_terms(num_vars, power)
self.weights=torch.rand(num_terms, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def num_poly_terms(self,num_vars, power):
if power == 0:
return 0
return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1)
def factorial(self,n):
if n==0:
return 1
else:
return n*self.factorial(n-1)
def take_all_pwrs(self, vec, pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0]).to(device).to(torch.float)
for i in torch.t(combins).to(device).to(torch.float):
out *= i
if pwr == 1:
return out
else:
return torch.cat((out,self.take_all_pwrs(vec, pwr-1)))
def forward(self,mtx):
#TODO: Vectorize the last part
cols=[]
for i in torch.t(mtx):
cols.append(take_all_pwrs(i,self.power))
cols.append(self.take_all_pwrs(i,self.power))
new_mtx=torch.t(torch.stack(cols))
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,new_mtx)+long_bias
def SGDTrain(kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
optim=torch.optim.SGD(kernel.parameters, lr=learning_rate)
data_cuda=data.to(device)
ground_cuda=ground.to(device)
@@ -192,7 +195,7 @@ def SGDTrain(kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, lea
optim.step()
return kernel
def CustomTrain(kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
@@ -214,4 +217,4 @@ def CustomTrain(kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
return kernel

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@@ -2,7 +2,7 @@ import setuptools
setuptools.setup(
name="analysis", # Replace with your own username
version="1.0.0.003",
version="1.0.0.009",
author="The Titan Scouting Team",
author_email="titanscout2022@gmail.com",
description="analysis package developed by Titan Scouting for The Red Alliance",
@@ -10,6 +10,14 @@ setuptools.setup(
long_description_content_type="text/markdown",
url="https://github.com/titanscout2022/tr2022-strategy",
packages=setuptools.find_packages(),
install_requires=[
"numba",
"numpy",
"scipy",
"scikit-learn",
"six",
"matplotlib"
],
license = "GNU General Public License v3.0",
classifiers=[
"Programming Language :: Python :: 3",

File diff suppressed because it is too large Load Diff

View File

@@ -1,6 +0,0 @@
2020ilch
balls-blocked,basic_stats,historical_analysis
balls-collected,basic_stats,historical_analysis
balls-lower,basic_stats,historical_analysis
balls-started,basic_stats,historical_analysis
balls-upper,basic_stats,historical_analysis
1 2020ilch
2 balls-blocked,basic_stats,historical_analysis
3 balls-collected,basic_stats,historical_analysis
4 balls-lower,basic_stats,historical_analysis
5 balls-started,basic_stats,historical_analysis
6 balls-upper,basic_stats,historical_analysis

View File

@@ -0,0 +1 @@
2020ilch

View File

View File

@@ -0,0 +1,14 @@
balls-blocked,basic_stats,historical_analysis,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
balls-collected,basic_stats,historical_analysis,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
balls-lower-teleop,basic_stats,historical_analysis,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
balls-lower-auto,basic_stats,historical_analysis,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
balls-started,basic_stats,historical_analyss,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
balls-upper-teleop,basic_stats,historical_analysis,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
balls-upper-auto,basic_stats,historical_analysis,regression_linear,regression_logarithmic,regression_exponential,regression_polynomial,regression_sigmoidal
wheel-mechanism
low-balls
high-balls
wheel-success
strategic-focus
climb-mechanism
attitude

View File

@@ -8,7 +8,7 @@ def pull_new_tba_matches(apikey, competition, cutoff):
x=requests.get("https://www.thebluealliance.com/api/v3/event/"+competition+"/matches/simple", headers={"X-TBA-Auth_Key":api_key})
out = []
for i in x.json():
if (i["actual_time"]-cutoff >= 0 and i["comp_level"] == "qm"):
if (i["actual_time"] != None and i["actual_time"]-cutoff >= 0 and i["comp_level"] == "qm"):
out.append({"match" : i['match_number'], "blue" : list(map(lambda x: int(x[3:]), i['alliances']['blue']['team_keys'])), "red" : list(map(lambda x: int(x[3:]), i['alliances']['red']['team_keys'])), "winner": i["winning_alliance"]})
return out
@@ -21,6 +21,13 @@ def get_team_match_data(apikey, competition, team_num):
out[i['match']] = i['data']
return pd.DataFrame(out)
def get_team_pit_data(apikey, competition, team_num):
client = pymongo.MongoClient(apikey)
db = client.data_scouting
mdata = db.pitdata
out = {}
return mdata.find_one({"competition" : competition, "team_scouted": team_num})["data"]
def get_team_metrics_data(apikey, competition, team_num):
client = pymongo.MongoClient(apikey)
db = client.data_processing
@@ -38,7 +45,7 @@ def unkeyify_2l(layered_dict):
out[i] = list(map(lambda x: x[1], add))
return out
def get_data_formatted(apikey, competition):
def get_match_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_scouting
mdata = db.teamlist
@@ -51,6 +58,19 @@ def get_data_formatted(apikey, competition):
pass
return out
def get_pit_data_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = get_team_pit_data(apikey, competition, int(i))
except:
pass
return out
def push_team_tests_data(apikey, competition, team_num, data, dbname = "data_processing", colname = "team_tests"):
client = pymongo.MongoClient(apikey)
db = client[dbname]
@@ -63,6 +83,12 @@ def push_team_metrics_data(apikey, competition, team_num, data, dbname = "data_p
mdata = db[colname]
mdata.replace_one({"competition" : competition, "team": team_num}, {"_id": competition+str(team_num)+"am", "competition" : competition, "team" : team_num, "metrics" : data}, True)
def push_team_pit_data(apikey, competition, variable, data, dbname = "data_processing", colname = "team_pit"):
client = pymongo.MongoClient(apikey)
db = client[dbname]
mdata = db[colname]
mdata.replace_one({"competition" : competition, "variable": variable}, {"competition" : competition, "variable" : variable, "data" : data}, True)
def get_analysis_flags(apikey, flag):
client = pymongo.MongoClient(apikey)
db = client.data_processing

View File

@@ -0,0 +1,59 @@
import data as d
from analysis import analysis as an
import pymongo
import operator
def load_config(file):
config_vector = {}
file = an.load_csv(file)
for line in file[1:]:
config_vector[line[0]] = line[1:]
return (file[0][0], config_vector)
def get_metrics_processed_formatted(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_scouting
mdata = db.teamlist
x=mdata.find_one({"competition":competition})
out = {}
for i in x:
try:
out[int(i)] = d.get_team_metrics_data(apikey, competition, int(i))
except:
pass
return out
def main():
apikey = an.load_csv("keys.txt")[0][0]
tbakey = an.load_csv("keys.txt")[1][0]
competition, config = load_config("config.csv")
metrics = get_metrics_processed_formatted(apikey, competition)
elo = {}
gl2 = {}
for team in metrics:
elo[team] = metrics[team]["metrics"]["elo"]["score"]
gl2[team] = metrics[team]["metrics"]["gl2"]["score"]
elo = {k: v for k, v in sorted(elo.items(), key=lambda item: item[1])}
gl2 = {k: v for k, v in sorted(gl2.items(), key=lambda item: item[1])}
for team in elo:
print("teams sorted by elo:")
print("" + str(team) + " | " + str(elo[team]))
print("*"*25)
for team in gl2:
print("teams sorted by glicko2:")
print("" + str(team) + " | " + str(gl2[team]))
main()

View File

@@ -3,10 +3,24 @@
# Notes:
# setup:
__version__ = "0.0.2.001"
__version__ = "0.0.5.000"
# changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
0.0.5.000:
improved user interface
0.0.4.002:
- removed unessasary code
0.0.4.001:
- fixed bug where X range for regression was determined before sanitization
- better sanitized data
0.0.4.000:
- fixed spelling issue in __changelog__
- addressed nan bug in regression
- fixed errors on line 335 with metrics calling incorrect key "glicko2"
- fixed errors in metrics computing
0.0.3.000:
- added analysis to pit data
0.0.2.001:
- minor stability patches
- implemented db syncing for timestamps
@@ -69,22 +83,28 @@ __all__ = [
from analysis import analysis as an
import data as d
import numpy as np
import matplotlib.pyplot as plt
from os import system, name
from pathlib import Path
import time
import warnings
def main():
warnings.filterwarnings("ignore")
while(True):
current_time = time.time()
print("time: " + str(current_time))
print("[OK] time: " + str(current_time))
print(" loading config")
competition, config = load_config("config.csv")
print(" config loaded")
start = time.time()
config = load_config(Path("config/stats.config"))
competition = an.load_csv(Path("config/competition.config"))[0][0]
print("[OK] configs loaded")
print(" loading database keys")
apikey = an.load_csv("keys.txt")[0][0]
tbakey = an.load_csv("keys.txt")[1][0]
print(" loaded keys")
apikey = an.load_csv(Path("config/keys.config"))[0][0]
tbakey = an.load_csv(Path("config/keys.config"))[1][0]
print("[OK] loaded keys")
previous_time = d.get_analysis_flags(apikey, "latest_update")
@@ -97,33 +117,55 @@ def main():
previous_time = previous_time["latest_update"]
print(" analysis backtimed to: " + str(previous_time))
print("[OK] analysis backtimed to: " + str(previous_time))
print(" loading data")
data = d.get_data_formatted(apikey, competition)
print(" loaded data")
print("[OK] loading data")
start = time.time()
data = d.get_match_data_formatted(apikey, competition)
pit_data = d.pit = d.get_pit_data_formatted(apikey, competition)
print("[OK] loaded data in " + str(time.time() - start) + " seconds")
print(" running tests")
print("[OK] running tests")
start = time.time()
results = simpleloop(data, config)
print(" finished tests")
print("[OK] finished tests in " + str(time.time() - start) + " seconds")
print(" running metrics")
metrics = metricsloop(tbakey, apikey, competition, previous_time)
print(" finished metrics")
print("[OK] running metrics")
start = time.time()
metricsloop(tbakey, apikey, competition, previous_time)
print("[OK] finished metrics in " + str(time.time() - start) + " seconds")
print("[OK] running pit analysis")
start = time.time()
pit = pitloop(pit_data, config)
print("[OK] finished pit analysis in " + str(time.time() - start) + " seconds")
d.set_analysis_flags(apikey, "latest_update", {"latest_update":current_time})
print(" pushing to database")
push_to_database(apikey, competition, results, metrics)
print(" pushed to database")
print("[OK] pushing to database")
start = time.time()
push_to_database(apikey, competition, results, pit)
print("[OK] pushed to database in " + str(time.time() - start) + " seconds")
clear()
def clear():
# for windows
if name == 'nt':
_ = system('cls')
# for mac and linux(here, os.name is 'posix')
else:
_ = system('clear')
def load_config(file):
config_vector = {}
file = an.load_csv(file)
for line in file[1:]:
for line in file:
config_vector[line[0]] = line[1:]
return (file[0][0], config_vector)
return config_vector
def simpleloop(data, tests): # expects 3D array with [Team][Variable][Match]
@@ -145,36 +187,40 @@ def simpleloop(data, tests): # expects 3D array with [Team][Variable][Match]
def simplestats(data, test):
data = np.array(data)
data = data[np.isfinite(data)]
ranges = list(range(len(data)))
if(test == "basic_stats"):
return an.basic_stats(data)
if(test == "historical_analysis"):
return an.histo_analysis([list(range(len(data))), data])
return an.histo_analysis([ranges, data])
if(test == "regression_linear"):
return an.regression('cpu', [list(range(len(data)))], [data], ['lin'], _iterations = 5000)
return an.regression(ranges, data, ['lin'])
if(test == "regression_logarithmic"):
return an.regression('cpu', [list(range(len(data)))], [data], ['log'], _iterations = 5000)
return an.regression(ranges, data, ['log'])
if(test == "regression_exponential"):
return an.regression('cpu', [list(range(len(data)))], [data], ['exp'], _iterations = 5000)
return an.regression(ranges, data, ['exp'])
if(test == "regression_polynomial"):
return an.regression('cpu', [list(range(len(data)))], [data], ['ply'], _iterations = 5000)
return an.regression(ranges, data, ['ply'])
if(test == "regression_sigmoidal"):
return an.regression('cpu', [list(range(len(data)))], [data], ['sig'], _iterations = 5000)
return an.regression(ranges, data, ['sig'])
def push_to_database(apikey, competition, results, metrics):
def push_to_database(apikey, competition, results, pit):
for team in results:
d.push_team_tests_data(apikey, competition, team, results[team])
for team in metrics:
for variable in pit:
d.push_team_metrics_data(apikey, competition, team, metrics[team])
d.push_team_pit_data(apikey, competition, variable, pit[variable])
def metricsloop(tbakey, apikey, competition, timestamp): # listener based metrics update
@@ -183,8 +229,6 @@ def metricsloop(tbakey, apikey, competition, timestamp): # listener based metric
matches = d.pull_new_tba_matches(tbakey, competition, timestamp)
return_vector = {}
red = {}
blu = {}
@@ -192,7 +236,7 @@ def metricsloop(tbakey, apikey, competition, timestamp): # listener based metric
red = load_metrics(apikey, competition, match, "red")
blu = load_metrics(apikey, competition, match, "blue")
elo_red_total = 0
elo_blu_total = 0
@@ -265,36 +309,13 @@ def metricsloop(tbakey, apikey, competition, timestamp): # listener based metric
blu[team]["gl2"]["rd"] = blu[team]["gl2"]["rd"] + blu_gl2_delta["rd"]
blu[team]["gl2"]["vol"] = blu[team]["gl2"]["vol"] + blu_gl2_delta["vol"]
""" not functional for now
red_trueskill = []
blu_trueskill = []
temp_vector = {}
temp_vector.update(red)
temp_vector.update(blu)
red_ts_team_lookup = []
blu_ts_team_lookup = []
for team in temp_vector:
for team in red:
red_trueskill.append((red[team]["ts"]["mu"], red[team]["ts"]["sigma"]))
red_ts_team_lookup.append(team)
for team in blu:
blu_trueskill.append((blu[team]["ts"]["mu"], blu[team]["ts"]["sigma"]))
blu_ts_team_lookup.append(team)
print(red_trueskill)
print(blu_trueskill)
results = an.trueskill([red_trueskill, blu_trueskill], [observations["red"], observations["blu"]])
print(results)
"""
return_vector.update(red)
return_vector.update(blu)
return return_vector
d.push_team_metrics_data(apikey, competition, team, temp_vector[team])
def load_metrics(apikey, competition, match, group_name):
@@ -310,21 +331,34 @@ def load_metrics(apikey, competition, match, group_name):
gl2 = {"score": 1500, "rd": 250, "vol": 0.06}
ts = {"mu": 25, "sigma": 25/3}
d.push_team_metrics_data(apikey, competition, team, {"elo":elo, "gliko2":gl2,"trueskill":ts})
#d.push_team_metrics_data(apikey, competition, team, {"elo":elo, "gl2":gl2,"trueskill":ts})
group[team] = {"elo": elo, "gl2": gl2, "ts": ts}
else:
metrics = db_data["metrics"]
elo = metrics["elo"]
gl2 = metrics["gliko2"]
ts = metrics["trueskill"]
gl2 = metrics["gl2"]
ts = metrics["ts"]
group[team] = {"elo": elo, "gl2": gl2, "ts": ts}
return group
def pitloop(pit, tests):
return_vector = {}
for team in pit:
for variable in pit[team]:
if(variable in tests):
if(not variable in return_vector):
return_vector[variable] = []
return_vector[variable].append(pit[team][variable])
return return_vector
main()
"""

View File

@@ -0,0 +1,59 @@
# To add a new cell, type '# %%'
# To add a new markdown cell, type '# %% [markdown]'
# %%
import matplotlib.pyplot as plt
import data as d
import pymongo
# %%
def get_pit_variable_data(apikey, competition):
client = pymongo.MongoClient(apikey)
db = client.data_processing
mdata = db.team_pit
out = {}
return mdata.find()
# %%
def get_pit_variable_formatted(apikey, competition):
temp = get_pit_variable_data(apikey, competition)
out = {}
for i in temp:
out[i["variable"]] = i["data"]
return out
# %%
pit = get_pit_variable_formatted("mongodb+srv://api-user-new:titanscout2022@2022-scouting-4vfuu.mongodb.net/test?authSource=admin&replicaSet=2022-scouting-shard-0&readPreference=primary&appname=MongoDB%20Compass&ssl=true", "2020ilch")
# %%
import matplotlib.pyplot as plt
import numpy as np
# %%
fig, ax = plt.subplots(1, len(pit), sharey=True, figsize=(80,15))
i = 0
for variable in pit:
ax[i].hist(pit[variable])
ax[i].invert_xaxis()
ax[i].set_xlabel('')
ax[i].set_ylabel('Frequency')
ax[i].set_title(variable)
plt.yticks(np.arange(len(pit[variable])))
i+=1
plt.show()
# %%