-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathProfile.py
More file actions
249 lines (199 loc) · 10.8 KB
/
Profile.py
File metadata and controls
249 lines (199 loc) · 10.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
import json
import os
from os.path import isfile, join
import logging
import time
import copy
from typing import Union, Tuple
log = logging.getLogger(__name__)
log.level = logging.DEBUG
def convert_old_profile(old_profile: dict) -> dict:
new_segments = []
for i, t_t in enumerate(old_profile['data']):
new_segments.append({"time": t_t[0], "temperature": t_t[1]})
new_profile = {"name": old_profile['name'], "segments": new_segments}
return new_profile
def save_old_profile_as_new(old_profile: dict):
new_profile = convert_old_profile(old_profile)
last_seg_time = 0
for segment in new_profile['segments']:
seg_time = segment['time'] - last_seg_time
last_seg_time = segment['time']
segment['time'] = seg_time / 60
new_profile['segments'] = new_profile['segments'][1:]
file_name = new_profile['name'] + '.' + 'json'
path = os.path.abspath(os.path.join(os.path.dirname(__file__), 'NewProfiles/', file_name))
with open(path, 'w') as f:
f.write(json.dumps(new_profile))
def convert_old_profile_ms(name: str, segments: list, start_ms: float) -> dict:
new_segments = []
for i, t_t in enumerate(segments):
new_segments.append({"time": t_t[0], "temperature": t_t[1], "time_ms": t_t[0] * 1000 + start_ms})
new_profile = {"name": name, "segments": new_segments}
return new_profile
class Profile:
def __init__(self):
self.profiles_directory = os.path.abspath(os.path.join(os.path.dirname(__file__), '.', 'Profiles'))
self.original = None
self.data = None
self.name = None
self.current_segment = None
def load_profile_by_name(self, file: str):
profile_path = os.path.join(self.profiles_directory, file)
log.debug(profile_path)
with open(profile_path) as infile:
profile_json = json.dumps(json.load(infile))
prf = json.loads(profile_json)
self.name = prf["name"]
self.data = sorted(prf["data"])
self.original = copy.deepcopy(self.data)
def get_profiles_names(self) -> list:
profile_names = []
files = [f for f in os.listdir(self.profiles_directory) if isfile(join(self.profiles_directory, f))]
for file in files:
profile_names.append({'name': file.split('.')[0]})
return profile_names
def get_duration(self) -> int:
return max([t for (t, x) in self.data])
# x = (y-y1)(x2-x1)/(y2-y1) + x1
@staticmethod
def find_x_given_y_on_line_from_two_points(y: float, point1: list, point2: list) -> float:
if point1[0] > point2[0]:
return 0 # time2 before time1 makes no sense in kiln segment
if point1[1] >= point2[1]:
return 0 # Zero will crash. Negative temperature slope, we don't want to seek a time.
x = (y - point1[1]) * (point2[0] - point1[0]) / (point2[1] - point1[1]) + point1[0]
return x
def find_next_time_from_temperature(self, temperature: float) -> Tuple[float, int]:
time = 0 # The seek function will not do anything if this returns zero, no useful intersection was found
segment = 0
for index, point in enumerate(self.data):
if point[1] >= temperature:
if index > 0: # Zero here would be before the first segment
if self.data[index - 1][1] <= temperature: # We have an intersection
time = self.find_x_given_y_on_line_from_two_points(temperature, self.data[index - 1], point)
segment = index - 1
if time == 0:
if self.data[index - 1][1] == point[1]: # It's a flat segment that matches the temperature
time = self.data[index - 1][0]
break
return time, segment
def hot_start(self, temperature: float) -> float:
t_time, segment = self.find_next_time_from_temperature(temperature)
self.current_segment = segment
self.segment_start = time.time() / 1000 - self.data[segment][0]
return t_time
def get_surrounding_points(self, time):
if time > self.get_duration():
return None, None
# Each segment must have a start and an end.
# current_segment is an index to the profile data. current_segment + 1 data has to exist to make a segment.
prev_point = self.data[self.current_segment]
next_point = self.data[self.current_segment + 1]
return prev_point, next_point
def is_segment_cooling(self) -> bool:
cooling = False
if self.current_segment is not None:
prev_point = self.data[self.current_segment]
next_point = self.data[self.current_segment + 1]
if next_point[1] - prev_point[1] < 0:
cooling = True
return cooling
def get_target_temperature(self, time: float, future=False) -> Union[float, str]:
if self.current_segment is None:
return self.data[0][1]
if time > self.get_duration(): # Return the final temperature
if self.current_segment != len(self.data) - 2: # Return the max of the last two, this is not the current segment
return max(self.data[len(self.data) -1][1], self.data[len(self.data) -2][1])
return self.data[len(self.data) -1][1]
prev_point, next_point = self.get_surrounding_points(time)
if time > next_point[0]: # Looking ahead in time into the next segment
if future:
next_point_plus = self.data[self.current_segment + 2] # This has to exist if the time is within the duration.
if next_point_plus[1] - next_point[1] >= 0: # Only if the next segment is heating or flat.
incl = float(next_point_plus[1] - next_point[1]) / float(next_point_plus[0] - next_point[0])
temp = next_point[1] + (time - next_point[0]) * incl
else: # Next segment is cooling.
if next_point[1] > prev_point[1]: # this is a temperature peak. Extropolate to avoid a delay. (Causing an overshoot.)
incl = float(next_point[1] - prev_point[1]) / float(next_point[0] - prev_point[0])
temp = prev_point[1] + (time - prev_point[0]) * incl
else: # Use the hottest temp in the next segment so it switches segments
temp = next_point[1]
else:
temp = next_point[1]
else:
incl = float(next_point[1] - prev_point[1]) / float(next_point[0] - prev_point[0])
temp = prev_point[1] + (time - prev_point[0]) * incl
# log.debug('Target temperature: ' + str(temp))
return temp
def get_target_slope(self, time_seconds: float) -> float:
if time_seconds > self.get_duration():
return 0
if self.current_segment is None:
return 0
(prev_point, next_point) = self.get_surrounding_points(time_seconds)
slope = (next_point[1] - prev_point[1]) / (next_point[0] - prev_point[0])
return slope
def check_shift_profile(self, time_since_start, min_temp, zone) -> bool:
update = False
slope = zone.slope
if not isinstance(slope, str):
target_slope = self.get_target_slope(time_since_start)
if slope > 0 and slope < target_slope: # Not catching up, reduce the target slope.
if self.current_segment is not None:
prev_point = self.data[self.current_segment]
next_point = self.data[self.current_segment + 1]
if min_temp - prev_point[1] > 5: # Avoid divide by small number
min_temp += 5 # Stop temperature drop from PID (?)
if slope / target_slope < 0.9: # The slope is not very stable, don't over do it
slope = target_slope * 0.9
_, delta_t_next = self.delta_t_from_slope(slope, prev_point, next_point,
time_since_start, min_temp)
if delta_t_next <= 0:
log.debug('Slope: ' + str(slope) + ' Target slope: ' + str(target_slope))
log.debug('delta_t_next: ' + str(delta_t_next))
for index, time_temp in enumerate(self.data):
if index > self.current_segment:
time_temp[0] += delta_t_next
update = True
# Add a new segment (for this firing only) to the profile so it doesn't change history.
self.data.insert(self.current_segment + 1, [time_since_start, min_temp])
self.check_switch_segment(time_since_start + 1)
# log.debug('Target slope: ' + str(self.get_target_slope(time_since_start)))
return update
def delta_t_from_slope(self, desired_slope, prev_point, next_point, time_since_start, min_temp) -> Tuple[
float, float]:
# desired_slope is the maximum achievable slope. Move the current segment start and end times to go through the
# current point at this slope.
# T = mt + b, m=slope
# This point: t0=time_since_start, T0=min_temp. b = T0-mt0
# Previous point: t1 is unknown, T1=prev_point[1]. t1=(T1-b)/m
# Next point: t2 is unknown, T2=next_point[1]. t2=(T2-b)/m
m = desired_slope # Degrees C per second
b = min_temp - m * time_since_start
t0 = (prev_point[1] - b) / m
t1 = (next_point[1] - b) / m
delta_t_prev = t0 - prev_point[0]
delta_t_next = t1 - next_point[0]
return delta_t_prev, delta_t_next # Seconds
def check_switch_segment(self, time_since_start: float) -> Tuple[bool, bool, bool]:
segment_change = False
update = False
firing_finished = False
if self.current_segment is None: # Firing has not started yet
self.current_segment = 0
segment_change = True
for time_temp in self.data:
time_temp[0] += time_since_start
update = True # Shift profile start on start of first segment
else:
log.debug('Time since start: ' + str(time_since_start))
# Require both time and temperature to switch to the next segment
if time_since_start >= self.data[self.current_segment + 1][0]:
self.current_segment += 1
segment_change = True
if self.current_segment >= len(self.data) - 1: # Last segment, finish
firing_finished = True
log.info('Segment: ' + str(self.current_segment))
log.info('Profile data: ' + str(self.data))
return segment_change, update, firing_finished