-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathHTDAQ.py
More file actions
155 lines (125 loc) · 4.71 KB
/
HTDAQ.py
File metadata and controls
155 lines (125 loc) · 4.71 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
import socket
import u3
import numpy as np
from time import sleep
import threading
import platform
import matplotlib.pyplot as plt
class HTDAQ:
def __init__(self, experiment_id, DEBUG):
self.experiment_id = experiment_id
self.DEBUG = DEBUG
self.ip_address_list = ["137.82.137.183"]
self.port = 1001 if platform == "win32" else 1001
self.sampling_rate = 5000
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.data = []
self.acquisition_thread = None
self.running = False
if not self.DEBUG:
self.create_LabJack_tasks()
def __del__(self):
if not self.DEBUG:
self.u3_device.close()
def create_LabJack_tasks(self):
self.u3_device = u3.U3()
self.u3_device.configU3()
self.u3_device.getCalibrationData()
self.u3_device.configIO(FIOAnalog=0x0F)
self.u3_device.streamConfig(NumChannels=4, PChannels=[0, 1, 2, 3], NChannels=[31, 31, 31, 31], Resolution=3, ScanFrequency=self.sampling_rate)
def data_read_callback(self):
for r in self.u3_device.streamData():
if r is not None:
samples = [r['AIN%d' % i] for i in range(4)]
self.data.append(samples)
if not self.running:
break
else:
break
def start_logging(self):
self.running = True
self.u3_device.streamStart()
self.acquisition_thread = threading.Thread(target=self.data_read_callback)
self.acquisition_thread.start()
def stop_logging(self):
try:
self.running = False
if self.acquisition_thread and self.acquisition_thread.is_alive():
self.acquisition_thread.join()
self.u3_device.streamStop()
except IndexError as e:
print(f"Error stopping stream: {e}")
def save_log(self, filename):
# Convert list of samples to a 2D numpy array (n_samples, n_channels)
self.data = np.hstack(self.data).astype(np.float16)
np.save(filename, self.data)
print("Saved LabJack log (size): ", filename, self.data.shape)
def send_message_to_list(self, message):
message = message.encode()
for address in self.ip_address_list:
print(message, address, self.port)
self.sock.sendto(message, (address, self.port))
def start_cameras(self):
message1 = "ExpStart {} 1 1".format(self.experiment_id)
message2 = "StimStart {} 1 1 1".format(self.experiment_id)
self.send_message_to_list(message1)
self.send_message_to_list(message2)
def stop_cameras(self):
message1 = "StimEnd {} 1 1 1".format(self.experiment_id)
message2 = "ExpEnd {} 1 1".format(self.experiment_id)
self.send_message_to_list(message1)
self.send_message_to_list(message2)
def start_everything(self):
self.start_logging()
sleep(1)
self.start_cameras()
sleep(1)
def stop_everything(self):
self.stop_cameras()
print("Waiting additional 3 seconds before stopping logging.")
sleep(3)
self.stop_logging()
self.save_log(self.ni_log_filename)
self.acquisition_running = False
# Other methods (send_message_to_list, start_cameras, stop_cameras, etc.) remain the same
# Rest of your class implementation remains unchanged
if __name__ == "__main__":
experiment_id = "test_experiment"
DEBUG = False
# Create an instance of the PCODAQ class
daq = HTDAQ(experiment_id, DEBUG)
input("Press enter to start the fake experiment.")
try:
# Start data acquisition
daq.start_logging()
print("Data acquisition started. Running for 10 seconds...")
sleep(1)
daq.start_cameras()
# Wait for 10 seconds
sleep(10)
daq.stop_cameras()
sleep(1)
# Stop data acquisition
daq.stop_logging()
print("Data acquisition stopped.")
# Save the logged data
filename = "labjack_data.npy"
daq.save_log(filename)
print("Data saved to", filename)
# Load the saved data
saved_data = np.load(filename)
# Plotting the data
plt.figure(figsize=(10, 8))
n_channels = saved_data.shape[0]
time_axis = np.arange(saved_data.shape[1]) / float(daq.sampling_rate)
for i in range(n_channels):
plt.subplot(n_channels, 1, i+1)
plt.plot(time_axis, saved_data[i, :])
plt.title(f'Channel {i}')
plt.xlabel('Time (s)')
plt.ylabel('Voltage (V)')
plt.tight_layout()
plt.show()
finally:
# Clean up
del daq