Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
1bb0c14
update
tpollina Nov 10, 2025
fa90d49
Merge branch 'main' into update-dashboard
tpollina Nov 10, 2025
cc2ad8c
Merge branch 'main' into update-dashboard
tpollina Nov 10, 2025
b2a4265
Merge branch 'main' into update-dashboard
tpollina Nov 10, 2025
81dac17
Merge branch 'main' into update-dashboard
tpollina Nov 10, 2025
291b924
Merge branch 'main' into update-dashboard
tpollina Nov 10, 2025
1e3df1d
update
tpollina Nov 13, 2025
73e82d8
update
tpollina Nov 19, 2025
fa0ca2a
update
tpollina Nov 20, 2025
7efebd5
update
tpollina Nov 20, 2025
cab29be
update
tpollina Nov 20, 2025
4a881ea
update
tpollina Nov 25, 2025
da8bd58
update
tpollina Nov 25, 2025
c4ff0f2
Merge branch 'main' into update-dashboard
tpollina Nov 25, 2025
742121b
update
tpollina Nov 25, 2025
efbf771
update
tpollina Nov 25, 2025
b8ac594
update
tpollina Nov 26, 2025
1362391
update
tpollina Nov 27, 2025
ad68a28
update
tpollina Nov 27, 2025
1b9f4de
Merge branch 'main' into update-dashboard
sonnyp Nov 27, 2025
7fc6eb4
update
tpollina Nov 29, 2025
f6f22e5
update
tpollina Nov 29, 2025
4e94cb2
update
tpollina Nov 29, 2025
8e80405
update
tpollina Nov 29, 2025
6ce95d2
update
tpollina Nov 30, 2025
055274c
update
tpollina Nov 30, 2025
951abb2
update
tpollina Nov 30, 2025
06ef9b5
update
tpollina Nov 30, 2025
392b194
update
tpollina Nov 30, 2025
c075703
update
tpollina Nov 30, 2025
451f99b
update
tpollina Nov 30, 2025
74627f0
update
tpollina Nov 30, 2025
89ec927
update
tpollina Nov 30, 2025
7be0035
update
tpollina Dec 1, 2025
a7a0a59
update
tpollina Dec 4, 2025
2bb4fe3
Merge branch 'main' into update-dashboard
tpollina Dec 4, 2025
2d9c33c
controller: Increase range for LED value on 2.6
sonnyp Dec 2, 2025
699c459
update
tpollina Dec 8, 2025
a6bb031
update
tpollina Dec 8, 2025
ea9d6c4
f
tpollina Dec 9, 2025
c13eced
save
tpollina Feb 2, 2026
ef9bd25
Merge branch 'main' into update-dashboard
tpollina Feb 2, 2026
e4c28db
update
sonnyp Feb 2, 2026
698f8a2
update
sonnyp Feb 2, 2026
dbf0ce1
feat: live segmentation with acquisition synchronization fixes
babo989 Feb 2, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 15 additions & 1 deletion controller/imager/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ def __init__(self) -> None:
self._stop_receiving_mqtt = threading.Event() # close() was called
self._done = threading.Event() # run_discrete() finished or stop() was called
self._discrete_run = threading.Lock() # mutex on starting the pump
self._waiting_for_pump = False # FIX: Only accept Done after pump command sent

def open(self) -> None:
"""Start the pump MQTT client.
Expand Down Expand Up @@ -412,7 +413,17 @@ def _receive_messages(self) -> None:
self._mqtt.read_message()
continue

# FIX: Only process Done if we are actually waiting for pump to finish
# This prevents retained/stale Done messages from triggering early return
if not self._waiting_for_pump:
loguru.logger.debug(
f"Ignoring pump Done (not waiting for pump): {self._mqtt.msg['payload']}"
)
self._mqtt.read_message()
continue
Comment on lines +416 to +423
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I introduced the retained messages. Can you elaborate on what the issue is? Maybe with a sequence of pseudo events.


loguru.logger.debug(f"The pump has stopped: {self._mqtt.msg['payload']}")
self._waiting_for_pump = False # FIX: Clear waiting flag
self._mqtt.client.unsubscribe("status/pump")
self._mqtt.read_message()
self._done.set()
Expand All @@ -436,6 +447,7 @@ def run_discrete(self, settings: stopflow.DiscretePumpSettings) -> None:
# We ignore the pylint error here because the lock can only be released from a different
# thread (the thread which calls the `handle_status_update()` method):
self._discrete_run.acquire() # pylint: disable=consider-using-with
self._waiting_for_pump = False # FIX: Not waiting yet (ignore retained messages)
self._done.clear()
self._mqtt.client.subscribe("status/pump")
self._mqtt.client.publish(
Expand All @@ -446,9 +458,11 @@ def run_discrete(self, settings: stopflow.DiscretePumpSettings) -> None:
"direction": settings.direction.value,
"flowrate": settings.flowrate,
"volume": settings.volume,
"from_acquisition": True, # FIX: Tag as acquisition command
}
),
)
self._waiting_for_pump = True # FIX: NOW we're waiting for pump Done
self._done.wait()

def stop(self) -> None:
Expand All @@ -457,7 +471,7 @@ def stop(self) -> None:
raise RuntimeError("MQTT client was not initialized yet!")

self._mqtt.client.subscribe("status/pump")
self._mqtt.client.publish("actuator/pump", '{"action": "stop"}')
self._mqtt.client.publish("actuator/pump", '{"action": "stop", "from_acquisition": true}')

def close(self) -> None:
"""Close the pump MQTT client, if it's currently open.
Expand Down
5 changes: 4 additions & 1 deletion controller/imager/stopflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,10 @@ def run_step(self) -> typing.Optional[tuple[int, str]]:
+ f"{capture_path}...",
)
self._camera.capture_file(capture_path)
os.sync()
# FIX: Use fsync on specific file to ensure write completes before MQTT publish
# os.sync() is system-wide and async - doesn't guarantee this file is written
with open(capture_path, "rb") as f:
os.fsync(f.fileno())
Comment on lines +136 to +139
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
# FIX: Use fsync on specific file to ensure write completes before MQTT publish
# os.sync() is system-wide and async - doesn't guarantee this file is written
with open(capture_path, "rb") as f:
os.fsync(f.fileno())
# Use fsync to ensure write completes before MQTT publish
with open(capture_path, "rb") as f:
os.fsync(f.fileno())

makes sense

I'm going to benchmark this as part of https://github.com/fairscope/PlanktoScope3/issues/375 to measure impact but I agree we should do this anyway

at least until we optimize sequence of events and start capturing n+1 before n finished writing

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

os.fsync(f.fileno()) end us being slightly faster than os.sync

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

# Note(ethanjli): updating the integrity file is the responsibility of the code which
# calls this `run_step()` method.

Expand Down
23 changes: 23 additions & 0 deletions controller/pump/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
pump_max_speed = 50

pump_started = False
# FIX: Track acquisition state to prevent UI commands from interrupting acquisition
acquisition_in_progress = False
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is something I wanted to have as well 👍

However it doesn't belong in the controller. controllers should only concern themselves with providing a simple hardware API.

Ideally we have a backend app handling "business logic". Until then we can do that logic in the node-red frontend. WDYT?


pump_stepper = Motor(pin=23, spi_bus=0, spi_device=0)
pump_stepper.acceleration = 2000
Expand Down Expand Up @@ -52,19 +54,40 @@ async def start() -> None:
async with client, task_group:
_ = await asyncio.gather(
client.subscribe("actuator/pump"),
client.subscribe("status/imager"), # FIX: Track acquisition state
# publish_status(),
)
async for message in client.messages:
task_group.create_task(handle_message(message))


async def handle_message(message) -> None:
global acquisition_in_progress

# FIX: Handle imager status messages to track acquisition state
if message.topic.matches("status/imager"):
payload = json.loads(message.payload.decode("utf-8"))
status = payload.get("status", "")
if status == "Started":
print("Acquisition started - locking pump for acquisition commands only")
acquisition_in_progress = True
elif status in ("Done", "Interrupted"):
print("Acquisition ended - unlocking pump for manual control")
acquisition_in_progress = False
return

if not message.topic.matches("actuator/pump"):
return

payload = json.loads(message.payload.decode("utf-8"))
pprint(payload)

# FIX: During acquisition, only accept commands tagged with from_acquisition
is_from_acquisition = payload.get("from_acquisition", False)
if acquisition_in_progress and not is_from_acquisition:
print(f"Ignoring pump command during acquisition (use from_acquisition flag): {payload}")
return

action = payload.get("action")
if action is not None:
await handle_action(action, payload)
Expand Down
6 changes: 3 additions & 3 deletions node-red/projects/dashboard/flows.json
Original file line number Diff line number Diff line change
Expand Up @@ -2183,8 +2183,8 @@
"initialize": "",
"finalize": "",
"libs": [],
"x": 650,
"y": 180,
"x": 710,
"y": 140,
"wires": [
[]
]
Expand Down Expand Up @@ -2557,7 +2557,7 @@
"templateScope": "widget:ui",
"className": "",
"x": 800,
"y": 180,
"y": 200,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please revert unrelated changes (flows.json)

"wires": [
[
"1db1c2e3e19e85ff"
Expand Down
Loading
Loading