mirror of
https://github.com/titanscouting/tra-analysis.git
synced 2024-11-13 22:56:18 +00:00
179 lines
7.2 KiB
Python
179 lines
7.2 KiB
Python
|
#Titan Robotics Team 2022: ML Module
|
||
|
#Written by Arthur Lu & Jacob Levine
|
||
|
#Notes:
|
||
|
# this should be imported as a python module using 'import titanlearn'
|
||
|
# this should be included in the local directory or environment variable
|
||
|
# this module has not been optimized for multhreaded computing
|
||
|
# this module learns from its mistakes far faster than 2022's captains
|
||
|
#setup:
|
||
|
|
||
|
__version__ = "1.0.0.001"
|
||
|
|
||
|
#changelog should be viewed using print(analysis.__changelog__)
|
||
|
__changelog__ = """changelog:
|
||
|
1.0.0.xxx:
|
||
|
-added generation of ANNS, basic SGD training"""
|
||
|
__author__ = (
|
||
|
"Arthur Lu <arthurlu@ttic.edu>, "
|
||
|
"Jacob Levine <jlevine@ttic.edu>,"
|
||
|
)
|
||
|
__all__ = [
|
||
|
'linear_nn',
|
||
|
'train_sgd_minibatch',
|
||
|
'train_sgd_simple'
|
||
|
]
|
||
|
#imports
|
||
|
import torch
|
||
|
import warnings
|
||
|
from collections import OrderedDict
|
||
|
from sklearn import metrics
|
||
|
import numpy as np
|
||
|
import matplotlib.pyplot as plt
|
||
|
import math
|
||
|
|
||
|
#enable CUDA if possible
|
||
|
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
|
||
|
|
||
|
#linear_nn: creates a fully connected network given params
|
||
|
def linear_nn(in_dim, hidden_dim, out_dim, num_hidden, act_fn="tanh", end="softmax"):
|
||
|
if act_fn.lower()=="tanh":
|
||
|
k=OrderedDict([("in", torch.nn.Linear(in_dim,hidden_dim)), ('tanh0', torch.nn.Tanh())])
|
||
|
for i in range(num_hidden):
|
||
|
k.update({"lin"+str(i+1): torch.nn.Linear(hidden_dim,hidden_dim), "tanh"+str(i+1):torch.nn.Tanh()})
|
||
|
|
||
|
elif act_fn.lower()=="sigmoid":
|
||
|
k=OrderedDict([("in", torch.nn.Linear(in_dim,hidden_dim)), ('sig0', torch.nn.Sigmoid())])
|
||
|
for i in range(num_hidden):
|
||
|
k.update({"lin"+str(i+1): torch.nn.Linear(hidden_dim,hidden_dim), "sig"+str(i+1):torch.nn.Sigmoid()})
|
||
|
|
||
|
elif act_fn.lower()=="relu":
|
||
|
k=OrderedDict([("in", torch.nn.Linear(in_dim,hidden_dim)), ('relu0', torch.nn.ReLU())])
|
||
|
for i in range(num_hidden):
|
||
|
k.update({"lin"+str(i+1): torch.nn.Linear(hidden_dim,hidden_dim), "relu"+str(i+1):torch.nn.ReLU()})
|
||
|
|
||
|
elif act_fn.lower()=="leaky relu":
|
||
|
k=OrderedDict([("in", torch.nn.Linear(in_dim,hidden_dim)), ('lre0', torch.nn.LeakyReLU())])
|
||
|
for i in range(num_hidden):
|
||
|
k.update({"lin"+str(i+1): torch.nn.Linear(hidden_dim,hidden_dim), "lre"+str(i+1):torch.nn.LeakyReLU()})
|
||
|
else:
|
||
|
warnings.warn("Did not specify a valid inner activation function. Returning nothing.")
|
||
|
return None
|
||
|
|
||
|
if end.lower()=="softmax":
|
||
|
k.update({"out": torch.nn.Linear(hidden_dim,out_dim), "softmax": torch.nn.Softmax()})
|
||
|
elif end.lower()=="none":
|
||
|
k.update({"out": torch.nn.Linear(hidden_dim,out_dim)})
|
||
|
elif end.lower()=="sigmoid":
|
||
|
k.update({"out": torch.nn.Linear(hidden_dim,out_dim), "sigmoid": torch.nn.Sigmoid()})
|
||
|
else:
|
||
|
warnings.warn("Did not specify a valid final activation function. Returning nothing.")
|
||
|
return None
|
||
|
|
||
|
return torch.nn.Sequential(k)
|
||
|
|
||
|
#train_sgd_simple: trains network using SGD
|
||
|
def train_sgd_simple(net, data, ground, dev=None, devg=None, iters=1000, learnrate=1e-4, testevery=1, graphsaveloc=None, modelsaveloc=None, loss="mse"):
|
||
|
model=net.to(device)
|
||
|
data=data.to(device)
|
||
|
ground=ground.to(device)
|
||
|
if dev != None:
|
||
|
dev=dev.to(device)
|
||
|
losses=[]
|
||
|
dev_losses=[]
|
||
|
if loss.lower()=="mse":
|
||
|
loss_fn = torch.nn.MSELoss(reduction='sum')
|
||
|
elif loss.lower()=="cross entropy":
|
||
|
loss_fn = torch.nn.CrossEntropyLoss()
|
||
|
elif loss.lower()=="nll":
|
||
|
loss_fn = torch.nn.NLLLoss()
|
||
|
elif loss.lower()=="poisson nll":
|
||
|
loss_fn = torch.nn.PoissonNLLLoss()
|
||
|
else:
|
||
|
warnings.warn("Did not specify a valid loss function. Returning nothing.")
|
||
|
return None
|
||
|
optimizer=torch.optim.SGD(model.parameters(), lr=learnrate)
|
||
|
for i in range(iters):
|
||
|
if i%testevery==0:
|
||
|
with torch.no_grad():
|
||
|
output = model(data)
|
||
|
ap = metrics.average_precision_score(ground.numpy(), output.numpy())
|
||
|
losses.append(ap)
|
||
|
print(str(i)+": "+str(ap))
|
||
|
plt.plot(np.array(range(0,i+1,testevery)),np.array(losses), label="train AP")
|
||
|
if dev != None:
|
||
|
output = model(dev)
|
||
|
ap = metrics.average_precision_score(devg.numpy(), output.numpy())
|
||
|
dev_losses.append(ap)
|
||
|
plt.plot(np.array(range(0,i+1,testevery)),np.array(losses), label="dev AP")
|
||
|
if graphsaveloc != None:
|
||
|
plt.savefig(graphsaveloc+".pdf")
|
||
|
with torch.enable_grad():
|
||
|
optimizer.zero_grad()
|
||
|
output = model(data)
|
||
|
loss = loss_fn(output, ground)
|
||
|
loss.backward()
|
||
|
optimizer.step()
|
||
|
if modelsaveloc != None:
|
||
|
torch.save(model, modelsaveloc)
|
||
|
plt.show()
|
||
|
return model
|
||
|
|
||
|
#train_sgd_minibatch: same as above, but with minibatches
|
||
|
def train_sgd_minibatch(net, data, ground, dev=None, devg=None, epoch=100, batchsize=20, learnrate=1e-4, testevery=20, graphsaveloc=None, modelsaveloc=None, loss="mse"):
|
||
|
model=net.to(device)
|
||
|
data=data.to(device)
|
||
|
ground=ground.to(device)
|
||
|
if dev != None:
|
||
|
dev=dev.to(device)
|
||
|
losses=[]
|
||
|
dev_losses=[]
|
||
|
if loss.lower()=="mse":
|
||
|
loss_fn = torch.nn.MSELoss(reduction='sum')
|
||
|
elif loss.lower()=="cross entropy":
|
||
|
loss_fn = torch.nn.CrossEntropyLoss()
|
||
|
elif loss.lower()=="nll":
|
||
|
loss_fn = torch.nn.NLLLoss()
|
||
|
elif loss.lower()=="poisson nll":
|
||
|
loss_fn = torch.nn.PoissonNLLLoss()
|
||
|
else:
|
||
|
warnings.warn("Did not specify a valid loss function. Returning nothing.")
|
||
|
return None
|
||
|
optimizer=torch.optim.SGD(model.parameters(), lr=learnrate)
|
||
|
itercount=0
|
||
|
for i in range(epoch):
|
||
|
print("EPOCH "+str(i)+" OF "+str(epoch-1))
|
||
|
batches=math.ceil(data.size()[0].item()/batchsize)
|
||
|
for j in range(batches):
|
||
|
batchdata=[]
|
||
|
batchground=[]
|
||
|
for k in range(j*batchsize, min((j+1)*batchsize, data.size()[0].item()),1):
|
||
|
batchdata.append(data[k])
|
||
|
batchground.append(ground[k])
|
||
|
batchdata=torch.stack(batchdata)
|
||
|
batchground=torch.stack(batchground)
|
||
|
if itercount%testevery==0:
|
||
|
with torch.no_grad():
|
||
|
output = model(data)
|
||
|
ap = metrics.average_precision_score(ground.numpy(), output.numpy())
|
||
|
losses.append(ap)
|
||
|
print(str(i)+": "+str(ap))
|
||
|
plt.plot(np.array(range(0,i+1,testevery)),np.array(losses))
|
||
|
if dev != None:
|
||
|
output = model(dev)
|
||
|
ap = metrics.average_precision_score(devg.numpy(), output.numpy())
|
||
|
dev_losses.append(ap)
|
||
|
plt.plot(np.array(range(0,i+1,testevery)),np.array(losses), label="dev AP")
|
||
|
if graphsaveloc != None:
|
||
|
plt.savefig(graphsaveloc+".pdf")
|
||
|
with torch.enable_grad():
|
||
|
optimizer.zero_grad()
|
||
|
output = model(batchdata)
|
||
|
loss = loss_fn(output, ground)
|
||
|
loss.backward()
|
||
|
optimizer.step()
|
||
|
itercount +=1
|
||
|
if modelsaveloc != None:
|
||
|
torch.save(model, modelsaveloc)
|
||
|
plt.show()
|
||
|
return model
|