-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathsimulate_data.py
More file actions
132 lines (115 loc) · 4.56 KB
/
simulate_data.py
File metadata and controls
132 lines (115 loc) · 4.56 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
import csv
import math
import random
import time
from pathlib import Path
DATA_DIR = Path("/home/pi/ICE_PLANT/data")
SAMPLE_RATE = 4.0
BLOCK_INTERVAL = 1.0
def open_csv_for_hour(ts: float):
hour_stamp = time.strftime("%Y%m%d_%H00", time.localtime(ts))
csv_path = DATA_DIR / f"ctg_frames_sim_{hour_stamp}.csv"
csv_file = open(csv_path, "w", newline="")
csv_writer = csv.writer(csv_file)
csv_writer.writerow(["timestamp", "payload_len", "payload_hex"])
print(f"Writing simulated data to {csv_path}")
return csv_file, csv_writer, hour_stamp
def clamp(value: float, lo: float, hi: float) -> float:
return max(lo, min(hi, value))
def encode_hr_sample(bpm: float, quality: int, fmp: int) -> bytes:
value = int(round(bpm / 0.25))
value = max(0, min(1200, value))
hi = ((value >> 8) & 0x07) | ((quality & 0x03) << 5) | ((fmp & 0x03) << 3)
lo = value & 0xFF
return bytes([hi, lo])
def smooth_step(value: float, step: float, lo: float, hi: float) -> float:
return clamp(value + random.uniform(-step, step), lo, hi)
def generate_toco_series(base: float, count: int) -> list[int]:
out = []
for i in range(count):
t = time.time() + i / SAMPLE_RATE
wave = 6.0 * math.sin(t * 2.0 * math.pi * 0.015)
bump = 0.0
# Contraction: ~20s rise/fall every ~60s.
if int(t) % 60 < 20:
bump = 50.0 * math.sin((t % 20) * math.pi / 20.0)
val = clamp(base + wave + bump + random.uniform(-0.5, 0.5), 0.0, 127.0)
out.append(int(round(val)))
return out
def contraction_factor(t: float) -> float:
phase = t % 60.0
if phase >= 20.0:
return 0.0
# Asymmetric rise/fall for a more realistic contraction shape.
rise = phase / 8.0
fall = (20.0 - phase) / 12.0
return min(rise, fall)
BASELINE_HR1 = 140.0
BASELINE_HR2 = 135.0
BASELINE_MHR = 80.0
BASELINE_SPO2 = 98.0
def build_payload() -> bytes:
global BASELINE_HR1, BASELINE_HR2, BASELINE_MHR, BASELINE_SPO2
payload = bytearray()
payload.append(ord("S"))
payload.extend([0x80, 0x00])
hr1 = []
hr2 = []
mhr = []
last_mhr = BASELINE_MHR
for i in range(4):
t = time.time() + i / SAMPLE_RATE
cf = contraction_factor(t)
# Slow random-walk baselines.
BASELINE_HR1 = smooth_step(BASELINE_HR1, 0.1, 120.0, 160.0)
BASELINE_HR2 = smooth_step(BASELINE_HR2, 0.12, 115.0, 155.0)
BASELINE_MHR = smooth_step(BASELINE_MHR, 0.08, 65.0, 95.0)
# HR1: mild accelerations during contractions.
hr1_val = BASELINE_HR1 + 5.0 * cf + random.uniform(-0.4, 0.4)
# HR2: subtle late decelerations (delay by ~5s).
cf_late = contraction_factor(t - 5.0)
hr2_val = BASELINE_HR2 - 8.0 * cf_late + random.uniform(-0.6, 0.6)
# MHR: steady adult rate with mild rise during contractions.
mhr_val = BASELINE_MHR + 1.5 * cf + random.uniform(-0.3, 0.3)
hr1.append(clamp(hr1_val, 110.0, 170.0))
hr2.append(clamp(hr2_val, 100.0, 165.0))
mhr.append(clamp(mhr_val, 60.0, 110.0))
last_mhr = mhr[-1]
for bpm in hr1:
payload.extend(encode_hr_sample(bpm, quality=2, fmp=0))
for bpm in hr2:
payload.extend(encode_hr_sample(bpm, quality=1, fmp=0))
for bpm in mhr:
payload.extend(encode_hr_sample(bpm, quality=2, fmp=0))
toco = generate_toco_series(10.0, 4)
payload.extend(toco)
payload.extend([0x21, 0x10])
payload.append(0x04)
# FSpO2: mild random-walk around 96-99 with slight dip tracking MHR.
BASELINE_SPO2 = smooth_step(BASELINE_SPO2, 0.05, 94.0, 99.0)
# Adverse event: brief dip every ~180s for ~12s.
t = time.time()
event_dip = 0.0
if int(t) % 180 < 12:
event_dip = 3.0 * math.sin((t % 12) * math.pi / 12.0)
spo2_val = BASELINE_SPO2 - 0.02 * (last_mhr - BASELINE_MHR) - event_dip + random.uniform(-0.1, 0.1)
payload.append(int(round(clamp(spo2_val, 92.0, 99.0))))
return bytes(payload)
def main() -> None:
DATA_DIR.mkdir(parents=True, exist_ok=True)
csv_file, writer, current_hour = open_csv_for_hour(time.time())
try:
while True:
now = time.time()
hour_stamp = time.strftime("%Y%m%d_%H00", time.localtime(now))
if hour_stamp != current_hour:
csv_file.close()
csv_file, writer, current_hour = open_csv_for_hour(now)
payload = build_payload()
writer.writerow([now, len(payload), payload.hex(" ")])
csv_file.flush()
time.sleep(BLOCK_INTERVAL)
finally:
csv_file.close()
if __name__ == "__main__":
main()