This repository has been archived by the owner on Dec 26, 2018. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 97
/
Copy pathparameterservermodel.py
149 lines (120 loc) · 20 KB
/
parameterservermodel.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
import tensorflow as tf
import numpy as np
import time
import json
import cStringIO
import base64
#from memory_profiler import profile
#import sys
#mod (by default, the error log is not written on local disk where the Driver is running)
isWritingErrorLogOnLocaldisk = False
# (set the same flag in tensorspark.py)
class ParameterServerModel():
def __init__(self, x, y_, compute_gradients, apply_gradients, minimize, error_rate, session, batch_size):
self.session = session
self.batch_size = batch_size
self.graph = session.graph
self.session.graph.as_default().__enter__()
self.x = x
self.y_ = y_
self.compute_gradients = compute_gradients
self.apply_gradients = apply_gradients
self.error_rate = error_rate
self.error_rate_summary = tf.scalar_summary("error_rate", error_rate)
self.minimize = minimize
self.reset_gradients()
self.gradient_counter = tf.Variable(initial_value=0, trainable=False)
self.parameter_assignments = [None]*len(self.compute_gradients)
for i in xrange(len(self.compute_gradients)):
gradient = self.compute_gradients[i][0]
variable = self.compute_gradients[i][1]
self.parameter_assignments[i] = variable.assign(gradient)
self.session.run(tf.initialize_all_variables())
def get_num_classes(self):
return self.y_.get_shape().as_list()[1]
def train(self, labels, features):
with self.session.as_default():
feed = {self.x: features, self.y_: labels}
for i in range(len(self.compute_gradients)):
self.gradients[i] += self.compute_gradients[i][0].eval(feed_dict=feed)
self.num_gradients += 1
del feed
#del plus_this
#return error_rate
def test(self, labels, features):
with self.session.as_default():
feed = {self.x: features, self.y_: labels}
test_error_rate = self.error_rate.eval(feed_dict=feed)
del feed
return test_error_rate
#@profile(stream=sys.stdout)
def get_parameters(self):
with self.session.as_default():
# return np.array([grad_var[1].eval(session=self.session) for grad_var in self.compute_gradients])
result = [None]*len(self.compute_gradients)
for i in xrange(len(self.compute_gradients)):
result[i] = self.compute_gradients[i][1].eval(session=self.session)
array = np.array(result)
del result[:]
del result
return array
#return np.asarray(result)
def assign_parameters(self, parameters):
with self.session.as_default():
self.reset_gradients()
for i, grad_var in enumerate(self.compute_gradients):
self.parameter_assignments[i].eval(feed_dict={grad_var[0]:parameters[i]})
#@profile(stream=sys.stdout)
def apply(self, gradients):
with self.graph.as_default():
feed_dict = {}
for i, grad_var in enumerate(self.compute_gradients):
feed_dict[grad_var[0]] = gradients[i]
self.apply_gradients.run(session=self.session, feed_dict=feed_dict)
del feed_dict
del gradients
def get_gradients(self):
# with self.session.as_default():
result = [None]*(1+len(self.gradients))
for i in xrange(len(self.gradients)):
result[i+1] = np.divide(self.gradients[i],self.num_gradients).astype('float32')
result[0] = [time.time()]
array = np.array(result)
del result[:]
del result
return array
def reset_gradients(self):
with self.session.as_default():
self.gradients = [tf.zeros(g[1].get_shape()).eval() for g in self.compute_gradients]
self.num_gradients = 0
def train_warmup(self, partition, error_rates_filename):
error_rates = []
iteration = 0
batch_size = self.batch_size
for i in range(0, len(partition), batch_size):
data = partition[i:i+batch_size]
labels, features = self.process_data(data)
if len(labels) is 0:
break
with self.session.as_default():
#accuracy = self.train(labels, features)
feed = {self.x: features, self.y_: labels}
self.minimize.run(feed_dict = feed)
error_rate = self.error_rate.eval(feed_dict=feed)
t = time.time()
#mod:
if isWritingErrorLogOnLocaldisk == True:
with open(error_rates_filename, 'a') as f:
f.write('%f , %f\n' % (t,error_rate))
error_rates.append(error_rate)
iteration += 1
print 'Warmup training iteration %d at %f error_rate' % (iteration, error_rate)
return error_rates
def process_data(self, data):
raise AssertionError('function not implemented')
def process_partition(self, partition):
raise AssertionError('function not implemented')
def serialize(self, array):
return array.dumps()
def deserialize(self, serialized):
return np.loads(serialized)