mirror of
				https://github.com/titanscouting/tra-analysis.git
				synced 2025-10-25 02:19:20 +00:00 
			
		
		
		
	created two new analysis variants
the existing amd64 new unpopulated arm64
This commit is contained in:
		| @@ -1,792 +0,0 @@ | ||||
| # Titan Robotics Team 2022: Data Analysis Module | ||||
| # Written by Arthur Lu & Jacob Levine | ||||
| # Notes: | ||||
| #    this should be imported as a python module using 'import analysis' | ||||
| #    this should be included in the local directory or environment variable | ||||
| #    this module has been optimized for multhreaded computing | ||||
| #    current benchmark of optimization: 1.33 times faster | ||||
| # setup: | ||||
|  | ||||
| __version__ = "1.1.13.007" | ||||
|  | ||||
| # changelog should be viewed using print(analysis.__changelog__) | ||||
| __changelog__ = """changelog: | ||||
|     1.1.13.007: | ||||
|         - fixed bug with trueskill | ||||
|     1.1.13.006: | ||||
|         - cleaned up imports | ||||
|     1.1.13.005: | ||||
|         - cleaned up package | ||||
|     1.1.13.004: | ||||
|         - small fixes to regression to improve performance | ||||
|     1.1.13.003: | ||||
|         - filtered nans from regression | ||||
|     1.1.13.002: | ||||
|         - removed torch requirement, and moved Regression back to regression.py | ||||
|     1.1.13.001: | ||||
|         - bug fix with linear regression not returning a proper value | ||||
|         - cleaned up regression | ||||
|         - fixed bug with polynomial regressions | ||||
|     1.1.13.000: | ||||
|         - fixed all regressions to now properly work | ||||
|     1.1.12.006: | ||||
|         - fixed bg with a division by zero in histo_analysis | ||||
|     1.1.12.005: | ||||
|         - fixed numba issues by removing numba  from elo, glicko2 and trueskill | ||||
|     1.1.12.004: | ||||
|         - renamed gliko to glicko | ||||
|     1.1.12.003: | ||||
|         - removed depreciated code | ||||
|     1.1.12.002: | ||||
|         - removed team first time trueskill instantiation in favor of integration in superscript.py | ||||
|     1.1.12.001: | ||||
|         - improved readibility of regression outputs by stripping tensor data | ||||
|         - used map with lambda to acheive the improved readibility | ||||
|         - lost numba jit support with regression, and generated_jit hangs at execution | ||||
|         - TODO: reimplement correct numba integration in regression | ||||
|     1.1.12.000: | ||||
|         - temporarily fixed polynomial regressions by using sklearn's PolynomialFeatures | ||||
|     1.1.11.010: | ||||
|         - alphabeticaly ordered import lists | ||||
|     1.1.11.009: | ||||
|         - bug fixes | ||||
|     1.1.11.008: | ||||
|         - bug fixes | ||||
|     1.1.11.007: | ||||
|         - bug fixes | ||||
|     1.1.11.006: | ||||
|         - tested min and max | ||||
|         - bug fixes  | ||||
|     1.1.11.005: | ||||
|         - added min and max in basic_stats | ||||
|     1.1.11.004: | ||||
|         - bug fixes | ||||
|     1.1.11.003: | ||||
|         - bug fixes | ||||
|     1.1.11.002: | ||||
|         - consolidated metrics | ||||
|         - fixed __all__ | ||||
|     1.1.11.001: | ||||
|         - added test/train split to RandomForestClassifier and RandomForestRegressor | ||||
|     1.1.11.000: | ||||
|         - added RandomForestClassifier and RandomForestRegressor | ||||
|         - note: untested | ||||
|     1.1.10.000: | ||||
|         - added numba.jit to remaining functions | ||||
|     1.1.9.002: | ||||
|         - kernelized PCA and KNN | ||||
|     1.1.9.001: | ||||
|         - fixed bugs with SVM and NaiveBayes | ||||
|     1.1.9.000: | ||||
|         - added SVM class, subclasses, and functions | ||||
|         - note: untested | ||||
|     1.1.8.000: | ||||
|         - added NaiveBayes classification engine | ||||
|         - note: untested | ||||
|     1.1.7.000: | ||||
|         - added knn() | ||||
|         - added confusion matrix to decisiontree() | ||||
|     1.1.6.002: | ||||
|         - changed layout of __changelog to be vscode friendly | ||||
|     1.1.6.001: | ||||
|         - added additional hyperparameters to decisiontree() | ||||
|     1.1.6.000: | ||||
|         - fixed __version__ | ||||
|         - fixed __all__ order | ||||
|         - added decisiontree() | ||||
|     1.1.5.003: | ||||
|         - added pca | ||||
|     1.1.5.002: | ||||
|         - reduced import list | ||||
|         - added kmeans clustering engine | ||||
|     1.1.5.001: | ||||
|         - simplified regression by using .to(device) | ||||
|     1.1.5.000: | ||||
|         - added polynomial regression to regression(); untested | ||||
|     1.1.4.000: | ||||
|         - added trueskill() | ||||
|     1.1.3.002: | ||||
|         - renamed regression class to Regression, regression_engine() to regression gliko2_engine class to Gliko2 | ||||
|     1.1.3.001: | ||||
|         - changed glicko2() to return tuple instead of array | ||||
|     1.1.3.000: | ||||
|         - added glicko2_engine class and glicko() | ||||
|         - verified glicko2() accuracy | ||||
|     1.1.2.003: | ||||
|         - fixed elo() | ||||
|     1.1.2.002: | ||||
|         - added elo() | ||||
|         - elo() has bugs to be fixed | ||||
|     1.1.2.001: | ||||
|         - readded regrression import | ||||
|     1.1.2.000: | ||||
|         - integrated regression.py as regression class | ||||
|         - removed regression import | ||||
|         - fixed metadata for regression class | ||||
|         - fixed metadata for analysis class | ||||
|     1.1.1.001: | ||||
|         - regression_engine() bug fixes, now actaully regresses | ||||
|     1.1.1.000: | ||||
|         - added regression_engine() | ||||
|         - added all regressions except polynomial | ||||
|     1.1.0.007: | ||||
|         - updated _init_device() | ||||
|     1.1.0.006: | ||||
|         - removed useless try statements | ||||
|     1.1.0.005: | ||||
|         - removed impossible outcomes | ||||
|     1.1.0.004: | ||||
|         - added performance metrics (r^2, mse, rms) | ||||
|     1.1.0.003: | ||||
|         - resolved nopython mode for mean, median, stdev, variance | ||||
|     1.1.0.002: | ||||
|         - snapped (removed) majority of uneeded imports | ||||
|         - forced object mode (bad) on all jit | ||||
|         - TODO: stop numba complaining about not being able to compile in nopython mode | ||||
|     1.1.0.001: | ||||
|         - removed from sklearn import * to resolve uneeded wildcard imports | ||||
|     1.1.0.000: | ||||
|         - removed c_entities,nc_entities,obstacles,objectives from __all__ | ||||
|         - applied numba.jit to all functions | ||||
|         - depreciated and removed stdev_z_split | ||||
|         - cleaned up histo_analysis to include numpy and numba.jit optimizations | ||||
|         - depreciated and removed all regression functions in favor of future pytorch optimizer | ||||
|         - depreciated and removed all nonessential functions (basic_analysis, benchmark, strip_data) | ||||
|         - optimized z_normalize using sklearn.preprocessing.normalize | ||||
|         - TODO: implement kernel/function based pytorch regression optimizer | ||||
|     1.0.9.000: | ||||
|         - refactored | ||||
|         - numpyed everything | ||||
|         - removed stats in favor of numpy functions | ||||
|     1.0.8.005: | ||||
|         - minor fixes | ||||
|     1.0.8.004: | ||||
|         - removed a few unused dependencies | ||||
|     1.0.8.003: | ||||
|         - added p_value function | ||||
|     1.0.8.002: | ||||
|         - updated __all__ correctly to contain changes made in v 1.0.8.000 and v 1.0.8.001 | ||||
|     1.0.8.001: | ||||
|         - refactors | ||||
|         - bugfixes | ||||
|     1.0.8.000: | ||||
|         - depreciated histo_analysis_old | ||||
|         - depreciated debug | ||||
|         - altered basic_analysis to take array data instead of filepath | ||||
|         - refactor | ||||
|         - optimization | ||||
|     1.0.7.002: | ||||
|         - bug fixes | ||||
|     1.0.7.001: | ||||
|         - bug fixes | ||||
|     1.0.7.000: | ||||
|         - added tanh_regression (logistical regression) | ||||
|         - bug fixes | ||||
|     1.0.6.005: | ||||
|         - added z_normalize function to normalize dataset | ||||
|         - bug fixes | ||||
|     1.0.6.004: | ||||
|         - bug fixes | ||||
|     1.0.6.003: | ||||
|         - bug fixes | ||||
|     1.0.6.002: | ||||
|         - bug fixes | ||||
|     1.0.6.001: | ||||
|         - corrected __all__ to contain all of the functions | ||||
|     1.0.6.000: | ||||
|         - added calc_overfit, which calculates two measures of overfit, error and performance | ||||
|         - added calculating overfit to optimize_regression | ||||
|     1.0.5.000: | ||||
|         - added optimize_regression function, which is a sample function to find the optimal regressions | ||||
|         - optimize_regression function filters out some overfit funtions (functions with r^2 = 1) | ||||
|         - planned addition: overfit detection in the optimize_regression function | ||||
|     1.0.4.002: | ||||
|         - added __changelog__ | ||||
|         - updated debug function with log and exponential regressions | ||||
|     1.0.4.001: | ||||
|         - added log regressions | ||||
|         - added exponential regressions | ||||
|         - added log_regression and exp_regression to __all__ | ||||
|     1.0.3.008: | ||||
|         - added debug function to further consolidate functions | ||||
|     1.0.3.007: | ||||
|         - added builtin benchmark function | ||||
|         - added builtin random (linear) data generation function | ||||
|         - added device initialization (_init_device) | ||||
|     1.0.3.006: | ||||
|         - reorganized the imports list to be in alphabetical order | ||||
|         - added search and regurgitate functions to c_entities, nc_entities, obstacles, objectives | ||||
|     1.0.3.005: | ||||
|         - major bug fixes | ||||
|         - updated historical analysis | ||||
|         - depreciated old historical analysis | ||||
|     1.0.3.004: | ||||
|         - added __version__, __author__, __all__ | ||||
|         - added polynomial regression | ||||
|         - added root mean squared function | ||||
|         - added r squared function | ||||
|     1.0.3.003: | ||||
|         - bug fixes | ||||
|         - added c_entities | ||||
|     1.0.3.002: | ||||
|         - bug fixes | ||||
|         - added nc_entities, obstacles, objectives | ||||
|         - consolidated statistics.py to analysis.py | ||||
|     1.0.3.001: | ||||
|         - compiled 1d, column, and row basic stats into basic stats function | ||||
|     1.0.3.000: | ||||
|         - added historical analysis function | ||||
|     1.0.2.xxx: | ||||
|         - added z score test | ||||
|     1.0.1.xxx: | ||||
|         - major bug fixes | ||||
|     1.0.0.xxx: | ||||
|         - added loading csv | ||||
|         - added 1d, column, row basic stats | ||||
| """ | ||||
|  | ||||
| __author__ = ( | ||||
|     "Arthur Lu <learthurgo@gmail.com>", | ||||
|     "Jacob Levine <jlevine@imsa.edu>", | ||||
| ) | ||||
|  | ||||
| __all__ = [ | ||||
|     'load_csv', | ||||
|     'basic_stats', | ||||
|     'z_score', | ||||
|     'z_normalize', | ||||
|     'histo_analysis', | ||||
|     'regression', | ||||
|     'elo', | ||||
|     'glicko2', | ||||
|     'trueskill', | ||||
|     'RegressionMetrics', | ||||
|     'ClassificationMetrics', | ||||
|     'kmeans', | ||||
|     'pca', | ||||
|     'decisiontree', | ||||
|     'knn_classifier', | ||||
|     'knn_regressor', | ||||
|     'NaiveBayes', | ||||
|     'SVM', | ||||
|     'random_forest_classifier', | ||||
|     'random_forest_regressor', | ||||
|     'Glicko2', | ||||
|     # all statistics functions left out due to integration in other functions | ||||
| ] | ||||
|  | ||||
| # now back to your regularly scheduled programming: | ||||
|  | ||||
| # imports (now in alphabetical order! v 1.0.3.006): | ||||
|  | ||||
| import csv | ||||
| import numba | ||||
| from numba import jit | ||||
| import numpy as np | ||||
| import scipy | ||||
| from scipy import * | ||||
| import sklearn | ||||
| from sklearn import * | ||||
| from analysis import trueskill as Trueskill | ||||
|  | ||||
| class error(ValueError): | ||||
|     pass | ||||
|  | ||||
| def load_csv(filepath): | ||||
|     with open(filepath, newline='') as csvfile: | ||||
|         file_array = np.array(list(csv.reader(csvfile))) | ||||
|         csvfile.close() | ||||
|     return file_array | ||||
|  | ||||
| # expects 1d array | ||||
| @jit(forceobj=True) | ||||
| def basic_stats(data): | ||||
|  | ||||
|     data_t = np.array(data).astype(float) | ||||
|  | ||||
|     _mean = mean(data_t) | ||||
|     _median = median(data_t) | ||||
|     _stdev = stdev(data_t) | ||||
|     _variance = variance(data_t) | ||||
|     _min = npmin(data_t) | ||||
|     _max = npmax(data_t) | ||||
|  | ||||
|     return _mean, _median, _stdev, _variance, _min, _max | ||||
|  | ||||
| # returns z score with inputs of point, mean and standard deviation of spread | ||||
| @jit(forceobj=True) | ||||
| def z_score(point, mean, stdev): | ||||
|     score = (point - mean) / stdev | ||||
|      | ||||
|     return score | ||||
|  | ||||
| # expects 2d array, normalizes across all axes | ||||
| @jit(forceobj=True) | ||||
| def z_normalize(array, *args): | ||||
|  | ||||
|    array = np.array(array) | ||||
|    for arg in args: | ||||
|        array = sklearn.preprocessing.normalize(array, axis = arg) | ||||
|  | ||||
|    return array | ||||
|  | ||||
| @jit(forceobj=True) | ||||
| # expects 2d array of [x,y] | ||||
| def histo_analysis(hist_data): | ||||
|  | ||||
|     if(len(hist_data[0]) > 2): | ||||
|  | ||||
|         hist_data = np.array(hist_data) | ||||
|         derivative = np.array(len(hist_data) - 1, dtype = float) | ||||
|         t = np.diff(hist_data) | ||||
|         derivative = t[1] / t[0] | ||||
|         np.sort(derivative) | ||||
|  | ||||
|         return basic_stats(derivative)[0], basic_stats(derivative)[3] | ||||
|  | ||||
|     else: | ||||
|  | ||||
|         return None | ||||
|  | ||||
| def regression(inputs, outputs, args): # inputs, outputs expects N-D array  | ||||
|  | ||||
|     X = np.array(inputs) | ||||
|     y = np.array(outputs) | ||||
|  | ||||
|     regressions = [] | ||||
|  | ||||
|     if 'lin' in args: # formula: ax + b | ||||
|  | ||||
|         try: | ||||
|  | ||||
|             def func(x, a, b): | ||||
|  | ||||
|                 return a * x + b | ||||
|  | ||||
|             popt, pcov = scipy.optimize.curve_fit(func, X, y) | ||||
|  | ||||
|             regressions.append((popt.flatten().tolist(), None)) | ||||
|  | ||||
|         except Exception as e: | ||||
|  | ||||
|             pass | ||||
|  | ||||
|     if 'log' in args: # formula: a log (b(x + c)) + d | ||||
|  | ||||
|         try: | ||||
|  | ||||
|             def func(x, a, b, c, d): | ||||
|  | ||||
|                 return a * np.log(b*(x + c)) + d | ||||
|  | ||||
|             popt, pcov = scipy.optimize.curve_fit(func, X, y) | ||||
|  | ||||
|             regressions.append((popt.flatten().tolist(), None)) | ||||
|  | ||||
|         except Exception as e: | ||||
|              | ||||
|             pass | ||||
|  | ||||
|     if 'exp' in args: # formula: a e ^ (b(x + c)) + d | ||||
|  | ||||
|         try:         | ||||
|  | ||||
|             def func(x, a, b, c, d): | ||||
|  | ||||
|                 return a * np.exp(b*(x + c)) + d | ||||
|  | ||||
|             popt, pcov = scipy.optimize.curve_fit(func, X, y) | ||||
|  | ||||
|             regressions.append((popt.flatten().tolist(), None)) | ||||
|  | ||||
|         except Exception as e: | ||||
|  | ||||
|             pass | ||||
|  | ||||
|     if 'ply' in args: # formula: a + bx^1 + cx^2 + dx^3 + ... | ||||
|          | ||||
|         inputs = np.array([inputs]) | ||||
|         outputs = np.array([outputs]) | ||||
|  | ||||
|         plys = [] | ||||
|         limit = len(outputs[0]) | ||||
|  | ||||
|         for i in range(2, limit): | ||||
|  | ||||
|             model = sklearn.preprocessing.PolynomialFeatures(degree = i) | ||||
|             model = sklearn.pipeline.make_pipeline(model, sklearn.linear_model.LinearRegression()) | ||||
|             model = model.fit(np.rot90(inputs), np.rot90(outputs)) | ||||
|  | ||||
|             params = model.steps[1][1].intercept_.tolist() | ||||
|             params = np.append(params, model.steps[1][1].coef_[0].tolist()[1::]) | ||||
|             params.flatten() | ||||
|             params = params.tolist() | ||||
|              | ||||
|             plys.append(params) | ||||
|  | ||||
|         regressions.append(plys) | ||||
|  | ||||
|     if 'sig' in args: # formula: a tanh (b(x + c)) + d | ||||
|  | ||||
|         try:         | ||||
|  | ||||
|             def func(x, a, b, c, d): | ||||
|  | ||||
|                 return a * np.tanh(b*(x + c)) + d | ||||
|  | ||||
|             popt, pcov = scipy.optimize.curve_fit(func, X, y) | ||||
|  | ||||
|             regressions.append((popt.flatten().tolist(), None)) | ||||
|  | ||||
|         except Exception as e: | ||||
|             | ||||
|             pass | ||||
|  | ||||
|     return regressions | ||||
|  | ||||
| def elo(starting_score, opposing_score, observed, N, K): | ||||
|  | ||||
|     expected = 1/(1+10**((np.array(opposing_score) - starting_score)/N)) | ||||
|  | ||||
|     return starting_score + K*(np.sum(observed) - np.sum(expected)) | ||||
|  | ||||
| def glicko2(starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations): | ||||
|  | ||||
|     player = Glicko2(rating = starting_score, rd = starting_rd, vol = starting_vol) | ||||
|  | ||||
|     player.update_player([x for x in opposing_score], [x for x in opposing_rd], observations) | ||||
|  | ||||
|     return (player.rating, player.rd, player.vol) | ||||
|  | ||||
| def trueskill(teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]] | ||||
|  | ||||
|     team_ratings = [] | ||||
|  | ||||
|     for team in teams_data: | ||||
|         team_temp = () | ||||
|         for player in team: | ||||
|             player = Trueskill.Rating(player[0], player[1]) | ||||
|             team_temp = team_temp + (player,) | ||||
|         team_ratings.append(team_temp) | ||||
|  | ||||
|     return Trueskill.rate(team_ratings, ranks=observations) | ||||
|  | ||||
| class RegressionMetrics(): | ||||
|  | ||||
|     def __new__(cls, predictions, targets): | ||||
|  | ||||
|         return cls.r_squared(cls, predictions, targets), cls.mse(cls, predictions, targets), cls.rms(cls, predictions, targets) | ||||
|  | ||||
|     def r_squared(self, predictions, targets):  # assumes equal size inputs | ||||
|  | ||||
|         return sklearn.metrics.r2_score(targets, predictions) | ||||
|  | ||||
|     def mse(self, predictions, targets): | ||||
|  | ||||
|         return sklearn.metrics.mean_squared_error(targets, predictions) | ||||
|  | ||||
|     def rms(self, predictions, targets): | ||||
|  | ||||
|         return math.sqrt(sklearn.metrics.mean_squared_error(targets, predictions)) | ||||
|  | ||||
| class ClassificationMetrics(): | ||||
|  | ||||
|     def __new__(cls, predictions, targets): | ||||
|  | ||||
|         return cls.cm(cls, predictions, targets), cls.cr(cls, predictions, targets) | ||||
|  | ||||
|     def cm(self, predictions, targets): | ||||
|  | ||||
|         return sklearn.metrics.confusion_matrix(targets, predictions) | ||||
|  | ||||
|     def cr(self, predictions, targets): | ||||
|  | ||||
|         return sklearn.metrics.classification_report(targets, predictions) | ||||
|  | ||||
| @jit(nopython=True) | ||||
| def mean(data): | ||||
|  | ||||
|     return np.mean(data) | ||||
|  | ||||
| @jit(nopython=True) | ||||
| def median(data): | ||||
|  | ||||
|     return np.median(data) | ||||
|  | ||||
| @jit(nopython=True) | ||||
| def stdev(data): | ||||
|  | ||||
|     return np.std(data) | ||||
|  | ||||
| @jit(nopython=True) | ||||
| def variance(data): | ||||
|  | ||||
|     return np.var(data) | ||||
|  | ||||
| @jit(nopython=True) | ||||
| def npmin(data): | ||||
|  | ||||
|     return np.amin(data) | ||||
|  | ||||
| @jit(nopython=True) | ||||
| def npmax(data): | ||||
|  | ||||
|     return np.amax(data) | ||||
|  | ||||
| @jit(forceobj=True) | ||||
| def kmeans(data, n_clusters=8, init="k-means++", n_init=10, max_iter=300, tol=0.0001, precompute_distances="auto", verbose=0, random_state=None, copy_x=True, n_jobs=None, algorithm="auto"): | ||||
|  | ||||
|     kernel = sklearn.cluster.KMeans(n_clusters = n_clusters, init = init, n_init = n_init, max_iter = max_iter, tol = tol, precompute_distances = precompute_distances, verbose = verbose, random_state = random_state, copy_x = copy_x, n_jobs = n_jobs, algorithm = algorithm) | ||||
|     kernel.fit(data) | ||||
|     predictions = kernel.predict(data) | ||||
|     centers = kernel.cluster_centers_ | ||||
|  | ||||
|     return centers, predictions | ||||
|  | ||||
| @jit(forceobj=True) | ||||
| def pca(data, n_components = None, copy = True, whiten = False, svd_solver = "auto", tol = 0.0, iterated_power = "auto", random_state = None): | ||||
|  | ||||
|     kernel = sklearn.decomposition.PCA(n_components = n_components, copy = copy, whiten = whiten, svd_solver = svd_solver, tol = tol, iterated_power = iterated_power, random_state = random_state) | ||||
|  | ||||
|     return kernel.fit_transform(data) | ||||
|  | ||||
| @jit(forceobj=True) | ||||
| def decisiontree(data, labels, test_size = 0.3, criterion = "gini", splitter = "default", max_depth = None): #expects *2d data and 1d labels | ||||
|  | ||||
|     data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|     model = sklearn.tree.DecisionTreeClassifier(criterion = criterion, splitter = splitter, max_depth = max_depth) | ||||
|     model = model.fit(data_train,labels_train) | ||||
|     predictions = model.predict(data_test) | ||||
|     metrics = ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
|     return model, metrics | ||||
|  | ||||
| @jit(forceobj=True) | ||||
| def knn_classifier(data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling | ||||
|  | ||||
|     data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|     model = sklearn.neighbors.KNeighborsClassifier() | ||||
|     model.fit(data_train, labels_train) | ||||
|     predictions = model.predict(data_test) | ||||
|  | ||||
|     return model, ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
| def knn_regressor(data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None): | ||||
|  | ||||
|     data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1) | ||||
|     model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs) | ||||
|     model.fit(data_train, outputs_train) | ||||
|     predictions = model.predict(data_test) | ||||
|  | ||||
|     return model, RegressionMetrics(predictions, outputs_test) | ||||
|  | ||||
| class NaiveBayes: | ||||
|  | ||||
|     def guassian(self, data, labels, test_size = 0.3, priors = None, var_smoothing = 1e-09): | ||||
|  | ||||
|         data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|         model = sklearn.naive_bayes.GaussianNB(priors = priors, var_smoothing = var_smoothing) | ||||
|         model.fit(data_train, labels_train) | ||||
|         predictions = model.predict(data_test) | ||||
|  | ||||
|         return model, ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
|     def multinomial(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None): | ||||
|  | ||||
|         data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|         model = sklearn.naive_bayes.MultinomialNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior) | ||||
|         model.fit(data_train, labels_train) | ||||
|         predictions = model.predict(data_test) | ||||
|  | ||||
|         return model, ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
|     def bernoulli(self, data, labels, test_size = 0.3, alpha=1.0, binarize=0.0, fit_prior=True, class_prior=None): | ||||
|  | ||||
|         data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|         model = sklearn.naive_bayes.BernoulliNB(alpha = alpha, binarize = binarize, fit_prior = fit_prior, class_prior = class_prior) | ||||
|         model.fit(data_train, labels_train) | ||||
|         predictions = model.predict(data_test) | ||||
|  | ||||
|         return model, ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
|     def complement(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None, norm=False): | ||||
|  | ||||
|         data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|         model = sklearn.naive_bayes.ComplementNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior, norm = norm) | ||||
|         model.fit(data_train, labels_train) | ||||
|         predictions = model.predict(data_test) | ||||
|  | ||||
|         return model, ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
| class SVM: | ||||
|  | ||||
|     class CustomKernel: | ||||
|  | ||||
|         def __new__(cls, C, kernel, degre, gamma, coef0, shrinking, probability, tol, cache_size, class_weight, verbose, max_iter, decision_function_shape, random_state): | ||||
|  | ||||
|             return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state) | ||||
|  | ||||
|     class StandardKernel: | ||||
|  | ||||
|         def __new__(cls, kernel, C=1.0, degree=3, gamma='auto_deprecated', coef0=0.0, shrinking=True, probability=False, tol=0.001, cache_size=200, class_weight=None, verbose=False, max_iter=-1, decision_function_shape='ovr', random_state=None): | ||||
|  | ||||
|             return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state) | ||||
|  | ||||
|     class PrebuiltKernel: | ||||
|  | ||||
|         class Linear: | ||||
|  | ||||
|             def __new__(cls): | ||||
|  | ||||
|                 return sklearn.svm.SVC(kernel = 'linear') | ||||
|  | ||||
|         class Polynomial: | ||||
|  | ||||
|             def __new__(cls, power, r_bias): | ||||
|  | ||||
|                 return sklearn.svm.SVC(kernel = 'polynomial', degree = power, coef0 = r_bias) | ||||
|  | ||||
|         class RBF: | ||||
|  | ||||
|             def __new__(cls, gamma): | ||||
|  | ||||
|                 return sklearn.svm.SVC(kernel = 'rbf', gamma = gamma) | ||||
|  | ||||
|         class Sigmoid: | ||||
|  | ||||
|             def __new__(cls, r_bias): | ||||
|  | ||||
|                 return sklearn.svm.SVC(kernel = 'sigmoid', coef0 = r_bias) | ||||
|  | ||||
|     def fit(self, kernel, train_data, train_outputs): # expects *2d data, 1d labels or outputs | ||||
|  | ||||
|         return kernel.fit(train_data, train_outputs) | ||||
|  | ||||
|     def eval_classification(self, kernel, test_data, test_outputs): | ||||
|  | ||||
|         predictions = kernel.predict(test_data) | ||||
|  | ||||
|         return ClassificationMetrics(predictions, test_outputs) | ||||
|  | ||||
|     def eval_regression(self, kernel, test_data, test_outputs): | ||||
|  | ||||
|         predictions = kernel.predict(test_data) | ||||
|  | ||||
|         return RegressionMetrics(predictions, test_outputs) | ||||
|  | ||||
| def random_forest_classifier(data, labels, test_size, n_estimators="warn", criterion="gini", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False, class_weight=None): | ||||
|  | ||||
|     data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) | ||||
|     kernel = sklearn.ensemble.RandomForestClassifier(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_samples_leaf = min_samples_leaf, min_weight_fraction_leaf = min_weight_fraction_leaf, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start, class_weight = class_weight) | ||||
|     kernel.fit(data_train, labels_train) | ||||
|     predictions = kernel.predict(data_test) | ||||
|  | ||||
|     return kernel, ClassificationMetrics(predictions, labels_test) | ||||
|  | ||||
| def random_forest_regressor(data, outputs, test_size, n_estimators="warn", criterion="mse", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False): | ||||
|  | ||||
|     data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1) | ||||
|     kernel = sklearn.ensemble.RandomForestRegressor(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_weight_fraction_leaf = min_weight_fraction_leaf, max_features = max_features, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, min_impurity_split = min_impurity_split, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start) | ||||
|     kernel.fit(data_train, outputs_train) | ||||
|     predictions = kernel.predict(data_test) | ||||
|  | ||||
|     return kernel, RegressionMetrics(predictions, outputs_test) | ||||
|  | ||||
| class Glicko2: | ||||
|  | ||||
|     _tau = 0.5 | ||||
|  | ||||
|     def getRating(self): | ||||
|         return (self.__rating * 173.7178) + 1500  | ||||
|  | ||||
|     def setRating(self, rating): | ||||
|         self.__rating = (rating - 1500) / 173.7178 | ||||
|  | ||||
|     rating = property(getRating, setRating) | ||||
|  | ||||
|     def getRd(self): | ||||
|         return self.__rd * 173.7178 | ||||
|  | ||||
|     def setRd(self, rd): | ||||
|         self.__rd = rd / 173.7178 | ||||
|  | ||||
|     rd = property(getRd, setRd) | ||||
|       | ||||
|     def __init__(self, rating = 1500, rd = 350, vol = 0.06): | ||||
|  | ||||
|         self.setRating(rating) | ||||
|         self.setRd(rd) | ||||
|         self.vol = vol | ||||
|              | ||||
|     def _preRatingRD(self): | ||||
|  | ||||
|         self.__rd = math.sqrt(math.pow(self.__rd, 2) + math.pow(self.vol, 2)) | ||||
|          | ||||
|     def update_player(self, rating_list, RD_list, outcome_list): | ||||
|  | ||||
|         rating_list = [(x - 1500) / 173.7178 for x in rating_list] | ||||
|         RD_list = [x / 173.7178 for x in RD_list] | ||||
|  | ||||
|         v = self._v(rating_list, RD_list) | ||||
|         self.vol = self._newVol(rating_list, RD_list, outcome_list, v) | ||||
|         self._preRatingRD() | ||||
|          | ||||
|         self.__rd = 1 / math.sqrt((1 / math.pow(self.__rd, 2)) + (1 / v)) | ||||
|          | ||||
|         tempSum = 0 | ||||
|         for i in range(len(rating_list)): | ||||
|             tempSum += self._g(RD_list[i]) * \ | ||||
|                        (outcome_list[i] - self._E(rating_list[i], RD_list[i])) | ||||
|         self.__rating += math.pow(self.__rd, 2) * tempSum | ||||
|          | ||||
|          | ||||
|     def _newVol(self, rating_list, RD_list, outcome_list, v): | ||||
|  | ||||
|         i = 0 | ||||
|         delta = self._delta(rating_list, RD_list, outcome_list, v) | ||||
|         a = math.log(math.pow(self.vol, 2)) | ||||
|         tau = self._tau | ||||
|         x0 = a | ||||
|         x1 = 0 | ||||
|          | ||||
|         while x0 != x1: | ||||
|             # New iteration, so x(i) becomes x(i-1) | ||||
|             x0 = x1 | ||||
|             d = math.pow(self.__rating, 2) + v + math.exp(x0) | ||||
|             h1 = -(x0 - a) / math.pow(tau, 2) - 0.5 * math.exp(x0) \ | ||||
|             / d + 0.5 * math.exp(x0) * math.pow(delta / d, 2) | ||||
|             h2 = -1 / math.pow(tau, 2) - 0.5 * math.exp(x0) * \ | ||||
|             (math.pow(self.__rating, 2) + v) \ | ||||
|             / math.pow(d, 2) + 0.5 * math.pow(delta, 2) * math.exp(x0) \ | ||||
|             * (math.pow(self.__rating, 2) + v - math.exp(x0)) / math.pow(d, 3) | ||||
|             x1 = x0 - (h1 / h2) | ||||
|  | ||||
|         return math.exp(x1 / 2) | ||||
|          | ||||
|     def _delta(self, rating_list, RD_list, outcome_list, v): | ||||
|  | ||||
|         tempSum = 0 | ||||
|         for i in range(len(rating_list)): | ||||
|             tempSum += self._g(RD_list[i]) * (outcome_list[i] - self._E(rating_list[i], RD_list[i])) | ||||
|         return v * tempSum | ||||
|          | ||||
|     def _v(self, rating_list, RD_list): | ||||
|  | ||||
|         tempSum = 0 | ||||
|         for i in range(len(rating_list)): | ||||
|             tempE = self._E(rating_list[i], RD_list[i]) | ||||
|             tempSum += math.pow(self._g(RD_list[i]), 2) * tempE * (1 - tempE) | ||||
|         return 1 / tempSum | ||||
|          | ||||
|     def _E(self, p2rating, p2RD): | ||||
|  | ||||
|         return 1 / (1 + math.exp(-1 * self._g(p2RD) * \ | ||||
|                                  (self.__rating - p2rating))) | ||||
|          | ||||
|     def _g(self, RD): | ||||
|  | ||||
|         return 1 / math.sqrt(1 + 3 * math.pow(RD, 2) / math.pow(math.pi, 2)) | ||||
|          | ||||
|     def did_not_compete(self): | ||||
|  | ||||
|         self._preRatingRD() | ||||
| @@ -1,220 +0,0 @@ | ||||
| # Titan Robotics Team 2022: CUDA-based Regressions Module | ||||
| # Written by Arthur Lu & Jacob Levine | ||||
| # Notes: | ||||
| #   this module has been automatically inegrated into analysis.py, and should be callable as a class from the package | ||||
| #   this module is cuda-optimized and vectorized (except for one small part) | ||||
| # setup: | ||||
|  | ||||
| __version__ = "1.0.0.004" | ||||
|  | ||||
| # changelog should be viewed using print(analysis.regression.__changelog__) | ||||
| __changelog__ = """ | ||||
|     1.0.0.004: | ||||
|         - bug fixes | ||||
|         - fixed changelog | ||||
|     1.0.0.003: | ||||
|         - bug fixes | ||||
|     1.0.0.002: | ||||
|         -Added more parameters to log, exponential, polynomial | ||||
|         -Added SigmoidalRegKernelArthur, because Arthur apparently needs | ||||
|         to train the scaling and shifting of sigmoids | ||||
|     1.0.0.001: | ||||
|         -initial release, with linear, log, exponential, polynomial, and sigmoid kernels | ||||
|         -already vectorized (except for polynomial generation) and CUDA-optimized | ||||
| """ | ||||
|  | ||||
| __author__ = ( | ||||
|     "Jacob Levine <jlevine@imsa.edu>", | ||||
|     "Arthur Lu <learthurgo@gmail.com>" | ||||
| ) | ||||
|  | ||||
| __all__ = [ | ||||
|     'factorial', | ||||
|     'take_all_pwrs', | ||||
|     'num_poly_terms', | ||||
|     'set_device', | ||||
|     'LinearRegKernel', | ||||
|     'SigmoidalRegKernel', | ||||
|     'LogRegKernel', | ||||
|     'PolyRegKernel', | ||||
|     'ExpRegKernel', | ||||
|     'SigmoidalRegKernelArthur', | ||||
|     'SGDTrain', | ||||
|     'CustomTrain' | ||||
| ] | ||||
|  | ||||
| import torch | ||||
|  | ||||
| global device | ||||
|  | ||||
| device = "cuda:0" if torch.torch.cuda.is_available() else "cpu" | ||||
|  | ||||
| #todo: document completely | ||||
|  | ||||
| def set_device(self, new_device): | ||||
|     device=new_device | ||||
|  | ||||
| class LinearRegKernel(): | ||||
|     parameters= [] | ||||
|     weights=None | ||||
|     bias=None | ||||
|     def __init__(self, num_vars): | ||||
|         self.weights=torch.rand(num_vars, requires_grad=True, device=device) | ||||
|         self.bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.parameters=[self.weights,self.bias] | ||||
|     def forward(self,mtx): | ||||
|         long_bias=self.bias.repeat([1,mtx.size()[1]]) | ||||
|         return torch.matmul(self.weights,mtx)+long_bias | ||||
|  | ||||
| class SigmoidalRegKernel(): | ||||
|     parameters= [] | ||||
|     weights=None | ||||
|     bias=None | ||||
|     sigmoid=torch.nn.Sigmoid() | ||||
|     def __init__(self, num_vars): | ||||
|         self.weights=torch.rand(num_vars, requires_grad=True, device=device) | ||||
|         self.bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.parameters=[self.weights,self.bias] | ||||
|     def forward(self,mtx): | ||||
|         long_bias=self.bias.repeat([1,mtx.size()[1]]) | ||||
|         return self.sigmoid(torch.matmul(self.weights,mtx)+long_bias) | ||||
|  | ||||
| class SigmoidalRegKernelArthur(): | ||||
|     parameters= [] | ||||
|     weights=None | ||||
|     in_bias=None | ||||
|     scal_mult=None | ||||
|     out_bias=None | ||||
|     sigmoid=torch.nn.Sigmoid() | ||||
|     def __init__(self, num_vars): | ||||
|         self.weights=torch.rand(num_vars, requires_grad=True, device=device) | ||||
|         self.in_bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.scal_mult=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.out_bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias] | ||||
|     def forward(self,mtx): | ||||
|         long_in_bias=self.in_bias.repeat([1,mtx.size()[1]]) | ||||
|         long_out_bias=self.out_bias.repeat([1,mtx.size()[1]]) | ||||
|         return (self.scal_mult*self.sigmoid(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias | ||||
|  | ||||
| class LogRegKernel(): | ||||
|     parameters= [] | ||||
|     weights=None | ||||
|     in_bias=None | ||||
|     scal_mult=None | ||||
|     out_bias=None | ||||
|     def __init__(self, num_vars): | ||||
|         self.weights=torch.rand(num_vars, requires_grad=True, device=device) | ||||
|         self.in_bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.scal_mult=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.out_bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias] | ||||
|     def forward(self,mtx): | ||||
|         long_in_bias=self.in_bias.repeat([1,mtx.size()[1]]) | ||||
|         long_out_bias=self.out_bias.repeat([1,mtx.size()[1]]) | ||||
|         return (self.scal_mult*torch.log(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias | ||||
|  | ||||
| class ExpRegKernel(): | ||||
|     parameters= [] | ||||
|     weights=None | ||||
|     in_bias=None | ||||
|     scal_mult=None | ||||
|     out_bias=None | ||||
|     def __init__(self, num_vars): | ||||
|         self.weights=torch.rand(num_vars, requires_grad=True, device=device) | ||||
|         self.in_bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.scal_mult=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.out_bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias] | ||||
|     def forward(self,mtx): | ||||
|         long_in_bias=self.in_bias.repeat([1,mtx.size()[1]]) | ||||
|         long_out_bias=self.out_bias.repeat([1,mtx.size()[1]]) | ||||
|         return (self.scal_mult*torch.exp(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias | ||||
|  | ||||
| class PolyRegKernel(): | ||||
|     parameters= [] | ||||
|     weights=None | ||||
|     bias=None | ||||
|     power=None | ||||
|     def __init__(self, num_vars, power): | ||||
|         self.power=power | ||||
|         num_terms=self.num_poly_terms(num_vars, power) | ||||
|         self.weights=torch.rand(num_terms, requires_grad=True, device=device) | ||||
|         self.bias=torch.rand(1, requires_grad=True, device=device) | ||||
|         self.parameters=[self.weights,self.bias] | ||||
|     def num_poly_terms(self,num_vars, power): | ||||
|         if power == 0: | ||||
|             return 0 | ||||
|         return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1) | ||||
|     def factorial(self,n): | ||||
|         if n==0: | ||||
|             return 1 | ||||
|         else: | ||||
|             return n*self.factorial(n-1) | ||||
|     def take_all_pwrs(self, vec, pwr): | ||||
|         #todo: vectorize (kinda) | ||||
|         combins=torch.combinations(vec, r=pwr, with_replacement=True) | ||||
|         out=torch.ones(combins.size()[0]).to(device).to(torch.float) | ||||
|         for i in torch.t(combins).to(device).to(torch.float): | ||||
|             out *= i | ||||
|         if pwr == 1: | ||||
|             return out | ||||
|         else: | ||||
|             return torch.cat((out,self.take_all_pwrs(vec, pwr-1))) | ||||
|     def forward(self,mtx): | ||||
|         #TODO: Vectorize the last part | ||||
|         cols=[] | ||||
|         for i in torch.t(mtx): | ||||
|             cols.append(self.take_all_pwrs(i,self.power)) | ||||
|         new_mtx=torch.t(torch.stack(cols)) | ||||
|         long_bias=self.bias.repeat([1,mtx.size()[1]]) | ||||
|         return torch.matmul(self.weights,new_mtx)+long_bias | ||||
|  | ||||
| def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False): | ||||
|     optim=torch.optim.SGD(kernel.parameters, lr=learning_rate) | ||||
|     data_cuda=data.to(device) | ||||
|     ground_cuda=ground.to(device) | ||||
|     if (return_losses): | ||||
|         losses=[] | ||||
|         for i in range(iterations): | ||||
|             with torch.set_grad_enabled(True): | ||||
|                 optim.zero_grad() | ||||
|                 pred=kernel.forward(data_cuda) | ||||
|                 ls=loss(pred,ground_cuda) | ||||
|                 losses.append(ls.item()) | ||||
|                 ls.backward() | ||||
|                 optim.step() | ||||
|         return [kernel,losses] | ||||
|     else: | ||||
|         for i in range(iterations): | ||||
|             with torch.set_grad_enabled(True): | ||||
|                 optim.zero_grad() | ||||
|                 pred=kernel.forward(data_cuda) | ||||
|                 ls=loss(pred,ground_cuda) | ||||
|                 ls.backward() | ||||
|                 optim.step() | ||||
|         return kernel | ||||
|  | ||||
| def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False): | ||||
|     data_cuda=data.to(device) | ||||
|     ground_cuda=ground.to(device) | ||||
|     if (return_losses): | ||||
|         losses=[] | ||||
|         for i in range(iterations): | ||||
|             with torch.set_grad_enabled(True): | ||||
|                 optim.zero_grad() | ||||
|                 pred=kernel.forward(data) | ||||
|                 ls=loss(pred,ground) | ||||
|                 losses.append(ls.item()) | ||||
|                 ls.backward() | ||||
|                 optim.step() | ||||
|         return [kernel,losses] | ||||
|     else: | ||||
|         for i in range(iterations): | ||||
|             with torch.set_grad_enabled(True): | ||||
|                 optim.zero_grad() | ||||
|                 pred=kernel.forward(data_cuda) | ||||
|                 ls=loss(pred,ground_cuda) | ||||
|                 ls.backward() | ||||
|                 optim.step() | ||||
|         return kernel | ||||
| @@ -1,122 +0,0 @@ | ||||
| # Titan Robotics Team 2022: ML Module | ||||
| # Written by Arthur Lu & Jacob Levine | ||||
| # Notes: | ||||
| #    this should be imported as a python module using 'import titanlearn' | ||||
| #    this should be included in the local directory or environment variable | ||||
| #    this module is optimized for multhreaded computing | ||||
| #    this module learns from its mistakes far faster than 2022's captains | ||||
| # setup: | ||||
|  | ||||
| __version__ = "2.0.1.001" | ||||
|  | ||||
| #changelog should be viewed using print(analysis.__changelog__) | ||||
| __changelog__ = """changelog: | ||||
|     2.0.1.001: | ||||
|         - removed matplotlib import | ||||
|         - removed graphloss() | ||||
|     2.0.1.000: | ||||
|         - added net, dataset, dataloader, and stdtrain template definitions | ||||
|         - added graphloss function | ||||
|     2.0.0.001: | ||||
|         - added clear functions | ||||
|     2.0.0.000: | ||||
|         - complete rewrite planned | ||||
|         - depreciated 1.0.0.xxx versions | ||||
|         - added simple training loop | ||||
|     1.0.0.xxx: | ||||
|         -added generation of ANNS, basic SGD training | ||||
| """ | ||||
|  | ||||
| __author__ = ( | ||||
|     "Arthur Lu <arthurlu@ttic.edu>," | ||||
|     "Jacob Levine <jlevine@ttic.edu>," | ||||
|     ) | ||||
|  | ||||
| __all__ = [ | ||||
|     'clear', | ||||
|     'net', | ||||
|     'dataset', | ||||
|     'dataloader', | ||||
|     'train', | ||||
|     'stdtrainer', | ||||
|     ] | ||||
|  | ||||
| import torch | ||||
| from os import system, name | ||||
| import numpy as np | ||||
|  | ||||
| def clear():  | ||||
|     if name == 'nt':  | ||||
|         _ = system('cls')  | ||||
|     else:  | ||||
|         _ = system('clear')  | ||||
|  | ||||
| class net(torch.nn.Module): #template for standard neural net | ||||
|     def __init__(self): | ||||
|         super(Net, self).__init__() | ||||
|          | ||||
|     def forward(self, input): | ||||
|         pass | ||||
|  | ||||
| class dataset(torch.utils.data.Dataset): #template for standard dataset | ||||
|      | ||||
|     def __init__(self): | ||||
|         super(torch.utils.data.Dataset).__init__() | ||||
|          | ||||
|     def __getitem__(self, index): | ||||
|         pass | ||||
|      | ||||
|     def __len__(self): | ||||
|         pass | ||||
|  | ||||
| def dataloader(dataset, batch_size, num_workers, shuffle = True): | ||||
|  | ||||
|     return torch.utils.data.DataLoader(dataset, batch_size=batch_size, shuffle=shuffle, num_workers=num_workers) | ||||
|  | ||||
| def train(device, net, epochs, trainloader, optimizer, criterion): #expects standard dataloader, whch returns (inputs, labels) | ||||
|  | ||||
|     dataset_len = trainloader.dataset.__len__() | ||||
|     iter_count = 0 | ||||
|     running_loss = 0 | ||||
|     running_loss_list = [] | ||||
|  | ||||
|     for epoch in range(epochs):  # loop over the dataset multiple times | ||||
|  | ||||
|         for i, data in enumerate(trainloader, 0): | ||||
|  | ||||
|             inputs = data[0].to(device) | ||||
|             labels = data[1].to(device) | ||||
|  | ||||
|             optimizer.zero_grad() | ||||
|  | ||||
|             outputs = net(inputs) | ||||
|             loss = criterion(outputs, labels.to(torch.float)) | ||||
|              | ||||
|             loss.backward() | ||||
|             optimizer.step() | ||||
|  | ||||
|             # monitoring steps below | ||||
|  | ||||
|             iter_count += 1 | ||||
|             running_loss += loss.item() | ||||
|             running_loss_list.append(running_loss) | ||||
|             clear() | ||||
|  | ||||
|             print("training on: " + device) | ||||
|             print("iteration: " + str(i) + "/" + str(int(dataset_len / trainloader.batch_size)) + " | " + "epoch: " + str(epoch) + "/" + str(epochs)) | ||||
|             print("current batch loss: " + str(loss.item)) | ||||
|             print("running loss: " + str(running_loss / iter_count)) | ||||
|          | ||||
|     return net, running_loss_list | ||||
|     print("finished training") | ||||
|  | ||||
| def stdtrainer(net, criterion, optimizer, dataloader, epochs, batch_size): | ||||
|  | ||||
|     device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") | ||||
|  | ||||
|     net = net.to(device) | ||||
|     criterion = criterion.to(device) | ||||
|     optimizer = optimizer.to(device) | ||||
|     trainloader = dataloader | ||||
|      | ||||
|     return train(device, net, epochs, trainloader, optimizer, criterion) | ||||
| @@ -1,907 +0,0 @@ | ||||
| from __future__ import absolute_import | ||||
|  | ||||
| from itertools import chain | ||||
| import math | ||||
|  | ||||
| from six import iteritems | ||||
| from six.moves import map, range, zip | ||||
| from six import iterkeys | ||||
|  | ||||
| import copy | ||||
| try: | ||||
|     from numbers import Number | ||||
| except ImportError: | ||||
|     Number = (int, long, float, complex) | ||||
|  | ||||
| inf = float('inf') | ||||
|  | ||||
| class Gaussian(object): | ||||
|     #: Precision, the inverse of the variance. | ||||
|     pi = 0 | ||||
|     #: Precision adjusted mean, the precision multiplied by the mean. | ||||
|     tau = 0 | ||||
|  | ||||
|     def __init__(self, mu=None, sigma=None, pi=0, tau=0): | ||||
|         if mu is not None: | ||||
|             if sigma is None: | ||||
|                 raise TypeError('sigma argument is needed') | ||||
|             elif sigma == 0: | ||||
|                 raise ValueError('sigma**2 should be greater than 0') | ||||
|             pi = sigma ** -2 | ||||
|             tau = pi * mu | ||||
|         self.pi = pi | ||||
|         self.tau = tau | ||||
|  | ||||
|     @property | ||||
|     def mu(self): | ||||
|         return self.pi and self.tau / self.pi | ||||
|  | ||||
|     @property | ||||
|     def sigma(self): | ||||
|         return math.sqrt(1 / self.pi) if self.pi else inf | ||||
|  | ||||
|     def __mul__(self, other): | ||||
|         pi, tau = self.pi + other.pi, self.tau + other.tau | ||||
|         return Gaussian(pi=pi, tau=tau) | ||||
|  | ||||
|     def __truediv__(self, other): | ||||
|         pi, tau = self.pi - other.pi, self.tau - other.tau | ||||
|         return Gaussian(pi=pi, tau=tau) | ||||
|  | ||||
|     __div__ = __truediv__  # for Python 2 | ||||
|  | ||||
|     def __eq__(self, other): | ||||
|         return self.pi == other.pi and self.tau == other.tau | ||||
|  | ||||
|     def __lt__(self, other): | ||||
|         return self.mu < other.mu | ||||
|  | ||||
|     def __le__(self, other): | ||||
|         return self.mu <= other.mu | ||||
|  | ||||
|     def __gt__(self, other): | ||||
|         return self.mu > other.mu | ||||
|  | ||||
|     def __ge__(self, other): | ||||
|         return self.mu >= other.mu | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return 'N(mu={:.3f}, sigma={:.3f})'.format(self.mu, self.sigma) | ||||
|  | ||||
|     def _repr_latex_(self): | ||||
|         latex = r'\mathcal{{ N }}( {:.3f}, {:.3f}^2 )'.format(self.mu, self.sigma) | ||||
|         return '$%s$' % latex | ||||
|  | ||||
| class Matrix(list): | ||||
|     def __init__(self, src, height=None, width=None): | ||||
|         if callable(src): | ||||
|             f, src = src, {} | ||||
|             size = [height, width] | ||||
|             if not height: | ||||
|                 def set_height(height): | ||||
|                     size[0] = height | ||||
|                 size[0] = set_height | ||||
|             if not width: | ||||
|                 def set_width(width): | ||||
|                     size[1] = width | ||||
|                 size[1] = set_width | ||||
|             try: | ||||
|                 for (r, c), val in f(*size): | ||||
|                     src[r, c] = val | ||||
|             except TypeError: | ||||
|                 raise TypeError('A callable src must return an interable ' | ||||
|                                 'which generates a tuple containing ' | ||||
|                                 'coordinate and value') | ||||
|             height, width = tuple(size) | ||||
|             if height is None or width is None: | ||||
|                 raise TypeError('A callable src must call set_height and ' | ||||
|                                 'set_width if the size is non-deterministic') | ||||
|         if isinstance(src, list): | ||||
|             is_number = lambda x: isinstance(x, Number) | ||||
|             unique_col_sizes = set(map(len, src)) | ||||
|             everything_are_number = filter(is_number, sum(src, [])) | ||||
|             if len(unique_col_sizes) != 1 or not everything_are_number: | ||||
|                 raise ValueError('src must be a rectangular array of numbers') | ||||
|             two_dimensional_array = src | ||||
|         elif isinstance(src, dict): | ||||
|             if not height or not width: | ||||
|                 w = h = 0 | ||||
|                 for r, c in iterkeys(src): | ||||
|                     if not height: | ||||
|                         h = max(h, r + 1) | ||||
|                     if not width: | ||||
|                         w = max(w, c + 1) | ||||
|                 if not height: | ||||
|                     height = h | ||||
|                 if not width: | ||||
|                     width = w | ||||
|             two_dimensional_array = [] | ||||
|             for r in range(height): | ||||
|                 row = [] | ||||
|                 two_dimensional_array.append(row) | ||||
|                 for c in range(width): | ||||
|                     row.append(src.get((r, c), 0)) | ||||
|         else: | ||||
|             raise TypeError('src must be a list or dict or callable') | ||||
|         super(Matrix, self).__init__(two_dimensional_array) | ||||
|  | ||||
|     @property | ||||
|     def height(self): | ||||
|         return len(self) | ||||
|  | ||||
|     @property | ||||
|     def width(self): | ||||
|         return len(self[0]) | ||||
|  | ||||
|     def transpose(self): | ||||
|         height, width = self.height, self.width | ||||
|         src = {} | ||||
|         for c in range(width): | ||||
|             for r in range(height): | ||||
|                 src[c, r] = self[r][c] | ||||
|         return type(self)(src, height=width, width=height) | ||||
|  | ||||
|     def minor(self, row_n, col_n): | ||||
|         height, width = self.height, self.width | ||||
|         if not (0 <= row_n < height): | ||||
|             raise ValueError('row_n should be between 0 and %d' % height) | ||||
|         elif not (0 <= col_n < width): | ||||
|             raise ValueError('col_n should be between 0 and %d' % width) | ||||
|         two_dimensional_array = [] | ||||
|         for r in range(height): | ||||
|             if r == row_n: | ||||
|                 continue | ||||
|             row = [] | ||||
|             two_dimensional_array.append(row) | ||||
|             for c in range(width): | ||||
|                 if c == col_n: | ||||
|                     continue | ||||
|                 row.append(self[r][c]) | ||||
|         return type(self)(two_dimensional_array) | ||||
|  | ||||
|     def determinant(self): | ||||
|         height, width = self.height, self.width | ||||
|         if height != width: | ||||
|             raise ValueError('Only square matrix can calculate a determinant') | ||||
|         tmp, rv = copy.deepcopy(self), 1. | ||||
|         for c in range(width - 1, 0, -1): | ||||
|             pivot, r = max((abs(tmp[r][c]), r) for r in range(c + 1)) | ||||
|             pivot = tmp[r][c] | ||||
|             if not pivot: | ||||
|                 return 0. | ||||
|             tmp[r], tmp[c] = tmp[c], tmp[r] | ||||
|             if r != c: | ||||
|                 rv = -rv | ||||
|             rv *= pivot | ||||
|             fact = -1. / pivot | ||||
|             for r in range(c): | ||||
|                 f = fact * tmp[r][c] | ||||
|                 for x in range(c): | ||||
|                     tmp[r][x] += f * tmp[c][x] | ||||
|         return rv * tmp[0][0] | ||||
|  | ||||
|     def adjugate(self): | ||||
|         height, width = self.height, self.width | ||||
|         if height != width: | ||||
|             raise ValueError('Only square matrix can be adjugated') | ||||
|         if height == 2: | ||||
|             a, b = self[0][0], self[0][1] | ||||
|             c, d = self[1][0], self[1][1] | ||||
|             return type(self)([[d, -b], [-c, a]]) | ||||
|         src = {} | ||||
|         for r in range(height): | ||||
|             for c in range(width): | ||||
|                 sign = -1 if (r + c) % 2 else 1 | ||||
|                 src[r, c] = self.minor(r, c).determinant() * sign | ||||
|         return type(self)(src, height, width) | ||||
|  | ||||
|     def inverse(self): | ||||
|         if self.height == self.width == 1: | ||||
|             return type(self)([[1. / self[0][0]]]) | ||||
|         return (1. / self.determinant()) * self.adjugate() | ||||
|  | ||||
|     def __add__(self, other): | ||||
|         height, width = self.height, self.width | ||||
|         if (height, width) != (other.height, other.width): | ||||
|             raise ValueError('Must be same size') | ||||
|         src = {} | ||||
|         for r in range(height): | ||||
|             for c in range(width): | ||||
|                 src[r, c] = self[r][c] + other[r][c] | ||||
|         return type(self)(src, height, width) | ||||
|  | ||||
|     def __mul__(self, other): | ||||
|         if self.width != other.height: | ||||
|             raise ValueError('Bad size') | ||||
|         height, width = self.height, other.width | ||||
|         src = {} | ||||
|         for r in range(height): | ||||
|             for c in range(width): | ||||
|                 src[r, c] = sum(self[r][x] * other[x][c] | ||||
|                                 for x in range(self.width)) | ||||
|         return type(self)(src, height, width) | ||||
|  | ||||
|     def __rmul__(self, other): | ||||
|         if not isinstance(other, Number): | ||||
|             raise TypeError('The operand should be a number') | ||||
|         height, width = self.height, self.width | ||||
|         src = {} | ||||
|         for r in range(height): | ||||
|             for c in range(width): | ||||
|                 src[r, c] = other * self[r][c] | ||||
|         return type(self)(src, height, width) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         return '{}({})'.format(type(self).__name__, super(Matrix, self).__repr__()) | ||||
|  | ||||
|     def _repr_latex_(self): | ||||
|         rows = [' && '.join(['%.3f' % cell for cell in row]) for row in self] | ||||
|         latex = r'\begin{matrix} %s \end{matrix}' % r'\\'.join(rows) | ||||
|         return '$%s$' % latex | ||||
|  | ||||
| def _gen_erfcinv(erfc, math=math): | ||||
|     def erfcinv(y): | ||||
|         """The inverse function of erfc.""" | ||||
|         if y >= 2: | ||||
|             return -100. | ||||
|         elif y <= 0: | ||||
|             return 100. | ||||
|         zero_point = y < 1 | ||||
|         if not zero_point: | ||||
|             y = 2 - y | ||||
|         t = math.sqrt(-2 * math.log(y / 2.)) | ||||
|         x = -0.70711 * \ | ||||
|             ((2.30753 + t * 0.27061) / (1. + t * (0.99229 + t * 0.04481)) - t) | ||||
|         for i in range(2): | ||||
|             err = erfc(x) - y | ||||
|             x += err / (1.12837916709551257 * math.exp(-(x ** 2)) - x * err) | ||||
|         return x if zero_point else -x | ||||
|     return erfcinv | ||||
|  | ||||
| def _gen_ppf(erfc, math=math): | ||||
|     erfcinv = _gen_erfcinv(erfc, math) | ||||
|     def ppf(x, mu=0, sigma=1): | ||||
|         return mu - sigma * math.sqrt(2) * erfcinv(2 * x) | ||||
|     return ppf | ||||
|  | ||||
| def erfc(x): | ||||
|     z = abs(x) | ||||
|     t = 1. / (1. + z / 2.) | ||||
|     r = t * math.exp(-z * z - 1.26551223 + t * (1.00002368 + t * ( | ||||
|         0.37409196 + t * (0.09678418 + t * (-0.18628806 + t * ( | ||||
|             0.27886807 + t * (-1.13520398 + t * (1.48851587 + t * ( | ||||
|                 -0.82215223 + t * 0.17087277 | ||||
|             ))) | ||||
|         ))) | ||||
|     ))) | ||||
|     return 2. - r if x < 0 else r | ||||
|  | ||||
| def cdf(x, mu=0, sigma=1): | ||||
|     return 0.5 * erfc(-(x - mu) / (sigma * math.sqrt(2))) | ||||
|  | ||||
|  | ||||
| def pdf(x, mu=0, sigma=1): | ||||
|     return (1 / math.sqrt(2 * math.pi) * abs(sigma) * | ||||
|             math.exp(-(((x - mu) / abs(sigma)) ** 2 / 2))) | ||||
|  | ||||
| ppf = _gen_ppf(erfc) | ||||
|  | ||||
| def choose_backend(backend): | ||||
|     if backend is None:  # fallback | ||||
|         return cdf, pdf, ppf | ||||
|     elif backend == 'mpmath': | ||||
|         try: | ||||
|             import mpmath | ||||
|         except ImportError: | ||||
|             raise ImportError('Install "mpmath" to use this backend') | ||||
|         return mpmath.ncdf, mpmath.npdf, _gen_ppf(mpmath.erfc, math=mpmath) | ||||
|     elif backend == 'scipy': | ||||
|         try: | ||||
|             from scipy.stats import norm | ||||
|         except ImportError: | ||||
|             raise ImportError('Install "scipy" to use this backend') | ||||
|         return norm.cdf, norm.pdf, norm.ppf | ||||
|     raise ValueError('%r backend is not defined' % backend) | ||||
|  | ||||
| def available_backends(): | ||||
|     backends = [None] | ||||
|     for backend in ['mpmath', 'scipy']: | ||||
|         try: | ||||
|             __import__(backend) | ||||
|         except ImportError: | ||||
|             continue | ||||
|         backends.append(backend) | ||||
|     return backends | ||||
|  | ||||
| class Node(object): | ||||
|  | ||||
|     pass | ||||
|  | ||||
| class Variable(Node, Gaussian): | ||||
|  | ||||
|     def __init__(self): | ||||
|         self.messages = {} | ||||
|         super(Variable, self).__init__() | ||||
|  | ||||
|     def set(self, val): | ||||
|         delta = self.delta(val) | ||||
|         self.pi, self.tau = val.pi, val.tau | ||||
|         return delta | ||||
|  | ||||
|     def delta(self, other): | ||||
|         pi_delta = abs(self.pi - other.pi) | ||||
|         if pi_delta == inf: | ||||
|             return 0. | ||||
|         return max(abs(self.tau - other.tau), math.sqrt(pi_delta)) | ||||
|  | ||||
|     def update_message(self, factor, pi=0, tau=0, message=None): | ||||
|         message = message or Gaussian(pi=pi, tau=tau) | ||||
|         old_message, self[factor] = self[factor], message | ||||
|         return self.set(self / old_message * message) | ||||
|  | ||||
|     def update_value(self, factor, pi=0, tau=0, value=None): | ||||
|         value = value or Gaussian(pi=pi, tau=tau) | ||||
|         old_message = self[factor] | ||||
|         self[factor] = value * old_message / self | ||||
|         return self.set(value) | ||||
|  | ||||
|     def __getitem__(self, factor): | ||||
|         return self.messages[factor] | ||||
|  | ||||
|     def __setitem__(self, factor, message): | ||||
|         self.messages[factor] = message | ||||
|  | ||||
|     def __repr__(self): | ||||
|         args = (type(self).__name__, super(Variable, self).__repr__(), | ||||
|                 len(self.messages), '' if len(self.messages) == 1 else 's') | ||||
|         return '<%s %s with %d connection%s>' % args | ||||
|  | ||||
|  | ||||
| class Factor(Node): | ||||
|  | ||||
|     def __init__(self, variables): | ||||
|         self.vars = variables | ||||
|         for var in variables: | ||||
|             var[self] = Gaussian() | ||||
|  | ||||
|     def down(self): | ||||
|         return 0 | ||||
|  | ||||
|     def up(self): | ||||
|         return 0 | ||||
|  | ||||
|     @property | ||||
|     def var(self): | ||||
|         assert len(self.vars) == 1 | ||||
|         return self.vars[0] | ||||
|  | ||||
|     def __repr__(self): | ||||
|         args = (type(self).__name__, len(self.vars), | ||||
|                 '' if len(self.vars) == 1 else 's') | ||||
|         return '<%s with %d connection%s>' % args | ||||
|  | ||||
|  | ||||
| class PriorFactor(Factor): | ||||
|  | ||||
|     def __init__(self, var, val, dynamic=0): | ||||
|         super(PriorFactor, self).__init__([var]) | ||||
|         self.val = val | ||||
|         self.dynamic = dynamic | ||||
|  | ||||
|     def down(self): | ||||
|         sigma = math.sqrt(self.val.sigma ** 2 + self.dynamic ** 2) | ||||
|         value = Gaussian(self.val.mu, sigma) | ||||
|         return self.var.update_value(self, value=value) | ||||
|  | ||||
|  | ||||
| class LikelihoodFactor(Factor): | ||||
|  | ||||
|     def __init__(self, mean_var, value_var, variance): | ||||
|         super(LikelihoodFactor, self).__init__([mean_var, value_var]) | ||||
|         self.mean = mean_var | ||||
|         self.value = value_var | ||||
|         self.variance = variance | ||||
|  | ||||
|     def calc_a(self, var): | ||||
|         return 1. / (1. + self.variance * var.pi) | ||||
|  | ||||
|     def down(self): | ||||
|         # update value. | ||||
|         msg = self.mean / self.mean[self] | ||||
|         a = self.calc_a(msg) | ||||
|         return self.value.update_message(self, a * msg.pi, a * msg.tau) | ||||
|  | ||||
|     def up(self): | ||||
|         # update mean. | ||||
|         msg = self.value / self.value[self] | ||||
|         a = self.calc_a(msg) | ||||
|         return self.mean.update_message(self, a * msg.pi, a * msg.tau) | ||||
|  | ||||
|  | ||||
| class SumFactor(Factor): | ||||
|  | ||||
|     def __init__(self, sum_var, term_vars, coeffs): | ||||
|         super(SumFactor, self).__init__([sum_var] + term_vars) | ||||
|         self.sum = sum_var | ||||
|         self.terms = term_vars | ||||
|         self.coeffs = coeffs | ||||
|  | ||||
|     def down(self): | ||||
|         vals = self.terms | ||||
|         msgs = [var[self] for var in vals] | ||||
|         return self.update(self.sum, vals, msgs, self.coeffs) | ||||
|  | ||||
|     def up(self, index=0): | ||||
|         coeff = self.coeffs[index] | ||||
|         coeffs = [] | ||||
|         for x, c in enumerate(self.coeffs): | ||||
|             try: | ||||
|                 if x == index: | ||||
|                     coeffs.append(1. / coeff) | ||||
|                 else: | ||||
|                     coeffs.append(-c / coeff) | ||||
|             except ZeroDivisionError: | ||||
|                 coeffs.append(0.) | ||||
|         vals = self.terms[:] | ||||
|         vals[index] = self.sum | ||||
|         msgs = [var[self] for var in vals] | ||||
|         return self.update(self.terms[index], vals, msgs, coeffs) | ||||
|  | ||||
|     def update(self, var, vals, msgs, coeffs): | ||||
|         pi_inv = 0 | ||||
|         mu = 0 | ||||
|         for val, msg, coeff in zip(vals, msgs, coeffs): | ||||
|             div = val / msg | ||||
|             mu += coeff * div.mu | ||||
|             if pi_inv == inf: | ||||
|                 continue | ||||
|             try: | ||||
|                 # numpy.float64 handles floating-point error by different way. | ||||
|                 # For example, it can just warn RuntimeWarning on n/0 problem | ||||
|                 # instead of throwing ZeroDivisionError.  So div.pi, the | ||||
|                 # denominator has to be a built-in float. | ||||
|                 pi_inv += coeff ** 2 / float(div.pi) | ||||
|             except ZeroDivisionError: | ||||
|                 pi_inv = inf | ||||
|         pi = 1. / pi_inv | ||||
|         tau = pi * mu | ||||
|         return var.update_message(self, pi, tau) | ||||
|  | ||||
|  | ||||
| class TruncateFactor(Factor): | ||||
|  | ||||
|     def __init__(self, var, v_func, w_func, draw_margin): | ||||
|         super(TruncateFactor, self).__init__([var]) | ||||
|         self.v_func = v_func | ||||
|         self.w_func = w_func | ||||
|         self.draw_margin = draw_margin | ||||
|  | ||||
|     def up(self): | ||||
|         val = self.var | ||||
|         msg = self.var[self] | ||||
|         div = val / msg | ||||
|         sqrt_pi = math.sqrt(div.pi) | ||||
|         args = (div.tau / sqrt_pi, self.draw_margin * sqrt_pi) | ||||
|         v = self.v_func(*args) | ||||
|         w = self.w_func(*args) | ||||
|         denom = (1. - w) | ||||
|         pi, tau = div.pi / denom, (div.tau + sqrt_pi * v) / denom | ||||
|         return val.update_value(self, pi, tau) | ||||
|  | ||||
| #: Default initial mean of ratings. | ||||
| MU = 25. | ||||
| #: Default initial standard deviation of ratings. | ||||
| SIGMA = MU / 3 | ||||
| #: Default distance that guarantees about 76% chance of winning. | ||||
| BETA = SIGMA / 2 | ||||
| #: Default dynamic factor. | ||||
| TAU = SIGMA / 100 | ||||
| #: Default draw probability of the game. | ||||
| DRAW_PROBABILITY = .10 | ||||
| #: A basis to check reliability of the result. | ||||
| DELTA = 0.0001 | ||||
|  | ||||
|  | ||||
| def calc_draw_probability(draw_margin, size, env=None): | ||||
|     if env is None: | ||||
|         env = global_env() | ||||
|     return 2 * env.cdf(draw_margin / (math.sqrt(size) * env.beta)) - 1 | ||||
|  | ||||
|  | ||||
| def calc_draw_margin(draw_probability, size, env=None): | ||||
|     if env is None: | ||||
|         env = global_env() | ||||
|     return env.ppf((draw_probability + 1) / 2.) * math.sqrt(size) * env.beta | ||||
|  | ||||
|  | ||||
| def _team_sizes(rating_groups): | ||||
|     team_sizes = [0] | ||||
|     for group in rating_groups: | ||||
|         team_sizes.append(len(group) + team_sizes[-1]) | ||||
|     del team_sizes[0] | ||||
|     return team_sizes | ||||
|  | ||||
|  | ||||
| def _floating_point_error(env): | ||||
|     if env.backend == 'mpmath': | ||||
|         msg = 'Set "mpmath.mp.dps" to higher' | ||||
|     else: | ||||
|         msg = 'Cannot calculate correctly, set backend to "mpmath"' | ||||
|     return FloatingPointError(msg) | ||||
|  | ||||
|  | ||||
| class Rating(Gaussian): | ||||
|     def __init__(self, mu=None, sigma=None): | ||||
|         if isinstance(mu, tuple): | ||||
|             mu, sigma = mu | ||||
|         elif isinstance(mu, Gaussian): | ||||
|             mu, sigma = mu.mu, mu.sigma | ||||
|         if mu is None: | ||||
|             mu = global_env().mu | ||||
|         if sigma is None: | ||||
|             sigma = global_env().sigma | ||||
|         super(Rating, self).__init__(mu, sigma) | ||||
|  | ||||
|     def __int__(self): | ||||
|         return int(self.mu) | ||||
|  | ||||
|     def __long__(self): | ||||
|         return long(self.mu) | ||||
|  | ||||
|     def __float__(self): | ||||
|         return float(self.mu) | ||||
|  | ||||
|     def __iter__(self): | ||||
|         return iter((self.mu, self.sigma)) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         c = type(self) | ||||
|         args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma) | ||||
|         return '%s(mu=%.3f, sigma=%.3f)' % args | ||||
|  | ||||
|  | ||||
| class TrueSkill(object): | ||||
|     def __init__(self, mu=MU, sigma=SIGMA, beta=BETA, tau=TAU, | ||||
|                  draw_probability=DRAW_PROBABILITY, backend=None): | ||||
|         self.mu = mu | ||||
|         self.sigma = sigma | ||||
|         self.beta = beta | ||||
|         self.tau = tau | ||||
|         self.draw_probability = draw_probability | ||||
|         self.backend = backend | ||||
|         if isinstance(backend, tuple): | ||||
|             self.cdf, self.pdf, self.ppf = backend | ||||
|         else: | ||||
|             self.cdf, self.pdf, self.ppf = choose_backend(backend) | ||||
|  | ||||
|     def create_rating(self, mu=None, sigma=None): | ||||
|         if mu is None: | ||||
|             mu = self.mu | ||||
|         if sigma is None: | ||||
|             sigma = self.sigma | ||||
|         return Rating(mu, sigma) | ||||
|  | ||||
|     def v_win(self, diff, draw_margin): | ||||
|         x = diff - draw_margin | ||||
|         denom = self.cdf(x) | ||||
|         return (self.pdf(x) / denom) if denom else -x | ||||
|  | ||||
|     def v_draw(self, diff, draw_margin): | ||||
|         abs_diff = abs(diff) | ||||
|         a, b = draw_margin - abs_diff, -draw_margin - abs_diff | ||||
|         denom = self.cdf(a) - self.cdf(b) | ||||
|         numer = self.pdf(b) - self.pdf(a) | ||||
|         return ((numer / denom) if denom else a) * (-1 if diff < 0 else +1) | ||||
|  | ||||
|     def w_win(self, diff, draw_margin): | ||||
|         x = diff - draw_margin | ||||
|         v = self.v_win(diff, draw_margin) | ||||
|         w = v * (v + x) | ||||
|         if 0 < w < 1: | ||||
|             return w | ||||
|         raise _floating_point_error(self) | ||||
|  | ||||
|     def w_draw(self, diff, draw_margin): | ||||
|         abs_diff = abs(diff) | ||||
|         a, b = draw_margin - abs_diff, -draw_margin - abs_diff | ||||
|         denom = self.cdf(a) - self.cdf(b) | ||||
|         if not denom: | ||||
|             raise _floating_point_error(self) | ||||
|         v = self.v_draw(abs_diff, draw_margin) | ||||
|         return (v ** 2) + (a * self.pdf(a) - b * self.pdf(b)) / denom | ||||
|  | ||||
|     def validate_rating_groups(self, rating_groups): | ||||
|         # check group sizes | ||||
|         if len(rating_groups) < 2: | ||||
|             raise ValueError('Need multiple rating groups') | ||||
|         elif not all(rating_groups): | ||||
|             raise ValueError('Each group must contain multiple ratings') | ||||
|         # check group types | ||||
|         group_types = set(map(type, rating_groups)) | ||||
|         if len(group_types) != 1: | ||||
|             raise TypeError('All groups should be same type') | ||||
|         elif group_types.pop() is Rating: | ||||
|             raise TypeError('Rating cannot be a rating group') | ||||
|         # normalize rating_groups | ||||
|         if isinstance(rating_groups[0], dict): | ||||
|             dict_rating_groups = rating_groups | ||||
|             rating_groups = [] | ||||
|             keys = [] | ||||
|             for dict_rating_group in dict_rating_groups: | ||||
|                 rating_group, key_group = [], [] | ||||
|                 for key, rating in iteritems(dict_rating_group): | ||||
|                     rating_group.append(rating) | ||||
|                     key_group.append(key) | ||||
|                 rating_groups.append(tuple(rating_group)) | ||||
|                 keys.append(tuple(key_group)) | ||||
|         else: | ||||
|             rating_groups = list(rating_groups) | ||||
|             keys = None | ||||
|         return rating_groups, keys | ||||
|  | ||||
|     def validate_weights(self, weights, rating_groups, keys=None): | ||||
|         if weights is None: | ||||
|             weights = [(1,) * len(g) for g in rating_groups] | ||||
|         elif isinstance(weights, dict): | ||||
|             weights_dict, weights = weights, [] | ||||
|             for x, group in enumerate(rating_groups): | ||||
|                 w = [] | ||||
|                 weights.append(w) | ||||
|                 for y, rating in enumerate(group): | ||||
|                     if keys is not None: | ||||
|                         y = keys[x][y] | ||||
|                     w.append(weights_dict.get((x, y), 1)) | ||||
|         return weights | ||||
|  | ||||
|     def factor_graph_builders(self, rating_groups, ranks, weights): | ||||
|         flatten_ratings = sum(map(tuple, rating_groups), ()) | ||||
|         flatten_weights = sum(map(tuple, weights), ()) | ||||
|         size = len(flatten_ratings) | ||||
|         group_size = len(rating_groups) | ||||
|         # create variables | ||||
|         rating_vars = [Variable() for x in range(size)] | ||||
|         perf_vars = [Variable() for x in range(size)] | ||||
|         team_perf_vars = [Variable() for x in range(group_size)] | ||||
|         team_diff_vars = [Variable() for x in range(group_size - 1)] | ||||
|         team_sizes = _team_sizes(rating_groups) | ||||
|         # layer builders | ||||
|         def build_rating_layer(): | ||||
|             for rating_var, rating in zip(rating_vars, flatten_ratings): | ||||
|                 yield PriorFactor(rating_var, rating, self.tau) | ||||
|         def build_perf_layer(): | ||||
|             for rating_var, perf_var in zip(rating_vars, perf_vars): | ||||
|                 yield LikelihoodFactor(rating_var, perf_var, self.beta ** 2) | ||||
|         def build_team_perf_layer(): | ||||
|             for team, team_perf_var in enumerate(team_perf_vars): | ||||
|                 if team > 0: | ||||
|                     start = team_sizes[team - 1] | ||||
|                 else: | ||||
|                     start = 0 | ||||
|                 end = team_sizes[team] | ||||
|                 child_perf_vars = perf_vars[start:end] | ||||
|                 coeffs = flatten_weights[start:end] | ||||
|                 yield SumFactor(team_perf_var, child_perf_vars, coeffs) | ||||
|         def build_team_diff_layer(): | ||||
|             for team, team_diff_var in enumerate(team_diff_vars): | ||||
|                 yield SumFactor(team_diff_var, | ||||
|                                 team_perf_vars[team:team + 2], [+1, -1]) | ||||
|         def build_trunc_layer(): | ||||
|             for x, team_diff_var in enumerate(team_diff_vars): | ||||
|                 if callable(self.draw_probability): | ||||
|                     # dynamic draw probability | ||||
|                     team_perf1, team_perf2 = team_perf_vars[x:x + 2] | ||||
|                     args = (Rating(team_perf1), Rating(team_perf2), self) | ||||
|                     draw_probability = self.draw_probability(*args) | ||||
|                 else: | ||||
|                     # static draw probability | ||||
|                     draw_probability = self.draw_probability | ||||
|                 size = sum(map(len, rating_groups[x:x + 2])) | ||||
|                 draw_margin = calc_draw_margin(draw_probability, size, self) | ||||
|                 if ranks[x] == ranks[x + 1]:  # is a tie? | ||||
|                     v_func, w_func = self.v_draw, self.w_draw | ||||
|                 else: | ||||
|                     v_func, w_func = self.v_win, self.w_win | ||||
|                 yield TruncateFactor(team_diff_var, | ||||
|                                      v_func, w_func, draw_margin) | ||||
|         # build layers | ||||
|         return (build_rating_layer, build_perf_layer, build_team_perf_layer, | ||||
|                 build_team_diff_layer, build_trunc_layer) | ||||
|  | ||||
|     def run_schedule(self, build_rating_layer, build_perf_layer, | ||||
|                      build_team_perf_layer, build_team_diff_layer, | ||||
|                      build_trunc_layer, min_delta=DELTA): | ||||
|         if min_delta <= 0: | ||||
|             raise ValueError('min_delta must be greater than 0') | ||||
|         layers = [] | ||||
|         def build(builders): | ||||
|             layers_built = [list(build()) for build in builders] | ||||
|             layers.extend(layers_built) | ||||
|             return layers_built | ||||
|         # gray arrows | ||||
|         layers_built = build([build_rating_layer, | ||||
|                               build_perf_layer, | ||||
|                               build_team_perf_layer]) | ||||
|         rating_layer, perf_layer, team_perf_layer = layers_built | ||||
|         for f in chain(*layers_built): | ||||
|             f.down() | ||||
|         # arrow #1, #2, #3 | ||||
|         team_diff_layer, trunc_layer = build([build_team_diff_layer, | ||||
|                                               build_trunc_layer]) | ||||
|         team_diff_len = len(team_diff_layer) | ||||
|         for x in range(10): | ||||
|             if team_diff_len == 1: | ||||
|                 # only two teams | ||||
|                 team_diff_layer[0].down() | ||||
|                 delta = trunc_layer[0].up() | ||||
|             else: | ||||
|                 # multiple teams | ||||
|                 delta = 0 | ||||
|                 for x in range(team_diff_len - 1): | ||||
|                     team_diff_layer[x].down() | ||||
|                     delta = max(delta, trunc_layer[x].up()) | ||||
|                     team_diff_layer[x].up(1)  # up to right variable | ||||
|                 for x in range(team_diff_len - 1, 0, -1): | ||||
|                     team_diff_layer[x].down() | ||||
|                     delta = max(delta, trunc_layer[x].up()) | ||||
|                     team_diff_layer[x].up(0)  # up to left variable | ||||
|             # repeat until to small update | ||||
|             if delta <= min_delta: | ||||
|                 break | ||||
|         # up both ends | ||||
|         team_diff_layer[0].up(0) | ||||
|         team_diff_layer[team_diff_len - 1].up(1) | ||||
|         # up the remainder of the black arrows | ||||
|         for f in team_perf_layer: | ||||
|             for x in range(len(f.vars) - 1): | ||||
|                 f.up(x) | ||||
|         for f in perf_layer: | ||||
|             f.up() | ||||
|         return layers | ||||
|  | ||||
|     def rate(self, rating_groups, ranks=None, weights=None, min_delta=DELTA): | ||||
|         rating_groups, keys = self.validate_rating_groups(rating_groups) | ||||
|         weights = self.validate_weights(weights, rating_groups, keys) | ||||
|         group_size = len(rating_groups) | ||||
|         if ranks is None: | ||||
|             ranks = range(group_size) | ||||
|         elif len(ranks) != group_size: | ||||
|             raise ValueError('Wrong ranks') | ||||
|         # sort rating groups by rank | ||||
|         by_rank = lambda x: x[1][1] | ||||
|         sorting = sorted(enumerate(zip(rating_groups, ranks, weights)), | ||||
|                          key=by_rank) | ||||
|         sorted_rating_groups, sorted_ranks, sorted_weights = [], [], [] | ||||
|         for x, (g, r, w) in sorting: | ||||
|             sorted_rating_groups.append(g) | ||||
|             sorted_ranks.append(r) | ||||
|             # make weights to be greater than 0 | ||||
|             sorted_weights.append(max(min_delta, w_) for w_ in w) | ||||
|         # build factor graph | ||||
|         args = (sorted_rating_groups, sorted_ranks, sorted_weights) | ||||
|         builders = self.factor_graph_builders(*args) | ||||
|         args = builders + (min_delta,) | ||||
|         layers = self.run_schedule(*args) | ||||
|         # make result | ||||
|         rating_layer, team_sizes = layers[0], _team_sizes(sorted_rating_groups) | ||||
|         transformed_groups = [] | ||||
|         for start, end in zip([0] + team_sizes[:-1], team_sizes): | ||||
|             group = [] | ||||
|             for f in rating_layer[start:end]: | ||||
|                 group.append(Rating(float(f.var.mu), float(f.var.sigma))) | ||||
|             transformed_groups.append(tuple(group)) | ||||
|         by_hint = lambda x: x[0] | ||||
|         unsorting = sorted(zip((x for x, __ in sorting), transformed_groups), | ||||
|                            key=by_hint) | ||||
|         if keys is None: | ||||
|             return [g for x, g in unsorting] | ||||
|         # restore the structure with input dictionary keys | ||||
|         return [dict(zip(keys[x], g)) for x, g in unsorting] | ||||
|  | ||||
|     def quality(self, rating_groups, weights=None): | ||||
|         rating_groups, keys = self.validate_rating_groups(rating_groups) | ||||
|         weights = self.validate_weights(weights, rating_groups, keys) | ||||
|         flatten_ratings = sum(map(tuple, rating_groups), ()) | ||||
|         flatten_weights = sum(map(tuple, weights), ()) | ||||
|         length = len(flatten_ratings) | ||||
|         # a vector of all of the skill means | ||||
|         mean_matrix = Matrix([[r.mu] for r in flatten_ratings]) | ||||
|         # a matrix whose diagonal values are the variances (sigma ** 2) of each | ||||
|         # of the players. | ||||
|         def variance_matrix(height, width): | ||||
|             variances = (r.sigma ** 2 for r in flatten_ratings) | ||||
|             for x, variance in enumerate(variances): | ||||
|                 yield (x, x), variance | ||||
|         variance_matrix = Matrix(variance_matrix, length, length) | ||||
|         # the player-team assignment and comparison matrix | ||||
|         def rotated_a_matrix(set_height, set_width): | ||||
|             t = 0 | ||||
|             for r, (cur, _next) in enumerate(zip(rating_groups[:-1], | ||||
|                                                 rating_groups[1:])): | ||||
|                 for x in range(t, t + len(cur)): | ||||
|                     yield (r, x), flatten_weights[x] | ||||
|                     t += 1 | ||||
|                 x += 1 | ||||
|                 for x in range(x, x + len(_next)): | ||||
|                     yield (r, x), -flatten_weights[x] | ||||
|             set_height(r + 1) | ||||
|             set_width(x + 1) | ||||
|         rotated_a_matrix = Matrix(rotated_a_matrix) | ||||
|         a_matrix = rotated_a_matrix.transpose() | ||||
|         # match quality further derivation | ||||
|         _ata = (self.beta ** 2) * rotated_a_matrix * a_matrix | ||||
|         _atsa = rotated_a_matrix * variance_matrix * a_matrix | ||||
|         start = mean_matrix.transpose() * a_matrix | ||||
|         middle = _ata + _atsa | ||||
|         end = rotated_a_matrix * mean_matrix | ||||
|         # make result | ||||
|         e_arg = (-0.5 * start * middle.inverse() * end).determinant() | ||||
|         s_arg = _ata.determinant() / middle.determinant() | ||||
|         return math.exp(e_arg) * math.sqrt(s_arg) | ||||
|  | ||||
|     def expose(self, rating): | ||||
|         k = self.mu / self.sigma | ||||
|         return rating.mu - k * rating.sigma | ||||
|  | ||||
|     def make_as_global(self): | ||||
|         return setup(env=self) | ||||
|  | ||||
|     def __repr__(self): | ||||
|         c = type(self) | ||||
|         if callable(self.draw_probability): | ||||
|             f = self.draw_probability | ||||
|             draw_probability = '.'.join([f.__module__, f.__name__]) | ||||
|         else: | ||||
|             draw_probability = '%.1f%%' % (self.draw_probability * 100) | ||||
|         if self.backend is None: | ||||
|             backend = '' | ||||
|         elif isinstance(self.backend, tuple): | ||||
|             backend = ', backend=...' | ||||
|         else: | ||||
|             backend = ', backend=%r' % self.backend | ||||
|         args = ('.'.join([c.__module__, c.__name__]), self.mu, self.sigma, | ||||
|                 self.beta, self.tau, draw_probability, backend) | ||||
|         return ('%s(mu=%.3f, sigma=%.3f, beta=%.3f, tau=%.3f, ' | ||||
|                 'draw_probability=%s%s)' % args) | ||||
|  | ||||
|  | ||||
| def rate_1vs1(rating1, rating2, drawn=False, min_delta=DELTA, env=None): | ||||
|     if env is None: | ||||
|         env = global_env() | ||||
|     ranks = [0, 0 if drawn else 1] | ||||
|     teams = env.rate([(rating1,), (rating2,)], ranks, min_delta=min_delta) | ||||
|     return teams[0][0], teams[1][0] | ||||
|  | ||||
|  | ||||
| def quality_1vs1(rating1, rating2, env=None): | ||||
|     if env is None: | ||||
|         env = global_env() | ||||
|     return env.quality([(rating1,), (rating2,)]) | ||||
|  | ||||
|  | ||||
| def global_env(): | ||||
|     try: | ||||
|         global_env.__trueskill__ | ||||
|     except AttributeError: | ||||
|         # setup the default environment | ||||
|         setup() | ||||
|     return global_env.__trueskill__ | ||||
|  | ||||
|  | ||||
| def setup(mu=MU, sigma=SIGMA, beta=BETA, tau=TAU, | ||||
|           draw_probability=DRAW_PROBABILITY, backend=None, env=None): | ||||
|     if env is None: | ||||
|         env = TrueSkill(mu, sigma, beta, tau, draw_probability, backend) | ||||
|     global_env.__trueskill__ = env | ||||
|     return env | ||||
|  | ||||
|  | ||||
| def rate(rating_groups, ranks=None, weights=None, min_delta=DELTA): | ||||
|     return global_env().rate(rating_groups, ranks, weights, min_delta) | ||||
|  | ||||
|  | ||||
| def quality(rating_groups, weights=None): | ||||
|     return global_env().quality(rating_groups, weights) | ||||
|  | ||||
|  | ||||
| def expose(rating): | ||||
|     return global_env().expose(rating) | ||||
| @@ -1,34 +0,0 @@ | ||||
| # Titan Robotics Team 2022: Visualization Module | ||||
| # Written by Arthur Lu & Jacob Levine | ||||
| # Notes: | ||||
| #    this should be imported as a python module using 'import visualization' | ||||
| #    this should be included in the local directory or environment variable | ||||
| #    fancy | ||||
| # setup: | ||||
|  | ||||
| __version__ = "1.0.0.000" | ||||
|  | ||||
| #changelog should be viewed using print(analysis.__changelog__) | ||||
| __changelog__ = """changelog: | ||||
|     1.0.0.000: | ||||
|         - created visualization.py | ||||
|         - added graphloss() | ||||
|         - added imports | ||||
| """ | ||||
|  | ||||
| __author__ = ( | ||||
|     "Arthur Lu <arthurlu@ttic.edu>," | ||||
|     "Jacob Levine <jlevine@ttic.edu>," | ||||
|     ) | ||||
|  | ||||
| __all__ = [ | ||||
|     'graphloss', | ||||
|     ] | ||||
|  | ||||
| import matplotlib.pyplot as plt | ||||
|  | ||||
| def graphloss(losses): | ||||
|  | ||||
|     x = range(0, len(losses)) | ||||
|     plt.plot(x, losses) | ||||
|     plt.show() | ||||
		Reference in New Issue
	
	Block a user