-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathread_write_trajectory.py
More file actions
154 lines (124 loc) · 4.43 KB
/
read_write_trajectory.py
File metadata and controls
154 lines (124 loc) · 4.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
import csv
import numpy as np
import pygame
from PIL import Image
import os
import cv2
def read_coords(fname):
with open(fname) as csvFile:
car_positions = []
readCSV = csv.reader(csvFile, delimiter=',')
for row in readCSV:
car_position = [float(v) for v in row]
car_positions.append(car_position)
return car_positions
def read_traffic_data(fname):
with open(fname) as csvFile:
trajectories_data = []
readCSV = csv.reader(csvFile, delimiter=',')
for row in readCSV:
trajectory_data = [v for v in row]
trajectories_data.append(trajectory_data)
return trajectories_data
def read_data_from_dataset(fname, desired_data):
flag = check_valid_csv(fname)
if flag == 'empty':
raise ValueError('empty dataset csv')
else:
pass
with open(fname) as refCsv:
data = {}
reader = csv.DictReader(refCsv, delimiter='\t')
for row in reader:
for header, value in row.items():
try:
if header in desired_data:
data[header].append(value)
except KeyError:
data[header] = [value]
return data
def write_data(fname, *args):
with open(fname, "a") as csvfile:
writer = csv.writer(csvfile, lineterminator='\n')
export_list = np.array([])
for arg in args:
export_list = np.append(export_list, arg)
export_list = list(export_list)
writer.writerow(export_list)
def check_valid_csv(fname):
try:
split_path = fname.rsplit('/', 1)[0]
os.makedirs(split_path)
except OSError:
pass
try:
open(fname, 'x')
except FileExistsError:
pass
with open(fname, 'r') as csvfile:
reader = csv.reader(csvfile)
try:
first_line = next(reader)
return 'not_empty'
except StopIteration:
return 'empty'
def write_state_buf(fname, args):
fieldnames = ['CarPositionX', 'CarPositionY', 'CarAngle', 'Acceleration', 'Velocity']
flag = check_valid_csv(fname)
if flag == 'empty':
try:
with open(fname, 'w') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t', lineterminator='\n')
writer.writeheader()
except :
pass
elif flag == 'not_empty':
try:
with open(fname, 'a') as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=fieldnames, delimiter='\t', lineterminator='\n')
writer.writerow({'CarPositionX': args[0],
'CarPositionY': args[1],
'CarAngle': args[2],
'Acceleration': args[3],
'Velocity': args[4]})
#'ImageName': args[5]})
except:
pass
def save_frame(screen, frame_name, path):
try:
os.makedirs(path)
except OSError:
pass
screendata = pygame.surfarray.array3d(screen)
screendata = np.rot90(screendata, axes=(0, 1))
screendata = np.flipud(screendata)
img = Image.fromarray(screendata)
img.save(path + '/' + frame_name)
def resize_image(image, size):
image = pygame.surfarray.array3d(image)
resized_image = cv2.resize(image, size)
resized_image = pygame.surfarray.make_surface(resized_image)
return resized_image
def write_distances_as_int(fname, distances_vector):
flag = check_valid_csv(fname)
# if type(distances_vector) != np.array:
# distances_vector = np.asarray(distances_vector, dtype=np.float32)
try:
with open(fname, 'a') as csvfile:
writer = csv.writer(csvfile, lineterminator='\n')
distances_vector = map(int, distances_vector)
writer.writerow(distances_vector)
except :
raise OSError('path to distances.txt does not exists')
def read_distances(fname):
try:
with open(fname, 'r') as csvfile:
distances_dataset = []
reader = csv.reader(csvfile, lineterminator='\n')
for row in reader:
np_row = np.asarray(row)
distances_dataset.append(np_row)
distances_dataset = np.asanyarray(distances_dataset)
return distances_dataset
except:
raise OSError('path to distances.txt does not exists')