This commit is contained in:
ltcptgeneral 2020-05-01 16:07:57 -05:00
parent b345bfb95b
commit 88e7c52c8b
31 changed files with 3396 additions and 0 deletions

View File

@ -0,0 +1,14 @@
Metadata-Version: 2.1
Name: analysis
Version: 1.0.0.12
Summary: analysis package developed by Titan Scouting for The Red Alliance
Home-page: https://github.com/titanscout2022/tr2022-strategy
Author: The Titan Scouting Team
Author-email: titanscout2022@gmail.com
License: GNU General Public License v3.0
Description: UNKNOWN
Platform: UNKNOWN
Classifier: Programming Language :: Python :: 3
Classifier: Operating System :: OS Independent
Requires-Python: >=3.6
Description-Content-Type: text/markdown

View File

@ -0,0 +1,15 @@
setup.py
analysis/__init__.py
analysis/analysis.py
analysis/regression.py
analysis/titanlearn.py
analysis/visualization.py
analysis.egg-info/PKG-INFO
analysis.egg-info/SOURCES.txt
analysis.egg-info/dependency_links.txt
analysis.egg-info/requires.txt
analysis.egg-info/top_level.txt
analysis/metrics/__init__.py
analysis/metrics/elo.py
analysis/metrics/glicko2.py
analysis/metrics/trueskill.py

View File

@ -0,0 +1 @@

View File

@ -0,0 +1,6 @@
numba
numpy
scipy
scikit-learn
six
matplotlib

View File

@ -0,0 +1 @@
analysis

1
analysis-master/build.sh Normal file
View File

@ -0,0 +1 @@
python setup.py sdist bdist_wheel || python3 setup.py sdist bdist_wheel

View File

@ -0,0 +1,923 @@
# Titan Robotics Team 2022: Data Analysis Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this should be imported as a python module using 'from analysis import analysis'
# this should be included in the local directory or environment variable
# this module has been optimized for multhreaded computing
# current benchmark of optimization: 1.33 times faster
# setup:
__version__ = "1.2.0.004"
# changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
1.2.0.004:
- fixed __all__ to reflected the correct functions and classes
- fixed CorrelationTests and StatisticalTests class functions to require self invocation
- added missing math import
- fixed KNN class functions to require self invocation
- fixed Metrics class functions to require self invocation
- various spelling fixes in CorrelationTests and StatisticalTests
1.2.0.003:
- bug fixes with CorrelationTests and StatisticalTests
- moved glicko2 and trueskill to the metrics subpackage
- moved elo to a new metrics subpackage
1.2.0.002:
- fixed docs
1.2.0.001:
- fixed docs
1.2.0.000:
- cleaned up wild card imports with scipy and sklearn
- added CorrelationTests class
- added StatisticalTests class
- added several correlation tests to CorrelationTests
- added several statistical tests to StatisticalTests
1.1.13.009:
- moved elo, glicko2, trueskill functions under class Metrics
1.1.13.008:
- moved Glicko2 to a seperate package
1.1.13.007:
- fixed bug with trueskill
1.1.13.006:
- cleaned up imports
1.1.13.005:
- cleaned up package
1.1.13.004:
- small fixes to regression to improve performance
1.1.13.003:
- filtered nans from regression
1.1.13.002:
- removed torch requirement, and moved Regression back to regression.py
1.1.13.001:
- bug fix with linear regression not returning a proper value
- cleaned up regression
- fixed bug with polynomial regressions
1.1.13.000:
- fixed all regressions to now properly work
1.1.12.006:
- fixed bg with a division by zero in histo_analysis
1.1.12.005:
- fixed numba issues by removing numba from elo, glicko2 and trueskill
1.1.12.004:
- renamed gliko to glicko
1.1.12.003:
- removed depreciated code
1.1.12.002:
- removed team first time trueskill instantiation in favor of integration in superscript.py
1.1.12.001:
- improved readibility of regression outputs by stripping tensor data
- used map with lambda to acheive the improved readibility
- lost numba jit support with regression, and generated_jit hangs at execution
- TODO: reimplement correct numba integration in regression
1.1.12.000:
- temporarily fixed polynomial regressions by using sklearn's PolynomialFeatures
1.1.11.010:
- alphabeticaly ordered import lists
1.1.11.009:
- bug fixes
1.1.11.008:
- bug fixes
1.1.11.007:
- bug fixes
1.1.11.006:
- tested min and max
- bug fixes
1.1.11.005:
- added min and max in basic_stats
1.1.11.004:
- bug fixes
1.1.11.003:
- bug fixes
1.1.11.002:
- consolidated metrics
- fixed __all__
1.1.11.001:
- added test/train split to RandomForestClassifier and RandomForestRegressor
1.1.11.000:
- added RandomForestClassifier and RandomForestRegressor
- note: untested
1.1.10.000:
- added numba.jit to remaining functions
1.1.9.002:
- kernelized PCA and KNN
1.1.9.001:
- fixed bugs with SVM and NaiveBayes
1.1.9.000:
- added SVM class, subclasses, and functions
- note: untested
1.1.8.000:
- added NaiveBayes classification engine
- note: untested
1.1.7.000:
- added knn()
- added confusion matrix to decisiontree()
1.1.6.002:
- changed layout of __changelog to be vscode friendly
1.1.6.001:
- added additional hyperparameters to decisiontree()
1.1.6.000:
- fixed __version__
- fixed __all__ order
- added decisiontree()
1.1.5.003:
- added pca
1.1.5.002:
- reduced import list
- added kmeans clustering engine
1.1.5.001:
- simplified regression by using .to(device)
1.1.5.000:
- added polynomial regression to regression(); untested
1.1.4.000:
- added trueskill()
1.1.3.002:
- renamed regression class to Regression, regression_engine() to regression gliko2_engine class to Gliko2
1.1.3.001:
- changed glicko2() to return tuple instead of array
1.1.3.000:
- added glicko2_engine class and glicko()
- verified glicko2() accuracy
1.1.2.003:
- fixed elo()
1.1.2.002:
- added elo()
- elo() has bugs to be fixed
1.1.2.001:
- readded regrression import
1.1.2.000:
- integrated regression.py as regression class
- removed regression import
- fixed metadata for regression class
- fixed metadata for analysis class
1.1.1.001:
- regression_engine() bug fixes, now actaully regresses
1.1.1.000:
- added regression_engine()
- added all regressions except polynomial
1.1.0.007:
- updated _init_device()
1.1.0.006:
- removed useless try statements
1.1.0.005:
- removed impossible outcomes
1.1.0.004:
- added performance metrics (r^2, mse, rms)
1.1.0.003:
- resolved nopython mode for mean, median, stdev, variance
1.1.0.002:
- snapped (removed) majority of uneeded imports
- forced object mode (bad) on all jit
- TODO: stop numba complaining about not being able to compile in nopython mode
1.1.0.001:
- removed from sklearn import * to resolve uneeded wildcard imports
1.1.0.000:
- removed c_entities,nc_entities,obstacles,objectives from __all__
- applied numba.jit to all functions
- depreciated and removed stdev_z_split
- cleaned up histo_analysis to include numpy and numba.jit optimizations
- depreciated and removed all regression functions in favor of future pytorch optimizer
- depreciated and removed all nonessential functions (basic_analysis, benchmark, strip_data)
- optimized z_normalize using sklearn.preprocessing.normalize
- TODO: implement kernel/function based pytorch regression optimizer
1.0.9.000:
- refactored
- numpyed everything
- removed stats in favor of numpy functions
1.0.8.005:
- minor fixes
1.0.8.004:
- removed a few unused dependencies
1.0.8.003:
- added p_value function
1.0.8.002:
- updated __all__ correctly to contain changes made in v 1.0.8.000 and v 1.0.8.001
1.0.8.001:
- refactors
- bugfixes
1.0.8.000:
- depreciated histo_analysis_old
- depreciated debug
- altered basic_analysis to take array data instead of filepath
- refactor
- optimization
1.0.7.002:
- bug fixes
1.0.7.001:
- bug fixes
1.0.7.000:
- added tanh_regression (logistical regression)
- bug fixes
1.0.6.005:
- added z_normalize function to normalize dataset
- bug fixes
1.0.6.004:
- bug fixes
1.0.6.003:
- bug fixes
1.0.6.002:
- bug fixes
1.0.6.001:
- corrected __all__ to contain all of the functions
1.0.6.000:
- added calc_overfit, which calculates two measures of overfit, error and performance
- added calculating overfit to optimize_regression
1.0.5.000:
- added optimize_regression function, which is a sample function to find the optimal regressions
- optimize_regression function filters out some overfit funtions (functions with r^2 = 1)
- planned addition: overfit detection in the optimize_regression function
1.0.4.002:
- added __changelog__
- updated debug function with log and exponential regressions
1.0.4.001:
- added log regressions
- added exponential regressions
- added log_regression and exp_regression to __all__
1.0.3.008:
- added debug function to further consolidate functions
1.0.3.007:
- added builtin benchmark function
- added builtin random (linear) data generation function
- added device initialization (_init_device)
1.0.3.006:
- reorganized the imports list to be in alphabetical order
- added search and regurgitate functions to c_entities, nc_entities, obstacles, objectives
1.0.3.005:
- major bug fixes
- updated historical analysis
- depreciated old historical analysis
1.0.3.004:
- added __version__, __author__, __all__
- added polynomial regression
- added root mean squared function
- added r squared function
1.0.3.003:
- bug fixes
- added c_entities
1.0.3.002:
- bug fixes
- added nc_entities, obstacles, objectives
- consolidated statistics.py to analysis.py
1.0.3.001:
- compiled 1d, column, and row basic stats into basic stats function
1.0.3.000:
- added historical analysis function
1.0.2.xxx:
- added z score test
1.0.1.xxx:
- major bug fixes
1.0.0.xxx:
- added loading csv
- added 1d, column, row basic stats
"""
__author__ = (
"Arthur Lu <learthurgo@gmail.com>",
"Jacob Levine <jlevine@imsa.edu>",
)
__all__ = [
'load_csv',
'basic_stats',
'z_score',
'z_normalize',
'histo_analysis',
'regression',
'Metrics',
'RegressionMetrics',
'ClassificationMetrics',
'kmeans',
'pca',
'decisiontree',
'KNN',
'NaiveBayes',
'SVM',
'random_forest_classifier',
'random_forest_regressor',
'CorrelationTests',
'StatisticalTests',
# all statistics functions left out due to integration in other functions
]
# now back to your regularly scheduled programming:
# imports (now in alphabetical order! v 1.0.3.006):
import csv
from analysis.metrics import elo as Elo
from analysis.metrics import glicko2 as Glicko2
import math
import numba
from numba import jit
import numpy as np
import scipy
from scipy import optimize, stats
import sklearn
from sklearn import preprocessing, pipeline, linear_model, metrics, cluster, decomposition, tree, neighbors, naive_bayes, svm, model_selection, ensemble
from analysis.metrics import trueskill as Trueskill
class error(ValueError):
pass
def load_csv(filepath):
with open(filepath, newline='') as csvfile:
file_array = np.array(list(csv.reader(csvfile)))
csvfile.close()
return file_array
# expects 1d array
@jit(forceobj=True)
def basic_stats(data):
data_t = np.array(data).astype(float)
_mean = mean(data_t)
_median = median(data_t)
_stdev = stdev(data_t)
_variance = variance(data_t)
_min = npmin(data_t)
_max = npmax(data_t)
return _mean, _median, _stdev, _variance, _min, _max
# returns z score with inputs of point, mean and standard deviation of spread
@jit(forceobj=True)
def z_score(point, mean, stdev):
score = (point - mean) / stdev
return score
# expects 2d array, normalizes across all axes
@jit(forceobj=True)
def z_normalize(array, *args):
array = np.array(array)
for arg in args:
array = sklearn.preprocessing.normalize(array, axis = arg)
return array
@jit(forceobj=True)
# expects 2d array of [x,y]
def histo_analysis(hist_data):
if(len(hist_data[0]) > 2):
hist_data = np.array(hist_data)
derivative = np.array(len(hist_data) - 1, dtype = float)
t = np.diff(hist_data)
derivative = t[1] / t[0]
np.sort(derivative)
return basic_stats(derivative)[0], basic_stats(derivative)[3]
else:
return None
def regression(inputs, outputs, args): # inputs, outputs expects N-D array
X = np.array(inputs)
y = np.array(outputs)
regressions = []
if 'lin' in args: # formula: ax + b
try:
def func(x, a, b):
return a * x + b
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'log' in args: # formula: a log (b(x + c)) + d
try:
def func(x, a, b, c, d):
return a * np.log(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'exp' in args: # formula: a e ^ (b(x + c)) + d
try:
def func(x, a, b, c, d):
return a * np.exp(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
if 'ply' in args: # formula: a + bx^1 + cx^2 + dx^3 + ...
inputs = np.array([inputs])
outputs = np.array([outputs])
plys = []
limit = len(outputs[0])
for i in range(2, limit):
model = sklearn.preprocessing.PolynomialFeatures(degree = i)
model = sklearn.pipeline.make_pipeline(model, sklearn.linear_model.LinearRegression())
model = model.fit(np.rot90(inputs), np.rot90(outputs))
params = model.steps[1][1].intercept_.tolist()
params = np.append(params, model.steps[1][1].coef_[0].tolist()[1::])
params.flatten()
params = params.tolist()
plys.append(params)
regressions.append(plys)
if 'sig' in args: # formula: a tanh (b(x + c)) + d
try:
def func(x, a, b, c, d):
return a * np.tanh(b*(x + c)) + d
popt, pcov = scipy.optimize.curve_fit(func, X, y)
regressions.append((popt.flatten().tolist(), None))
except Exception as e:
pass
return regressions
class Metrics:
def elo(self, starting_score, opposing_score, observed, N, K):
return Elo.calculate(starting_score, opposing_score, observed, N, K)
def glicko2(self, starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations):
player = Glicko2.Glicko2(rating = starting_score, rd = starting_rd, vol = starting_vol)
player.update_player([x for x in opposing_score], [x for x in opposing_rd], observations)
return (player.rating, player.rd, player.vol)
def trueskill(self, teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]]
team_ratings = []
for team in teams_data:
team_temp = ()
for player in team:
player = Trueskill.Rating(player[0], player[1])
team_temp = team_temp + (player,)
team_ratings.append(team_temp)
return Trueskill.rate(team_ratings, ranks=observations)
class RegressionMetrics():
def __new__(cls, predictions, targets):
return cls.r_squared(cls, predictions, targets), cls.mse(cls, predictions, targets), cls.rms(cls, predictions, targets)
def r_squared(self, predictions, targets): # assumes equal size inputs
return sklearn.metrics.r2_score(targets, predictions)
def mse(self, predictions, targets):
return sklearn.metrics.mean_squared_error(targets, predictions)
def rms(self, predictions, targets):
return math.sqrt(sklearn.metrics.mean_squared_error(targets, predictions))
class ClassificationMetrics():
def __new__(cls, predictions, targets):
return cls.cm(cls, predictions, targets), cls.cr(cls, predictions, targets)
def cm(self, predictions, targets):
return sklearn.metrics.confusion_matrix(targets, predictions)
def cr(self, predictions, targets):
return sklearn.metrics.classification_report(targets, predictions)
@jit(nopython=True)
def mean(data):
return np.mean(data)
@jit(nopython=True)
def median(data):
return np.median(data)
@jit(nopython=True)
def stdev(data):
return np.std(data)
@jit(nopython=True)
def variance(data):
return np.var(data)
@jit(nopython=True)
def npmin(data):
return np.amin(data)
@jit(nopython=True)
def npmax(data):
return np.amax(data)
@jit(forceobj=True)
def kmeans(data, n_clusters=8, init="k-means++", n_init=10, max_iter=300, tol=0.0001, precompute_distances="auto", verbose=0, random_state=None, copy_x=True, n_jobs=None, algorithm="auto"):
kernel = sklearn.cluster.KMeans(n_clusters = n_clusters, init = init, n_init = n_init, max_iter = max_iter, tol = tol, precompute_distances = precompute_distances, verbose = verbose, random_state = random_state, copy_x = copy_x, n_jobs = n_jobs, algorithm = algorithm)
kernel.fit(data)
predictions = kernel.predict(data)
centers = kernel.cluster_centers_
return centers, predictions
@jit(forceobj=True)
def pca(data, n_components = None, copy = True, whiten = False, svd_solver = "auto", tol = 0.0, iterated_power = "auto", random_state = None):
kernel = sklearn.decomposition.PCA(n_components = n_components, copy = copy, whiten = whiten, svd_solver = svd_solver, tol = tol, iterated_power = iterated_power, random_state = random_state)
return kernel.fit_transform(data)
@jit(forceobj=True)
def decisiontree(data, labels, test_size = 0.3, criterion = "gini", splitter = "default", max_depth = None): #expects *2d data and 1d labels
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.tree.DecisionTreeClassifier(criterion = criterion, splitter = splitter, max_depth = max_depth)
model = model.fit(data_train,labels_train)
predictions = model.predict(data_test)
metrics = ClassificationMetrics(predictions, labels_test)
return model, metrics
class KNN:
def knn_classifier(self, data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsClassifier()
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def knn_regressor(self, data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None):
data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs)
model.fit(data_train, outputs_train)
predictions = model.predict(data_test)
return model, RegressionMetrics(predictions, outputs_test)
class NaiveBayes:
def guassian(self, data, labels, test_size = 0.3, priors = None, var_smoothing = 1e-09):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.GaussianNB(priors = priors, var_smoothing = var_smoothing)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def multinomial(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.MultinomialNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def bernoulli(self, data, labels, test_size = 0.3, alpha=1.0, binarize=0.0, fit_prior=True, class_prior=None):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.BernoulliNB(alpha = alpha, binarize = binarize, fit_prior = fit_prior, class_prior = class_prior)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
def complement(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None, norm=False):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.naive_bayes.ComplementNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior, norm = norm)
model.fit(data_train, labels_train)
predictions = model.predict(data_test)
return model, ClassificationMetrics(predictions, labels_test)
class SVM:
class CustomKernel:
def __new__(cls, C, kernel, degre, gamma, coef0, shrinking, probability, tol, cache_size, class_weight, verbose, max_iter, decision_function_shape, random_state):
return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state)
class StandardKernel:
def __new__(cls, kernel, C=1.0, degree=3, gamma='auto_deprecated', coef0=0.0, shrinking=True, probability=False, tol=0.001, cache_size=200, class_weight=None, verbose=False, max_iter=-1, decision_function_shape='ovr', random_state=None):
return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state)
class PrebuiltKernel:
class Linear:
def __new__(cls):
return sklearn.svm.SVC(kernel = 'linear')
class Polynomial:
def __new__(cls, power, r_bias):
return sklearn.svm.SVC(kernel = 'polynomial', degree = power, coef0 = r_bias)
class RBF:
def __new__(cls, gamma):
return sklearn.svm.SVC(kernel = 'rbf', gamma = gamma)
class Sigmoid:
def __new__(cls, r_bias):
return sklearn.svm.SVC(kernel = 'sigmoid', coef0 = r_bias)
def fit(self, kernel, train_data, train_outputs): # expects *2d data, 1d labels or outputs
return kernel.fit(train_data, train_outputs)
def eval_classification(self, kernel, test_data, test_outputs):
predictions = kernel.predict(test_data)
return ClassificationMetrics(predictions, test_outputs)
def eval_regression(self, kernel, test_data, test_outputs):
predictions = kernel.predict(test_data)
return RegressionMetrics(predictions, test_outputs)
def random_forest_classifier(data, labels, test_size, n_estimators="warn", criterion="gini", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False, class_weight=None):
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
kernel = sklearn.ensemble.RandomForestClassifier(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_samples_leaf = min_samples_leaf, min_weight_fraction_leaf = min_weight_fraction_leaf, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start, class_weight = class_weight)
kernel.fit(data_train, labels_train)
predictions = kernel.predict(data_test)
return kernel, ClassificationMetrics(predictions, labels_test)
def random_forest_regressor(data, outputs, test_size, n_estimators="warn", criterion="mse", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False):
data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1)
kernel = sklearn.ensemble.RandomForestRegressor(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_weight_fraction_leaf = min_weight_fraction_leaf, max_features = max_features, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, min_impurity_split = min_impurity_split, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start)
kernel.fit(data_train, outputs_train)
predictions = kernel.predict(data_test)
return kernel, RegressionMetrics(predictions, outputs_test)
class CorrelationTests:
def anova_oneway(self, *args): #expects arrays of samples
results = scipy.stats.f_oneway(*args)
return {"F-value": results[0], "p-value": results[1]}
def pearson(self, x, y):
results = scipy.stats.pearsonr(x, y)
return {"r-value": results[0], "p-value": results[1]}
def spearman(self, a, b = None, axis = 0, nan_policy = 'propagate'):
results = scipy.stats.spearmanr(a, b = b, axis = axis, nan_policy = nan_policy)
return {"r-value": results[0], "p-value": results[1]}
def point_biserial(self, x,y):
results = scipy.stats.pointbiserialr(x, y)
return {"r-value": results[0], "p-value": results[1]}
def kendall(self, x, y, initial_lexsort = None, nan_policy = 'propagate', method = 'auto'):
results = scipy.stats.kendalltau(x, y, initial_lexsort = initial_lexsort, nan_policy = nan_policy, method = method)
return {"tau": results[0], "p-value": results[1]}
def kendall_weighted(self, x, y, rank = True, weigher = None, additive = True):
results = scipy.stats.weightedtau(x, y, rank = rank, weigher = weigher, additive = additive)
return {"tau": results[0], "p-value": results[1]}
def mgc(self, x, y, compute_distance = None, reps = 1000, workers = 1, is_twosamp = False, random_state = None):
results = scipy.stats.multiscale_graphcorr(x, y, compute_distance = compute_distance, reps = reps, workers = workers, is_twosamp = is_twosamp, random_state = random_state)
return {"k-value": results[0], "p-value": results[1], "data": results[2]} # unsure if MGC test returns a k value
class StatisticalTests:
def ttest_onesample(self, a, popmean, axis = 0, nan_policy = 'propagate'):
results = scipy.stats.ttest_1samp(a, popmean, axis = axis, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]}
def ttest_independent(self, a, b, equal = True, nan_policy = 'propagate'):
results = scipy.stats.ttest_ind(a, b, equal_var = equal, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]}
def ttest_statistic(self, o1, o2, equal = True):
results = scipy.stats.ttest_ind_from_stats(o1["mean"], o1["std"], o1["nobs"], o2["mean"], o2["std"], o2["nobs"], equal_var = equal)
return {"t-value": results[0], "p-value": results[1]}
def ttest_related(self, a, b, axis = 0, nan_policy='propagate'):
results = scipy.stats.ttest_rel(a, b, axis = axis, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]}
def ks_fitness(self, rvs, cdf, args = (), N = 20, alternative = 'two-sided', mode = 'approx'):
results = scipy.stats.kstest(rvs, cdf, args = args, N = N, alternative = alternative, mode = mode)
return {"ks-value": results[0], "p-value": results[1]}
def chisquare(self, f_obs, f_exp = None, ddof = None, axis = 0):
results = scipy.stats.chisquare(f_obs, f_exp = f_exp, ddof = ddof, axis = axis)
return {"chisquared-value": results[0], "p-value": results[1]}
def powerdivergence(self, f_obs, f_exp = None, ddof = None, axis = 0, lambda_ = None):
results = scipy.stats.power_divergence(f_obs, f_exp = f_exp, ddof = ddof, axis = axis, lambda_ = lambda_)
return {"powerdivergence-value": results[0], "p-value": results[1]}
def ks_twosample(self, x, y, alternative = 'two_sided', mode = 'auto'):
results = scipy.stats.ks_2samp(x, y, alternative = alternative, mode = mode)
return {"ks-value": results[0], "p-value": results[1]}
def es_twosample(self, x, y, t = (0.4, 0.8)):
results = scipy.stats.epps_singleton_2samp(x, y, t = t)
return {"es-value": results[0], "p-value": results[1]}
def mw_rank(self, x, y, use_continuity = True, alternative = None):
results = scipy.stats.mannwhitneyu(x, y, use_continuity = use_continuity, alternative = alternative)
return {"u-value": results[0], "p-value": results[1]}
def mw_tiecorrection(self, rank_values):
results = scipy.stats.tiecorrect(rank_values)
return {"correction-factor": results}
def rankdata(self, a, method = 'average'):
results = scipy.stats.rankdata(a, method = method)
return results
def wilcoxon_ranksum(self, a, b): # this seems to be superceded by Mann Whitney Wilcoxon U Test
results = scipy.stats.ranksums(a, b)
return {"u-value": results[0], "p-value": results[1]}
def wilcoxon_signedrank(self, x, y = None, zero_method = 'wilcox', correction = False, alternative = 'two-sided'):
results = scipy.stats.wilcoxon(x, y = y, zero_method = zero_method, correction = correction, alternative = alternative)
return {"t-value": results[0], "p-value": results[1]}
def kw_htest(self, *args, nan_policy = 'propagate'):
results = scipy.stats.kruskal(*args, nan_policy = nan_policy)
return {"h-value": results[0], "p-value": results[1]}
def friedman_chisquare(self, *args):
results = scipy.stats.friedmanchisquare(*args)
return {"chisquared-value": results[0], "p-value": results[1]}
def bm_wtest(self, x, y, alternative = 'two-sided', distribution = 't', nan_policy = 'propagate'):
results = scipy.stats.brunnermunzel(x, y, alternative = alternative, distribution = distribution, nan_policy = nan_policy)
return {"w-value": results[0], "p-value": results[1]}
def combine_pvalues(self, pvalues, method = 'fisher', weights = None):
results = scipy.stats.combine_pvalues(pvalues, method = method, weights = weights)
return {"combined-statistic": results[0], "p-value": results[1]}
def jb_fitness(self, x):
results = scipy.stats.jarque_bera(x)
return {"jb-value": results[0], "p-value": results[1]}
def ab_equality(self, x, y):
results = scipy.stats.ansari(x, y)
return {"ab-value": results[0], "p-value": results[1]}
def bartlett_variance(self, *args):
results = scipy.stats.bartlett(*args)
return {"t-value": results[0], "p-value": results[1]}
def levene_variance(self, *args, center = 'median', proportiontocut = 0.05):
results = scipy.stats.levene(*args, center = center, proportiontocut = proportiontocut)
return {"w-value": results[0], "p-value": results[1]}
def sw_normality(self, x):
results = scipy.stats.shapiro(x)
return {"w-value": results[0], "p-value": results[1]}
def shapiro(self, x):
return "destroyed by facts and logic"
def ad_onesample(self, x, dist = 'norm'):
results = scipy.stats.anderson(x, dist = dist)
return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]}
def ad_ksample(self, samples, midrank = True):
results = scipy.stats.anderson_ksamp(samples, midrank = midrank)
return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]}
def binomial(self, x, n = None, p = 0.5, alternative = 'two-sided'):
results = scipy.stats.binom_test(x, n = n, p = p, alternative = alternative)
return {"p-value": results}
def fk_variance(self, *args, center = 'median', proportiontocut = 0.05):
results = scipy.stats.fligner(*args, center = center, proportiontocut = proportiontocut)
return {"h-value": results[0], "p-value": results[1]} # unknown if the statistic is an h value
def mood_mediantest(self, *args, ties = 'below', correction = True, lambda_ = 1, nan_policy = 'propagate'):
results = scipy.stats.median_test(*args, ties = ties, correction = correction, lambda_ = lambda_, nan_policy = nan_policy)
return {"chisquared-value": results[0], "p-value": results[1], "m-value": results[2], "table": results[3]}
def mood_equalscale(self, x, y, axis = 0):
results = scipy.stats.mood(x, y, axis = axis)
return {"z-score": results[0], "p-value": results[1]}
def skewtest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.skewtest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]}
def kurtosistest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.kurtosistest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]}
def normaltest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.normaltest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]}

View File

@ -0,0 +1,99 @@
import math
class Glicko2:
_tau = 0.5
def getRating(self):
return (self.__rating * 173.7178) + 1500
def setRating(self, rating):
self.__rating = (rating - 1500) / 173.7178
rating = property(getRating, setRating)
def getRd(self):
return self.__rd * 173.7178
def setRd(self, rd):
self.__rd = rd / 173.7178
rd = property(getRd, setRd)
def __init__(self, rating = 1500, rd = 350, vol = 0.06):
self.setRating(rating)
self.setRd(rd)
self.vol = vol
def _preRatingRD(self):
self.__rd = math.sqrt(math.pow(self.__rd, 2) + math.pow(self.vol, 2))
def update_player(self, rating_list, RD_list, outcome_list):
rating_list = [(x - 1500) / 173.7178 for x in rating_list]
RD_list = [x / 173.7178 for x in RD_list]
v = self._v(rating_list, RD_list)
self.vol = self._newVol(rating_list, RD_list, outcome_list, v)
self._preRatingRD()
self.__rd = 1 / math.sqrt((1 / math.pow(self.__rd, 2)) + (1 / v))
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * \
(outcome_list[i] - self._E(rating_list[i], RD_list[i]))
self.__rating += math.pow(self.__rd, 2) * tempSum
def _newVol(self, rating_list, RD_list, outcome_list, v):
i = 0
delta = self._delta(rating_list, RD_list, outcome_list, v)
a = math.log(math.pow(self.vol, 2))
tau = self._tau
x0 = a
x1 = 0
while x0 != x1:
# New iteration, so x(i) becomes x(i-1)
x0 = x1
d = math.pow(self.__rating, 2) + v + math.exp(x0)
h1 = -(x0 - a) / math.pow(tau, 2) - 0.5 * math.exp(x0) \
/ d + 0.5 * math.exp(x0) * math.pow(delta / d, 2)
h2 = -1 / math.pow(tau, 2) - 0.5 * math.exp(x0) * \
(math.pow(self.__rating, 2) + v) \
/ math.pow(d, 2) + 0.5 * math.pow(delta, 2) * math.exp(x0) \
* (math.pow(self.__rating, 2) + v - math.exp(x0)) / math.pow(d, 3)
x1 = x0 - (h1 / h2)
return math.exp(x1 / 2)
def _delta(self, rating_list, RD_list, outcome_list, v):
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * (outcome_list[i] - self._E(rating_list[i], RD_list[i]))
return v * tempSum
def _v(self, rating_list, RD_list):
tempSum = 0
for i in range(len(rating_list)):
tempE = self._E(rating_list[i], RD_list[i])
tempSum += math.pow(self._g(RD_list[i]), 2) * tempE * (1 - tempE)
return 1 / tempSum
def _E(self, p2rating, p2RD):
return 1 / (1 + math.exp(-1 * self._g(p2RD) * \
(self.__rating - p2rating)))
def _g(self, RD):
return 1 / math.sqrt(1 + 3 * math.pow(RD, 2) / math.pow(math.pi, 2))
def did_not_compete(self):
self._preRatingRD()

View File

@ -0,0 +1,7 @@
import numpy as np
def calculate(starting_score, opposing_score, observed, N, K):
expected = 1/(1+10**((np.array(opposing_score) - starting_score)/N))
return starting_score + K*(np.sum(observed) - np.sum(expected))

View File

@ -0,0 +1,99 @@
import math
class Glicko2:
_tau = 0.5
def getRating(self):
return (self.__rating * 173.7178) + 1500
def setRating(self, rating):
self.__rating = (rating - 1500) / 173.7178
rating = property(getRating, setRating)
def getRd(self):
return self.__rd * 173.7178
def setRd(self, rd):
self.__rd = rd / 173.7178
rd = property(getRd, setRd)
def __init__(self, rating = 1500, rd = 350, vol = 0.06):
self.setRating(rating)
self.setRd(rd)
self.vol = vol
def _preRatingRD(self):
self.__rd = math.sqrt(math.pow(self.__rd, 2) + math.pow(self.vol, 2))
def update_player(self, rating_list, RD_list, outcome_list):
rating_list = [(x - 1500) / 173.7178 for x in rating_list]
RD_list = [x / 173.7178 for x in RD_list]
v = self._v(rating_list, RD_list)
self.vol = self._newVol(rating_list, RD_list, outcome_list, v)
self._preRatingRD()
self.__rd = 1 / math.sqrt((1 / math.pow(self.__rd, 2)) + (1 / v))
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * \
(outcome_list[i] - self._E(rating_list[i], RD_list[i]))
self.__rating += math.pow(self.__rd, 2) * tempSum
def _newVol(self, rating_list, RD_list, outcome_list, v):
i = 0
delta = self._delta(rating_list, RD_list, outcome_list, v)
a = math.log(math.pow(self.vol, 2))
tau = self._tau
x0 = a
x1 = 0
while x0 != x1:
# New iteration, so x(i) becomes x(i-1)
x0 = x1
d = math.pow(self.__rating, 2) + v + math.exp(x0)
h1 = -(x0 - a) / math.pow(tau, 2) - 0.5 * math.exp(x0) \
/ d + 0.5 * math.exp(x0) * math.pow(delta / d, 2)
h2 = -1 / math.pow(tau, 2) - 0.5 * math.exp(x0) * \
(math.pow(self.__rating, 2) + v) \
/ math.pow(d, 2) + 0.5 * math.pow(delta, 2) * math.exp(x0) \
* (math.pow(self.__rating, 2) + v - math.exp(x0)) / math.pow(d, 3)
x1 = x0 - (h1 / h2)
return math.exp(x1 / 2)
def _delta(self, rating_list, RD_list, outcome_list, v):
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * (outcome_list[i] - self._E(rating_list[i], RD_list[i]))
return v * tempSum
def _v(self, rating_list, RD_list):
tempSum = 0
for i in range(len(rating_list)):
tempE = self._E(rating_list[i], RD_list[i])
tempSum += math.pow(self._g(RD_list[i]), 2) * tempE * (1 - tempE)
return 1 / tempSum
def _E(self, p2rating, p2RD):
return 1 / (1 + math.exp(-1 * self._g(p2RD) * \
(self.__rating - p2rating)))
def _g(self, RD):
return 1 / math.sqrt(1 + 3 * math.pow(RD, 2) / math.pow(math.pi, 2))
def did_not_compete(self):
self._preRatingRD()

View File

@ -0,0 +1,907 @@
from __future__ import absolute_import
from itertools import chain
import math
from six import iteritems
from six.moves import map, range, zip
from six import iterkeys
import copy
try:
from numbers import Number
except ImportError:
Number = (int, long, float, complex)
inf = float('inf')
class Gaussian(object):
#: Precision, the inverse of the variance.
pi = 0
#: Precision adjusted mean, the precision multiplied by the mean.
tau = 0
def __init__(self, mu=None, sigma=None, pi=0, tau=0):
if mu is not None:
if sigma is None:
raise TypeError('sigma argument is needed')
elif sigma == 0:
raise ValueError('sigma**2 should be greater than 0')
pi = sigma ** -2
tau = pi * mu
self.pi = pi
self.tau = tau
@property
def mu(self):
return self.pi and self.tau / self.pi
@property
def sigma(self):
return math.sqrt(1 / self.pi) if self.pi else inf
def __mul__(self, other):
pi, tau = self.pi + other.pi, self.tau + other.tau
return Gaussian(pi=pi, tau=tau)
def __truediv__(self, other):
pi, tau = self.pi - other.pi, self.tau - other.tau
return Gaussian(pi=pi, tau=tau)
__div__ = __truediv__ # for Python 2
def __eq__(self, other):
return self.pi == other.pi and self.tau == other.tau
def __lt__(self, other):
return self.mu < other.mu
def __le__(self, other):
return self.mu <= other.mu
def __gt__(self, other):
return self.mu > other.mu
def __ge__(self, other):
return self.mu >= other.mu
def __repr__(self):
return 'N(mu={:.3f}, sigma={:.3f})'.format(self.mu, self.sigma)
def _repr_latex_(self):
latex = r'\mathcal{{ N }}( {:.3f}, {:.3f}^2 )'.format(self.mu, self.sigma)
return '$%s$' % latex
class Matrix(list):
def __init__(self, src, height=None, width=None):
if callable(src):
f, src = src, {}
size = [height, width]
if not height:
def set_height(height):
size[0] = height
size[0] = set_height
if not width:
def set_width(width):
size[1] = width
size[1] = set_width
try:
for (r, c), val in f(*size):
src[r, c] = val
except TypeError:
raise TypeError('A callable src must return an interable '
'which generates a tuple containing '
'coordinate and value')
height, width = tuple(size)
if height is None or width is None:
raise TypeError('A callable src must call set_height and '
'set_width if the size is non-deterministic')
if isinstance(src, list):
is_number = lambda x: isinstance(x, Number)
unique_col_sizes = set(map(len, src))
everything_are_number = filter(is_number, sum(src, []))
if len(unique_col_sizes) != 1 or not everything_are_number:
raise ValueError('src must be a rectangular array of numbers')
two_dimensional_array = src
elif isinstance(src, dict):
if not height or not width:
w = h = 0
for r, c in iterkeys(src):
if not height:
h = max(h, r + 1)
if not width:
w = max(w, c + 1)
if not height:
height = h
if not width:
width = w
two_dimensional_array = []
for r in range(height):
row = []
two_dimensional_array.append(row)
for c in range(width):
row.append(src.get((r, c), 0))
else:
raise TypeError('src must be a list or dict or callable')
super(Matrix, self).__init__(two_dimensional_array)
@property
def height(self):
return len(self)
@property
def width(self):
return len(self[0])
def transpose(self):
height, width = self.height, self.width
src = {}
for c in range(width):
for r in range(height):
src[c, r] = self[r][c]
return type(self)(src, height=width, width=height)
def minor(self, row_n, col_n):
height, width = self.height, self.width
if not (0 <= row_n < height):
raise ValueError('row_n should be between 0 and %d' % height)
elif not (0 <= col_n < width):
raise ValueError('col_n should be between 0 and %d' % width)
two_dimensional_array = []
for r in range(height):
if r == row_n:
continue
row = []
two_dimensional_array.append(row)
for c in range(width):
if c == col_n:
continue
row.append(self[r][c])
return type(self)(two_dimensional_array)
def determinant(self):
height, width = self.height, self.width
if height != width:
raise ValueError('Only square matrix can calculate a determinant')
tmp, rv = copy.deepcopy(self), 1.
for c in range(width - 1, 0, -1):
pivot, r = max((abs(tmp[r][c]), r) for r in range(c + 1))
pivot = tmp[r][c]
if not pivot:
return 0.
tmp[r], tmp[c] = tmp[c], tmp[r]
if r != c:
rv = -rv
rv *= pivot
fact = -1. / pivot
for r in range(c):
f = fact * tmp[r][c]
for x in range(c):
tmp[r][x] += f * tmp[c][x]
return rv * tmp[0][0]
def adjugate(self):
height, width = self.height, self.width
if height != width:
raise ValueError('Only square matrix can be adjugated')
if height == 2:
a, b = self[0][0], self[0][1]
c, d = self[1][0], self[1][1]
return type(self)([[d, -b], [-c, a]])
src = {}
for r in range(height):
for c in range(width):
sign = -1 if (r + c) % 2 else 1
src[r, c] = self.minor(r, c).determinant() * sign
return type(self)(src, height, width)
def inverse(self):
if self.height == self.width == 1:
return type(self)([[1. / self[0][0]]])
return (1. / self.determinant()) * self.adjugate()
def __add__(self, other):
height, width = self.height, self.width
if (height, width) != (other.height, other.width):
raise ValueError('Must be same size')
src = {}
for r in range(height):
for c in range(width):
src[r, c] = self[r][c] + other[r][c]
return type(self)(src, height, width)
def __mul__(self, other):
if self.width != other.height:
raise ValueError('Bad size')
height, width = self.height, other.width
src = {}
for r in range(height):
for c in range(width):
src[r, c] = sum(self[r][x] * other[x][c]
for x in range(self.width))
return type(self)(src, height, width)
def __rmul__(self, other):
if not isinstance(other, Number):
raise TypeError('The operand should be a number')
height, width = self.height, self.width
src = {}
for r in range(height):
for c in range(width):
src[r, c] = other * self[r][c]
return type(self)(src, height, width)
def __repr__(self):
return '{}({})'.format(type(self).__name__, super(Matrix, self).__repr__())
def _repr_latex_(self):
rows = [' && '.join(['%.3f' % cell for cell in row]) for row in self]
latex = r'\begin{matrix} %s \end{matrix}' % r'\\'.join(rows)
return '$%s$' % latex
def _gen_erfcinv(erfc, math=math):
def erfcinv(y):
"""The inverse function of erfc."""
if y >= 2:
return -100.
elif y <= 0:
return 100.
zero_point = y < 1
if not zero_point:
y = 2 - y
t = math.sqrt(-2 * math.log(y / 2.))
x = -0.70711 * \
((2.30753 + t * 0.27061) / (1. + t * (0.99229 + t * 0.04481)) - t)
for i in range(2):
err = erfc(x) - y
x += err / (1.12837916709551257 * math.exp(-(x ** 2)) - x * err)
return x if zero_point else -x
return erfcinv
def _gen_ppf(erfc, math=math):
erfcinv = _gen_erfcinv(erfc, math)
def ppf(x, mu=0, sigma=1):
return mu - sigma * math.sqrt(2) * erfcinv(2 * x)
return ppf
def erfc(x):
z = abs(x)
t = 1. / (1. + z / 2.)
r = t * math.exp(-z * z - 1.26551223 + t * (1.00002368 + t * (
0.37409196 + t * (0.09678418 + t * (-0.18628806 + t * (
0.27886807 + t * (-1.13520398 + t * (1.48851587 + t * (
-0.82215223 + t * 0.17087277
)))
)))
)))
return 2. - r if x < 0 else r
def cdf(x, mu=0, sigma=1):
return 0.5 * erfc(-(x - mu) / (sigma * math.sqrt(2)))
def pdf(x, mu=0, sigma=1):
return (1 / math.sqrt(2 * math.pi) * abs(sigma) *
math.exp(-(((x - mu) / abs(sigma)) ** 2 / 2)))
ppf = _gen_ppf(erfc)
def choose_backend(backend):
if backend is None: # fallback
return cdf, pdf, ppf
elif backend == 'mpmath':
try:
import mpmath
except ImportError:
raise ImportError('Install "mpmath" to use this backend')
return mpmath.ncdf, mpmath.npdf, _gen_ppf(mpmath.erfc, math=mpmath)
elif backend == 'scipy':
try:
from scipy.stats import norm
except ImportError:
raise ImportError('Install "scipy" to use this backend')
return norm.cdf, norm.pdf, norm.ppf
raise ValueError('%r backend is not defined' % backend)
def available_backends():
backends = [None]
for backend in ['mpmath', 'scipy']:
try:
__import__(backend)
except ImportError:
continue
backends.append(backend)
return backends
class Node(object):
pass
class Variable(Node, Gaussian):
def __init__(self):
self.messages = {}
super(Variable, self).__init__()
def set(self, val):
delta = self.delta(val)
self.pi, self.tau = val.pi, val.tau
return delta
def delta(self, other):
pi_delta = abs(self.pi - other.pi)
if pi_delta == inf:
return 0.
return max(abs(self.tau - other.tau), math.sqrt(pi_delta))
def update_message(self, factor, pi=0, tau=0, message=None):
message = message or Gaussian(pi=pi, tau=tau)
old_message, self[factor] = self[factor], message
return self.set(self / old_message * message)
def update_value(self, factor, pi=0, tau=0, value=None):
value = value or Gaussian(pi=pi, tau=tau)
old_message = self[factor]
self[factor] = value * old_message / self
return self.set(value)
def __getitem__(self, factor):
return self.messages[factor]
def __setitem__(self, factor, message):
self.messages[factor] = message
def __repr__(self):
args = (type(self).__name__, super(Variable, self).__repr__(),
len(self.messages), '' if len(self.messages) == 1 else 's')
return '<%s %s with %d connection%s>' % args
class Factor(Node):
def __init__(self, variables):
self.vars = variables
for var in variables:
var[self] = Gaussian()
def down(self):
return 0
def up(self):
return 0
@property
def var(self):
assert len(self.vars) == 1
return self.vars[0]
def __repr__(self):
args = (type(self).__name__, len(self.vars),
'' if len(self.vars) == 1 else 's')
return '<%s with %d connection%s>' % args
class PriorFactor(Factor):
def __init__(self, var, val, dynamic=0):
super(PriorFactor, self).__init__([var])
self.val = val
self.dynamic = dynamic
def down(self):
sigma = math.sqrt(self.val.sigma ** 2 + self.dynamic ** 2)
value = Gaussian(self.val.mu, sigma)
return self.var.update_value(self, value=value)
class LikelihoodFactor(Factor):
def __init__(self, mean_var, value_var, variance):
super(LikelihoodFactor, self).__init__([mean_var, value_var])
self.mean = mean_var
self.value = value_var
self.variance = variance
def calc_a(self, var):
return 1. / (1. + self.variance * var.pi)
def down(self):
# update value.
msg = self.mean / self.mean[self]
a = self.calc_a(msg)
return self.value.update_message(self, a * msg.pi, a * msg.tau)
def up(self):
# update mean.
msg = self.value / self.value[self]
a = self.calc_a(msg)
return self.mean.update_message(self, a * msg.pi, a * msg.tau)
class SumFactor(Factor):
def __init__(self, sum_var, term_vars, coeffs):
super(SumFactor, self).__init__([sum_var] + term_vars)
self.sum = sum_var
self.terms = term_vars
self.coeffs = coeffs
def down(self):
vals = self.terms
msgs = [var[self] for var in vals]
return self.update(self.sum, vals, msgs, self.coeffs)
def up(self, index=0):
coeff = self.coeffs[index]
coeffs = []
for x, c in enumerate(self.coeffs):
try:
if x == index:
coeffs.append(1. / coeff)
else:
coeffs.append(-c / coeff)
except ZeroDivisionError:
coeffs.append(0.)
vals = self.terms[:]
vals[index] = self.sum
msgs = [var[self] for var in vals]
return self.update(self.terms[index], vals, msgs, coeffs)
def update(self, var, vals, msgs, coeffs):
pi_inv = 0
mu = 0
for val, msg, coeff in zip(vals, msgs, coeffs):
div = val / msg
mu += coeff * div.mu
if pi_inv == inf:
continue
try:
# numpy.float64 handles floating-point error by different way.
# For example, it can just warn RuntimeWarning on n/0 problem
# instead of throwing ZeroDivisionError. So div.pi, the
# denominator has to be a built-in float.
pi_inv += coeff ** 2 / float(div.pi)
except ZeroDivisionError:
pi_inv = inf
pi = 1. / pi_inv
tau = pi * mu
return var.update_message(self, pi, tau)
class TruncateFactor(Factor):
def __init__(self, var, v_func, w_func, draw_margin):
super(TruncateFactor, self).__init__([var])
self.v_func = v_func
self.w_func = w_func
self.draw_margin = draw_margin
def up(self):
val = self.var
msg = self.var[self]
div = val / msg
sqrt_pi = math.sqrt(div.pi)
args = (div.tau / sqrt_pi, self.draw_margin * sqrt_pi)
v = self.v_func(*args)
w = self.w_func(*args)
denom = (1. - w)
pi, tau = div.pi / denom, (div.tau + sqrt_pi * v) / denom
return val.update_value(self, pi, tau)
#: Default initial mean of ratings.
MU = 25.
#: Default initial standard deviation of ratings.
SIGMA = MU / 3
#: Default distance that guarantees about 76% chance of winning.
BETA = SIGMA / 2
#: Default dynamic factor.
TAU = SIGMA / 100
#: Default draw probability of the game.
DRAW_PROBABILITY = .10
#: A basis to check reliability of the result.
DELTA = 0.0001
def calc_draw_probability(draw_margin, size, env=None):
if env is None:
env = global_env()
return 2 * env.cdf(draw_margin / (math.sqrt(size) * env.beta)) - 1
def calc_draw_margin(draw_probability, size, env=None):
if env is None:
env = global_env()
return env.ppf((draw_probability + 1) / 2.) * math.sqrt(size) * env.beta
def _team_sizes(rating_groups):
team_sizes = [0]
for group in rating_groups:
team_sizes.append(len(group) + team_sizes[-1])
del team_sizes[0]
return team_sizes
def _floating_point_error(env):
if env.backend == 'mpmath':
msg = 'Set "mpmath.mp.dps" to higher'
else:
msg = 'Cannot calculate correctly, set backend to "mpmath"'
return FloatingPointError(msg)
class Rating(Gaussian):
def __init__(self, mu=None, sigma=None):
if isinstance(mu, tuple):
mu, sigma = mu
elif isinstance(mu, Gaussian):
mu, sigma = mu.mu, mu.sigma
if mu is None:
mu = global_env().mu
if sigma is None:
sigma = global_env().sigma
super(Rating, self).__init__(mu, sigma)
def __int__(self):
return int(self.mu)
def __long__(self):
return long(self.mu)
def __float__(self):
return float(self.mu)
def __iter__(self):
return iter((self.mu, self.sigma))
def __repr__(self):
c = type(self)
args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma)
return '%s(mu=%.3f, sigma=%.3f)' % args
class TrueSkill(object):
def __init__(self, mu=MU, sigma=SIGMA, beta=BETA, tau=TAU,
draw_probability=DRAW_PROBABILITY, backend=None):
self.mu = mu
self.sigma = sigma
self.beta = beta
self.tau = tau
self.draw_probability = draw_probability
self.backend = backend
if isinstance(backend, tuple):
self.cdf, self.pdf, self.ppf = backend
else:
self.cdf, self.pdf, self.ppf = choose_backend(backend)
def create_rating(self, mu=None, sigma=None):
if mu is None:
mu = self.mu
if sigma is None:
sigma = self.sigma
return Rating(mu, sigma)
def v_win(self, diff, draw_margin):
x = diff - draw_margin
denom = self.cdf(x)
return (self.pdf(x) / denom) if denom else -x
def v_draw(self, diff, draw_margin):
abs_diff = abs(diff)
a, b = draw_margin - abs_diff, -draw_margin - abs_diff
denom = self.cdf(a) - self.cdf(b)
numer = self.pdf(b) - self.pdf(a)
return ((numer / denom) if denom else a) * (-1 if diff < 0 else +1)
def w_win(self, diff, draw_margin):
x = diff - draw_margin
v = self.v_win(diff, draw_margin)
w = v * (v + x)
if 0 < w < 1:
return w
raise _floating_point_error(self)
def w_draw(self, diff, draw_margin):
abs_diff = abs(diff)
a, b = draw_margin - abs_diff, -draw_margin - abs_diff
denom = self.cdf(a) - self.cdf(b)
if not denom:
raise _floating_point_error(self)
v = self.v_draw(abs_diff, draw_margin)
return (v ** 2) + (a * self.pdf(a) - b * self.pdf(b)) / denom
def validate_rating_groups(self, rating_groups):
# check group sizes
if len(rating_groups) < 2:
raise ValueError('Need multiple rating groups')
elif not all(rating_groups):
raise ValueError('Each group must contain multiple ratings')
# check group types
group_types = set(map(type, rating_groups))
if len(group_types) != 1:
raise TypeError('All groups should be same type')
elif group_types.pop() is Rating:
raise TypeError('Rating cannot be a rating group')
# normalize rating_groups
if isinstance(rating_groups[0], dict):
dict_rating_groups = rating_groups
rating_groups = []
keys = []
for dict_rating_group in dict_rating_groups:
rating_group, key_group = [], []
for key, rating in iteritems(dict_rating_group):
rating_group.append(rating)
key_group.append(key)
rating_groups.append(tuple(rating_group))
keys.append(tuple(key_group))
else:
rating_groups = list(rating_groups)
keys = None
return rating_groups, keys
def validate_weights(self, weights, rating_groups, keys=None):
if weights is None:
weights = [(1,) * len(g) for g in rating_groups]
elif isinstance(weights, dict):
weights_dict, weights = weights, []
for x, group in enumerate(rating_groups):
w = []
weights.append(w)
for y, rating in enumerate(group):
if keys is not None:
y = keys[x][y]
w.append(weights_dict.get((x, y), 1))
return weights
def factor_graph_builders(self, rating_groups, ranks, weights):
flatten_ratings = sum(map(tuple, rating_groups), ())
flatten_weights = sum(map(tuple, weights), ())
size = len(flatten_ratings)
group_size = len(rating_groups)
# create variables
rating_vars = [Variable() for x in range(size)]
perf_vars = [Variable() for x in range(size)]
team_perf_vars = [Variable() for x in range(group_size)]
team_diff_vars = [Variable() for x in range(group_size - 1)]
team_sizes = _team_sizes(rating_groups)
# layer builders
def build_rating_layer():
for rating_var, rating in zip(rating_vars, flatten_ratings):
yield PriorFactor(rating_var, rating, self.tau)
def build_perf_layer():
for rating_var, perf_var in zip(rating_vars, perf_vars):
yield LikelihoodFactor(rating_var, perf_var, self.beta ** 2)
def build_team_perf_layer():
for team, team_perf_var in enumerate(team_perf_vars):
if team > 0:
start = team_sizes[team - 1]
else:
start = 0
end = team_sizes[team]
child_perf_vars = perf_vars[start:end]
coeffs = flatten_weights[start:end]
yield SumFactor(team_perf_var, child_perf_vars, coeffs)
def build_team_diff_layer():
for team, team_diff_var in enumerate(team_diff_vars):
yield SumFactor(team_diff_var,
team_perf_vars[team:team + 2], [+1, -1])
def build_trunc_layer():
for x, team_diff_var in enumerate(team_diff_vars):
if callable(self.draw_probability):
# dynamic draw probability
team_perf1, team_perf2 = team_perf_vars[x:x + 2]
args = (Rating(team_perf1), Rating(team_perf2), self)
draw_probability = self.draw_probability(*args)
else:
# static draw probability
draw_probability = self.draw_probability
size = sum(map(len, rating_groups[x:x + 2]))
draw_margin = calc_draw_margin(draw_probability, size, self)
if ranks[x] == ranks[x + 1]: # is a tie?
v_func, w_func = self.v_draw, self.w_draw
else:
v_func, w_func = self.v_win, self.w_win
yield TruncateFactor(team_diff_var,
v_func, w_func, draw_margin)
# build layers
return (build_rating_layer, build_perf_layer, build_team_perf_layer,
build_team_diff_layer, build_trunc_layer)
def run_schedule(self, build_rating_layer, build_perf_layer,
build_team_perf_layer, build_team_diff_layer,
build_trunc_layer, min_delta=DELTA):
if min_delta <= 0:
raise ValueError('min_delta must be greater than 0')
layers = []
def build(builders):
layers_built = [list(build()) for build in builders]
layers.extend(layers_built)
return layers_built
# gray arrows
layers_built = build([build_rating_layer,
build_perf_layer,
build_team_perf_layer])
rating_layer, perf_layer, team_perf_layer = layers_built
for f in chain(*layers_built):
f.down()
# arrow #1, #2, #3
team_diff_layer, trunc_layer = build([build_team_diff_layer,
build_trunc_layer])
team_diff_len = len(team_diff_layer)
for x in range(10):
if team_diff_len == 1:
# only two teams
team_diff_layer[0].down()
delta = trunc_layer[0].up()
else:
# multiple teams
delta = 0
for x in range(team_diff_len - 1):
team_diff_layer[x].down()
delta = max(delta, trunc_layer[x].up())
team_diff_layer[x].up(1) # up to right variable
for x in range(team_diff_len - 1, 0, -1):
team_diff_layer[x].down()
delta = max(delta, trunc_layer[x].up())
team_diff_layer[x].up(0) # up to left variable
# repeat until to small update
if delta <= min_delta:
break
# up both ends
team_diff_layer[0].up(0)
team_diff_layer[team_diff_len - 1].up(1)
# up the remainder of the black arrows
for f in team_perf_layer:
for x in range(len(f.vars) - 1):
f.up(x)
for f in perf_layer:
f.up()
return layers
def rate(self, rating_groups, ranks=None, weights=None, min_delta=DELTA):
rating_groups, keys = self.validate_rating_groups(rating_groups)
weights = self.validate_weights(weights, rating_groups, keys)
group_size = len(rating_groups)
if ranks is None:
ranks = range(group_size)
elif len(ranks) != group_size:
raise ValueError('Wrong ranks')
# sort rating groups by rank
by_rank = lambda x: x[1][1]
sorting = sorted(enumerate(zip(rating_groups, ranks, weights)),
key=by_rank)
sorted_rating_groups, sorted_ranks, sorted_weights = [], [], []
for x, (g, r, w) in sorting:
sorted_rating_groups.append(g)
sorted_ranks.append(r)
# make weights to be greater than 0
sorted_weights.append(max(min_delta, w_) for w_ in w)
# build factor graph
args = (sorted_rating_groups, sorted_ranks, sorted_weights)
builders = self.factor_graph_builders(*args)
args = builders + (min_delta,)
layers = self.run_schedule(*args)
# make result
rating_layer, team_sizes = layers[0], _team_sizes(sorted_rating_groups)
transformed_groups = []
for start, end in zip([0] + team_sizes[:-1], team_sizes):
group = []
for f in rating_layer[start:end]:
group.append(Rating(float(f.var.mu), float(f.var.sigma)))
transformed_groups.append(tuple(group))
by_hint = lambda x: x[0]
unsorting = sorted(zip((x for x, __ in sorting), transformed_groups),
key=by_hint)
if keys is None:
return [g for x, g in unsorting]
# restore the structure with input dictionary keys
return [dict(zip(keys[x], g)) for x, g in unsorting]
def quality(self, rating_groups, weights=None):
rating_groups, keys = self.validate_rating_groups(rating_groups)
weights = self.validate_weights(weights, rating_groups, keys)
flatten_ratings = sum(map(tuple, rating_groups), ())
flatten_weights = sum(map(tuple, weights), ())
length = len(flatten_ratings)
# a vector of all of the skill means
mean_matrix = Matrix([[r.mu] for r in flatten_ratings])
# a matrix whose diagonal values are the variances (sigma ** 2) of each
# of the players.
def variance_matrix(height, width):
variances = (r.sigma ** 2 for r in flatten_ratings)
for x, variance in enumerate(variances):
yield (x, x), variance
variance_matrix = Matrix(variance_matrix, length, length)
# the player-team assignment and comparison matrix
def rotated_a_matrix(set_height, set_width):
t = 0
for r, (cur, _next) in enumerate(zip(rating_groups[:-1],
rating_groups[1:])):
for x in range(t, t + len(cur)):
yield (r, x), flatten_weights[x]
t += 1
x += 1
for x in range(x, x + len(_next)):
yield (r, x), -flatten_weights[x]
set_height(r + 1)
set_width(x + 1)
rotated_a_matrix = Matrix(rotated_a_matrix)
a_matrix = rotated_a_matrix.transpose()
# match quality further derivation
_ata = (self.beta ** 2) * rotated_a_matrix * a_matrix
_atsa = rotated_a_matrix * variance_matrix * a_matrix
start = mean_matrix.transpose() * a_matrix
middle = _ata + _atsa
end = rotated_a_matrix * mean_matrix
# make result
e_arg = (-0.5 * start * middle.inverse() * end).determinant()
s_arg = _ata.determinant() / middle.determinant()
return math.exp(e_arg) * math.sqrt(s_arg)
def expose(self, rating):
k = self.mu / self.sigma
return rating.mu - k * rating.sigma
def make_as_global(self):
return setup(env=self)
def __repr__(self):
c = type(self)
if callable(self.draw_probability):
f = self.draw_probability
draw_probability = '.'.join([f.__module__, f.__name__])
else:
draw_probability = '%.1f%%' % (self.draw_probability * 100)
if self.backend is None:
backend = ''
elif isinstance(self.backend, tuple):
backend = ', backend=...'
else:
backend = ', backend=%r' % self.backend
args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma,
self.beta, self.tau, draw_probability, backend)
return ('%s(mu=%.3f, sigma=%.3f, beta=%.3f, tau=%.3f, '
'draw_probability=%s%s)' % args)
def rate_1vs1(rating1, rating2, drawn=False, min_delta=DELTA, env=None):
if env is None:
env = global_env()
ranks = [0, 0 if drawn else 1]
teams = env.rate([(rating1,), (rating2,)], ranks, min_delta=min_delta)
return teams[0][0], teams[1][0]
def quality_1vs1(rating1, rating2, env=None):
if env is None:
env = global_env()
return env.quality([(rating1,), (rating2,)])
def global_env():
try:
global_env.__trueskill__
except AttributeError:
# setup the default environment
setup()
return global_env.__trueskill__
def setup(mu=MU, sigma=SIGMA, beta=BETA, tau=TAU,
draw_probability=DRAW_PROBABILITY, backend=None, env=None):
if env is None:
env = TrueSkill(mu, sigma, beta, tau, draw_probability, backend)
global_env.__trueskill__ = env
return env
def rate(rating_groups, ranks=None, weights=None, min_delta=DELTA):
return global_env().rate(rating_groups, ranks, weights, min_delta)
def quality(rating_groups, weights=None):
return global_env().quality(rating_groups, weights)
def expose(rating):
return global_env().expose(rating)

View File

@ -0,0 +1,220 @@
# Titan Robotics Team 2022: CUDA-based Regressions Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this module has been automatically inegrated into analysis.py, and should be callable as a class from the package
# this module is cuda-optimized and vectorized (except for one small part)
# setup:
__version__ = "1.0.0.004"
# changelog should be viewed using print(analysis.regression.__changelog__)
__changelog__ = """
1.0.0.004:
- bug fixes
- fixed changelog
1.0.0.003:
- bug fixes
1.0.0.002:
-Added more parameters to log, exponential, polynomial
-Added SigmoidalRegKernelArthur, because Arthur apparently needs
to train the scaling and shifting of sigmoids
1.0.0.001:
-initial release, with linear, log, exponential, polynomial, and sigmoid kernels
-already vectorized (except for polynomial generation) and CUDA-optimized
"""
__author__ = (
"Jacob Levine <jlevine@imsa.edu>",
"Arthur Lu <learthurgo@gmail.com>"
)
__all__ = [
'factorial',
'take_all_pwrs',
'num_poly_terms',
'set_device',
'LinearRegKernel',
'SigmoidalRegKernel',
'LogRegKernel',
'PolyRegKernel',
'ExpRegKernel',
'SigmoidalRegKernelArthur',
'SGDTrain',
'CustomTrain'
]
import torch
global device
device = "cuda:0" if torch.torch.cuda.is_available() else "cpu"
#todo: document completely
def set_device(self, new_device):
device=new_device
class LinearRegKernel():
parameters= []
weights=None
bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,mtx)+long_bias
class SigmoidalRegKernel():
parameters= []
weights=None
bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def forward(self,mtx):
long_bias=self.bias.repeat([1,mtx.size()[1]])
return self.sigmoid(torch.matmul(self.weights,mtx)+long_bias)
class SigmoidalRegKernelArthur():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
sigmoid=torch.nn.Sigmoid()
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*self.sigmoid(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class LogRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.log(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class ExpRegKernel():
parameters= []
weights=None
in_bias=None
scal_mult=None
out_bias=None
def __init__(self, num_vars):
self.weights=torch.rand(num_vars, requires_grad=True, device=device)
self.in_bias=torch.rand(1, requires_grad=True, device=device)
self.scal_mult=torch.rand(1, requires_grad=True, device=device)
self.out_bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias]
def forward(self,mtx):
long_in_bias=self.in_bias.repeat([1,mtx.size()[1]])
long_out_bias=self.out_bias.repeat([1,mtx.size()[1]])
return (self.scal_mult*torch.exp(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias
class PolyRegKernel():
parameters= []
weights=None
bias=None
power=None
def __init__(self, num_vars, power):
self.power=power
num_terms=self.num_poly_terms(num_vars, power)
self.weights=torch.rand(num_terms, requires_grad=True, device=device)
self.bias=torch.rand(1, requires_grad=True, device=device)
self.parameters=[self.weights,self.bias]
def num_poly_terms(self,num_vars, power):
if power == 0:
return 0
return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1)
def factorial(self,n):
if n==0:
return 1
else:
return n*self.factorial(n-1)
def take_all_pwrs(self, vec, pwr):
#todo: vectorize (kinda)
combins=torch.combinations(vec, r=pwr, with_replacement=True)
out=torch.ones(combins.size()[0]).to(device).to(torch.float)
for i in torch.t(combins).to(device).to(torch.float):
out *= i
if pwr == 1:
return out
else:
return torch.cat((out,self.take_all_pwrs(vec, pwr-1)))
def forward(self,mtx):
#TODO: Vectorize the last part
cols=[]
for i in torch.t(mtx):
cols.append(self.take_all_pwrs(i,self.power))
new_mtx=torch.t(torch.stack(cols))
long_bias=self.bias.repeat([1,mtx.size()[1]])
return torch.matmul(self.weights,new_mtx)+long_bias
def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False):
optim=torch.optim.SGD(kernel.parameters, lr=learning_rate)
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel
def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False):
data_cuda=data.to(device)
ground_cuda=ground.to(device)
if (return_losses):
losses=[]
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data)
ls=loss(pred,ground)
losses.append(ls.item())
ls.backward()
optim.step()
return [kernel,losses]
else:
for i in range(iterations):
with torch.set_grad_enabled(True):
optim.zero_grad()
pred=kernel.forward(data_cuda)
ls=loss(pred,ground_cuda)
ls.backward()
optim.step()
return kernel

View File

@ -0,0 +1,122 @@
# Titan Robotics Team 2022: ML Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this should be imported as a python module using 'import titanlearn'
# this should be included in the local directory or environment variable
# this module is optimized for multhreaded computing
# this module learns from its mistakes far faster than 2022's captains
# setup:
__version__ = "2.0.1.001"
#changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
2.0.1.001:
- removed matplotlib import
- removed graphloss()
2.0.1.000:
- added net, dataset, dataloader, and stdtrain template definitions
- added graphloss function
2.0.0.001:
- added clear functions
2.0.0.000:
- complete rewrite planned
- depreciated 1.0.0.xxx versions
- added simple training loop
1.0.0.xxx:
-added generation of ANNS, basic SGD training
"""
__author__ = (
"Arthur Lu <arthurlu@ttic.edu>,"
"Jacob Levine <jlevine@ttic.edu>,"
)
__all__ = [
'clear',
'net',
'dataset',
'dataloader',
'train',
'stdtrainer',
]
import torch
from os import system, name
import numpy as np
def clear():
if name == 'nt':
_ = system('cls')
else:
_ = system('clear')
class net(torch.nn.Module): #template for standard neural net
def __init__(self):
super(Net, self).__init__()
def forward(self, input):
pass
class dataset(torch.utils.data.Dataset): #template for standard dataset
def __init__(self):
super(torch.utils.data.Dataset).__init__()
def __getitem__(self, index):
pass
def __len__(self):
pass
def dataloader(dataset, batch_size, num_workers, shuffle = True):
return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers)
def train(device, net, epochs, trainloader, optimizer, criterion): #expects standard dataloader, whch returns (inputs, labels)
dataset_len = trainloader.dataset.__len__()
iter_count = 0
running_loss = 0
running_loss_list = []
for epoch in range(epochs): # loop over the dataset multiple times
for i, data in enumerate(trainloader, 0):
inputs = data[0].to(device)
labels = data[1].to(device)
optimizer.zero_grad()
outputs = net(inputs)
loss = criterion(outputs, labels.to(torch.float))
loss.backward()
optimizer.step()
# monitoring steps below
iter_count += 1
running_loss += loss.item()
running_loss_list.append(running_loss)
clear()
print("training on: " + device)
print("iteration: " + str(i) + "/" + str(int(dataset_len / trainloader.batch_size)) + " | " + "epoch: " + str(epoch) + "/" + str(epochs))
print("current batch loss: " + str(loss.item))
print("running loss: " + str(running_loss / iter_count))
return net, running_loss_list
print("finished training")
def stdtrainer(net, criterion, optimizer, dataloader, epochs, batch_size):
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = net.to(device)
criterion = criterion.to(device)
optimizer = optimizer.to(device)
trainloader = dataloader
return train(device, net, epochs, trainloader, optimizer, criterion)

View File

@ -0,0 +1,907 @@
from __future__ import absolute_import
from itertools import chain
import math
from six import iteritems
from six.moves import map, range, zip
from six import iterkeys
import copy
try:
from numbers import Number
except ImportError:
Number = (int, long, float, complex)
inf = float('inf')
class Gaussian(object):
#: Precision, the inverse of the variance.
pi = 0
#: Precision adjusted mean, the precision multiplied by the mean.
tau = 0
def __init__(self, mu=None, sigma=None, pi=0, tau=0):
if mu is not None:
if sigma is None:
raise TypeError('sigma argument is needed')
elif sigma == 0:
raise ValueError('sigma**2 should be greater than 0')
pi = sigma ** -2
tau = pi * mu
self.pi = pi
self.tau = tau
@property
def mu(self):
return self.pi and self.tau / self.pi
@property
def sigma(self):
return math.sqrt(1 / self.pi) if self.pi else inf
def __mul__(self, other):
pi, tau = self.pi + other.pi, self.tau + other.tau
return Gaussian(pi=pi, tau=tau)
def __truediv__(self, other):
pi, tau = self.pi - other.pi, self.tau - other.tau
return Gaussian(pi=pi, tau=tau)
__div__ = __truediv__ # for Python 2
def __eq__(self, other):
return self.pi == other.pi and self.tau == other.tau
def __lt__(self, other):
return self.mu < other.mu
def __le__(self, other):
return self.mu <= other.mu
def __gt__(self, other):
return self.mu > other.mu
def __ge__(self, other):
return self.mu >= other.mu
def __repr__(self):
return 'N(mu={:.3f}, sigma={:.3f})'.format(self.mu, self.sigma)
def _repr_latex_(self):
latex = r'\mathcal{{ N }}( {:.3f}, {:.3f}^2 )'.format(self.mu, self.sigma)
return '$%s$' % latex
class Matrix(list):
def __init__(self, src, height=None, width=None):
if callable(src):
f, src = src, {}
size = [height, width]
if not height:
def set_height(height):
size[0] = height
size[0] = set_height
if not width:
def set_width(width):
size[1] = width
size[1] = set_width
try:
for (r, c), val in f(*size):
src[r, c] = val
except TypeError:
raise TypeError('A callable src must return an interable '
'which generates a tuple containing '
'coordinate and value')
height, width = tuple(size)
if height is None or width is None:
raise TypeError('A callable src must call set_height and '
'set_width if the size is non-deterministic')
if isinstance(src, list):
is_number = lambda x: isinstance(x, Number)
unique_col_sizes = set(map(len, src))
everything_are_number = filter(is_number, sum(src, []))
if len(unique_col_sizes) != 1 or not everything_are_number:
raise ValueError('src must be a rectangular array of numbers')
two_dimensional_array = src
elif isinstance(src, dict):
if not height or not width:
w = h = 0
for r, c in iterkeys(src):
if not height:
h = max(h, r + 1)
if not width:
w = max(w, c + 1)
if not height:
height = h
if not width:
width = w
two_dimensional_array = []
for r in range(height):
row = []
two_dimensional_array.append(row)
for c in range(width):
row.append(src.get((r, c), 0))
else:
raise TypeError('src must be a list or dict or callable')
super(Matrix, self).__init__(two_dimensional_array)
@property
def height(self):
return len(self)
@property
def width(self):
return len(self[0])
def transpose(self):
height, width = self.height, self.width
src = {}
for c in range(width):
for r in range(height):
src[c, r] = self[r][c]
return type(self)(src, height=width, width=height)
def minor(self, row_n, col_n):
height, width = self.height, self.width
if not (0 <= row_n < height):
raise ValueError('row_n should be between 0 and %d' % height)
elif not (0 <= col_n < width):
raise ValueError('col_n should be between 0 and %d' % width)
two_dimensional_array = []
for r in range(height):
if r == row_n:
continue
row = []
two_dimensional_array.append(row)
for c in range(width):
if c == col_n:
continue
row.append(self[r][c])
return type(self)(two_dimensional_array)
def determinant(self):
height, width = self.height, self.width
if height != width:
raise ValueError('Only square matrix can calculate a determinant')
tmp, rv = copy.deepcopy(self), 1.
for c in range(width - 1, 0, -1):
pivot, r = max((abs(tmp[r][c]), r) for r in range(c + 1))
pivot = tmp[r][c]
if not pivot:
return 0.
tmp[r], tmp[c] = tmp[c], tmp[r]
if r != c:
rv = -rv
rv *= pivot
fact = -1. / pivot
for r in range(c):
f = fact * tmp[r][c]
for x in range(c):
tmp[r][x] += f * tmp[c][x]
return rv * tmp[0][0]
def adjugate(self):
height, width = self.height, self.width
if height != width:
raise ValueError('Only square matrix can be adjugated')
if height == 2:
a, b = self[0][0], self[0][1]
c, d = self[1][0], self[1][1]
return type(self)([[d, -b], [-c, a]])
src = {}
for r in range(height):
for c in range(width):
sign = -1 if (r + c) % 2 else 1
src[r, c] = self.minor(r, c).determinant() * sign
return type(self)(src, height, width)
def inverse(self):
if self.height == self.width == 1:
return type(self)([[1. / self[0][0]]])
return (1. / self.determinant()) * self.adjugate()
def __add__(self, other):
height, width = self.height, self.width
if (height, width) != (other.height, other.width):
raise ValueError('Must be same size')
src = {}
for r in range(height):
for c in range(width):
src[r, c] = self[r][c] + other[r][c]
return type(self)(src, height, width)
def __mul__(self, other):
if self.width != other.height:
raise ValueError('Bad size')
height, width = self.height, other.width
src = {}
for r in range(height):
for c in range(width):
src[r, c] = sum(self[r][x] * other[x][c]
for x in range(self.width))
return type(self)(src, height, width)
def __rmul__(self, other):
if not isinstance(other, Number):
raise TypeError('The operand should be a number')
height, width = self.height, self.width
src = {}
for r in range(height):
for c in range(width):
src[r, c] = other * self[r][c]
return type(self)(src, height, width)
def __repr__(self):
return '{}({})'.format(type(self).__name__, super(Matrix, self).__repr__())
def _repr_latex_(self):
rows = [' && '.join(['%.3f' % cell for cell in row]) for row in self]
latex = r'\begin{matrix} %s \end{matrix}' % r'\\'.join(rows)
return '$%s$' % latex
def _gen_erfcinv(erfc, math=math):
def erfcinv(y):
"""The inverse function of erfc."""
if y >= 2:
return -100.
elif y <= 0:
return 100.
zero_point = y < 1
if not zero_point:
y = 2 - y
t = math.sqrt(-2 * math.log(y / 2.))
x = -0.70711 * \
((2.30753 + t * 0.27061) / (1. + t * (0.99229 + t * 0.04481)) - t)
for i in range(2):
err = erfc(x) - y
x += err / (1.12837916709551257 * math.exp(-(x ** 2)) - x * err)
return x if zero_point else -x
return erfcinv
def _gen_ppf(erfc, math=math):
erfcinv = _gen_erfcinv(erfc, math)
def ppf(x, mu=0, sigma=1):
return mu - sigma * math.sqrt(2) * erfcinv(2 * x)
return ppf
def erfc(x):
z = abs(x)
t = 1. / (1. + z / 2.)
r = t * math.exp(-z * z - 1.26551223 + t * (1.00002368 + t * (
0.37409196 + t * (0.09678418 + t * (-0.18628806 + t * (
0.27886807 + t * (-1.13520398 + t * (1.48851587 + t * (
-0.82215223 + t * 0.17087277
)))
)))
)))
return 2. - r if x < 0 else r
def cdf(x, mu=0, sigma=1):
return 0.5 * erfc(-(x - mu) / (sigma * math.sqrt(2)))
def pdf(x, mu=0, sigma=1):
return (1 / math.sqrt(2 * math.pi) * abs(sigma) *
math.exp(-(((x - mu) / abs(sigma)) ** 2 / 2)))
ppf = _gen_ppf(erfc)
def choose_backend(backend):
if backend is None: # fallback
return cdf, pdf, ppf
elif backend == 'mpmath':
try:
import mpmath
except ImportError:
raise ImportError('Install "mpmath" to use this backend')
return mpmath.ncdf, mpmath.npdf, _gen_ppf(mpmath.erfc, math=mpmath)
elif backend == 'scipy':
try:
from scipy.stats import norm
except ImportError:
raise ImportError('Install "scipy" to use this backend')
return norm.cdf, norm.pdf, norm.ppf
raise ValueError('%r backend is not defined' % backend)
def available_backends():
backends = [None]
for backend in ['mpmath', 'scipy']:
try:
__import__(backend)
except ImportError:
continue
backends.append(backend)
return backends
class Node(object):
pass
class Variable(Node, Gaussian):
def __init__(self):
self.messages = {}
super(Variable, self).__init__()
def set(self, val):
delta = self.delta(val)
self.pi, self.tau = val.pi, val.tau
return delta
def delta(self, other):
pi_delta = abs(self.pi - other.pi)
if pi_delta == inf:
return 0.
return max(abs(self.tau - other.tau), math.sqrt(pi_delta))
def update_message(self, factor, pi=0, tau=0, message=None):
message = message or Gaussian(pi=pi, tau=tau)
old_message, self[factor] = self[factor], message
return self.set(self / old_message * message)
def update_value(self, factor, pi=0, tau=0, value=None):
value = value or Gaussian(pi=pi, tau=tau)
old_message = self[factor]
self[factor] = value * old_message / self
return self.set(value)
def __getitem__(self, factor):
return self.messages[factor]
def __setitem__(self, factor, message):
self.messages[factor] = message
def __repr__(self):
args = (type(self).__name__, super(Variable, self).__repr__(),
len(self.messages), '' if len(self.messages) == 1 else 's')
return '<%s %s with %d connection%s>' % args
class Factor(Node):
def __init__(self, variables):
self.vars = variables
for var in variables:
var[self] = Gaussian()
def down(self):
return 0
def up(self):
return 0
@property
def var(self):
assert len(self.vars) == 1
return self.vars[0]
def __repr__(self):
args = (type(self).__name__, len(self.vars),
'' if len(self.vars) == 1 else 's')
return '<%s with %d connection%s>' % args
class PriorFactor(Factor):
def __init__(self, var, val, dynamic=0):
super(PriorFactor, self).__init__([var])
self.val = val
self.dynamic = dynamic
def down(self):
sigma = math.sqrt(self.val.sigma ** 2 + self.dynamic ** 2)
value = Gaussian(self.val.mu, sigma)
return self.var.update_value(self, value=value)
class LikelihoodFactor(Factor):
def __init__(self, mean_var, value_var, variance):
super(LikelihoodFactor, self).__init__([mean_var, value_var])
self.mean = mean_var
self.value = value_var
self.variance = variance
def calc_a(self, var):
return 1. / (1. + self.variance * var.pi)
def down(self):
# update value.
msg = self.mean / self.mean[self]
a = self.calc_a(msg)
return self.value.update_message(self, a * msg.pi, a * msg.tau)
def up(self):
# update mean.
msg = self.value / self.value[self]
a = self.calc_a(msg)
return self.mean.update_message(self, a * msg.pi, a * msg.tau)
class SumFactor(Factor):
def __init__(self, sum_var, term_vars, coeffs):
super(SumFactor, self).__init__([sum_var] + term_vars)
self.sum = sum_var
self.terms = term_vars
self.coeffs = coeffs
def down(self):
vals = self.terms
msgs = [var[self] for var in vals]
return self.update(self.sum, vals, msgs, self.coeffs)
def up(self, index=0):
coeff = self.coeffs[index]
coeffs = []
for x, c in enumerate(self.coeffs):
try:
if x == index:
coeffs.append(1. / coeff)
else:
coeffs.append(-c / coeff)
except ZeroDivisionError:
coeffs.append(0.)
vals = self.terms[:]
vals[index] = self.sum
msgs = [var[self] for var in vals]
return self.update(self.terms[index], vals, msgs, coeffs)
def update(self, var, vals, msgs, coeffs):
pi_inv = 0
mu = 0
for val, msg, coeff in zip(vals, msgs, coeffs):
div = val / msg
mu += coeff * div.mu
if pi_inv == inf:
continue
try:
# numpy.float64 handles floating-point error by different way.
# For example, it can just warn RuntimeWarning on n/0 problem
# instead of throwing ZeroDivisionError. So div.pi, the
# denominator has to be a built-in float.
pi_inv += coeff ** 2 / float(div.pi)
except ZeroDivisionError:
pi_inv = inf
pi = 1. / pi_inv
tau = pi * mu
return var.update_message(self, pi, tau)
class TruncateFactor(Factor):
def __init__(self, var, v_func, w_func, draw_margin):
super(TruncateFactor, self).__init__([var])
self.v_func = v_func
self.w_func = w_func
self.draw_margin = draw_margin
def up(self):
val = self.var
msg = self.var[self]
div = val / msg
sqrt_pi = math.sqrt(div.pi)
args = (div.tau / sqrt_pi, self.draw_margin * sqrt_pi)
v = self.v_func(*args)
w = self.w_func(*args)
denom = (1. - w)
pi, tau = div.pi / denom, (div.tau + sqrt_pi * v) / denom
return val.update_value(self, pi, tau)
#: Default initial mean of ratings.
MU = 25.
#: Default initial standard deviation of ratings.
SIGMA = MU / 3
#: Default distance that guarantees about 76% chance of winning.
BETA = SIGMA / 2
#: Default dynamic factor.
TAU = SIGMA / 100
#: Default draw probability of the game.
DRAW_PROBABILITY = .10
#: A basis to check reliability of the result.
DELTA = 0.0001
def calc_draw_probability(draw_margin, size, env=None):
if env is None:
env = global_env()
return 2 * env.cdf(draw_margin / (math.sqrt(size) * env.beta)) - 1
def calc_draw_margin(draw_probability, size, env=None):
if env is None:
env = global_env()
return env.ppf((draw_probability + 1) / 2.) * math.sqrt(size) * env.beta
def _team_sizes(rating_groups):
team_sizes = [0]
for group in rating_groups:
team_sizes.append(len(group) + team_sizes[-1])
del team_sizes[0]
return team_sizes
def _floating_point_error(env):
if env.backend == 'mpmath':
msg = 'Set "mpmath.mp.dps" to higher'
else:
msg = 'Cannot calculate correctly, set backend to "mpmath"'
return FloatingPointError(msg)
class Rating(Gaussian):
def __init__(self, mu=None, sigma=None):
if isinstance(mu, tuple):
mu, sigma = mu
elif isinstance(mu, Gaussian):
mu, sigma = mu.mu, mu.sigma
if mu is None:
mu = global_env().mu
if sigma is None:
sigma = global_env().sigma
super(Rating, self).__init__(mu, sigma)
def __int__(self):
return int(self.mu)
def __long__(self):
return long(self.mu)
def __float__(self):
return float(self.mu)
def __iter__(self):
return iter((self.mu, self.sigma))
def __repr__(self):
c = type(self)
args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma)
return '%s(mu=%.3f, sigma=%.3f)' % args
class TrueSkill(object):
def __init__(self, mu=MU, sigma=SIGMA, beta=BETA, tau=TAU,
draw_probability=DRAW_PROBABILITY, backend=None):
self.mu = mu
self.sigma = sigma
self.beta = beta
self.tau = tau
self.draw_probability = draw_probability
self.backend = backend
if isinstance(backend, tuple):
self.cdf, self.pdf, self.ppf = backend
else:
self.cdf, self.pdf, self.ppf = choose_backend(backend)
def create_rating(self, mu=None, sigma=None):
if mu is None:
mu = self.mu
if sigma is None:
sigma = self.sigma
return Rating(mu, sigma)
def v_win(self, diff, draw_margin):
x = diff - draw_margin
denom = self.cdf(x)
return (self.pdf(x) / denom) if denom else -x
def v_draw(self, diff, draw_margin):
abs_diff = abs(diff)
a, b = draw_margin - abs_diff, -draw_margin - abs_diff
denom = self.cdf(a) - self.cdf(b)
numer = self.pdf(b) - self.pdf(a)
return ((numer / denom) if denom else a) * (-1 if diff < 0 else +1)
def w_win(self, diff, draw_margin):
x = diff - draw_margin
v = self.v_win(diff, draw_margin)
w = v * (v + x)
if 0 < w < 1:
return w
raise _floating_point_error(self)
def w_draw(self, diff, draw_margin):
abs_diff = abs(diff)
a, b = draw_margin - abs_diff, -draw_margin - abs_diff
denom = self.cdf(a) - self.cdf(b)
if not denom:
raise _floating_point_error(self)
v = self.v_draw(abs_diff, draw_margin)
return (v ** 2) + (a * self.pdf(a) - b * self.pdf(b)) / denom
def validate_rating_groups(self, rating_groups):
# check group sizes
if len(rating_groups) < 2:
raise ValueError('Need multiple rating groups')
elif not all(rating_groups):
raise ValueError('Each group must contain multiple ratings')
# check group types
group_types = set(map(type, rating_groups))
if len(group_types) != 1:
raise TypeError('All groups should be same type')
elif group_types.pop() is Rating:
raise TypeError('Rating cannot be a rating group')
# normalize rating_groups
if isinstance(rating_groups[0], dict):
dict_rating_groups = rating_groups
rating_groups = []
keys = []
for dict_rating_group in dict_rating_groups:
rating_group, key_group = [], []
for key, rating in iteritems(dict_rating_group):
rating_group.append(rating)
key_group.append(key)
rating_groups.append(tuple(rating_group))
keys.append(tuple(key_group))
else:
rating_groups = list(rating_groups)
keys = None
return rating_groups, keys
def validate_weights(self, weights, rating_groups, keys=None):
if weights is None:
weights = [(1,) * len(g) for g in rating_groups]
elif isinstance(weights, dict):
weights_dict, weights = weights, []
for x, group in enumerate(rating_groups):
w = []
weights.append(w)
for y, rating in enumerate(group):
if keys is not None:
y = keys[x][y]
w.append(weights_dict.get((x, y), 1))
return weights
def factor_graph_builders(self, rating_groups, ranks, weights):
flatten_ratings = sum(map(tuple, rating_groups), ())
flatten_weights = sum(map(tuple, weights), ())
size = len(flatten_ratings)
group_size = len(rating_groups)
# create variables
rating_vars = [Variable() for x in range(size)]
perf_vars = [Variable() for x in range(size)]
team_perf_vars = [Variable() for x in range(group_size)]
team_diff_vars = [Variable() for x in range(group_size - 1)]
team_sizes = _team_sizes(rating_groups)
# layer builders
def build_rating_layer():
for rating_var, rating in zip(rating_vars, flatten_ratings):
yield PriorFactor(rating_var, rating, self.tau)
def build_perf_layer():
for rating_var, perf_var in zip(rating_vars, perf_vars):
yield LikelihoodFactor(rating_var, perf_var, self.beta ** 2)
def build_team_perf_layer():
for team, team_perf_var in enumerate(team_perf_vars):
if team > 0:
start = team_sizes[team - 1]
else:
start = 0
end = team_sizes[team]
child_perf_vars = perf_vars[start:end]
coeffs = flatten_weights[start:end]
yield SumFactor(team_perf_var, child_perf_vars, coeffs)
def build_team_diff_layer():
for team, team_diff_var in enumerate(team_diff_vars):
yield SumFactor(team_diff_var,
team_perf_vars[team:team + 2], [+1, -1])
def build_trunc_layer():
for x, team_diff_var in enumerate(team_diff_vars):
if callable(self.draw_probability):
# dynamic draw probability
team_perf1, team_perf2 = team_perf_vars[x:x + 2]
args = (Rating(team_perf1), Rating(team_perf2), self)
draw_probability = self.draw_probability(*args)
else:
# static draw probability
draw_probability = self.draw_probability
size = sum(map(len, rating_groups[x:x + 2]))
draw_margin = calc_draw_margin(draw_probability, size, self)
if ranks[x] == ranks[x + 1]: # is a tie?
v_func, w_func = self.v_draw, self.w_draw
else:
v_func, w_func = self.v_win, self.w_win
yield TruncateFactor(team_diff_var,
v_func, w_func, draw_margin)
# build layers
return (build_rating_layer, build_perf_layer, build_team_perf_layer,
build_team_diff_layer, build_trunc_layer)
def run_schedule(self, build_rating_layer, build_perf_layer,
build_team_perf_layer, build_team_diff_layer,
build_trunc_layer, min_delta=DELTA):
if min_delta <= 0:
raise ValueError('min_delta must be greater than 0')
layers = []
def build(builders):
layers_built = [list(build()) for build in builders]
layers.extend(layers_built)
return layers_built
# gray arrows
layers_built = build([build_rating_layer,
build_perf_layer,
build_team_perf_layer])
rating_layer, perf_layer, team_perf_layer = layers_built
for f in chain(*layers_built):
f.down()
# arrow #1, #2, #3
team_diff_layer, trunc_layer = build([build_team_diff_layer,
build_trunc_layer])
team_diff_len = len(team_diff_layer)
for x in range(10):
if team_diff_len == 1:
# only two teams
team_diff_layer[0].down()
delta = trunc_layer[0].up()
else:
# multiple teams
delta = 0
for x in range(team_diff_len - 1):
team_diff_layer[x].down()
delta = max(delta, trunc_layer[x].up())
team_diff_layer[x].up(1) # up to right variable
for x in range(team_diff_len - 1, 0, -1):
team_diff_layer[x].down()
delta = max(delta, trunc_layer[x].up())
team_diff_layer[x].up(0) # up to left variable
# repeat until to small update
if delta <= min_delta:
break
# up both ends
team_diff_layer[0].up(0)
team_diff_layer[team_diff_len - 1].up(1)
# up the remainder of the black arrows
for f in team_perf_layer:
for x in range(len(f.vars) - 1):
f.up(x)
for f in perf_layer:
f.up()
return layers
def rate(self, rating_groups, ranks=None, weights=None, min_delta=DELTA):
rating_groups, keys = self.validate_rating_groups(rating_groups)
weights = self.validate_weights(weights, rating_groups, keys)
group_size = len(rating_groups)
if ranks is None:
ranks = range(group_size)
elif len(ranks) != group_size:
raise ValueError('Wrong ranks')
# sort rating groups by rank
by_rank = lambda x: x[1][1]
sorting = sorted(enumerate(zip(rating_groups, ranks, weights)),
key=by_rank)
sorted_rating_groups, sorted_ranks, sorted_weights = [], [], []
for x, (g, r, w) in sorting:
sorted_rating_groups.append(g)
sorted_ranks.append(r)
# make weights to be greater than 0
sorted_weights.append(max(min_delta, w_) for w_ in w)
# build factor graph
args = (sorted_rating_groups, sorted_ranks, sorted_weights)
builders = self.factor_graph_builders(*args)
args = builders + (min_delta,)
layers = self.run_schedule(*args)
# make result
rating_layer, team_sizes = layers[0], _team_sizes(sorted_rating_groups)
transformed_groups = []
for start, end in zip([0] + team_sizes[:-1], team_sizes):
group = []
for f in rating_layer[start:end]:
group.append(Rating(float(f.var.mu), float(f.var.sigma)))
transformed_groups.append(tuple(group))
by_hint = lambda x: x[0]
unsorting = sorted(zip((x for x, __ in sorting), transformed_groups),
key=by_hint)
if keys is None:
return [g for x, g in unsorting]
# restore the structure with input dictionary keys
return [dict(zip(keys[x], g)) for x, g in unsorting]
def quality(self, rating_groups, weights=None):
rating_groups, keys = self.validate_rating_groups(rating_groups)
weights = self.validate_weights(weights, rating_groups, keys)
flatten_ratings = sum(map(tuple, rating_groups), ())
flatten_weights = sum(map(tuple, weights), ())
length = len(flatten_ratings)
# a vector of all of the skill means
mean_matrix = Matrix([[r.mu] for r in flatten_ratings])
# a matrix whose diagonal values are the variances (sigma ** 2) of each
# of the players.
def variance_matrix(height, width):
variances = (r.sigma ** 2 for r in flatten_ratings)
for x, variance in enumerate(variances):
yield (x, x), variance
variance_matrix = Matrix(variance_matrix, length, length)
# the player-team assignment and comparison matrix
def rotated_a_matrix(set_height, set_width):
t = 0
for r, (cur, _next) in enumerate(zip(rating_groups[:-1],
rating_groups[1:])):
for x in range(t, t + len(cur)):
yield (r, x), flatten_weights[x]
t += 1
x += 1
for x in range(x, x + len(_next)):
yield (r, x), -flatten_weights[x]
set_height(r + 1)
set_width(x + 1)
rotated_a_matrix = Matrix(rotated_a_matrix)
a_matrix = rotated_a_matrix.transpose()
# match quality further derivation
_ata = (self.beta ** 2) * rotated_a_matrix * a_matrix
_atsa = rotated_a_matrix * variance_matrix * a_matrix
start = mean_matrix.transpose() * a_matrix
middle = _ata + _atsa
end = rotated_a_matrix * mean_matrix
# make result
e_arg = (-0.5 * start * middle.inverse() * end).determinant()
s_arg = _ata.determinant() / middle.determinant()
return math.exp(e_arg) * math.sqrt(s_arg)
def expose(self, rating):
k = self.mu / self.sigma
return rating.mu - k * rating.sigma
def make_as_global(self):
return setup(env=self)
def __repr__(self):
c = type(self)
if callable(self.draw_probability):
f = self.draw_probability
draw_probability = '.'.join([f.__module__, f.__name__])
else:
draw_probability = '%.1f%%' % (self.draw_probability * 100)
if self.backend is None:
backend = ''
elif isinstance(self.backend, tuple):
backend = ', backend=...'
else:
backend = ', backend=%r' % self.backend
args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma,
self.beta, self.tau, draw_probability, backend)
return ('%s(mu=%.3f, sigma=%.3f, beta=%.3f, tau=%.3f, '
'draw_probability=%s%s)' % args)
def rate_1vs1(rating1, rating2, drawn=False, min_delta=DELTA, env=None):
if env is None:
env = global_env()
ranks = [0, 0 if drawn else 1]
teams = env.rate([(rating1,), (rating2,)], ranks, min_delta=min_delta)
return teams[0][0], teams[1][0]
def quality_1vs1(rating1, rating2, env=None):
if env is None:
env = global_env()
return env.quality([(rating1,), (rating2,)])
def global_env():
try:
global_env.__trueskill__
except AttributeError:
# setup the default environment
setup()
return global_env.__trueskill__
def setup(mu=MU, sigma=SIGMA, beta=BETA, tau=TAU,
draw_probability=DRAW_PROBABILITY, backend=None, env=None):
if env is None:
env = TrueSkill(mu, sigma, beta, tau, draw_probability, backend)
global_env.__trueskill__ = env
return env
def rate(rating_groups, ranks=None, weights=None, min_delta=DELTA):
return global_env().rate(rating_groups, ranks, weights, min_delta)
def quality(rating_groups, weights=None):
return global_env().quality(rating_groups, weights)
def expose(rating):
return global_env().expose(rating)

View File

@ -0,0 +1,34 @@
# Titan Robotics Team 2022: Visualization Module
# Written by Arthur Lu & Jacob Levine
# Notes:
# this should be imported as a python module using 'import visualization'
# this should be included in the local directory or environment variable
# fancy
# setup:
__version__ = "1.0.0.000"
#changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog:
1.0.0.000:
- created visualization.py
- added graphloss()
- added imports
"""
__author__ = (
"Arthur Lu <arthurlu@ttic.edu>,"
"Jacob Levine <jlevine@ttic.edu>,"
)
__all__ = [
'graphloss',
]
import matplotlib.pyplot as plt
def graphloss(losses):
x = range(0, len(losses))
plt.plot(x, losses)
plt.show()

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

View File

@ -0,0 +1,5 @@
FROM python
WORKDIR ~/
COPY ./ ./
RUN pip install -r requirements.txt
CMD ["bash"]

View File

@ -0,0 +1,3 @@
cd ..
docker build -t tra-analysis-amd64-dev -f docker/Dockerfile .
docker run -it tra-analysis-amd64-dev

View File

@ -0,0 +1,6 @@
numba
numpy
scipy
scikit-learn
six
matplotlib

26
analysis-master/setup.py Normal file
View File

@ -0,0 +1,26 @@
import setuptools
requirements = []
with open("requirements.txt", 'r') as file:
for line in file:
requirements.append(line)
setuptools.setup(
name="analysis",
version="1.0.0.012",
author="The Titan Scouting Team",
author_email="titanscout2022@gmail.com",
description="analysis package developed by Titan Scouting for The Red Alliance",
long_description="",
long_description_content_type="text/markdown",
url="https://github.com/titanscout2022/tr2022-strategy",
packages=setuptools.find_packages(),
install_requires=requirements,
license = "GNU General Public License v3.0",
classifiers=[
"Programming Language :: Python :: 3",
"Operating System :: OS Independent",
],
python_requires='>=3.6',
)