diff --git a/analysis-master/analysis/.ipynb_checkpoints/analysis-checkpoint.py b/analysis-master/analysis/.ipynb_checkpoints/analysis-checkpoint.py deleted file mode 100644 index 40c12eac..00000000 --- a/analysis-master/analysis/.ipynb_checkpoints/analysis-checkpoint.py +++ /dev/null @@ -1,952 +0,0 @@ -# Titan Robotics Team 2022: Data Analysis Module -# Written by Arthur Lu & Jacob Levine -# Notes: -# this should be imported as a python module using 'import analysis' -# this should be included in the local directory or environment variable -# this module has been optimized for multhreaded computing -# current benchmark of optimization: 1.33 times faster -# setup: - -__version__ = "1.1.12.003" - -# changelog should be viewed using print(analysis.__changelog__) -__changelog__ = """changelog: - 1.1.12.003: - - removed depreciated code - 1.1.12.002: - - removed team first time trueskill instantiation in favor of integration in superscript.py - 1.1.12.001: - - improved readibility of regression outputs by stripping tensor data - - used map with lambda to acheive the improved readibility - - lost numba jit support with regression, and generated_jit hangs at execution - - TODO: reimplement correct numba integration in regression - 1.1.12.000: - - temporarily fixed polynomial regressions by using sklearn's PolynomialFeatures - 1.1.11.010: - - alphabeticaly ordered import lists - 1.1.11.009: - - bug fixes - 1.1.11.008: - - bug fixes - 1.1.11.007: - - bug fixes - 1.1.11.006: - - tested min and max - - bug fixes - 1.1.11.005: - - added min and max in basic_stats - 1.1.11.004: - - bug fixes - 1.1.11.003: - - bug fixes - 1.1.11.002: - - consolidated metrics - - fixed __all__ - 1.1.11.001: - - added test/train split to RandomForestClassifier and RandomForestRegressor - 1.1.11.000: - - added RandomForestClassifier and RandomForestRegressor - - note: untested - 1.1.10.000: - - added numba.jit to remaining functions - 1.1.9.002: - - kernelized PCA and KNN - 1.1.9.001: - - fixed bugs with SVM and NaiveBayes - 1.1.9.000: - - added SVM class, subclasses, and functions - - note: untested - 1.1.8.000: - - added NaiveBayes classification engine - - note: untested - 1.1.7.000: - - added knn() - - added confusion matrix to decisiontree() - 1.1.6.002: - - changed layout of __changelog to be vscode friendly - 1.1.6.001: - - added additional hyperparameters to decisiontree() - 1.1.6.000: - - fixed __version__ - - fixed __all__ order - - added decisiontree() - 1.1.5.003: - - added pca - 1.1.5.002: - - reduced import list - - added kmeans clustering engine - 1.1.5.001: - - simplified regression by using .to(device) - 1.1.5.000: - - added polynomial regression to regression(); untested - 1.1.4.000: - - added trueskill() - 1.1.3.002: - - renamed regression class to Regression, regression_engine() to regression gliko2_engine class to Gliko2 - 1.1.3.001: - - changed glicko2() to return tuple instead of array - 1.1.3.000: - - added glicko2_engine class and glicko() - - verified glicko2() accuracy - 1.1.2.003: - - fixed elo() - 1.1.2.002: - - added elo() - - elo() has bugs to be fixed - 1.1.2.001: - - readded regrression import - 1.1.2.000: - - integrated regression.py as regression class - - removed regression import - - fixed metadata for regression class - - fixed metadata for analysis class - 1.1.1.001: - - regression_engine() bug fixes, now actaully regresses - 1.1.1.000: - - added regression_engine() - - added all regressions except polynomial - 1.1.0.007: - - updated _init_device() - 1.1.0.006: - - removed useless try statements - 1.1.0.005: - - removed impossible outcomes - 1.1.0.004: - - added performance metrics (r^2, mse, rms) - 1.1.0.003: - - resolved nopython mode for mean, median, stdev, variance - 1.1.0.002: - - snapped (removed) majority of uneeded imports - - forced object mode (bad) on all jit - - TODO: stop numba complaining about not being able to compile in nopython mode - 1.1.0.001: - - removed from sklearn import * to resolve uneeded wildcard imports - 1.1.0.000: - - removed c_entities,nc_entities,obstacles,objectives from __all__ - - applied numba.jit to all functions - - depreciated and removed stdev_z_split - - cleaned up histo_analysis to include numpy and numba.jit optimizations - - depreciated and removed all regression functions in favor of future pytorch optimizer - - depreciated and removed all nonessential functions (basic_analysis, benchmark, strip_data) - - optimized z_normalize using sklearn.preprocessing.normalize - - TODO: implement kernel/function based pytorch regression optimizer - 1.0.9.000: - - refactored - - numpyed everything - - removed stats in favor of numpy functions - 1.0.8.005: - - minor fixes - 1.0.8.004: - - removed a few unused dependencies - 1.0.8.003: - - added p_value function - 1.0.8.002: - - updated __all__ correctly to contain changes made in v 1.0.8.000 and v 1.0.8.001 - 1.0.8.001: - - refactors - - bugfixes - 1.0.8.000: - - depreciated histo_analysis_old - - depreciated debug - - altered basic_analysis to take array data instead of filepath - - refactor - - optimization - 1.0.7.002: - - bug fixes - 1.0.7.001: - - bug fixes - 1.0.7.000: - - added tanh_regression (logistical regression) - - bug fixes - 1.0.6.005: - - added z_normalize function to normalize dataset - - bug fixes - 1.0.6.004: - - bug fixes - 1.0.6.003: - - bug fixes - 1.0.6.002: - - bug fixes - 1.0.6.001: - - corrected __all__ to contain all of the functions - 1.0.6.000: - - added calc_overfit, which calculates two measures of overfit, error and performance - - added calculating overfit to optimize_regression - 1.0.5.000: - - added optimize_regression function, which is a sample function to find the optimal regressions - - optimize_regression function filters out some overfit funtions (functions with r^2 = 1) - - planned addition: overfit detection in the optimize_regression function - 1.0.4.002: - - added __changelog__ - - updated debug function with log and exponential regressions - 1.0.4.001: - - added log regressions - - added exponential regressions - - added log_regression and exp_regression to __all__ - 1.0.3.008: - - added debug function to further consolidate functions - 1.0.3.007: - - added builtin benchmark function - - added builtin random (linear) data generation function - - added device initialization (_init_device) - 1.0.3.006: - - reorganized the imports list to be in alphabetical order - - added search and regurgitate functions to c_entities, nc_entities, obstacles, objectives - 1.0.3.005: - - major bug fixes - - updated historical analysis - - depreciated old historical analysis - 1.0.3.004: - - added __version__, __author__, __all__ - - added polynomial regression - - added root mean squared function - - added r squared function - 1.0.3.003: - - bug fixes - - added c_entities - 1.0.3.002: - - bug fixes - - added nc_entities, obstacles, objectives - - consolidated statistics.py to analysis.py - 1.0.3.001: - - compiled 1d, column, and row basic stats into basic stats function - 1.0.3.000: - - added historical analysis function - 1.0.2.xxx: - - added z score test - 1.0.1.xxx: - - major bug fixes - 1.0.0.xxx: - - added loading csv - - added 1d, column, row basic stats -""" - -__author__ = ( - "Arthur Lu ", - "Jacob Levine ", -) - -__all__ = [ - '_init_device', - 'load_csv', - 'basic_stats', - 'z_score', - 'z_normalize', - 'histo_analysis', - 'regression', - 'elo', - 'gliko2', - 'trueskill', - 'RegressionMetrics', - 'ClassificationMetrics', - 'kmeans', - 'pca', - 'decisiontree', - 'knn_classifier', - 'knn_regressor', - 'NaiveBayes', - 'SVM', - 'random_forest_classifier', - 'random_forest_regressor', - 'Regression', - 'Gliko2', - # all statistics functions left out due to integration in other functions -] - -# now back to your regularly scheduled programming: - -# imports (now in alphabetical order! v 1.0.3.006): - -import csv -import numba -from numba import jit -import numpy as np -import math -import sklearn -from sklearn import * -import torch -try: - from analysis import trueskill as Trueskill -except: - import trueskill as Trueskill - -class error(ValueError): - pass - -def _init_device(): # initiates computation device for ANNs - device = 'cuda:0' if torch.cuda.is_available() else 'cpu' - return device - -def load_csv(filepath): - with open(filepath, newline='') as csvfile: - file_array = np.array(list(csv.reader(csvfile))) - csvfile.close() - return file_array - -# expects 1d array -@jit(forceobj=True) -def basic_stats(data): - - data_t = np.array(data).astype(float) - - _mean = mean(data_t) - _median = median(data_t) - _stdev = stdev(data_t) - _variance = variance(data_t) - _min = npmin(data_t) - _max = npmax(data_t) - - return _mean, _median, _stdev, _variance, _min, _max - -# returns z score with inputs of point, mean and standard deviation of spread -@jit(forceobj=True) -def z_score(point, mean, stdev): - score = (point - mean) / stdev - - return score - -# expects 2d array, normalizes across all axes -@jit(forceobj=True) -def z_normalize(array, *args): - - array = np.array(array) - for arg in args: - array = sklearn.preprocessing.normalize(array, axis = arg) - - return array - -@jit(forceobj=True) -# expects 2d array of [x,y] -def histo_analysis(hist_data): - - hist_data = np.array(hist_data) - derivative = np.array(len(hist_data) - 1, dtype = float) - t = np.diff(hist_data) - derivative = t[1] / t[0] - np.sort(derivative) - - return basic_stats(derivative)[0], basic_stats(derivative)[3] - -def regression(ndevice, inputs, outputs, args, loss = torch.nn.MSELoss(), _iterations = 10000, lr = 0.01, _iterations_ply = 10000, lr_ply = 0.01): # inputs, outputs expects N-D array - - regressions = [] - Regression().set_device(ndevice) - - if 'lin' in args: # formula: ax + b - - model = Regression().SGDTrain(Regression.LinearRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor([outputs]).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True) - params = model[0].parameters - params[:] = map(lambda x: x.item(), params) - regressions.append((params, model[1][::-1][0])) - - if 'log' in args: # formula: a log (b(x + c)) + d - - model = Regression().SGDTrain(Regression.LogRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True) - params = model[0].parameters - params[:] = map(lambda x: x.item(), params) - regressions.append((params, model[1][::-1][0])) - - if 'exp' in args: # formula: a e ^ (b(x + c)) + d - - model = Regression().SGDTrain(Regression.ExpRegKernel(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True) - params = model[0].parameters - params[:] = map(lambda x: x.item(), params) - regressions.append((params, model[1][::-1][0])) - - if 'ply' in args: # formula: a + bx^1 + cx^2 + dx^3 + ... - - plys = [] - limit = len(outputs[0]) - - for i in range(2, limit): - - model = sklearn.preprocessing.PolynomialFeatures(degree = i) - model = sklearn.pipeline.make_pipeline(model, sklearn.linear_model.LinearRegression()) - model = model.fit(np.rot90(inputs), np.rot90(outputs)) - - params = model.steps[1][1].intercept_.tolist() - params = np.append(params, model.steps[1][1].coef_[0].tolist()[1::]) - params.flatten() - params = params.tolist() - - plys.append(params) - - regressions.append(plys) - - if 'sig' in args: # formula: a sig (b(x + c)) + d | sig() = 1/(1 + e ^ -x) - - model = Regression().SGDTrain(Regression.SigmoidalRegKernelArthur(len(inputs)), torch.tensor(inputs).to(torch.float).to(device), torch.tensor(outputs).to(torch.float).to(device), iterations=_iterations, learning_rate=lr, return_losses=True) - params = model[0].parameters - params[:] = map(lambda x: x.item(), params) - regressions.append((params, model[1][::-1][0])) - - return regressions - -@jit(nopython=True) -def elo(starting_score, opposing_score, observed, N, K): - - expected = 1/(1+10**((np.array(opposing_score) - starting_score)/N)) - - return starting_score + K*(np.sum(observed) - np.sum(expected)) - -@jit(forceobj=True) -def gliko2(starting_score, starting_rd, starting_vol, opposing_score, opposing_rd, observations): - - player = Gliko2(rating = starting_score, rd = starting_rd, vol = starting_vol) - - player.update_player([x for x in opposing_score], [x for x in opposing_rd], observations) - - return (player.rating, player.rd, player.vol) - -@jit(forceobj=True) -def trueskill(teams_data, observations): # teams_data is array of array of tuples ie. [[(mu, sigma), (mu, sigma), (mu, sigma)], [(mu, sigma), (mu, sigma), (mu, sigma)]] - - team_ratings = [] - - for team in teams_data: - team_temp = [] - for player in team: - player = Trueskill.Rating(player[0], player[1]) - team_temp.append(player) - team_ratings.append(team_temp) - - return Trueskill.rate(teams_data, observations) - -class RegressionMetrics(): - - def __new__(cls, predictions, targets): - - return cls.r_squared(cls, predictions, targets), cls.mse(cls, predictions, targets), cls.rms(cls, predictions, targets) - - def r_squared(self, predictions, targets): # assumes equal size inputs - - return sklearn.metrics.r2_score(targets, predictions) - - def mse(self, predictions, targets): - - return sklearn.metrics.mean_squared_error(targets, predictions) - - def rms(self, predictions, targets): - - return math.sqrt(sklearn.metrics.mean_squared_error(targets, predictions)) - -class ClassificationMetrics(): - - def __new__(cls, predictions, targets): - - return cls.cm(cls, predictions, targets), cls.cr(cls, predictions, targets) - - def cm(self, predictions, targets): - - return sklearn.metrics.confusion_matrix(targets, predictions) - - def cr(self, predictions, targets): - - return sklearn.metrics.classification_report(targets, predictions) - -@jit(nopython=True) -def mean(data): - - return np.mean(data) - -@jit(nopython=True) -def median(data): - - return np.median(data) - -@jit(nopython=True) -def stdev(data): - - return np.std(data) - -@jit(nopython=True) -def variance(data): - - return np.var(data) - -@jit(nopython=True) -def npmin(data): - - return np.amin(data) - -@jit(nopython=True) -def npmax(data): - - return np.amax(data) - -@jit(forceobj=True) -def kmeans(data, n_clusters=8, init="k-means++", n_init=10, max_iter=300, tol=0.0001, precompute_distances="auto", verbose=0, random_state=None, copy_x=True, n_jobs=None, algorithm="auto"): - - kernel = sklearn.cluster.KMeans(n_clusters = n_clusters, init = init, n_init = n_init, max_iter = max_iter, tol = tol, precompute_distances = precompute_distances, verbose = verbose, random_state = random_state, copy_x = copy_x, n_jobs = n_jobs, algorithm = algorithm) - kernel.fit(data) - predictions = kernel.predict(data) - centers = kernel.cluster_centers_ - - return centers, predictions - -@jit(forceobj=True) -def pca(data, n_components = None, copy = True, whiten = False, svd_solver = "auto", tol = 0.0, iterated_power = "auto", random_state = None): - - kernel = sklearn.decomposition.PCA(n_components = n_components, copy = copy, whiten = whiten, svd_solver = svd_solver, tol = tol, iterated_power = iterated_power, random_state = random_state) - - return kernel.fit_transform(data) - -@jit(forceobj=True) -def decisiontree(data, labels, test_size = 0.3, criterion = "gini", splitter = "default", max_depth = None): #expects *2d data and 1d labels - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - model = sklearn.tree.DecisionTreeClassifier(criterion = criterion, splitter = splitter, max_depth = max_depth) - model = model.fit(data_train,labels_train) - predictions = model.predict(data_test) - metrics = ClassificationMetrics(predictions, labels_test) - - return model, metrics - -@jit(forceobj=True) -def knn_classifier(data, labels, test_size = 0.3, algorithm='auto', leaf_size=30, metric='minkowski', metric_params=None, n_jobs=None, n_neighbors=5, p=2, weights='uniform'): #expects *2d data and 1d labels post-scaling - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - model = sklearn.neighbors.KNeighborsClassifier() - model.fit(data_train, labels_train) - predictions = model.predict(data_test) - - return model, ClassificationMetrics(predictions, labels_test) - -def knn_regressor(data, outputs, test_size, n_neighbors = 5, weights = "uniform", algorithm = "auto", leaf_size = 30, p = 2, metric = "minkowski", metric_params = None, n_jobs = None): - - data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1) - model = sklearn.neighbors.KNeighborsRegressor(n_neighbors = n_neighbors, weights = weights, algorithm = algorithm, leaf_size = leaf_size, p = p, metric = metric, metric_params = metric_params, n_jobs = n_jobs) - model.fit(data_train, outputs_train) - predictions = model.predict(data_test) - - return model, RegressionMetrics(predictions, outputs_test) - -class NaiveBayes: - - def guassian(self, data, labels, test_size = 0.3, priors = None, var_smoothing = 1e-09): - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - model = sklearn.naive_bayes.GaussianNB(priors = priors, var_smoothing = var_smoothing) - model.fit(data_train, labels_train) - predictions = model.predict(data_test) - - return model, ClassificationMetrics(predictions, labels_test) - - def multinomial(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None): - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - model = sklearn.naive_bayes.MultinomialNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior) - model.fit(data_train, labels_train) - predictions = model.predict(data_test) - - return model, ClassificationMetrics(predictions, labels_test) - - def bernoulli(self, data, labels, test_size = 0.3, alpha=1.0, binarize=0.0, fit_prior=True, class_prior=None): - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - model = sklearn.naive_bayes.BernoulliNB(alpha = alpha, binarize = binarize, fit_prior = fit_prior, class_prior = class_prior) - model.fit(data_train, labels_train) - predictions = model.predict(data_test) - - return model, ClassificationMetrics(predictions, labels_test) - - def complement(self, data, labels, test_size = 0.3, alpha=1.0, fit_prior=True, class_prior=None, norm=False): - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - model = sklearn.naive_bayes.ComplementNB(alpha = alpha, fit_prior = fit_prior, class_prior = class_prior, norm = norm) - model.fit(data_train, labels_train) - predictions = model.predict(data_test) - - return model, ClassificationMetrics(predictions, labels_test) - -class SVM: - - class CustomKernel: - - def __new__(cls, C, kernel, degre, gamma, coef0, shrinking, probability, tol, cache_size, class_weight, verbose, max_iter, decision_function_shape, random_state): - - return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state) - - class StandardKernel: - - def __new__(cls, kernel, C=1.0, degree=3, gamma='auto_deprecated', coef0=0.0, shrinking=True, probability=False, tol=0.001, cache_size=200, class_weight=None, verbose=False, max_iter=-1, decision_function_shape='ovr', random_state=None): - - return sklearn.svm.SVC(C = C, kernel = kernel, gamma = gamma, coef0 = coef0, shrinking = shrinking, probability = probability, tol = tol, cache_size = cache_size, class_weight = class_weight, verbose = verbose, max_iter = max_iter, decision_function_shape = decision_function_shape, random_state = random_state) - - class PrebuiltKernel: - - class Linear: - - def __new__(cls): - - return sklearn.svm.SVC(kernel = 'linear') - - class Polynomial: - - def __new__(cls, power, r_bias): - - return sklearn.svm.SVC(kernel = 'polynomial', degree = power, coef0 = r_bias) - - class RBF: - - def __new__(cls, gamma): - - return sklearn.svm.SVC(kernel = 'rbf', gamma = gamma) - - class Sigmoid: - - def __new__(cls, r_bias): - - return sklearn.svm.SVC(kernel = 'sigmoid', coef0 = r_bias) - - def fit(self, kernel, train_data, train_outputs): # expects *2d data, 1d labels or outputs - - return kernel.fit(train_data, train_outputs) - - def eval_classification(self, kernel, test_data, test_outputs): - - predictions = kernel.predict(test_data) - - return ClassificationMetrics(predictions, test_outputs) - - def eval_regression(self, kernel, test_data, test_outputs): - - predictions = kernel.predict(test_data) - - return RegressionMetrics(predictions, test_outputs) - -def random_forest_classifier(data, labels, test_size, n_estimators="warn", criterion="gini", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False, class_weight=None): - - data_train, data_test, labels_train, labels_test = sklearn.model_selection.train_test_split(data, labels, test_size=test_size, random_state=1) - kernel = sklearn.ensemble.RandomForestClassifier(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_samples_leaf = min_samples_leaf, min_weight_fraction_leaf = min_weight_fraction_leaf, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start, class_weight = class_weight) - kernel.fit(data_train, labels_train) - predictions = kernel.predict(data_test) - - return kernel, ClassificationMetrics(predictions, labels_test) - -def random_forest_regressor(data, outputs, test_size, n_estimators="warn", criterion="mse", max_depth=None, min_samples_split=2, min_samples_leaf=1, min_weight_fraction_leaf=0.0, max_features="auto", max_leaf_nodes=None, min_impurity_decrease=0.0, min_impurity_split=None, bootstrap=True, oob_score=False, n_jobs=None, random_state=None, verbose=0, warm_start=False): - - data_train, data_test, outputs_train, outputs_test = sklearn.model_selection.train_test_split(data, outputs, test_size=test_size, random_state=1) - kernel = sklearn.ensemble.RandomForestRegressor(n_estimators = n_estimators, criterion = criterion, max_depth = max_depth, min_samples_split = min_samples_split, min_weight_fraction_leaf = min_weight_fraction_leaf, max_features = max_features, max_leaf_nodes = max_leaf_nodes, min_impurity_decrease = min_impurity_decrease, min_impurity_split = min_impurity_split, bootstrap = bootstrap, oob_score = oob_score, n_jobs = n_jobs, random_state = random_state, verbose = verbose, warm_start = warm_start) - kernel.fit(data_train, outputs_train) - predictions = kernel.predict(data_test) - - return kernel, RegressionMetrics(predictions, outputs_test) - -class Regression: - - # Titan Robotics Team 2022: CUDA-based Regressions Module - # Written by Arthur Lu & Jacob Levine - # Notes: - # this module has been automatically inegrated into analysis.py, and should be callable as a class from the package - # this module is cuda-optimized and vectorized (except for one small part) - # setup: - - __version__ = "1.0.0.003" - - # changelog should be viewed using print(analysis.regression.__changelog__) - __changelog__ = """ - 1.0.0.003: - - bug fixes - 1.0.0.002: - -Added more parameters to log, exponential, polynomial - -Added SigmoidalRegKernelArthur, because Arthur apparently needs - to train the scaling and shifting of sigmoids - - 1.0.0.001: - -initial release, with linear, log, exponential, polynomial, and sigmoid kernels - -already vectorized (except for polynomial generation) and CUDA-optimized - """ - - __author__ = ( - "Jacob Levine ", - "Arthur Lu " - ) - - __all__ = [ - 'factorial', - 'take_all_pwrs', - 'num_poly_terms', - 'set_device', - 'LinearRegKernel', - 'SigmoidalRegKernel', - 'LogRegKernel', - 'PolyRegKernel', - 'ExpRegKernel', - 'SigmoidalRegKernelArthur', - 'SGDTrain', - 'CustomTrain' - ] - - global device - - device = "cuda:0" if torch.torch.cuda.is_available() else "cpu" - - #todo: document completely - - def set_device(self, new_device): - device=new_device - - class LinearRegKernel(): - parameters= [] - weights=None - bias=None - def __init__(self, num_vars): - self.weights=torch.rand(num_vars, requires_grad=True, device=device) - self.bias=torch.rand(1, requires_grad=True, device=device) - self.parameters=[self.weights,self.bias] - def forward(self,mtx): - long_bias=self.bias.repeat([1,mtx.size()[1]]) - return torch.matmul(self.weights,mtx)+long_bias - - class SigmoidalRegKernel(): - parameters= [] - weights=None - bias=None - sigmoid=torch.nn.Sigmoid() - def __init__(self, num_vars): - self.weights=torch.rand(num_vars, requires_grad=True, device=device) - self.bias=torch.rand(1, requires_grad=True, device=device) - self.parameters=[self.weights,self.bias] - def forward(self,mtx): - long_bias=self.bias.repeat([1,mtx.size()[1]]) - return self.sigmoid(torch.matmul(self.weights,mtx)+long_bias) - - class SigmoidalRegKernelArthur(): - parameters= [] - weights=None - in_bias=None - scal_mult=None - out_bias=None - sigmoid=torch.nn.Sigmoid() - def __init__(self, num_vars): - self.weights=torch.rand(num_vars, requires_grad=True, device=device) - self.in_bias=torch.rand(1, requires_grad=True, device=device) - self.scal_mult=torch.rand(1, requires_grad=True, device=device) - self.out_bias=torch.rand(1, requires_grad=True, device=device) - self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias] - def forward(self,mtx): - long_in_bias=self.in_bias.repeat([1,mtx.size()[1]]) - long_out_bias=self.out_bias.repeat([1,mtx.size()[1]]) - return (self.scal_mult*self.sigmoid(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias - - class LogRegKernel(): - parameters= [] - weights=None - in_bias=None - scal_mult=None - out_bias=None - def __init__(self, num_vars): - self.weights=torch.rand(num_vars, requires_grad=True, device=device) - self.in_bias=torch.rand(1, requires_grad=True, device=device) - self.scal_mult=torch.rand(1, requires_grad=True, device=device) - self.out_bias=torch.rand(1, requires_grad=True, device=device) - self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias] - def forward(self,mtx): - long_in_bias=self.in_bias.repeat([1,mtx.size()[1]]) - long_out_bias=self.out_bias.repeat([1,mtx.size()[1]]) - return (self.scal_mult*torch.log(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias - - class ExpRegKernel(): - parameters= [] - weights=None - in_bias=None - scal_mult=None - out_bias=None - def __init__(self, num_vars): - self.weights=torch.rand(num_vars, requires_grad=True, device=device) - self.in_bias=torch.rand(1, requires_grad=True, device=device) - self.scal_mult=torch.rand(1, requires_grad=True, device=device) - self.out_bias=torch.rand(1, requires_grad=True, device=device) - self.parameters=[self.weights,self.in_bias, self.scal_mult, self.out_bias] - def forward(self,mtx): - long_in_bias=self.in_bias.repeat([1,mtx.size()[1]]) - long_out_bias=self.out_bias.repeat([1,mtx.size()[1]]) - return (self.scal_mult*torch.exp(torch.matmul(self.weights,mtx)+long_in_bias))+long_out_bias - - class PolyRegKernel(): - parameters= [] - weights=None - bias=None - power=None - def __init__(self, num_vars, power): - self.power=power - num_terms=self.num_poly_terms(num_vars, power) - self.weights=torch.rand(num_terms, requires_grad=True, device=device) - self.bias=torch.rand(1, requires_grad=True, device=device) - self.parameters=[self.weights,self.bias] - def num_poly_terms(self,num_vars, power): - if power == 0: - return 0 - return int(self.factorial(num_vars+power-1) / self.factorial(power) / self.factorial(num_vars-1)) + self.num_poly_terms(num_vars, power-1) - def factorial(self,n): - if n==0: - return 1 - else: - return n*self.factorial(n-1) - def take_all_pwrs(self, vec, pwr): - #todo: vectorize (kinda) - combins=torch.combinations(vec, r=pwr, with_replacement=True) - out=torch.ones(combins.size()[0]).to(device).to(torch.float) - for i in torch.t(combins).to(device).to(torch.float): - out *= i - if pwr == 1: - return out - else: - return torch.cat((out,self.take_all_pwrs(vec, pwr-1))) - def forward(self,mtx): - #TODO: Vectorize the last part - cols=[] - for i in torch.t(mtx): - cols.append(self.take_all_pwrs(i,self.power)) - new_mtx=torch.t(torch.stack(cols)) - long_bias=self.bias.repeat([1,mtx.size()[1]]) - return torch.matmul(self.weights,new_mtx)+long_bias - - def SGDTrain(self, kernel, data, ground, loss=torch.nn.MSELoss(), iterations=1000, learning_rate=.1, return_losses=False): - optim=torch.optim.SGD(kernel.parameters, lr=learning_rate) - data_cuda=data.to(device) - ground_cuda=ground.to(device) - if (return_losses): - losses=[] - for i in range(iterations): - with torch.set_grad_enabled(True): - optim.zero_grad() - pred=kernel.forward(data_cuda) - ls=loss(pred,ground_cuda) - losses.append(ls.item()) - ls.backward() - optim.step() - return [kernel,losses] - else: - for i in range(iterations): - with torch.set_grad_enabled(True): - optim.zero_grad() - pred=kernel.forward(data_cuda) - ls=loss(pred,ground_cuda) - ls.backward() - optim.step() - return kernel - - def CustomTrain(self, kernel, optim, data, ground, loss=torch.nn.MSELoss(), iterations=1000, return_losses=False): - data_cuda=data.to(device) - ground_cuda=ground.to(device) - if (return_losses): - losses=[] - for i in range(iterations): - with torch.set_grad_enabled(True): - optim.zero_grad() - pred=kernel.forward(data) - ls=loss(pred,ground) - losses.append(ls.item()) - ls.backward() - optim.step() - return [kernel,losses] - else: - for i in range(iterations): - with torch.set_grad_enabled(True): - optim.zero_grad() - pred=kernel.forward(data_cuda) - ls=loss(pred,ground_cuda) - ls.backward() - optim.step() - return kernel - -class Gliko2: - - _tau = 0.5 - - def getRating(self): - return (self.__rating * 173.7178) + 1500 - - def setRating(self, rating): - self.__rating = (rating - 1500) / 173.7178 - - rating = property(getRating, setRating) - - def getRd(self): - return self.__rd * 173.7178 - - def setRd(self, rd): - self.__rd = rd / 173.7178 - - rd = property(getRd, setRd) - - def __init__(self, rating = 1500, rd = 350, vol = 0.06): - - self.setRating(rating) - self.setRd(rd) - self.vol = vol - - def _preRatingRD(self): - - self.__rd = math.sqrt(math.pow(self.__rd, 2) + math.pow(self.vol, 2)) - - def update_player(self, rating_list, RD_list, outcome_list): - - rating_list = [(x - 1500) / 173.7178 for x in rating_list] - RD_list = [x / 173.7178 for x in RD_list] - - v = self._v(rating_list, RD_list) - self.vol = self._newVol(rating_list, RD_list, outcome_list, v) - self._preRatingRD() - - self.__rd = 1 / math.sqrt((1 / math.pow(self.__rd, 2)) + (1 / v)) - - tempSum = 0 - for i in range(len(rating_list)): - tempSum += self._g(RD_list[i]) * \ - (outcome_list[i] - self._E(rating_list[i], RD_list[i])) - self.__rating += math.pow(self.__rd, 2) * tempSum - - - def _newVol(self, rating_list, RD_list, outcome_list, v): - - i = 0 - delta = self._delta(rating_list, RD_list, outcome_list, v) - a = math.log(math.pow(self.vol, 2)) - tau = self._tau - x0 = a - x1 = 0 - - while x0 != x1: - # New iteration, so x(i) becomes x(i-1) - x0 = x1 - d = math.pow(self.__rating, 2) + v + math.exp(x0) - h1 = -(x0 - a) / math.pow(tau, 2) - 0.5 * math.exp(x0) \ - / d + 0.5 * math.exp(x0) * math.pow(delta / d, 2) - h2 = -1 / math.pow(tau, 2) - 0.5 * math.exp(x0) * \ - (math.pow(self.__rating, 2) + v) \ - / math.pow(d, 2) + 0.5 * math.pow(delta, 2) * math.exp(x0) \ - * (math.pow(self.__rating, 2) + v - math.exp(x0)) / math.pow(d, 3) - x1 = x0 - (h1 / h2) - - return math.exp(x1 / 2) - - def _delta(self, rating_list, RD_list, outcome_list, v): - - tempSum = 0 - for i in range(len(rating_list)): - tempSum += self._g(RD_list[i]) * (outcome_list[i] - self._E(rating_list[i], RD_list[i])) - return v * tempSum - - def _v(self, rating_list, RD_list): - - tempSum = 0 - for i in range(len(rating_list)): - tempE = self._E(rating_list[i], RD_list[i]) - tempSum += math.pow(self._g(RD_list[i]), 2) * tempE * (1 - tempE) - return 1 / tempSum - - def _E(self, p2rating, p2RD): - - return 1 / (1 + math.exp(-1 * self._g(p2RD) * \ - (self.__rating - p2rating))) - - def _g(self, RD): - - return 1 / math.sqrt(1 + 3 * math.pow(RD, 2) / math.pow(math.pi, 2)) - - def did_not_compete(self): - - self._preRatingRD() \ No newline at end of file diff --git a/data analysis/.ipynb_checkpoints/analysis-checkpoint.py b/data analysis/.ipynb_checkpoints/analysis-checkpoint.py deleted file mode 100644 index 479f79de..00000000 --- a/data analysis/.ipynb_checkpoints/analysis-checkpoint.py +++ /dev/null @@ -1,1107 +0,0 @@ -#Titan Robotics Team 2022: Data Analysis Module -#Written by Arthur Lu & Jacob Levine -#Notes: -# this should be imported as a python module using 'import analysis' -# this should be included in the local directory or environment variable -# this module has not been optimized for multhreaded computing -#number of easter eggs: 2 -#setup: - -__version__ = "1.0.8.005" - -#changelog should be viewed using print(analysis.__changelog__) -__changelog__ = """changelog: -1.0.8.005: - - minor fixes -1.0.8.004: - - removed a few unused dependencies -1.0.8.003: - - added p_value function -1.0.8.002: - - updated __all__ correctly to contain changes made in v 1.0.8.000 and v 1.0.8.001 -1.0.8.001: - - refactors - - bugfixes -1.0.8.000: - - depreciated histo_analysis_old - - depreciated debug - - altered basic_analysis to take array data instead of filepath - - refactor - - optimization -1.0.7.002: - - bug fixes -1.0.7.001: - - bug fixes -1.0.7.000: - - added tanh_regression (logistical regression) - - bug fixes -1.0.6.005: - - added z_normalize function to normalize dataset - - bug fixes -1.0.6.004: - - bug fixes -1.0.6.003: - - bug fixes -1.0.6.002: - - bug fixes -1.0.6.001: - - corrected __all__ to contain all of the functions -1.0.6.000: - - added calc_overfit, which calculates two measures of overfit, error and performance - - added calculating overfit to optimize_regression -1.0.5.000: - - added optimize_regression function, which is a sample function to find the optimal regressions - - optimize_regression function filters out some overfit funtions (functions with r^2 = 1) - - planned addition: overfit detection in the optimize_regression function -1.0.4.002: - - added __changelog__ - - updated debug function with log and exponential regressions -1.0.4.001: - - added log regressions - - added exponential regressions - - added log_regression and exp_regression to __all__ -1.0.3.008: - - added debug function to further consolidate functions -1.0.3.007: - - added builtin benchmark function - - added builtin random (linear) data generation function - - added device initialization (_init_device) -1.0.3.006: - - reorganized the imports list to be in alphabetical order - - added search and regurgitate functions to c_entities, nc_entities, obstacles, objectives -1.0.3.005: - - major bug fixes - - updated historical analysis - - depreciated old historical analysis -1.0.3.004: - - added __version__, __author__, __all__ - - added polynomial regression - - added root mean squared function - - added r squared function -1.0.3.003: - - bug fixes - - added c_entities -1.0.3.002: - - bug fixes - - added nc_entities, obstacles, objectives - - consolidated statistics.py to analysis.py -1.0.3.001: - - compiled 1d, column, and row basic stats into basic stats function -1.0.3.000: - - added historical analysis function -1.0.2.xxx: - - added z score test -1.0.1.xxx: - - major bug fixes -1.0.0.xxx: - - added loading csv - - added 1d, column, row basic stats -""" - -__author__ = ( - "Arthur Lu , " - "Jacob Levine ," - ) - -__all__ = [ - '_init_device', - 'c_entities', - 'nc_entities', - 'obstacles', - 'objectives', - 'load_csv', - 'basic_stats', - 'z_score', - 'z_normalize', - 'stdev_z_split', - 'histo_analysis', - 'poly_regression', - 'log_regression', - 'exp_regression', - 'r_squared', - 'rms', - 'calc_overfit', - 'strip_data', - 'optimize_regression', - 'select_best_regression', - 'basic_analysis', - #all statistics functions left out due to integration in other functions - ] - -#now back to your regularly scheduled programming: - -#imports (now in alphabetical order! v 1.0.3.006): - -from bisect import bisect_left, bisect_right -import collections -import csv -from decimal import Decimal -import functools -from fractions import Fraction -from itertools import groupby -import math -import matplotlib -import numbers -import numpy as np -import pandas -import random -import scipy -from scipy.optimize import curve_fit -from scipy import stats -from sklearn import * -#import statistics <-- statistics.py functions have been integrated into analysis.py as of v 1.0.3.002 -import time -import torch - -class error(ValueError): - pass - -def _init_device (setting, arg): #initiates computation device for ANNs - if setting == "cuda": - try: - return torch.device(setting + ":" + str(arg) if torch.cuda.is_available() else "cpu") - except: - raise error("could not assign cuda or cpu") - elif setting == "cpu": - try: - return torch.device("cpu") - except: - raise error("could not assign cpu") - else: - raise error("specified device does not exist") - -class c_entities: - - c_names = [] - c_ids = [] - c_pos = [] - c_properties = [] - c_logic = [] - - def debug(self): - print("c_entities has attributes names, ids, positions, properties, and logic. __init__ takes self, 1d array of names, 1d array of ids, 2d array of positions, nd array of properties, and nd array of logic") - return[self.c_names, self.c_ids, self.c_pos, self.c_properties, self.c_logic] - - def __init__(self, names, ids, pos, properties, logic): - self.c_names = names - self.c_ids = ids - self.c_pos = pos - self.c_properties = properties - self.c_logic = logic - return None - - - def append(self, n_name, n_id, n_pos, n_property, n_logic): - self.c_names.append(n_name) - self.c_ids.append(n_id) - self.c_pos.append(n_pos) - self.c_properties.append(n_property) - self.c_logic.append(n_logic) - return None - - def edit(self, search, n_name, n_id, n_pos, n_property, n_logic): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - if n_name != "null": - self.c_names[position] = n_name - - if n_id != "null": - self.c_ids[position] = n_id - - if n_pos != "null": - self.c_pos[position] = n_pos - - if n_property != "null": - self.c_properties[position] = n_property - - if n_logic != "null": - self.c_logic[position] = n_logic - - return None - - def search(self, search): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - - return [self.c_names[position], self.c_ids[position], self.c_pos[position], self.c_properties[position], self.c_logic[position]] - - def regurgitate(self): - return[self.c_names, self.c_ids, self.c_pos, self.c_properties, self.c_logic] - -class nc_entities: - - c_names = [] - c_ids = [] - c_pos = [] - c_properties = [] - c_effects = [] - - def debug(self): - print ("nc_entities (non-controlable entities) has attributes names, ids, positions, properties, and effects. __init__ takes self, 1d array of names, 1d array of ids, 2d array of positions, 2d array of properties, and 2d array of effects.") - return[self.c_names, self.c_ids, self.c_pos, self.c_properties, self.c_effects] - - def __init__(self, names, ids, pos, properties, effects): - self.c_names = names - self.c_ids = ids - self.c_pos = pos - self.c_properties = properties - self.c_effects = effects - return None - - def append(self, n_name, n_id, n_pos, n_property, n_effect): - self.c_names.append(n_name) - self.c_ids.append(n_id) - self.c_pos.append(n_pos) - self.c_properties.append(n_property) - self.c_effects.append(n_effect) - - return None - - def edit(self, search, n_name, n_id, n_pos, n_property, n_effect): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - if n_name != "null": - self.c_names[position] = n_name - - if n_id != "null": - self.c_ids[position] = n_id - - if n_pos != "null": - self.c_pos[position] = n_pos - - if n_property != "null": - self.c_properties[position] = n_property - - if n_effect != "null": - self.c_effects[position] = n_effect - - return None - - def search(self, search): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - - return [self.c_names[position], self.c_ids[position], self.c_pos[position], self.c_properties[position], self.c_effects[position]] - - def regurgitate(self): - - return[self.c_names, self.c_ids, self.c_pos, self.c_properties, self.c_effects] - -class obstacles: - - c_names = [] - c_ids = [] - c_perim = [] - c_effects = [] - - def debug(self): - print("obstacles has atributes names, ids, positions, perimeters, and effects. __init__ takes self, 1d array of names, 1d array of ids, 2d array of position, 3d array of perimeters, 2d array of effects.") - return [self.c_names, self.c_ids, self.c_perim, self.c_effects] - - def __init__(self, names, ids, perims, effects): - self.c_names = names - self.c_ids = ids - self.c_perim = perims - self.c_effects = effects - return None - - def append(self, n_name, n_id, n_perim, n_effect): - self.c_names.append(n_name) - self.c_ids.append(n_id) - self.c_perim.append(n_perim) - self.c_effects.append(n_effect) - return None - - def edit(self, search, n_name, n_id, n_perim, n_effect): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - - if n_name != "null": - self.c_names[position] = n_name - - if n_id != "null": - self.c_ids[position] = n_id - - if n_perim != "null": - self.c_perim[position] = n_perim - - if n_effect != "null": - self.c_effects[position] = n_effect - - return None - - def search(self, search): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - - return [self.c_names[position], self.c_ids[position], self.c_perim[position], self.c_effects[position]] - - def regurgitate(self): - return[self.c_names, self.c_ids, self.c_perim, self.c_effects] - -class objectives: - - c_names = [] - c_ids = [] - c_pos = [] - c_effects = [] - - def debug(self): - print("objectives has atributes names, ids, positions, and effects. __init__ takes self, 1d array of names, 1d array of ids, 2d array of position, 1d array of effects.") - return [self.c_names, self.c_ids, self.c_pos, self.c_effects] - - def __init__(self, names, ids, pos, effects): - self.c_names = names - self.c_ids = ids - self.c_pos = pos - self.c_effects = effects - return None - - def append(self, n_name, n_id, n_pos, n_effect): - self.c_names.append(n_name) - self.c_ids.append(n_id) - self.c_pos.append(n_pos) - self.c_effects.append(n_effect) - return None - - def edit(self, search, n_name, n_id, n_pos, n_effect): - position = 0 - print(self.c_ids) - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - - if n_name != "null": - self.c_names[position] = n_name - - if n_id != "null": - self.c_ids[position] = n_id - - if n_pos != "null": - self.c_pos[position] = n_pos - - if n_effect != "null": - self.c_effects[position] = n_effect - - return None - - def search(self, search): - position = 0 - for i in range(0, len(self.c_ids), 1): - if self.c_ids[i] == search: - position = i - - return [self.c_names[position], self.c_ids[position], self.c_pos[position], self.c_effects[position]] - - def regurgitate(self): - return[self.c_names, self.c_ids, self.c_pos, self.c_effects] - -def load_csv(filepath): - with open(filepath, newline = '') as csvfile: - file_array = list(csv.reader(csvfile)) - csvfile.close() - return file_array - -def basic_stats(data, method, arg): # data=array, mode = ['1d':1d_basic_stats, 'column':c_basic_stats, 'row':r_basic_stats], arg for mode 1 or mode 2 for column or row - - if method == 'debug': - return "basic_stats requires 3 args: data, mode, arg; where data is data to be analyzed, mode is an int from 0 - 2 depending on type of analysis (by column or by row) and is only applicable to 2d arrays (for 1d arrays use mode 1), and arg is row/column number for mode 1 or mode 2; function returns: [mean, median, mode, stdev, variance]" - - if method == "1d" or method == 0: - - data_t = [] - - for i in range (0, len(data), 1): - data_t.append(float(data[i])) - - _mean = mean(data_t) - _median = median(data_t) - try: - _mode = mode(data_t) - except: - _mode = None - try: - _stdev = stdev(data_t) - except: - _stdev = None - try: - _variance = variance(data_t) - except: - _variance = None - - return _mean, _median, _mode, _stdev, _variance - - elif method == "column" or method == 1: - - c_data = [] - c_data_sorted = [] - - for i in data: - try: - c_data.append(float(i[arg])) - except: - pass - - _mean = mean(c_data) - _median = median(c_data) - try: - _mode = mode(c_data) - except: - _mode = None - try: - _stdev = stdev(c_data) - except: - _stdev = None - try: - _variance = variance(c_data) - except: - _variance = None - - return _mean, _median, _mode, _stdev, _variance - - elif method == "row" or method == 2: - - r_data = [] - - for i in range(len(data[arg])): - r_data.append(float(data[arg][i])) - - _mean = mean(r_data) - _median = median(r_data) - try: - _mode = mode(r_data) - except: - _mode = None - try: - _stdev = stdev(r_data) - except: - _stdev = None - try: - _variance = variance(r_data) - except: - _variance = None - - return _mean, _median, _mode, _stdev, _variance - - else: - raise error("method error") - -def z_score(point, mean, stdev): #returns z score with inputs of point, mean and standard deviation of spread - score = (point - mean)/stdev - return score - -def z_normalize(x, y, mode): #mode is either 'x' or 'y' or 'both' depending on the variable(s) to be normalized - - x_norm = [] - y_norm = [] - - mean = 0 - stdev = 0 - - if mode == 'x': - _mean, _median, _mode, _stdev, _variance = basic_stats(x, "1d", 0) - - for i in range (0, len(x), 1): - x_norm.append(z_score(x[i], _mean, _stdev)) - - return x_norm, y - - if mode == 'y': - _mean, _median, _mode, _stdev, _variance = basic_stats(y, "1d", 0) - - for i in range (0, len(y), 1): - y_norm.append(z_score(y[i], _mean, _stdev)) - - return x, y_norm - - if mode == 'both': - _mean, _median, _mode, _stdev, _variance = basic_stats(x, "1d", 0) - - for i in range (0, len(x), 1): - x_norm.append(z_score(x[i], _mean, _stdev)) - - _mean, _median, _mode, _stdev, _variance = basic_stats(y, "1d", 0) - - for i in range (0, len(y), 1): - y_norm.append(z_score(y[i], _mean, _stdev)) - - return x_norm, y_norm - - else: - - return error('method error') - -def stdev_z_split(mean, stdev, delta, low_bound, high_bound): #returns n-th percentile of spread given mean, standard deviation, lower z-score, and upper z-score - - z_split = [] - i = low_bound - - while True: - z_split.append(float((1 / (stdev * math.sqrt(2 * math.pi))) * math.e ** (-0.5 * (((i - mean) / stdev) ** 2)))) - i = i + delta - if i > high_bound: - break - - return z_split - -def histo_analysis(hist_data, delta, low_bound, high_bound): - - if hist_data == 'debug': - return ('returns list of predicted values based on historical data; input delta for delta step in z-score and lower and higher bounds in number of standard deviations') - - derivative = [] - - for i in range(0, len(hist_data), 1): - try: - derivative.append(float(hist_data[i - 1]) - float(hist_data [i])) - except: - pass - - derivative_sorted = sorted(derivative, key=int) - mean_derivative = basic_stats(derivative_sorted,"1d", 0)[0] - stdev_derivative = basic_stats(derivative_sorted, "1d", 0)[3] - - predictions = [] - pred_change = 0 - - i = low_bound - - while True: - if i > high_bound: - break - - try: - pred_change = mean_derivative + i * stdev_derivative - except: - pred_change = mean_derivative - - predictions.append(float(hist_data[-1:][0]) + pred_change) - - i = i + delta - - return predictions - -def poly_regression(x, y, power): - - if x == "null": #if x is 'null', then x will be filled with integer points between 1 and the size of y - x = [] - - for i in range(len(y)): - print(i) - x.append(i+1) - - reg_eq = scipy.polyfit(x, y, deg = power) - eq_str = "" - - for i in range(0, len(reg_eq), 1): - if i < len(reg_eq)- 1: - eq_str = eq_str + str(reg_eq[i]) + "*(z**" + str(len(reg_eq) - i - 1) + ")+" - else: - eq_str = eq_str + str(reg_eq[i]) + "*(z**" + str(len(reg_eq) - i - 1) + ")" - - vals = [] - - for i in range(0, len(x), 1): - z = x[i] - - try: - exec("vals.append(" + eq_str + ")") - except: - pass - - _rms = rms(vals, y) - r2_d2 = r_squared(vals, y) - - return [eq_str, _rms, r2_d2] - -def log_regression(x, y, base): - - x_fit = [] - - for i in range(len(x)): - try: - x_fit.append(np.log(x[i]) / np.log(base)) #change of base for logs - except: - pass - - reg_eq = np.polyfit(x_fit, y, 1) # y = reg_eq[0] * log(x, base) + reg_eq[1] - q_str = str(reg_eq[0]) + "* (np.log(z) / np.log(" + str(base) +"))+" + str(reg_eq[1]) - vals = [] - - for i in range(len(x)): - z = x[i] - - try: - exec("vals.append(" + eq_str + ")") - except: - pass - - _rms = rms(vals, y) - r2_d2 = r_squared(vals, y) - - return eq_str, _rms, r2_d2 - -def exp_regression(x, y, base): - - y_fit = [] - - for i in range(len(y)): - try: - y_fit.append(np.log(y[i]) / np.log(base)) #change of base for logs - except: - pass - - reg_eq = np.polyfit(x, y_fit, 1, w=np.sqrt(y_fit)) # y = base ^ (reg_eq[0] * x) * base ^ (reg_eq[1]) - eq_str = "(" + str(base) + "**(" + str(reg_eq[0]) + "*z))*(" + str(base) + "**(" + str(reg_eq[1]) + "))" - vals = [] - - for i in range(len(x)): - z = x[i] - - try: - exec("vals.append(" + eq_str + ")") - except: - pass - - _rms = rms(vals, y) - r2_d2 = r_squared(vals, y) - - return eq_str, _rms, r2_d2 - -def tanh_regression(x, y): - - def tanh (x, a, b, c, d): - - return a * np.tanh(b * (x - c)) + d - - reg_eq = np.float64(curve_fit(tanh, np.array(x), np.array(y))[0]).tolist() - eq_str = str(reg_eq[0]) + " * np.tanh(" + str(reg_eq[1]) + "*(z - " + str(reg_eq[2]) + ")) + " + str(reg_eq[3]) - vals = [] - - for i in range(len(x)): - z = x[i] - try: - exec("vals.append(" + eq_str + ")") - except: - pass - - _rms = rms(vals, y) - r2_d2 = r_squared(vals, y) - - return eq_str, _rms, r2_d2 - -def r_squared(predictions, targets): # assumes equal size inputs - - return metrics.r2_score(np.array(targets), np.array(predictions)) - -def rms(predictions, targets): # assumes equal size inputs - - _sum = 0 - - for i in range(0, len(targets), 1): - _sum = (targets[i] - predictions[i]) ** 2 - - return float(math.sqrt(_sum/len(targets))) - -def calc_overfit(equation, rms_train, r2_train, x_test, y_test): - - #performance overfit = performance(train) - performance(test) where performance is r^2 - #error overfit = error(train) - error(test) where error is rms; biased towards smaller values - - vals = [] - - for i in range(0, len(x_test), 1): - - z = x_test[i] - - exec("vals.append(" + equation + ")") - - r2_test = r_squared(vals, y_test) - rms_test = rms(vals, y_test) - - return r2_train - r2_test - -def strip_data(data, mode): - - if mode == "adam": #x is the row number, y are the data - pass - - if mode == "eve": #x are the data, y is the column number - pass - - else: - raise error("mode error") - -def optimize_regression(x, y, _range, resolution):#_range in poly regression is the range of powers tried, and in log/exp it is the inverse of the stepsize taken from -1000 to 1000 -#usage not: for demonstration purpose only, performance is shit - if type(resolution) != int: - raise error("resolution must be int") - - x_train = x - y_train = [] - - for i in range(len(y)): - y_train.append(float(y[i])) - - x_test = [] - y_test = [] - - for i in range (0, math.floor(len(x) * 0.5), 1): - index = random.randint(0, len(x) - 1) - - x_test.append(x[index]) - y_test.append(float(y[index])) - - x_train.pop(index) - y_train.pop(index) - - #print(x_train, x_test) - #print(y_train, y_test) - - eqs = [] - rmss = [] - r2s = [] - - for i in range (0, _range + 1, 1): - try: - x, y, z = poly_regression(x_train, y_train, i) - eqs.append(x) - rmss.append(y) - r2s.append(z) - except: - pass - - for i in range (1, 100 * resolution + 1): - try: - x, y, z = exp_regression(x_train, y_train, float(i / resolution)) - eqs.append(x) - rmss.append(y) - r2s.append(z) - except: - pass - - for i in range (1, 100 * resolution + 1): - try: - x, y, z = log_regression(x_train, y_train, float(i / resolution)) - eqs.append(x) - rmss.append(y) - r2s.append(z) - except: - pass - - try: - x, y, z = tanh_regression(x_train, y_train) - - eqs.append(x) - rmss.append(y) - r2s.append(z) - except: - pass - - for i in range (0, len(eqs), 1): #marks all equations where r2 = 1 as they 95% of the time overfit the data - if r2s[i] == 1: - eqs[i] = "" - rmss[i] = "" - r2s[i] = "" - - while True: #removes all equations marked for removal - try: - eqs.remove('') - rmss.remove('') - r2s.remove('') - except: - break - - overfit = [] - - for i in range (0, len(eqs), 1): - - overfit.append(calc_overfit(eqs[i], rmss[i], r2s[i], x_test, y_test)) - - return eqs, rmss, r2s, overfit - -def select_best_regression(eqs, rmss, r2s, overfit, selector): - - b_eq = "" - b_rms = 0 - b_r2 = 0 - b_overfit = 0 - - ind = 0 - - if selector == "min_overfit": - - ind = np.argmin(overfit) - - b_eq = eqs[ind] - b_rms = rmss[ind] - b_r2 = r2s[ind] - b_overfit = overfit[ind] - - if selector == "max_r2s": - - ind = np.argmax(r2s) - b_eq = eqs[ind] - b_rms = rmss[ind] - b_r2 = r2s[ind] - b_overfit = overfit[ind] - - return b_eq, b_rms, b_r2, b_overfit - -def p_value(x, y): #takes 2 1d arrays - - return stats.ttest_ind(x, y)[1] - -def basic_analysis(data): #assumes that rows are the independent variable and columns are the dependant. also assumes that time flows from lowest column to highest column. - - row = len(data) - column = [] - - for i in range(0, row, 1): - column.append(len(data[i])) - - column_max = max(column) - row_b_stats = [] - row_histo = [] - - for i in range(0, row, 1): - row_b_stats.append(basic_stats(data, "row", i)) - row_histo.append(histo_analysis(data[i], 0.67449, -0.67449, 0.67449)) - - column_b_stats = [] - - for i in range(0, column_max, 1): - column_b_stats.append(basic_stats(data, "column", i)) - - return[row_b_stats, column_b_stats, row_histo] - - -def benchmark(x, y): - - start_g = time.time() - generate_data("data/data.csv", x, y, -10, 10) - end_g = time.time() - - start_a = time.time() - basic_analysis("data/data.csv") - end_a = time.time() - - return [(end_g - start_g), (end_a - start_a)] - -def generate_data(filename, x, y, low, high): - - file = open(filename, "w") - - for i in range (0, y, 1): - temp = "" - - for j in range (0, x - 1, 1): - temp = str(random.uniform(low, high)) + "," + temp - - temp = temp + str(random.uniform(low, high)) - file.write(temp + "\n") - -class StatisticsError(ValueError): - pass - -def _sum(data, start=0): - count = 0 - n, d = _exact_ratio(start) - partials = {d: n} - partials_get = partials.get - T = _coerce(int, type(start)) - for typ, values in groupby(data, type): - T = _coerce(T, typ) # or raise TypeError - for n,d in map(_exact_ratio, values): - count += 1 - partials[d] = partials_get(d, 0) + n - if None in partials: - - total = partials[None] - assert not _isfinite(total) - else: - - total = sum(Fraction(n, d) for d, n in sorted(partials.items())) - return (T, total, count) - -def _isfinite(x): - try: - return x.is_finite() # Likely a Decimal. - except AttributeError: - return math.isfinite(x) # Coerces to float first. - -def _coerce(T, S): - - assert T is not bool, "initial type T is bool" - - if T is S: return T - - if S is int or S is bool: return T - if T is int: return S - - if issubclass(S, T): return S - if issubclass(T, S): return T - - if issubclass(T, int): return S - if issubclass(S, int): return T - - if issubclass(T, Fraction) and issubclass(S, float): - return S - if issubclass(T, float) and issubclass(S, Fraction): - return T - - msg = "don't know how to coerce %s and %s" - raise TypeError(msg % (T.__name__, S.__name__)) - -def _exact_ratio(x): - - try: - - if type(x) is float or type(x) is Decimal: - return x.as_integer_ratio() - try: - - return (x.numerator, x.denominator) - except AttributeError: - try: - - return x.as_integer_ratio() - except AttributeError: - - pass - except (OverflowError, ValueError): - - assert not _isfinite(x) - return (x, None) - msg = "can't convert type '{}' to numerator/denominator" - raise TypeError(msg.format(type(x).__name__)) - -def _convert(value, T): - - if type(value) is T: - - return value - if issubclass(T, int) and value.denominator != 1: - T = float - try: - - return T(value) - except TypeError: - if issubclass(T, Decimal): - return T(value.numerator)/T(value.denominator) - else: - raise - -def _counts(data): - - table = collections.Counter(iter(data)).most_common() - if not table: - return table - - maxfreq = table[0][1] - for i in range(1, len(table)): - if table[i][1] != maxfreq: - table = table[:i] - break - return table - - -def _find_lteq(a, x): - - i = bisect_left(a, x) - if i != len(a) and a[i] == x: - return i - raise ValueError - - -def _find_rteq(a, l, x): - - i = bisect_right(a, x, lo=l) - if i != (len(a)+1) and a[i-1] == x: - return i-1 - raise ValueError - - -def _fail_neg(values, errmsg='negative value'): - - for x in values: - if x < 0: - raise StatisticsError(errmsg) - yield x - -def mean(data): - - if iter(data) is data: - data = list(data) - n = len(data) - if n < 1: - raise StatisticsError('mean requires at least one data point') - T, total, count = _sum(data) - assert count == n - return _convert(total/n, T) - -def median(data): - - data = sorted(data) - n = len(data) - if n == 0: - raise StatisticsError("no median for empty data") - if n%2 == 1: - return data[n//2] - else: - i = n//2 - return (data[i - 1] + data[i])/2 - -def mode(data): - - table = _counts(data) - if len(table) == 1: - return table[0][0] - elif table: - raise StatisticsError( - 'no unique mode; found %d equally common values' % len(table) - ) - else: - raise StatisticsError('no mode for empty data') - -def _ss(data, c=None): - - if c is None: - c = mean(data) - T, total, count = _sum((x-c)**2 for x in data) - - U, total2, count2 = _sum((x-c) for x in data) - assert T == U and count == count2 - total -= total2**2/len(data) - assert not total < 0, 'negative sum of square deviations: %f' % total - return (T, total) - -def variance(data, xbar=None): - - if iter(data) is data: - data = list(data) - n = len(data) - if n < 2: - raise StatisticsError('variance requires at least two data points') - T, ss = _ss(data, xbar) - return _convert(ss/(n-1), T) - -def stdev(data, xbar=None): - - var = variance(data, xbar) - try: - return var.sqrt() - except AttributeError: - return math.sqrt(var) \ No newline at end of file