-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathmy_ai.py
109 lines (87 loc) · 3.63 KB
/
my_ai.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
#AI for self learning car
#Importing libraries
import numpy as np
import random
import os
import torch
import torch.nn as nn
import torch.nn.functional as Fns
import torch.optim as optim
import torch.autograd as autograd
from torch.autograd import Variable
#Creating the architecture of Neural networks
class Network(nn.Module):
def __init__(self, input_size, nb_action):
super(Network, self).__init__()
self.input_size = input_size
self.nb_action = nb_action
self.fc1 = nn.Linear(input_size, 50)
self.fc2 = nn.Linear(50, nb_action)
def forward(self, state):
x = Fns.relu(self.fc1(state))
q_values = self.fc2(x)
return q_values
#Implementing Experience Relay
class ReplayMemory(object):
def __init__(self, capacity):
self.capacity = capacity
self.memory = []
def push(self, event):
self.memory.append(event)
if len(self.memory) > self.capacity:
del self.memory[0]
def sample(self, batch_size):
samples = zip(*random.sample(self.memory, batch_size))
return map(lambda x : Variable(torch.cat(x, 0)), samples)
#Implementing the Deep Q-Learning
class Dqn():
def __init__(self, input_size, nb_action, gamma):
self.gamma = gamma
self.reward_window = []
self.model = Network(input_size, nb_action)
self.memory = ReplayMemory(100000)
self.optimizer = optim.Adam(self.model.parameters(), lr=0.001)
self.last_state = torch.Tensor(input_size).unsqueeze(0)
self.last_action = 0
self.last_reward = 0
def select_action(self, state):
probs = Fns.softmax(self.model(Variable(state, volatile=True))*300) #T=0 (ai stops)
action = probs.multinomial()
return action.data[0,0]
def learn(self, batch_state, batch_next_state, batch_reward, batch_action):
outputs = self.model(batch_state).gather(1, batch_action.unsqueeze(1)).squeeze(1)
next_outputs = self.model(batch_next_state).detach().max(1)[0]
target = self.gamma*next_outputs + batch_reward
td_loss = Fns.smooth_l1_loss(outputs, target)
self.optimizer.zero_grad()
td_loss.backward(retain_variables = True)
self.optimizer.step()
def update(self, reward, new_signal):
new_state = torch.Tensor(new_signal).float().unsqueeze(0)
self.memory.push((self.last_state, new_state, torch.LongTensor([int(self.last_action)]), torch.Tensor([self.last_reward])))
action = self.select_action(new_state)
if len(self.memory.memory) > 100:
batch_state, batch_next_state, batch_action, batch_reward = self.memory.sample(100)
self.learn(batch_state, batch_next_state, batch_reward, batch_action)
self.last_action = action
self.last_state = new_state
self.last_reward = reward
self.reward_window.append(reward)
if len(self.reward_window) > 1000:
del self.reward_window[0]
return action
def score(self):
return sum(self.reward_window) / (len(self.reward_window) + 1.0)
def save(self):
torch.save({'state_dict': self.model.state_dict(),
'optimizer': self.optimizer.state_dict(),
}, 'last_train_brain.pth')
def load(self):
if os.path.isfile('last_train_brain.pth'):
print("=> loading checkpoint... ")
checkpoint = torch.load('last_train_brain.pth')
self.model.load_state_dict(checkpoint['state_dict'])
self.optimizer.load_state_dict(checkpoint['optimizer'])
print("done !")
else:
print("no checkpoint found...")