-
Notifications
You must be signed in to change notification settings - Fork 6
Expand file tree
/
Copy pathBatchHandler.py
More file actions
275 lines (237 loc) · 13.3 KB
/
BatchHandler.py
File metadata and controls
275 lines (237 loc) · 13.3 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
# Class that handles the batch sampling, all data etc
# In main this should be init'd three times, for each data pool
# I need to brainstorm how to implement the features in this class, such as batch selection bias
# Also data augmentation goes here.
import random
import pandas as pd
import numpy as np
from sklearn import preprocessing
from sklearn.model_selection import StratifiedShuffleSplit
from imblearn.over_sampling import RandomOverSampler
import sys
import time
class BatchHandler:
def __init__(self, data_pool, parameters, training):
self.data_pool = data_pool
self.parameters = parameters
self.batch_size = parameters['batch_size']
self.training = training
# Training is defined as the boolean flag of whether the data is for training or test
# During training, the data is sampled from a pool
# During test, the data is sampled sequentially, and exhaustively.
# A vector needs to be given whether the data is padding data at the end of the dataset
# A return state needs to be given to state if all test data is given.
self.categorical = True
self.d_thresh_range = None
self.val_minibatch_idx = 0
self.d_thresh = None
self.reduced_pool = None
self.distance_pool_cache = {}
self.input_mask = pd.Series([np.tile(self.parameters['input_mask']
,
(self.parameters['observation_steps'],1)
)
for x in range(self.batch_size)
],
dtype=object,index=([0]*self.batch_size))
# Generate balanced index list
ros = RandomOverSampler()
if 'relative' in self.parameters['ibeo_data_columns'][0]:
selection_data = list(data_pool.relative_destination.values)
else:
selection_data = list(data_pool.track_class.values)
le = preprocessing.LabelEncoder()
le.fit(selection_data)
indexed_classes = np.array(le.transform(selection_data))
ros.fit(np.expand_dims(range(len(indexed_classes)),1),indexed_classes)
balanced_idxs, balanced_classes = ros.sample(np.expand_dims(range(len(indexed_classes)),1),indexed_classes)
self.balanced_idxs = np.squeeze(balanced_idxs)
# bf = data_pool.iloc[balanced_idxs]
# class_dict = {}
# for class_t in data_pool.track_class.unique():
# class_dict[class_t] = len(bf[bf.track_class==class_t])/float(len(bf))
return
def get_input_size(self):
return len(self.data_pool.iloc[0]['encoder_sample'][0])
def get_num_classes(self):
return len(self.data_pool['destination'].unique())
def set_distance_threshold(self, d_thresh=None):
self.d_thresh = d_thresh
# for every track_idx, find the sample that is max(d<thresh)
if d_thresh is None:
self.reduced_pool = None
return
rp = []
for track_idx in self.data_pool['track_idx'].unique():
pool = self.data_pool[self.data_pool['track_idx'] == track_idx]
tp = pool[pool['distance'] < d_thresh] # thresholded pool - everything that came before d_thresh
# Sort by distance, pick closest
try:
record = tp.sort_values('distance', ascending=False).iloc[range(self.parameters['d_thresh_top_n'])]
except IndexError:
continue
# Double list as it will return a Series of type object otherwise, ruining all labels, breaking the data
# structure, and wasting an afternoon of my life.
# if the closest point is within 1m, it is valid data. Else we have no data at this threshold.
if record.iloc[0].distance > (d_thresh -1):
rp.append(record)
self.reduced_pool = pd.concat(rp)
return
def set_distance_threshold_ranges(self, d_thresh_range):
self.d_thresh_range = d_thresh_range
return
# Function that gets the data as a list of sequences, (which are time length lists of features)
# i.e. a list of length batch size, containing [time, input_size] elements
# and converts it to a list of length time, containing [batch input_size] elements
def format_minibatch_data(self, X, Y, batchwise_padding, trackwise_padding=None):
if type(X) is not list:
X = list(X)
if type(Y) is not list:
Y = list(Y)
if type(batchwise_padding) is not list:
batchwise_padding = list(batchwise_padding)
if trackwise_padding is not None:
if type(trackwise_padding) is not list:
trackwise_padding = list(trackwise_padding)
batch_observation_inputs, batch_future_inputs, batch_weights, batch_labels, formatted_trackwise_padding = \
[], [], [], [], []
# Batch encoder inputs are just re-indexed encoder_inputs.
# Need to re-index to make an encoder_steps long list of shape [batch input_size]
# currently it is a list of length batch containing shape [timesteps input_size]
for length_idx in xrange(self.parameters['observation_steps']):
batch_observation_inputs.append(
np.array([X[batch_idx][length_idx]
for batch_idx in xrange(self.batch_size)], dtype=np.float32))
for length_idx in xrange(self.parameters['prediction_steps']):
batch_future_inputs.append(
np.array([Y[batch_idx][length_idx]
for batch_idx in xrange(self.batch_size)], dtype=np.float32))
batch_weight = np.logical_not(batchwise_padding) * np.ones(self.batch_size, dtype=np.float32)
batch_weights.append(batch_weight)
if trackwise_padding is not None:
formatted_trackwise_padding.append(
np.array([trackwise_padding[batch_idx][length_idx]
for batch_idx in xrange(self.batch_size)], dtype=np.bool))
# Encapsulate the label data in a list of size 1 to mimic a decoder seq of len 1
if self.parameters['prediction_steps'] == 0:
batch_labels = [Y]
batch_weights = [np.logical_not(batchwise_padding) * np.ones(self.batch_size, dtype=np.float32)]
if trackwise_padding is None:
formatted_trackwise_padding = None
# Batch_observation_inputs is now list of len encoder_steps, shape batch, input_size.
# Similarly with batch_future_inputs
return batch_observation_inputs, batch_future_inputs, batch_weights, batch_labels, formatted_trackwise_padding
# This function collects the mini-batch for training
# If the network is under test, it will sequentially feed the testing data in size minibatch
# The last mini-batch for the dataset is padded with junk data (taken from the start of the sequence)
# The batch_complete flag signals the last mini-batch for the batch, so the system should collate results
# pad_vector is TRUE if the data is junk (padding data)
def get_minibatch(self):
# TODO Research
# Bias sampling, importance sampling, weighted sampling
if self.training:
# Select randomly such that there is a balance between classes. Over sampling is used for small classes
batch_idxs = np.random.choice(self.balanced_idxs, self.batch_size, replace=False)
else:
# Select uniformly at random
batch_idxs = np.random.choice(range(len(self.data_pool)), self.batch_size, replace=False)
# class_dict = {} # BALANCER VERIFICATION CODE
# for class_t in data_pool.track_class.unique():
# class_dict[class_t] = len(bf[bf.track_class==class_t])/float(len(bf))
# print class_dict
batch_frame = self.data_pool.iloc[batch_idxs].copy()
num_columns = batch_frame.encoder_sample.iloc[0].shape[1]
if self.training and self.parameters['augmentation_chance'] > 0.001:
# Generate same size matrix that contains the offsets
# i.e. e,n,0,0 * REPMATRIX(encoder_length) for all samples
# np.tile([randomx,randomy,0,0],(len_enc_samples,1))
aug = pd.Series([
np.tile([
self.parameters['aug_function'](*self.parameters['aug_range']),
self.parameters['aug_function'](*self.parameters['aug_range'])
]+[0.0]*(num_columns-2)
,
(self.parameters['observation_steps'],1)
)
for x in range(self.batch_size)
],
dtype=object,index=([0]*self.batch_size))
aug_mask = pd.Series([
np.tile([np.random.choice([1.0,0.0],p=[self.parameters['augmentation_chance'],
1-self.parameters['augmentation_chance']])]
* num_columns
,
(self.parameters['observation_steps'], 1)
)
for x in range(self.batch_size)
],
dtype=object, index=([0] * self.batch_size))
batch_frame.encoder_sample = batch_frame.encoder_sample + (aug*aug_mask)
batch_frame.encoder_sample = batch_frame.encoder_sample*self.input_mask
batch_frame = batch_frame.assign(batchwise_padding=np.zeros(self.batch_size, dtype=bool))
return batch_frame # batch_X, batch_Y, batch_weights
# Testing / validating
def get_sequential_minibatch(self):
# Pick sequentially, compute padding vector
if self.d_thresh is None:
data_pool = self.data_pool
else:
data_pool = self.reduced_pool
#data_pool = NEW FUNCTION THAT RUNS ALL THE SET_DIS_THRESHOLD AND ADDS D_THRESH TO THE FRAME
# if d_thresh is not none, I would reduce the dataset some way
if self.d_thresh_range is not None:
data_pool = self.generate_distance_pool()
# If we do not have enough data remaining to fill a batch
if (self.val_minibatch_idx+self.batch_size) > len(data_pool):
# Collect the remaining data
batch_frame = data_pool.iloc[self.val_minibatch_idx:].copy()
batch_frame = batch_frame.assign(batchwise_padding=np.zeros(len(batch_frame), dtype=bool))
total_padding = self.batch_size - (len(batch_frame))
pad_vector = np.zeros(self.batch_size, dtype=bool)
#The last n are garbage
pad_vector[-total_padding:] = True
while len(batch_frame) < self.batch_size:
# Add garbage to the end, repeat if necessary
pad_length = self.batch_size - (len(batch_frame))
# This works because if pad_length > len(data_pool), it just returns the whole pool
padding_frame = data_pool.iloc[0:pad_length].copy()
padding_frame = padding_frame.assign(batchwise_padding=np.ones(len(padding_frame), dtype=bool))
batch_frame = pd.concat([batch_frame, padding_frame])
batch_complete = True
self.val_minibatch_idx = 0
else:
batch_frame = data_pool.iloc[self.val_minibatch_idx:self.val_minibatch_idx + self.batch_size].copy()
batch_frame = batch_frame.assign(batchwise_padding=np.zeros(len(batch_frame), dtype=bool))
self.val_minibatch_idx += self.batch_size
# Did we get lucky and have no remainder?
if self.val_minibatch_idx == len(data_pool):
batch_complete = True
self.val_minibatch_idx = 0
else:
batch_complete = False
return batch_frame, batch_complete
# Is this just the above function with a pool colleciton and partnered distance list?
# Do I even need the partnered list, or just append d_thresh?
def generate_distance_pool(self):
pool_list = []
try:
return self.distance_pool_cache[tuple(self.d_thresh_range)]
except KeyError:
busy_indicator = ['.', 'o', 'O','@', '*']
batch_counter = 0
print ''
cache_start = time.time()
for dis in self.d_thresh_range:
sys.stdout.write("\rGenerating validation data pool cache...%s" % busy_indicator[batch_counter % len(busy_indicator)])
sys.stdout.flush()
self.set_distance_threshold(dis)
local_pool = self.reduced_pool.copy()
local_pool = local_pool.assign(d_thresh=np.repeat(dis, len(local_pool)))
pool_list.append(local_pool)
batch_counter+=1
pool_df = pd.concat(pool_list)
self.distance_pool_cache[tuple(self.d_thresh_range)] = pool_df
print ''
print "Time for caching: " + str(time.time()-cache_start)
return pool_df
# Unwind all pools into one big pool with partnered distance list.