analysis pkg v 1.0.0.12

analysis.py v 1.2.0.004
This commit is contained in:
ltcptgeneral 2020-04-30 16:03:37 -05:00
parent d76eb5acbb
commit 91cbcae3f0
11 changed files with 1322 additions and 79 deletions

View File

@ -1,6 +1,6 @@
Metadata-Version: 2.1 Metadata-Version: 2.1
Name: analysis Name: analysis
Version: 1.0.0.11 Version: 1.0.0.12
Summary: analysis package developed by Titan Scouting for The Red Alliance Summary: analysis package developed by Titan Scouting for The Red Alliance
Home-page: https://github.com/titanscout2022/tr2022-strategy Home-page: https://github.com/titanscout2022/tr2022-strategy
Author: The Titan Scouting Team Author: The Titan Scouting Team

View File

@ -1,13 +1,15 @@
setup.py setup.py
analysis/__init__.py analysis/__init__.py
analysis/analysis.py analysis/analysis.py
analysis/glicko2.py
analysis/regression.py analysis/regression.py
analysis/titanlearn.py analysis/titanlearn.py
analysis/trueskill.py
analysis/visualization.py analysis/visualization.py
analysis.egg-info/PKG-INFO analysis.egg-info/PKG-INFO
analysis.egg-info/SOURCES.txt analysis.egg-info/SOURCES.txt
analysis.egg-info/dependency_links.txt analysis.egg-info/dependency_links.txt
analysis.egg-info/requires.txt analysis.egg-info/requires.txt
analysis.egg-info/top_level.txt analysis.egg-info/top_level.txt
analysis/metrics/__init__.py
analysis/metrics/elo.py
analysis/metrics/glicko2.py
analysis/metrics/trueskill.py

View File

@ -7,10 +7,17 @@
# current benchmark of optimization: 1.33 times faster # current benchmark of optimization: 1.33 times faster
# setup: # setup:
__version__ = "1.2.0.003" __version__ = "1.2.0.004"
# changelog should be viewed using print(analysis.__changelog__) # changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog: __changelog__ = """changelog:
1.2.0.004:
- fixed __all__ to reflected the correct functions and classes
- fixed CorrelationTests and StatisticalTests class functions to require self invocation
- added missing math import
- fixed KNN class functions to require self invocation
- fixed Metrics class functions to require self invocation
- various spelling fixes in CorrelationTests and StatisticalTests
1.2.0.003: 1.2.0.003:
- bug fixes with CorrelationTests and StatisticalTests - bug fixes with CorrelationTests and StatisticalTests
- moved glicko2 and trueskill to the metrics subpackage - moved glicko2 and trueskill to the metrics subpackage
@ -275,22 +282,19 @@ __all__ = [
'z_normalize', 'z_normalize',
'histo_analysis', 'histo_analysis',
'regression', 'regression',
'elo', 'Metrics',
'glicko2',
'trueskill',
'RegressionMetrics', 'RegressionMetrics',
'ClassificationMetrics', 'ClassificationMetrics',
'kmeans', 'kmeans',
'pca', 'pca',
'decisiontree', 'decisiontree',
'knn_classifier', 'KNN',
'knn_regressor',
'NaiveBayes', 'NaiveBayes',
'SVM', 'SVM',
'random_forest_classifier', 'random_forest_classifier',
'random_forest_regressor', 'random_forest_regressor',
'CorrelationTests', 'CorrelationTests',
'RegressionTests', 'StatisticalTests',
# all statistics functions left out due to integration in other functions # all statistics functions left out due to integration in other functions
] ]
@ -301,6 +305,7 @@ __all__ = [
import csv import csv
from analysis.metrics import elo as Elo from analysis.metrics import elo as Elo
from analysis.metrics import glicko2 as Glicko2 from analysis.metrics import glicko2 as Glicko2
import math
import numba import numba
from numba import jit from numba import jit
import numpy as np import numpy as np
@ -467,11 +472,11 @@ def regression(inputs, outputs, args): # inputs, outputs expects N-D array
class Metrics: class Metrics:
def elo(starting_score, opposing_score, observed, N, K): def elo(self, starting_score, opposing_score, observed, N, K):
return Elo.calculate(starting_score, opposing_score, observed, N, K) return Elo.calculate(starting_score, opposing_score, observed, N, K)
def glicko2(starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations): def glicko2(self, starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations):
player = Glicko2.Glicko2(rating = starting_score, rd = starting_rd, vol = starting_vol) player = Glicko2.Glicko2(rating = starting_score, rd = starting_rd, vol = starting_vol)
@ -479,7 +484,7 @@ class Metrics:
return (player.rating, player.rd, player.vol) return (player.rating, player.rd, player.vol)
def trueskill(teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]] def trueskill(self, teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]]
team_ratings = [] team_ratings = []
@ -584,7 +589,7 @@ def decisiontree(data, labels, test_size = 0.3, criterion = "gini", splitter = "
class KNN: class KNN:
def knn_classifier(data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling def knn_classifier(self, data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsClassifier() model = sklearn.neighbors.KNeighborsClassifier()
@ -593,7 +598,7 @@ class KNN:
return model, ClassificationMetrics(predictions, labels_test) return model, ClassificationMetrics(predictions, labels_test)
def knn_regressor(data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None): def knn_regressor(self, data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None):
data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1) data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs) model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs)
@ -716,203 +721,203 @@ def random_forest_regressor(data, outputs, test_size, n_estimators="warn", crite
class CorrelationTests: class CorrelationTests:
def anova_oneway(*args): #expects arrays of samples def anova_oneway(self, *args): #expects arrays of samples
results = scipy.stats.f_oneway(*args) results = scipy.stats.f_oneway(*args)
return {"F-value": results[0], "p-value": results[1]} return {"F-value": results[0], "p-value": results[1]}
def pearson(x, y): def pearson(self, x, y):
results = scipy.stats.pearsonr(x, y) results = scipy.stats.pearsonr(x, y)
return {"r-value": results[0], "p-value": results[1]} return {"r-value": results[0], "p-value": results[1]}
def spearman(a, b = None, axis = 0, nan_policy = 'propagate'): def spearman(self, a, b = None, axis = 0, nan_policy = 'propagate'):
results = scipy.stats.spearmanr(a, b = b, axis = axis, nan_policy = nan_policy) results = scipy.stats.spearmanr(a, b = b, axis = axis, nan_policy = nan_policy)
return {"r-value": results[0], "p-value": results[1]} return {"r-value": results[0], "p-value": results[1]}
def point_biserial(x,y): def point_biserial(self, x,y):
results = scipy.stats.pointbiserialr(x, y) results = scipy.stats.pointbiserialr(x, y)
return {"r-value": results[0], "p-value": results[1]} return {"r-value": results[0], "p-value": results[1]}
def kendall(x, y, initial_lexsort = None, nan_policy = 'propagate', method = 'auto'): def kendall(self, x, y, initial_lexsort = None, nan_policy = 'propagate', method = 'auto'):
results = scipy.stats.kendalltau(x, y, initial_lexsort = initial_lexsort, nan_policy = nan_policy, method = method) results = scipy.stats.kendalltau(x, y, initial_lexsort = initial_lexsort, nan_policy = nan_policy, method = method)
return {"tau": results[0], "p-value": results[1]} return {"tau": results[0], "p-value": results[1]}
def kendall_weighted(x, y, rank = True, weigher = None, additive = True): def kendall_weighted(self, x, y, rank = True, weigher = None, additive = True):
results = scipy.stats.weightedtau(x, y, rank = rank, weigher = weigher, additive = additive) results = scipy.stats.weightedtau(x, y, rank = rank, weigher = weigher, additive = additive)
return {"tau": results[0], "p-value": results[1]} return {"tau": results[0], "p-value": results[1]}
def mgc(x, y, compute_distance = None, reps = 1000, workers = 1, is_twosamp = False, random_state = None): def mgc(self, x, y, compute_distance = None, reps = 1000, workers = 1, is_twosamp = False, random_state = None):
results = scipy.stats.multiscale_graphcorr(x, y, compute_distance = compute_distance, reps = reps, workers = workers, is_twosamp = is_twosamp, random_state = random_state) results = scipy.stats.multiscale_graphcorr(x, y, compute_distance = compute_distance, reps = reps, workers = workers, is_twosamp = is_twosamp, random_state = random_state)
return {"k-value": results[0], "p-value": results[1], "data": results[2]} # unsure if MGC test returns a k value return {"k-value": results[0], "p-value": results[1], "data": results[2]} # unsure if MGC test returns a k value
class StatisticalTests: class StatisticalTests:
def ttest_onesample(a, popmean, axis = 0, nan_policy = 'propagate'): def ttest_onesample(self, a, popmean, axis = 0, nan_policy = 'propagate'):
results = scipy.stats.ttest_1samp(a, popmean, axis = axis, nan_policy = nan_policy) results = scipy.stats.ttest_1samp(a, popmean, axis = axis, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]} return {"t-value": results[0], "p-value": results[1]}
def ttest_independent(a, b, equal = True, nan_policy = 'propagate'): def ttest_independent(self, a, b, equal = True, nan_policy = 'propagate'):
results = scipt.stats.ttest_ind(a, b, equal_var = equal, nan_policy = nan_policy) results = scipy.stats.ttest_ind(a, b, equal_var = equal, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]} return {"t-value": results[0], "p-value": results[1]}
def ttest_statistic(o1, o2, equal = True): def ttest_statistic(self, o1, o2, equal = True):
results = scipy.stats.ttest_ind_from_stats(o1["mean"], o1["std"], o1["nobs"], o2["mean"], o2["std"], o2["nobs"], equal_var = equal) results = scipy.stats.ttest_ind_from_stats(o1["mean"], o1["std"], o1["nobs"], o2["mean"], o2["std"], o2["nobs"], equal_var = equal)
return {"t-value": results[0], "p-value": results[1]} return {"t-value": results[0], "p-value": results[1]}
def ttest_related(a, b, axis = 0, nan_policy='propagate'): def ttest_related(self, a, b, axis = 0, nan_policy='propagate'):
results = scipy.stats.ttest_rel(a, b, axis = axis, nan_policy = nan_policy) results = scipy.stats.ttest_rel(a, b, axis = axis, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]} return {"t-value": results[0], "p-value": results[1]}
def ks_fitness(rvs, cdf, args = (), N = 20, alternative = 'two-sided', mode = 'approx'): def ks_fitness(self, rvs, cdf, args = (), N = 20, alternative = 'two-sided', mode = 'approx'):
results = scipy.stats.kstest(rvs, cdf, args = args, N = N, alternative = alternative, mode = mode) results = scipy.stats.kstest(rvs, cdf, args = args, N = N, alternative = alternative, mode = mode)
return {"ks-value": results[0], "p-value": results[1]} return {"ks-value": results[0], "p-value": results[1]}
def chisquare(f_obs, f_exp = None, ddof = None, axis = 0): def chisquare(self, f_obs, f_exp = None, ddof = None, axis = 0):
results = scipy.stats.chisquare(f_obs, f_exp = f_exp, ddof = ddof, axis = axis) results = scipy.stats.chisquare(f_obs, f_exp = f_exp, ddof = ddof, axis = axis)
return {"chisquared-value": results[0], "p-value": results[1]} return {"chisquared-value": results[0], "p-value": results[1]}
def powerdivergence(f_obs, f_exp = None, ddof = None, axis = 0, lambda_ = None): def powerdivergence(self, f_obs, f_exp = None, ddof = None, axis = 0, lambda_ = None):
results = scipy.stats.power_divergence(f_obs, f_exp = f_exp, ddof = ddof, axis = axis, lambda_ = lambda_) results = scipy.stats.power_divergence(f_obs, f_exp = f_exp, ddof = ddof, axis = axis, lambda_ = lambda_)
return {"powerdivergence-value": results[0], "p-value": results[1]} return {"powerdivergence-value": results[0], "p-value": results[1]}
def ks_twosample(x, y, alternative = 'two_sided', mode = 'auto'): def ks_twosample(self, x, y, alternative = 'two_sided', mode = 'auto'):
results = scipy.stats.ks_2samp(x, y, alternative = alternative, mode = mode) results = scipy.stats.ks_2samp(x, y, alternative = alternative, mode = mode)
return {"ks-value": results[0], "p-value": results[1]} return {"ks-value": results[0], "p-value": results[1]}
def es_twosample(x, y, t = (0.4, 0.8)): def es_twosample(self, x, y, t = (0.4, 0.8)):
results = scipy.stats.epps_singleton_2samp(x, y, t = t) results = scipy.stats.epps_singleton_2samp(x, y, t = t)
return {"es-value": results[0], "p-value": results[1]} return {"es-value": results[0], "p-value": results[1]}
def mw_rank(x, y, use_continuity = True, alternative = None): def mw_rank(self, x, y, use_continuity = True, alternative = None):
results = scipy.stats.mannwhitneyu(x, y, use_continuity = use_continuity, alternative = alternative) results = scipy.stats.mannwhitneyu(x, y, use_continuity = use_continuity, alternative = alternative)
return {"u-value": results[0], "p-value": results[1]} return {"u-value": results[0], "p-value": results[1]}
def mw_tiecorrection(rank_values): def mw_tiecorrection(self, rank_values):
results = scipy.stats.tiecorrect(rank_values) results = scipy.stats.tiecorrect(rank_values)
return {"correction-factor": results} return {"correction-factor": results}
def rankdata(a, method = 'average'): def rankdata(self, a, method = 'average'):
results = scipy.stats.rankdata(a, method = method) results = scipy.stats.rankdata(a, method = method)
return results return results
def wilcoxon_ranksum(a, b): # this seems to be superceded by Mann Whitney Wilcoxon U Test def wilcoxon_ranksum(self, a, b): # this seems to be superceded by Mann Whitney Wilcoxon U Test
results = scipy.stats.ranksums(a, b) results = scipy.stats.ranksums(a, b)
return {"u-value": results[0], "p-value": results[1]} return {"u-value": results[0], "p-value": results[1]}
def wilcoxon_signedrank(x, y = None, method = 'wilcox', correction = False, alternative = 'two-sided'): def wilcoxon_signedrank(self, x, y = None, zero_method = 'wilcox', correction = False, alternative = 'two-sided'):
results = scipy.stats.wilcoxon(x, y = y, method = method, correction = correction, alternative = alternative) results = scipy.stats.wilcoxon(x, y = y, zero_method = zero_method, correction = correction, alternative = alternative)
return {"t-value": results[0], "p-value": results[1]} return {"t-value": results[0], "p-value": results[1]}
def kw_htest(*args, nan_policy = 'propagate'): def kw_htest(self, *args, nan_policy = 'propagate'):
results = scipy.stats.kruskal(*args, nan_policy = nan_policy) results = scipy.stats.kruskal(*args, nan_policy = nan_policy)
return {"h-value": results[0], "p-value": results[1]} return {"h-value": results[0], "p-value": results[1]}
def friedman_chisquare(*args): def friedman_chisquare(self, *args):
results = scipy.stats.friedmanchisquare(*args) results = scipy.stats.friedmanchisquare(*args)
return {"chisquared-value": results[0], "p-value": results[1]} return {"chisquared-value": results[0], "p-value": results[1]}
def bm_wtest(x, y, alternative = 'two-sided', distribution = 't', nan_policy = 'propagate'): def bm_wtest(self, x, y, alternative = 'two-sided', distribution = 't', nan_policy = 'propagate'):
results = scipy.stats.brunnermunzel(x, y, alternative = alternative, distribution = distribution, nan_policy = nan_policy) results = scipy.stats.brunnermunzel(x, y, alternative = alternative, distribution = distribution, nan_policy = nan_policy)
return {"w-value": results[0], "p-value": results[1]} return {"w-value": results[0], "p-value": results[1]}
def combine_pvalues(pvalues, method = 'fisher', weights = None): def combine_pvalues(self, pvalues, method = 'fisher', weights = None):
results = scipy.stats.combine_pvalues(pvalues, method = method, weights = weights) results = scipy.stats.combine_pvalues(pvalues, method = method, weights = weights)
return {"combined-statistic": results[0], "p-value": results[1]} return {"combined-statistic": results[0], "p-value": results[1]}
def jb_fitness(x): def jb_fitness(self, x):
results = scipy.stats.jarque_bera(x) results = scipy.stats.jarque_bera(x)
return {"jb-value": results[0], "p-value": results[1]} return {"jb-value": results[0], "p-value": results[1]}
def ab_equality(x, y): def ab_equality(self, x, y):
results = scipy.stats.ansari(x, y) results = scipy.stats.ansari(x, y)
return {"ab-value": results[0], "p-value": results[1]} return {"ab-value": results[0], "p-value": results[1]}
def bartlett_variance(*args): def bartlett_variance(self, *args):
results = scipy.stats.bartlett(*args) results = scipy.stats.bartlett(*args)
return {"t-value": results[0], "p-value": results[1]} return {"t-value": results[0], "p-value": results[1]}
def levene_variance(*args, center = 'median', proportiontocut = 0.05): def levene_variance(self, *args, center = 'median', proportiontocut = 0.05):
results = scipy.stats.levene(*args, center = center, proportiontocut = proportiontocut) results = scipy.stats.levene(*args, center = center, proportiontocut = proportiontocut)
return {"w-value": results[0], "p-value": results[1]} return {"w-value": results[0], "p-value": results[1]}
def sw_normality(x): def sw_normality(self, x):
results = scipy.stats.shapiro(x) results = scipy.stats.shapiro(x)
return {"w-value": results[0], "p-value": results[1]} return {"w-value": results[0], "p-value": results[1]}
def shapiro(x): def shapiro(self, x):
return "destroyed by facts and logic" return "destroyed by facts and logic"
def ad_onesample(x, dist = 'norm'): def ad_onesample(self, x, dist = 'norm'):
results = scipy.stats.anderson(x, dist = dist) results = scipy.stats.anderson(x, dist = dist)
return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]} return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]}
def ad_ksample(samples, midrank = True): def ad_ksample(self, samples, midrank = True):
results = scipy.stats.anderson_ksamp(samples, midrank = midrank) results = scipy.stats.anderson_ksamp(samples, midrank = midrank)
return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]} return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]}
def binomial(x, n = None, p = 0.5, alternative = 'two-sided'): def binomial(self, x, n = None, p = 0.5, alternative = 'two-sided'):
results = scipy.stats.binom_test(x, n = n, p = p, alternative = alternative) results = scipy.stats.binom_test(x, n = n, p = p, alternative = alternative)
return {"p-value": results} return {"p-value": results}
def fk_variance(*args, center = 'median', proportiontocut = 0.05): def fk_variance(self, *args, center = 'median', proportiontocut = 0.05):
results = scipy.stats.fligner(*args, center = center, proportiontocut = proportiontocut) results = scipy.stats.fligner(*args, center = center, proportiontocut = proportiontocut)
return {"h-value": results[0], "p-value": results[1]} # unknown if the statistic is an h value return {"h-value": results[0], "p-value": results[1]} # unknown if the statistic is an h value
def mood_mediantest(*args, ties = 'below', correction = True, lambda_ = 1, nan_policy = 'propagate'): def mood_mediantest(self, *args, ties = 'below', correction = True, lambda_ = 1, nan_policy = 'propagate'):
results = scipy.stats.median_test(*args, ties = ties, correction = correction, lambda_ = lambda_, nan_policy = nan_policy) results = scipy.stats.median_test(*args, ties = ties, correction = correction, lambda_ = lambda_, nan_policy = nan_policy)
return {"chisquared-value": results[0], "p-value": results[1], "m-value": results[2], "table": results[3]} return {"chisquared-value": results[0], "p-value": results[1], "m-value": results[2], "table": results[3]}
def mood_equalscale(x, y, axis = 0): def mood_equalscale(self, x, y, axis = 0):
results = scipy.stats.mood(x, y, axis = axis) results = scipy.stats.mood(x, y, axis = axis)
return {"z-score": results[0], "p-value": results[1]} return {"z-score": results[0], "p-value": results[1]}
def skewtest(a, axis = 0, nan_policy = 'propogate'): def skewtest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.skewtest(a, axis = axis, nan_policy = nan_policy) results = scipy.stats.skewtest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]} return {"z-score": results[0], "p-value": results[1]}
def kurtosistest(a, axis = 0, nan_policy = 'propogate'): def kurtosistest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.kurtosistest(a, axis = axis, nan_policy = nan_policy) results = scipy.stats.kurtosistest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]} return {"z-score": results[0], "p-value": results[1]}
def normaltest(a, axis = 0, nan_policy = 'propogate'): def normaltest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.normaltest(a, axis = axis, nan_policy = nan_policy) results = scipy.stats.normaltest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]} return {"z-score": results[0], "p-value": results[1]}

View File

@ -1,16 +1,37 @@
# Titan Robotics Team 2022: Data Analysis Module # Titan Robotics Team 2022: Data Analysis Module
# Written by Arthur Lu & Jacob Levine # Written by Arthur Lu & Jacob Levine
# Notes: # Notes:
# this should be imported as a python module using 'import analysis' # this should be imported as a python module using 'from analysis import analysis'
# this should be included in the local directory or environment variable # this should be included in the local directory or environment variable
# this module has been optimized for multhreaded computing # this module has been optimized for multhreaded computing
# current benchmark of optimization: 1.33 times faster # current benchmark of optimization: 1.33 times faster
# setup: # setup:
__version__ = "1.1.13.009" __version__ = "1.2.0.004"
# changelog should be viewed using print(analysis.__changelog__) # changelog should be viewed using print(analysis.__changelog__)
__changelog__ = """changelog: __changelog__ = """changelog:
1.2.0.004:
- fixed __all__ to reflected the correct functions and classes
- fixed CorrelationTests and StatisticalTests class functions to require self invocation
- added missing math import
- fixed KNN class functions to require self invocation
- fixed Metrics class functions to require self invocation
- various spelling fixes in CorrelationTests and StatisticalTests
1.2.0.003:
- bug fixes with CorrelationTests and StatisticalTests
- moved glicko2 and trueskill to the metrics subpackage
- moved elo to a new metrics subpackage
1.2.0.002:
- fixed docs
1.2.0.001:
- fixed docs
1.2.0.000:
- cleaned up wild card imports with scipy and sklearn
- added CorrelationTests class
- added StatisticalTests class
- added several correlation tests to CorrelationTests
- added several statistical tests to StatisticalTests
1.1.13.009: 1.1.13.009:
- moved elo, glicko2, trueskill functions under class Metrics - moved elo, glicko2, trueskill functions under class Metrics
1.1.13.008: 1.1.13.008:
@ -261,20 +282,19 @@ __all__ = [
'z_normalize', 'z_normalize',
'histo_analysis', 'histo_analysis',
'regression', 'regression',
'elo', 'Metrics',
'glicko2',
'trueskill',
'RegressionMetrics', 'RegressionMetrics',
'ClassificationMetrics', 'ClassificationMetrics',
'kmeans', 'kmeans',
'pca', 'pca',
'decisiontree', 'decisiontree',
'knn_classifier', 'KNN',
'knn_regressor',
'NaiveBayes', 'NaiveBayes',
'SVM', 'SVM',
'random_forest_classifier', 'random_forest_classifier',
'random_forest_regressor', 'random_forest_regressor',
'CorrelationTests',
'StatisticalTests',
# all statistics functions left out due to integration in other functions # all statistics functions left out due to integration in other functions
] ]
@ -283,15 +303,17 @@ __all__ = [
# imports (now in alphabetical order! v 1.0.3.006): # imports (now in alphabetical order! v 1.0.3.006):
import csv import csv
from analysis import glicko2 as Glicko2 from analysis.metrics import elo as Elo
from analysis.metrics import glicko2 as Glicko2
import math
import numba import numba
from numba import jit from numba import jit
import numpy as np import numpy as np
import scipy import scipy
from scipy import * from scipy import optimize, stats
import sklearn import sklearn
from sklearn import * from sklearn import preprocessing, pipeline, linear_model, metrics, cluster, decomposition, tree, neighbors, naive_bayes, svm, model_selection, ensemble
from analysis import trueskill as Trueskill from analysis.metrics import trueskill as Trueskill
class error(ValueError): class error(ValueError):
pass pass
@ -450,13 +472,11 @@ def regression(inputs, outputs, args): # inputs, outputs expects N-D array
class Metrics: class Metrics:
def elo(starting_score, opposing_score, observed, N, K): def elo(self, starting_score, opposing_score, observed, N, K):
expected = 1/(1+10**((np.array(opposing_score) - starting_score)/N)) return Elo.calculate(starting_score, opposing_score, observed, N, K)
return starting_score + K*(np.sum(observed) - np.sum(expected)) def glicko2(self, starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations):
def glicko2(starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations):
player = Glicko2.Glicko2(rating = starting_score, rd = starting_rd, vol = starting_vol) player = Glicko2.Glicko2(rating = starting_score, rd = starting_rd, vol = starting_vol)
@ -464,7 +484,7 @@ class Metrics:
return (player.rating, player.rd, player.vol) return (player.rating, player.rd, player.vol)
def trueskill(teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]] def trueskill(self, teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]]
team_ratings = [] team_ratings = []
@ -569,7 +589,7 @@ def decisiontree(data, labels, test_size = 0.3, criterion = "gini", splitter = "
class KNN: class KNN:
def knn_classifier(data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling def knn_classifier(self, data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling
data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsClassifier() model = sklearn.neighbors.KNeighborsClassifier()
@ -578,7 +598,7 @@ class KNN:
return model, ClassificationMetrics(predictions, labels_test) return model, ClassificationMetrics(predictions, labels_test)
def knn_regressor(data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None): def knn_regressor(self, data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None):
data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1) data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1)
model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs) model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs)
@ -698,3 +718,206 @@ def random_forest_regressor(data, outputs, test_size, n_estimators="warn", crite
predictions = kernel.predict(data_test) predictions = kernel.predict(data_test)
return kernel, RegressionMetrics(predictions, outputs_test) return kernel, RegressionMetrics(predictions, outputs_test)
class CorrelationTests:
def anova_oneway(self, *args): #expects arrays of samples
results = scipy.stats.f_oneway(*args)
return {"F-value": results[0], "p-value": results[1]}
def pearson(self, x, y):
results = scipy.stats.pearsonr(x, y)
return {"r-value": results[0], "p-value": results[1]}
def spearman(self, a, b = None, axis = 0, nan_policy = 'propagate'):
results = scipy.stats.spearmanr(a, b = b, axis = axis, nan_policy = nan_policy)
return {"r-value": results[0], "p-value": results[1]}
def point_biserial(self, x,y):
results = scipy.stats.pointbiserialr(x, y)
return {"r-value": results[0], "p-value": results[1]}
def kendall(self, x, y, initial_lexsort = None, nan_policy = 'propagate', method = 'auto'):
results = scipy.stats.kendalltau(x, y, initial_lexsort = initial_lexsort, nan_policy = nan_policy, method = method)
return {"tau": results[0], "p-value": results[1]}
def kendall_weighted(self, x, y, rank = True, weigher = None, additive = True):
results = scipy.stats.weightedtau(x, y, rank = rank, weigher = weigher, additive = additive)
return {"tau": results[0], "p-value": results[1]}
def mgc(self, x, y, compute_distance = None, reps = 1000, workers = 1, is_twosamp = False, random_state = None):
results = scipy.stats.multiscale_graphcorr(x, y, compute_distance = compute_distance, reps = reps, workers = workers, is_twosamp = is_twosamp, random_state = random_state)
return {"k-value": results[0], "p-value": results[1], "data": results[2]} # unsure if MGC test returns a k value
class StatisticalTests:
def ttest_onesample(self, a, popmean, axis = 0, nan_policy = 'propagate'):
results = scipy.stats.ttest_1samp(a, popmean, axis = axis, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]}
def ttest_independent(self, a, b, equal = True, nan_policy = 'propagate'):
results = scipy.stats.ttest_ind(a, b, equal_var = equal, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]}
def ttest_statistic(self, o1, o2, equal = True):
results = scipy.stats.ttest_ind_from_stats(o1["mean"], o1["std"], o1["nobs"], o2["mean"], o2["std"], o2["nobs"], equal_var = equal)
return {"t-value": results[0], "p-value": results[1]}
def ttest_related(self, a, b, axis = 0, nan_policy='propagate'):
results = scipy.stats.ttest_rel(a, b, axis = axis, nan_policy = nan_policy)
return {"t-value": results[0], "p-value": results[1]}
def ks_fitness(self, rvs, cdf, args = (), N = 20, alternative = 'two-sided', mode = 'approx'):
results = scipy.stats.kstest(rvs, cdf, args = args, N = N, alternative = alternative, mode = mode)
return {"ks-value": results[0], "p-value": results[1]}
def chisquare(self, f_obs, f_exp = None, ddof = None, axis = 0):
results = scipy.stats.chisquare(f_obs, f_exp = f_exp, ddof = ddof, axis = axis)
return {"chisquared-value": results[0], "p-value": results[1]}
def powerdivergence(self, f_obs, f_exp = None, ddof = None, axis = 0, lambda_ = None):
results = scipy.stats.power_divergence(f_obs, f_exp = f_exp, ddof = ddof, axis = axis, lambda_ = lambda_)
return {"powerdivergence-value": results[0], "p-value": results[1]}
def ks_twosample(self, x, y, alternative = 'two_sided', mode = 'auto'):
results = scipy.stats.ks_2samp(x, y, alternative = alternative, mode = mode)
return {"ks-value": results[0], "p-value": results[1]}
def es_twosample(self, x, y, t = (0.4, 0.8)):
results = scipy.stats.epps_singleton_2samp(x, y, t = t)
return {"es-value": results[0], "p-value": results[1]}
def mw_rank(self, x, y, use_continuity = True, alternative = None):
results = scipy.stats.mannwhitneyu(x, y, use_continuity = use_continuity, alternative = alternative)
return {"u-value": results[0], "p-value": results[1]}
def mw_tiecorrection(self, rank_values):
results = scipy.stats.tiecorrect(rank_values)
return {"correction-factor": results}
def rankdata(self, a, method = 'average'):
results = scipy.stats.rankdata(a, method = method)
return results
def wilcoxon_ranksum(self, a, b): # this seems to be superceded by Mann Whitney Wilcoxon U Test
results = scipy.stats.ranksums(a, b)
return {"u-value": results[0], "p-value": results[1]}
def wilcoxon_signedrank(self, x, y = None, zero_method = 'wilcox', correction = False, alternative = 'two-sided'):
results = scipy.stats.wilcoxon(x, y = y, zero_method = zero_method, correction = correction, alternative = alternative)
return {"t-value": results[0], "p-value": results[1]}
def kw_htest(self, *args, nan_policy = 'propagate'):
results = scipy.stats.kruskal(*args, nan_policy = nan_policy)
return {"h-value": results[0], "p-value": results[1]}
def friedman_chisquare(self, *args):
results = scipy.stats.friedmanchisquare(*args)
return {"chisquared-value": results[0], "p-value": results[1]}
def bm_wtest(self, x, y, alternative = 'two-sided', distribution = 't', nan_policy = 'propagate'):
results = scipy.stats.brunnermunzel(x, y, alternative = alternative, distribution = distribution, nan_policy = nan_policy)
return {"w-value": results[0], "p-value": results[1]}
def combine_pvalues(self, pvalues, method = 'fisher', weights = None):
results = scipy.stats.combine_pvalues(pvalues, method = method, weights = weights)
return {"combined-statistic": results[0], "p-value": results[1]}
def jb_fitness(self, x):
results = scipy.stats.jarque_bera(x)
return {"jb-value": results[0], "p-value": results[1]}
def ab_equality(self, x, y):
results = scipy.stats.ansari(x, y)
return {"ab-value": results[0], "p-value": results[1]}
def bartlett_variance(self, *args):
results = scipy.stats.bartlett(*args)
return {"t-value": results[0], "p-value": results[1]}
def levene_variance(self, *args, center = 'median', proportiontocut = 0.05):
results = scipy.stats.levene(*args, center = center, proportiontocut = proportiontocut)
return {"w-value": results[0], "p-value": results[1]}
def sw_normality(self, x):
results = scipy.stats.shapiro(x)
return {"w-value": results[0], "p-value": results[1]}
def shapiro(self, x):
return "destroyed by facts and logic"
def ad_onesample(self, x, dist = 'norm'):
results = scipy.stats.anderson(x, dist = dist)
return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]}
def ad_ksample(self, samples, midrank = True):
results = scipy.stats.anderson_ksamp(samples, midrank = midrank)
return {"d-value": results[0], "critical-values": results[1], "significance-value": results[2]}
def binomial(self, x, n = None, p = 0.5, alternative = 'two-sided'):
results = scipy.stats.binom_test(x, n = n, p = p, alternative = alternative)
return {"p-value": results}
def fk_variance(self, *args, center = 'median', proportiontocut = 0.05):
results = scipy.stats.fligner(*args, center = center, proportiontocut = proportiontocut)
return {"h-value": results[0], "p-value": results[1]} # unknown if the statistic is an h value
def mood_mediantest(self, *args, ties = 'below', correction = True, lambda_ = 1, nan_policy = 'propagate'):
results = scipy.stats.median_test(*args, ties = ties, correction = correction, lambda_ = lambda_, nan_policy = nan_policy)
return {"chisquared-value": results[0], "p-value": results[1], "m-value": results[2], "table": results[3]}
def mood_equalscale(self, x, y, axis = 0):
results = scipy.stats.mood(x, y, axis = axis)
return {"z-score": results[0], "p-value": results[1]}
def skewtest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.skewtest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]}
def kurtosistest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.kurtosistest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]}
def normaltest(self, a, axis = 0, nan_policy = 'propogate'):
results = scipy.stats.normaltest(a, axis = axis, nan_policy = nan_policy)
return {"z-score": results[0], "p-value": results[1]}

View File

@ -0,0 +1,7 @@
import numpy as np
def calculate(starting_score, opposing_score, observed, N, K):
expected = 1/(1+10**((np.array(opposing_score) - starting_score)/N))
return starting_score + K*(np.sum(observed) - np.sum(expected))

View File

@ -0,0 +1,99 @@
import math
class Glicko2:
_tau = 0.5
def getRating(self):
return (self.__rating * 173.7178) + 1500
def setRating(self, rating):
self.__rating = (rating - 1500) / 173.7178
rating = property(getRating, setRating)
def getRd(self):
return self.__rd * 173.7178
def setRd(self, rd):
self.__rd = rd / 173.7178
rd = property(getRd, setRd)
def __init__(self, rating = 1500, rd = 350, vol = 0.06):
self.setRating(rating)
self.setRd(rd)
self.vol = vol
def _preRatingRD(self):
self.__rd = math.sqrt(math.pow(self.__rd, 2) + math.pow(self.vol, 2))
def update_player(self, rating_list, RD_list, outcome_list):
rating_list = [(x - 1500) / 173.7178 for x in rating_list]
RD_list = [x / 173.7178 for x in RD_list]
v = self._v(rating_list, RD_list)
self.vol = self._newVol(rating_list, RD_list, outcome_list, v)
self._preRatingRD()
self.__rd = 1 / math.sqrt((1 / math.pow(self.__rd, 2)) + (1 / v))
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * \
(outcome_list[i] - self._E(rating_list[i], RD_list[i]))
self.__rating += math.pow(self.__rd, 2) * tempSum
def _newVol(self, rating_list, RD_list, outcome_list, v):
i = 0
delta = self._delta(rating_list, RD_list, outcome_list, v)
a = math.log(math.pow(self.vol, 2))
tau = self._tau
x0 = a
x1 = 0
while x0 != x1:
# New iteration, so x(i) becomes x(i-1)
x0 = x1
d = math.pow(self.__rating, 2) + v + math.exp(x0)
h1 = -(x0 - a) / math.pow(tau, 2) - 0.5 * math.exp(x0) \
/ d + 0.5 * math.exp(x0) * math.pow(delta / d, 2)
h2 = -1 / math.pow(tau, 2) - 0.5 * math.exp(x0) * \
(math.pow(self.__rating, 2) + v) \
/ math.pow(d, 2) + 0.5 * math.pow(delta, 2) * math.exp(x0) \
* (math.pow(self.__rating, 2) + v - math.exp(x0)) / math.pow(d, 3)
x1 = x0 - (h1 / h2)
return math.exp(x1 / 2)
def _delta(self, rating_list, RD_list, outcome_list, v):
tempSum = 0
for i in range(len(rating_list)):
tempSum += self._g(RD_list[i]) * (outcome_list[i] - self._E(rating_list[i], RD_list[i]))
return v * tempSum
def _v(self, rating_list, RD_list):
tempSum = 0
for i in range(len(rating_list)):
tempE = self._E(rating_list[i], RD_list[i])
tempSum += math.pow(self._g(RD_list[i]), 2) * tempE * (1 - tempE)
return 1 / tempSum
def _E(self, p2rating, p2RD):
return 1 / (1 + math.exp(-1 * self._g(p2RD) * \
(self.__rating - p2rating)))
def _g(self, RD):
return 1 / math.sqrt(1 + 3 * math.pow(RD, 2) / math.pow(math.pi, 2))
def did_not_compete(self):
self._preRatingRD()

View File

@ -0,0 +1,907 @@
from __future__ import absolute_import
from itertools import chain
import math
from six import iteritems
from six.moves import map, range, zip
from six import iterkeys
import copy
try:
from numbers import Number
except ImportError:
Number = (int, long, float, complex)
inf = float('inf')
class Gaussian(object):
#: Precision, the inverse of the variance.
pi = 0
#: Precision adjusted mean, the precision multiplied by the mean.
tau = 0
def __init__(self, mu=None, sigma=None, pi=0, tau=0):
if mu is not None:
if sigma is None:
raise TypeError('sigma argument is needed')
elif sigma == 0:
raise ValueError('sigma**2 should be greater than 0')
pi = sigma ** -2
tau = pi * mu
self.pi = pi
self.tau = tau
@property
def mu(self):
return self.pi and self.tau / self.pi
@property
def sigma(self):
return math.sqrt(1 / self.pi) if self.pi else inf
def __mul__(self, other):
pi, tau = self.pi + other.pi, self.tau + other.tau
return Gaussian(pi=pi, tau=tau)
def __truediv__(self, other):
pi, tau = self.pi - other.pi, self.tau - other.tau
return Gaussian(pi=pi, tau=tau)
__div__ = __truediv__ # for Python 2
def __eq__(self, other):
return self.pi == other.pi and self.tau == other.tau
def __lt__(self, other):
return self.mu < other.mu
def __le__(self, other):
return self.mu <= other.mu
def __gt__(self, other):
return self.mu > other.mu
def __ge__(self, other):
return self.mu >= other.mu
def __repr__(self):
return 'N(mu={:.3f}, sigma={:.3f})'.format(self.mu, self.sigma)
def _repr_latex_(self):
latex = r'\mathcal{{ N }}( {:.3f}, {:.3f}^2 )'.format(self.mu, self.sigma)
return '$%s$' % latex
class Matrix(list):
def __init__(self, src, height=None, width=None):
if callable(src):
f, src = src, {}
size = [height, width]
if not height:
def set_height(height):
size[0] = height
size[0] = set_height
if not width:
def set_width(width):
size[1] = width
size[1] = set_width
try:
for (r, c), val in f(*size):
src[r, c] = val
except TypeError:
raise TypeError('A callable src must return an interable '
'which generates a tuple containing '
'coordinate and value')
height, width = tuple(size)
if height is None or width is None:
raise TypeError('A callable src must call set_height and '
'set_width if the size is non-deterministic')
if isinstance(src, list):
is_number = lambda x: isinstance(x, Number)
unique_col_sizes = set(map(len, src))
everything_are_number = filter(is_number, sum(src, []))
if len(unique_col_sizes) != 1 or not everything_are_number:
raise ValueError('src must be a rectangular array of numbers')
two_dimensional_array = src
elif isinstance(src, dict):
if not height or not width:
w = h = 0
for r, c in iterkeys(src):
if not height:
h = max(h, r + 1)
if not width:
w = max(w, c + 1)
if not height:
height = h
if not width:
width = w
two_dimensional_array = []
for r in range(height):
row = []
two_dimensional_array.append(row)
for c in range(width):
row.append(src.get((r, c), 0))
else:
raise TypeError('src must be a list or dict or callable')
super(Matrix, self).__init__(two_dimensional_array)
@property
def height(self):
return len(self)
@property
def width(self):
return len(self[0])
def transpose(self):
height, width = self.height, self.width
src = {}
for c in range(width):
for r in range(height):
src[c, r] = self[r][c]
return type(self)(src, height=width, width=height)
def minor(self, row_n, col_n):
height, width = self.height, self.width
if not (0 <= row_n < height):
raise ValueError('row_n should be between 0 and %d' % height)
elif not (0 <= col_n < width):
raise ValueError('col_n should be between 0 and %d' % width)
two_dimensional_array = []
for r in range(height):
if r == row_n:
continue
row = []
two_dimensional_array.append(row)
for c in range(width):
if c == col_n:
continue
row.append(self[r][c])
return type(self)(two_dimensional_array)
def determinant(self):
height, width = self.height, self.width
if height != width:
raise ValueError('Only square matrix can calculate a determinant')
tmp, rv = copy.deepcopy(self), 1.
for c in range(width - 1, 0, -1):
pivot, r = max((abs(tmp[r][c]), r) for r in range(c + 1))
pivot = tmp[r][c]
if not pivot:
return 0.
tmp[r], tmp[c] = tmp[c], tmp[r]
if r != c:
rv = -rv
rv *= pivot
fact = -1. / pivot
for r in range(c):
f = fact * tmp[r][c]
for x in range(c):
tmp[r][x] += f * tmp[c][x]
return rv * tmp[0][0]
def adjugate(self):
height, width = self.height, self.width
if height != width:
raise ValueError('Only square matrix can be adjugated')
if height == 2:
a, b = self[0][0], self[0][1]
c, d = self[1][0], self[1][1]
return type(self)([[d, -b], [-c, a]])
src = {}
for r in range(height):
for c in range(width):
sign = -1 if (r + c) % 2 else 1
src[r, c] = self.minor(r, c).determinant() * sign
return type(self)(src, height, width)
def inverse(self):
if self.height == self.width == 1:
return type(self)([[1. / self[0][0]]])
return (1. / self.determinant()) * self.adjugate()
def __add__(self, other):
height, width = self.height, self.width
if (height, width) != (other.height, other.width):
raise ValueError('Must be same size')
src = {}
for r in range(height):
for c in range(width):
src[r, c] = self[r][c] + other[r][c]
return type(self)(src, height, width)
def __mul__(self, other):
if self.width != other.height:
raise ValueError('Bad size')
height, width = self.height, other.width
src = {}
for r in range(height):
for c in range(width):
src[r, c] = sum(self[r][x] * other[x][c]
for x in range(self.width))
return type(self)(src, height, width)
def __rmul__(self, other):
if not isinstance(other, Number):
raise TypeError('The operand should be a number')
height, width = self.height, self.width
src = {}
for r in range(height):
for c in range(width):
src[r, c] = other * self[r][c]
return type(self)(src, height, width)
def __repr__(self):
return '{}({})'.format(type(self).__name__, super(Matrix, self).__repr__())
def _repr_latex_(self):
rows = [' && '.join(['%.3f' % cell for cell in row]) for row in self]
latex = r'\begin{matrix} %s \end{matrix}' % r'\\'.join(rows)
return '$%s$' % latex
def _gen_erfcinv(erfc, math=math):
def erfcinv(y):
"""The inverse function of erfc."""
if y >= 2:
return -100.
elif y <= 0:
return 100.
zero_point = y < 1
if not zero_point:
y = 2 - y
t = math.sqrt(-2 * math.log(y / 2.))
x = -0.70711 * \
((2.30753 + t * 0.27061) / (1. + t * (0.99229 + t * 0.04481)) - t)
for i in range(2):
err = erfc(x) - y
x += err / (1.12837916709551257 * math.exp(-(x ** 2)) - x * err)
return x if zero_point else -x
return erfcinv
def _gen_ppf(erfc, math=math):
erfcinv = _gen_erfcinv(erfc, math)
def ppf(x, mu=0, sigma=1):
return mu - sigma * math.sqrt(2) * erfcinv(2 * x)
return ppf
def erfc(x):
z = abs(x)
t = 1. / (1. + z / 2.)
r = t * math.exp(-z * z - 1.26551223 + t * (1.00002368 + t * (
0.37409196 + t * (0.09678418 + t * (-0.18628806 + t * (
0.27886807 + t * (-1.13520398 + t * (1.48851587 + t * (
-0.82215223 + t * 0.17087277
)))
)))
)))
return 2. - r if x < 0 else r
def cdf(x, mu=0, sigma=1):
return 0.5 * erfc(-(x - mu) / (sigma * math.sqrt(2)))
def pdf(x, mu=0, sigma=1):
return (1 / math.sqrt(2 * math.pi) * abs(sigma) *
math.exp(-(((x - mu) / abs(sigma)) ** 2 / 2)))
ppf = _gen_ppf(erfc)
def choose_backend(backend):
if backend is None: # fallback
return cdf, pdf, ppf
elif backend == 'mpmath':
try:
import mpmath
except ImportError:
raise ImportError('Install "mpmath" to use this backend')
return mpmath.ncdf, mpmath.npdf, _gen_ppf(mpmath.erfc, math=mpmath)
elif backend == 'scipy':
try:
from scipy.stats import norm
except ImportError:
raise ImportError('Install "scipy" to use this backend')
return norm.cdf, norm.pdf, norm.ppf
raise ValueError('%r backend is not defined' % backend)
def available_backends():
backends = [None]
for backend in ['mpmath', 'scipy']:
try:
__import__(backend)
except ImportError:
continue
backends.append(backend)
return backends
class Node(object):
pass
class Variable(Node, Gaussian):
def __init__(self):
self.messages = {}
super(Variable, self).__init__()
def set(self, val):
delta = self.delta(val)
self.pi, self.tau = val.pi, val.tau
return delta
def delta(self, other):
pi_delta = abs(self.pi - other.pi)
if pi_delta == inf:
return 0.
return max(abs(self.tau - other.tau), math.sqrt(pi_delta))
def update_message(self, factor, pi=0, tau=0, message=None):
message = message or Gaussian(pi=pi, tau=tau)
old_message, self[factor] = self[factor], message
return self.set(self / old_message * message)
def update_value(self, factor, pi=0, tau=0, value=None):
value = value or Gaussian(pi=pi, tau=tau)
old_message = self[factor]
self[factor] = value * old_message / self
return self.set(value)
def __getitem__(self, factor):
return self.messages[factor]
def __setitem__(self, factor, message):
self.messages[factor] = message
def __repr__(self):
args = (type(self).__name__, super(Variable, self).__repr__(),
len(self.messages), '' if len(self.messages) == 1 else 's')
return '<%s %s with %d connection%s>' % args
class Factor(Node):
def __init__(self, variables):
self.vars = variables
for var in variables:
var[self] = Gaussian()
def down(self):
return 0
def up(self):
return 0
@property
def var(self):
assert len(self.vars) == 1
return self.vars[0]
def __repr__(self):
args = (type(self).__name__, len(self.vars),
'' if len(self.vars) == 1 else 's')
return '<%s with %d connection%s>' % args
class PriorFactor(Factor):
def __init__(self, var, val, dynamic=0):
super(PriorFactor, self).__init__([var])
self.val = val
self.dynamic = dynamic
def down(self):
sigma = math.sqrt(self.val.sigma ** 2 + self.dynamic ** 2)
value = Gaussian(self.val.mu, sigma)
return self.var.update_value(self, value=value)
class LikelihoodFactor(Factor):
def __init__(self, mean_var, value_var, variance):
super(LikelihoodFactor, self).__init__([mean_var, value_var])
self.mean = mean_var
self.value = value_var
self.variance = variance
def calc_a(self, var):
return 1. / (1. + self.variance * var.pi)
def down(self):
# update value.
msg = self.mean / self.mean[self]
a = self.calc_a(msg)
return self.value.update_message(self, a * msg.pi, a * msg.tau)
def up(self):
# update mean.
msg = self.value / self.value[self]
a = self.calc_a(msg)
return self.mean.update_message(self, a * msg.pi, a * msg.tau)
class SumFactor(Factor):
def __init__(self, sum_var, term_vars, coeffs):
super(SumFactor, self).__init__([sum_var] + term_vars)
self.sum = sum_var
self.terms = term_vars
self.coeffs = coeffs
def down(self):
vals = self.terms
msgs = [var[self] for var in vals]
return self.update(self.sum, vals, msgs, self.coeffs)
def up(self, index=0):
coeff = self.coeffs[index]
coeffs = []
for x, c in enumerate(self.coeffs):
try:
if x == index:
coeffs.append(1. / coeff)
else:
coeffs.append(-c / coeff)
except ZeroDivisionError:
coeffs.append(0.)
vals = self.terms[:]
vals[index] = self.sum
msgs = [var[self] for var in vals]
return self.update(self.terms[index], vals, msgs, coeffs)
def update(self, var, vals, msgs, coeffs):
pi_inv = 0
mu = 0
for val, msg, coeff in zip(vals, msgs, coeffs):
div = val / msg
mu += coeff * div.mu
if pi_inv == inf:
continue
try:
# numpy.float64 handles floating-point error by different way.
# For example, it can just warn RuntimeWarning on n/0 problem
# instead of throwing ZeroDivisionError. So div.pi, the
# denominator has to be a built-in float.
pi_inv += coeff ** 2 / float(div.pi)
except ZeroDivisionError:
pi_inv = inf
pi = 1. / pi_inv
tau = pi * mu
return var.update_message(self, pi, tau)
class TruncateFactor(Factor):
def __init__(self, var, v_func, w_func, draw_margin):
super(TruncateFactor, self).__init__([var])
self.v_func = v_func
self.w_func = w_func
self.draw_margin = draw_margin
def up(self):
val = self.var
msg = self.var[self]
div = val / msg
sqrt_pi = math.sqrt(div.pi)
args = (div.tau / sqrt_pi, self.draw_margin * sqrt_pi)
v = self.v_func(*args)
w = self.w_func(*args)
denom = (1. - w)
pi, tau = div.pi / denom, (div.tau + sqrt_pi * v) / denom
return val.update_value(self, pi, tau)
#: Default initial mean of ratings.
MU = 25.
#: Default initial standard deviation of ratings.
SIGMA = MU / 3
#: Default distance that guarantees about 76% chance of winning.
BETA = SIGMA / 2
#: Default dynamic factor.
TAU = SIGMA / 100
#: Default draw probability of the game.
DRAW_PROBABILITY = .10
#: A basis to check reliability of the result.
DELTA = 0.0001
def calc_draw_probability(draw_margin, size, env=None):
if env is None:
env = global_env()
return 2 * env.cdf(draw_margin / (math.sqrt(size) * env.beta)) - 1
def calc_draw_margin(draw_probability, size, env=None):
if env is None:
env = global_env()
return env.ppf((draw_probability + 1) / 2.) * math.sqrt(size) * env.beta
def _team_sizes(rating_groups):
team_sizes = [0]
for group in rating_groups:
team_sizes.append(len(group) + team_sizes[-1])
del team_sizes[0]
return team_sizes
def _floating_point_error(env):
if env.backend == 'mpmath':
msg = 'Set "mpmath.mp.dps" to higher'
else:
msg = 'Cannot calculate correctly, set backend to "mpmath"'
return FloatingPointError(msg)
class Rating(Gaussian):
def __init__(self, mu=None, sigma=None):
if isinstance(mu, tuple):
mu, sigma = mu
elif isinstance(mu, Gaussian):
mu, sigma = mu.mu, mu.sigma
if mu is None:
mu = global_env().mu
if sigma is None:
sigma = global_env().sigma
super(Rating, self).__init__(mu, sigma)
def __int__(self):
return int(self.mu)
def __long__(self):
return long(self.mu)
def __float__(self):
return float(self.mu)
def __iter__(self):
return iter((self.mu, self.sigma))
def __repr__(self):
c = type(self)
args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma)
return '%s(mu=%.3f, sigma=%.3f)' % args
class TrueSkill(object):
def __init__(self, mu=MU, sigma=SIGMA, beta=BETA, tau=TAU,
draw_probability=DRAW_PROBABILITY, backend=None):
self.mu = mu
self.sigma = sigma
self.beta = beta
self.tau = tau
self.draw_probability = draw_probability
self.backend = backend
if isinstance(backend, tuple):
self.cdf, self.pdf, self.ppf = backend
else:
self.cdf, self.pdf, self.ppf = choose_backend(backend)
def create_rating(self, mu=None, sigma=None):
if mu is None:
mu = self.mu
if sigma is None:
sigma = self.sigma
return Rating(mu, sigma)
def v_win(self, diff, draw_margin):
x = diff - draw_margin
denom = self.cdf(x)
return (self.pdf(x) / denom) if denom else -x
def v_draw(self, diff, draw_margin):
abs_diff = abs(diff)
a, b = draw_margin - abs_diff, -draw_margin - abs_diff
denom = self.cdf(a) - self.cdf(b)
numer = self.pdf(b) - self.pdf(a)
return ((numer / denom) if denom else a) * (-1 if diff < 0 else +1)
def w_win(self, diff, draw_margin):
x = diff - draw_margin
v = self.v_win(diff, draw_margin)
w = v * (v + x)
if 0 < w < 1:
return w
raise _floating_point_error(self)
def w_draw(self, diff, draw_margin):
abs_diff = abs(diff)
a, b = draw_margin - abs_diff, -draw_margin - abs_diff
denom = self.cdf(a) - self.cdf(b)
if not denom:
raise _floating_point_error(self)
v = self.v_draw(abs_diff, draw_margin)
return (v ** 2) + (a * self.pdf(a) - b * self.pdf(b)) / denom
def validate_rating_groups(self, rating_groups):
# check group sizes
if len(rating_groups) < 2:
raise ValueError('Need multiple rating groups')
elif not all(rating_groups):
raise ValueError('Each group must contain multiple ratings')
# check group types
group_types = set(map(type, rating_groups))
if len(group_types) != 1:
raise TypeError('All groups should be same type')
elif group_types.pop() is Rating:
raise TypeError('Rating cannot be a rating group')
# normalize rating_groups
if isinstance(rating_groups[0], dict):
dict_rating_groups = rating_groups
rating_groups = []
keys = []
for dict_rating_group in dict_rating_groups:
rating_group, key_group = [], []
for key, rating in iteritems(dict_rating_group):
rating_group.append(rating)
key_group.append(key)
rating_groups.append(tuple(rating_group))
keys.append(tuple(key_group))
else:
rating_groups = list(rating_groups)
keys = None
return rating_groups, keys
def validate_weights(self, weights, rating_groups, keys=None):
if weights is None:
weights = [(1,) * len(g) for g in rating_groups]
elif isinstance(weights, dict):
weights_dict, weights = weights, []
for x, group in enumerate(rating_groups):
w = []
weights.append(w)
for y, rating in enumerate(group):
if keys is not None:
y = keys[x][y]
w.append(weights_dict.get((x, y), 1))
return weights
def factor_graph_builders(self, rating_groups, ranks, weights):
flatten_ratings = sum(map(tuple, rating_groups), ())
flatten_weights = sum(map(tuple, weights), ())
size = len(flatten_ratings)
group_size = len(rating_groups)
# create variables
rating_vars = [Variable() for x in range(size)]
perf_vars = [Variable() for x in range(size)]
team_perf_vars = [Variable() for x in range(group_size)]
team_diff_vars = [Variable() for x in range(group_size - 1)]
team_sizes = _team_sizes(rating_groups)
# layer builders
def build_rating_layer():
for rating_var, rating in zip(rating_vars, flatten_ratings):
yield PriorFactor(rating_var, rating, self.tau)
def build_perf_layer():
for rating_var, perf_var in zip(rating_vars, perf_vars):
yield LikelihoodFactor(rating_var, perf_var, self.beta ** 2)
def build_team_perf_layer():
for team, team_perf_var in enumerate(team_perf_vars):
if team > 0:
start = team_sizes[team - 1]
else:
start = 0
end = team_sizes[team]
child_perf_vars = perf_vars[start:end]
coeffs = flatten_weights[start:end]
yield SumFactor(team_perf_var, child_perf_vars, coeffs)
def build_team_diff_layer():
for team, team_diff_var in enumerate(team_diff_vars):
yield SumFactor(team_diff_var,
team_perf_vars[team:team + 2], [+1, -1])
def build_trunc_layer():
for x, team_diff_var in enumerate(team_diff_vars):
if callable(self.draw_probability):
# dynamic draw probability
team_perf1, team_perf2 = team_perf_vars[x:x + 2]
args = (Rating(team_perf1), Rating(team_perf2), self)
draw_probability = self.draw_probability(*args)
else:
# static draw probability
draw_probability = self.draw_probability
size = sum(map(len, rating_groups[x:x + 2]))
draw_margin = calc_draw_margin(draw_probability, size, self)
if ranks[x] == ranks[x + 1]: # is a tie?
v_func, w_func = self.v_draw, self.w_draw
else:
v_func, w_func = self.v_win, self.w_win
yield TruncateFactor(team_diff_var,
v_func, w_func, draw_margin)
# build layers
return (build_rating_layer, build_perf_layer, build_team_perf_layer,
build_team_diff_layer, build_trunc_layer)
def run_schedule(self, build_rating_layer, build_perf_layer,
build_team_perf_layer, build_team_diff_layer,
build_trunc_layer, min_delta=DELTA):
if min_delta <= 0:
raise ValueError('min_delta must be greater than 0')
layers = []
def build(builders):
layers_built = [list(build()) for build in builders]
layers.extend(layers_built)
return layers_built
# gray arrows
layers_built = build([build_rating_layer,
build_perf_layer,
build_team_perf_layer])
rating_layer, perf_layer, team_perf_layer = layers_built
for f in chain(*layers_built):
f.down()
# arrow #1, #2, #3
team_diff_layer, trunc_layer = build([build_team_diff_layer,
build_trunc_layer])
team_diff_len = len(team_diff_layer)
for x in range(10):
if team_diff_len == 1:
# only two teams
team_diff_layer[0].down()
delta = trunc_layer[0].up()
else:
# multiple teams
delta = 0
for x in range(team_diff_len - 1):
team_diff_layer[x].down()
delta = max(delta, trunc_layer[x].up())
team_diff_layer[x].up(1) # up to right variable
for x in range(team_diff_len - 1, 0, -1):
team_diff_layer[x].down()
delta = max(delta, trunc_layer[x].up())
team_diff_layer[x].up(0) # up to left variable
# repeat until to small update
if delta <= min_delta:
break
# up both ends
team_diff_layer[0].up(0)
team_diff_layer[team_diff_len - 1].up(1)
# up the remainder of the black arrows
for f in team_perf_layer:
for x in range(len(f.vars) - 1):
f.up(x)
for f in perf_layer:
f.up()
return layers
def rate(self, rating_groups, ranks=None, weights=None, min_delta=DELTA):
rating_groups, keys = self.validate_rating_groups(rating_groups)
weights = self.validate_weights(weights, rating_groups, keys)
group_size = len(rating_groups)
if ranks is None:
ranks = range(group_size)
elif len(ranks) != group_size:
raise ValueError('Wrong ranks')
# sort rating groups by rank
by_rank = lambda x: x[1][1]
sorting = sorted(enumerate(zip(rating_groups, ranks, weights)),
key=by_rank)
sorted_rating_groups, sorted_ranks, sorted_weights = [], [], []
for x, (g, r, w) in sorting:
sorted_rating_groups.append(g)
sorted_ranks.append(r)
# make weights to be greater than 0
sorted_weights.append(max(min_delta, w_) for w_ in w)
# build factor graph
args = (sorted_rating_groups, sorted_ranks, sorted_weights)
builders = self.factor_graph_builders(*args)
args = builders + (min_delta,)
layers = self.run_schedule(*args)
# make result
rating_layer, team_sizes = layers[0], _team_sizes(sorted_rating_groups)
transformed_groups = []
for start, end in zip([0] + team_sizes[:-1], team_sizes):
group = []
for f in rating_layer[start:end]:
group.append(Rating(float(f.var.mu), float(f.var.sigma)))
transformed_groups.append(tuple(group))
by_hint = lambda x: x[0]
unsorting = sorted(zip((x for x, __ in sorting), transformed_groups),
key=by_hint)
if keys is None:
return [g for x, g in unsorting]
# restore the structure with input dictionary keys
return [dict(zip(keys[x], g)) for x, g in unsorting]
def quality(self, rating_groups, weights=None):
rating_groups, keys = self.validate_rating_groups(rating_groups)
weights = self.validate_weights(weights, rating_groups, keys)
flatten_ratings = sum(map(tuple, rating_groups), ())
flatten_weights = sum(map(tuple, weights), ())
length = len(flatten_ratings)
# a vector of all of the skill means
mean_matrix = Matrix([[r.mu] for r in flatten_ratings])
# a matrix whose diagonal values are the variances (sigma ** 2) of each
# of the players.
def variance_matrix(height, width):
variances = (r.sigma ** 2 for r in flatten_ratings)
for x, variance in enumerate(variances):
yield (x, x), variance
variance_matrix = Matrix(variance_matrix, length, length)
# the player-team assignment and comparison matrix
def rotated_a_matrix(set_height, set_width):
t = 0
for r, (cur, _next) in enumerate(zip(rating_groups[:-1],
rating_groups[1:])):
for x in range(t, t + len(cur)):
yield (r, x), flatten_weights[x]
t += 1
x += 1
for x in range(x, x + len(_next)):
yield (r, x), -flatten_weights[x]
set_height(r + 1)
set_width(x + 1)
rotated_a_matrix = Matrix(rotated_a_matrix)
a_matrix = rotated_a_matrix.transpose()
# match quality further derivation
_ata = (self.beta ** 2) * rotated_a_matrix * a_matrix
_atsa = rotated_a_matrix * variance_matrix * a_matrix
start = mean_matrix.transpose() * a_matrix
middle = _ata + _atsa
end = rotated_a_matrix * mean_matrix
# make result
e_arg = (-0.5 * start * middle.inverse() * end).determinant()
s_arg = _ata.determinant() / middle.determinant()
return math.exp(e_arg) * math.sqrt(s_arg)
def expose(self, rating):
k = self.mu / self.sigma
return rating.mu - k * rating.sigma
def make_as_global(self):
return setup(env=self)
def __repr__(self):
c = type(self)
if callable(self.draw_probability):
f = self.draw_probability
draw_probability = '.'.join([f.__module__, f.__name__])
else:
draw_probability = '%.1f%%' % (self.draw_probability * 100)
if self.backend is None:
backend = ''
elif isinstance(self.backend, tuple):
backend = ', backend=...'
else:
backend = ', backend=%r' % self.backend
args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma,
self.beta, self.tau, draw_probability, backend)
return ('%s(mu=%.3f, sigma=%.3f, beta=%.3f, tau=%.3f, '
'draw_probability=%s%s)' % args)
def rate_1vs1(rating1, rating2, drawn=False, min_delta=DELTA, env=None):
if env is None:
env = global_env()
ranks = [0, 0 if drawn else 1]
teams = env.rate([(rating1,), (rating2,)], ranks, min_delta=min_delta)
return teams[0][0], teams[1][0]
def quality_1vs1(rating1, rating2, env=None):
if env is None:
env = global_env()
return env.quality([(rating1,), (rating2,)])
def global_env():
try:
global_env.__trueskill__
except AttributeError:
# setup the default environment
setup()
return global_env.__trueskill__
def setup(mu=MU, sigma=SIGMA, beta=BETA, tau=TAU,
draw_probability=DRAW_PROBABILITY, backend=None, env=None):
if env is None:
env = TrueSkill(mu, sigma, beta, tau, draw_probability, backend)
global_env.__trueskill__ = env
return env
def rate(rating_groups, ranks=None, weights=None, min_delta=DELTA):
return global_env().rate(rating_groups, ranks, weights, min_delta)
def quality(rating_groups, weights=None):
return global_env().quality(rating_groups, weights)
def expose(rating):
return global_env().expose(rating)

Binary file not shown.

View File

@ -8,7 +8,7 @@ with open("requirements.txt", 'r') as file:
setuptools.setup( setuptools.setup(
name="analysis", name="analysis",
version="1.0.0.011", version="1.0.0.012",
author="The Titan Scouting Team", author="The Titan Scouting Team",
author_email="titanscout2022@gmail.com", author_email="titanscout2022@gmail.com",
description="analysis package developed by Titan Scouting for The Red Alliance", description="analysis package developed by Titan Scouting for The Red Alliance",