-
Notifications
You must be signed in to change notification settings - Fork 41
[TRCL-658][feat] SPARC: support for LabCube after spectrograph #3339
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
601253c
f5a055b
9a246e9
12498ce
c441dba
3c5bae0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,99 @@ | ||||||||||||||||||||||||||||||||
| # -*- coding: utf-8 -*- | ||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||
| Created on 5 Dev 2025 | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| @author: Éric Piel | ||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||
| Gives ability to acquire a spectrum data, while keeping the raw CCD image (ie, without vertical binning) | ||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||
| Gives ability to acquire a spectrum data, while keeping the raw CCD image (ie, without vertical binning) | |
| Provides live display and control of photo-detector signals for SPARC systems using monochromator settings. |
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in docstring: "add to to all compatible viewports" should be "add to all compatible viewports" (remove duplicate "to").
| """ Create a Monochromator stream, using a photo-detector and add to to all compatible viewports""" | |
| """ Create a Monochromator stream, using a photo-detector and add to all compatible viewports""" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🧩 Analysis chain
🏁 Script executed:
#!/bin/bash
# First, locate and examine the file
find . -name "photo_det_live.py" -type fRepository: delmic/odemis
Length of output: 84
🏁 Script executed:
#!/bin/bash
# Get file size to determine how to read it
wc -l plugins/photo_det_live.pyRepository: delmic/odemis
Length of output: 85
🏁 Script executed:
#!/bin/bash
# Read the relevant lines (33-80 to see full context)
sed -n '33,80p' plugins/photo_det_live.pyRepository: delmic/odemis
Length of output: 2272
🏁 Script executed:
#!/bin/bash
# Search for StreamController class definition and usage
rg "class StreamController" --type py -A 5Repository: delmic/odemis
Length of output: 462
🏁 Script executed:
#!/bin/bash
# Search for add_photo_det_stream method to see what it returns
rg "def add_photo_det_stream" --type py -A 20Repository: delmic/odemis
Length of output: 1618
🏁 Script executed:
#!/bin/bash
# Read the complete add_photo_det_stream method (lines 59-99)
sed -n '59,99p' plugins/photo_det_live.pyRepository: delmic/odemis
Length of output: 1670
Add class and method docstrings in reST format and return type hints.
The class and both methods lack proper documentation and return type annotations per Python guidelines:
- Class
PhotoDetectorLivePluginhas no docstring __init__is missing both docstring and return type hint (-> None)add_photo_det_streamhas an improperly formatted docstring (single-line, non-reST) and is missing a return type hint; the method returnsstream_conton line 97
✏️ Suggested update
class PhotoDetectorLivePlugin(Plugin):
+ """
+ Live-display plugin for photo-detector monochromator streams.
+ """
name = "Photo-detector live display"
@@
- def __init__(self, microscope: model.Microscope, main_app: OdemisGUIApp):
+ def __init__(self, microscope: model.Microscope, main_app: OdemisGUIApp) -> None:
+ """
+ Initialize the plugin and register photo-detector stream actions.
+
+ :param microscope: Active microscope instance.
+ :param main_app: Main GUI application.
+ """
@@
- def add_photo_det_stream(self, name: str, detector: model.Detector):
- """ Create a Monochromator stream, using a photo-detector and add to to all compatible viewports"""
+ def add_photo_det_stream(self, name: str, detector: model.Detector) -> "StreamController":
+ """
+ Create a monochromator stream using a photo-detector and add it to compatible viewports.
+
+ :param name: Stream name.
+ :param detector: Photo-detector to bind.
+ :return: The created stream controller.
+ """🤖 Prompt for AI Agents
In `@plugins/photo_det_live.py` around lines 33 - 59, Add reST docstrings and
return type hints: add a class-level reST docstring to PhotoDetectorLivePlugin
describing purpose and authorship; add an __init__ docstring in reST and
annotate __init__ with -> None; replace the single-line docstring in
add_photo_det_stream with a proper multi-line reST docstring describing
parameters (name, detector), behavior and return value, and add a return type
hint for add_photo_det_stream matching the actual returned object (the variable
stream_cont) — if the concrete type isn't available, import typing.Any or
typing.Optional and annotate accordingly; update imports if needed.
Copilot
AI
Feb 2, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The code unconditionally adds spectrograph axes to the axes dictionary without checking if spg is None. When no spectrograph affects the detector and main_data.spectrograph is also None (which can happen in some microscope configurations), spg will be None, leading to axis map entries with None components. This pattern is inconsistent with addTimeCorrelator (line 2009 in stream_bar.py) which checks if spg is truthy before adding spectrograph axes. Consider wrapping the axes.update() call in an if spg: condition similar to how it's done in addTimeCorrelator.
| axes.update({ | |
| "wavelength": ("wavelength", spg), | |
| "grating": ("grating", spg), | |
| "iris-in": ("iris-in", spg), | |
| "slit-in": ("slit-in", spg), | |
| "slit-monochromator": ("slit-monochromator", spg), | |
| }) | |
| if spg: | |
| axes.update({ | |
| "wavelength": ("wavelength", spg), | |
| "grating": ("grating", spg), | |
| "iris-in": ("iris-in", spg), | |
| "slit-in": ("slit-in", spg), | |
| "slit-monochromator": ("slit-monochromator", spg), | |
| }) |
| Original file line number | Diff line number | Diff line change | ||||||||
|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -38,6 +38,7 @@ | |||||||||
| import threading | ||||||||||
| from collections import OrderedDict | ||||||||||
| from concurrent.futures import CancelledError | ||||||||||
| from typing import Dict | ||||||||||
|
|
||||||||||
| try: | ||||||||||
| import canopen | ||||||||||
|
|
@@ -443,20 +444,28 @@ def __init__(self, name, role, port, axes, ustepsize, address=None, | |||||||||
| # Add digital output axes | ||||||||||
| self._do_axes = do_axes or {} | ||||||||||
| self._led_prot_do = led_prot_do or {} | ||||||||||
| self._expected_do_pos: Dict[str, float] = {} # DO positions, as requested by the user | ||||||||||
| for channel, (an, hpos, lpos, dur) in self._do_axes.items(): | ||||||||||
| if an in self._name_to_axis or an in self._name_to_do_axis: | ||||||||||
| raise ValueError("Axis %s specified multiple times" % an) | ||||||||||
| if not 0 <= dur < 1000: | ||||||||||
| raise ValueError("Axis %s duration %s should be in seconds" % (an, dur)) | ||||||||||
| axes_def[an] = model.Axis(choices={lpos, hpos}) | ||||||||||
| self._name_to_do_axis[an] = channel | ||||||||||
| self._expected_do_pos[an] = lpos # Always set to low at init via call to _releaseRefSwitch() just after | ||||||||||
|
|
||||||||||
| for channel, pos in self._led_prot_do.items(): | ||||||||||
| if channel not in self._do_axes: | ||||||||||
| raise ValueError("led_prot_do channel %s is not specified as a do-axis" % channel) | ||||||||||
| if pos not in self._do_axes[channel][1:3]: | ||||||||||
| raise ValueError("led_prot_do of channel %d has position %s, not in do_axes" % (channel, pos)) | ||||||||||
|
|
||||||||||
| if self._led_prot_do: | ||||||||||
| # Add a VA to allow forcing the LED protection on | ||||||||||
| # If it's False: normal operation, the protection is activated during referencing | ||||||||||
| # If it's True: the protection is always active | ||||||||||
| self.protection = model.BooleanVA(False, setter=self._set_protection) | ||||||||||
|
|
||||||||||
| model.Actuator.__init__(self, name, role, axes=axes_def, **kwargs) | ||||||||||
|
|
||||||||||
| driver_name = driver.getSerialDriver(self._portpattern) | ||||||||||
|
|
@@ -479,7 +488,7 @@ def __init__(self, name, role, port, axes, ustepsize, address=None, | |||||||||
| logging.warning("Acceleration of axis %s is null, most probably due to a bad hardware configuration", n) | ||||||||||
|
|
||||||||||
| # Check state of refswitch on startup | ||||||||||
| self._expected_do_pos = {} # do positions before referencing, will be reset after refswitch is released | ||||||||||
| # Use _refswitch_lock to access this attribute | ||||||||||
| self._leds_on = any(self.GetIO(2, rs) for rs in self._refswitch.values()) | ||||||||||
| if self._leds_on: | ||||||||||
| logging.debug("Refswitch is on during initialization, releasing refswitch for all axes.") | ||||||||||
|
|
@@ -1450,30 +1459,65 @@ def _cancelReferencing2xFF(self, axis): | |||||||||
| gparam = 128 + axis | ||||||||||
| self.SetGlobalParam(2, gparam, 3) # 3 => indicate cancelled | ||||||||||
|
|
||||||||||
| def _switch_led_prot(self, protected: bool) -> None: | ||||||||||
| """ | ||||||||||
| Blocks until the move duration is passed, and will update .position based on the actual | ||||||||||
| state of the DO axes. | ||||||||||
| Must be called with _refswitch_lock held. | ||||||||||
| :param protected: If True, force the shutters closed. If False, set them | ||||||||||
| to the expected position (if _leds_on is False). | ||||||||||
| """ | ||||||||||
| tsleep = 0 # max transition period for all shutters | ||||||||||
| if protected: | ||||||||||
| logging.debug("Forcing the protection active") | ||||||||||
| for channel, val in self._led_prot_do.items(): | ||||||||||
| do_an, hpos, lpos, dur = self._do_axes[channel] | ||||||||||
| # If the shutter is already in the right position, no need to wait for it to move | ||||||||||
| if self.position.value[do_an] == val: | ||||||||||
| logging.debug("Shutter on axis %s already in protected position", do_an) | ||||||||||
| else: | ||||||||||
| tsleep = max(tsleep, dur) | ||||||||||
| # Set the DO to the "protected" position even if the position reports it's already | ||||||||||
| # there to be really sure. It's very fast anyway. | ||||||||||
| self.SetIO(2, channel, val == hpos) | ||||||||||
| else: | ||||||||||
| # Set the shutter in the "right" position: | ||||||||||
| # if _leds_on is False, set to the expected position | ||||||||||
| # if _leds_on is True, leave them closed (ie, do nothing) | ||||||||||
| if not self._leds_on: | ||||||||||
| # Set digital axis outputs to latest requested value | ||||||||||
| for an, val in self._expected_do_pos.items(): | ||||||||||
| channel = self._name_to_do_axis[an] | ||||||||||
| if channel not in self._led_prot_do: | ||||||||||
| continue | ||||||||||
| _, hpos, lpos, dur = self._do_axes[channel] | ||||||||||
| if self.position.value[an] == val: | ||||||||||
| logging.debug("Shutter on axis %s already in protected position", an) | ||||||||||
| else: | ||||||||||
| tsleep = max(tsleep, dur) | ||||||||||
| self.SetIO(2, channel, val == hpos) | ||||||||||
|
|
||||||||||
| time.sleep(tsleep) | ||||||||||
| self._updatePosition(axes={}) | ||||||||||
|
|
||||||||||
| def _set_protection(self, protected: bool) -> bool: | ||||||||||
| if self.protection.value == protected: | ||||||||||
| return protected # no change | ||||||||||
|
|
||||||||||
| with self._refswitch_lock: | ||||||||||
| self._switch_led_prot(protected) | ||||||||||
|
|
||||||||||
| return protected | ||||||||||
|
Comment on lines
+1462
to
+1510
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Add a docstring for
🛠️ Proposed fix for an, val in self._expected_do_pos.items():
channel = self._name_to_do_axis[an]
if channel not in self._led_prot_do:
continue
- _, hpos, lpos, dur = self._do_axes[channel]
+ _, hpos, _lpos, dur = self._do_axes[channel]
if self.position.value[an] == val:
logging.debug("Shutter on axis %s already in protected position", an)
else:
tsleep = max(tsleep, dur)
self.SetIO(2, channel, val == hpos)
@@
- def _set_protection(self, protected: bool) -> bool:
+ def _set_protection(self, protected: bool) -> bool:
+ """
+ Toggle LED protection state.
+
+ :param protected: True to force protection, False to restore expected DO state.
+ :return: The requested protection state.
+ """
if self.protection.value == protected:
return protected # no change🧰 Tools🪛 Ruff (0.14.14)[warning] 1493-1493: Unpacked variable Prefix it with an underscore or any other dummy variable pattern (RUF059) 🤖 Prompt for AI Agents |
||||||||||
|
|
||||||||||
| def _requestRefSwitch(self, axis): | ||||||||||
| refswitch = self._refswitch.get(axis) | ||||||||||
| if refswitch is None: | ||||||||||
| return | ||||||||||
|
|
||||||||||
| with self._refswitch_lock: | ||||||||||
| # Set _leds_on attribute before closing shutters to make sure they are not | ||||||||||
| # opened again in a concurrent thread | ||||||||||
| leds_were_on = self._leds_on | ||||||||||
| self._leds_on = True # do this before closing shutters | ||||||||||
| # Close shutters | ||||||||||
| tsleep = 0 # max transition period for all shutters | ||||||||||
| for channel, val in self._led_prot_do.items(): | ||||||||||
| do_an, hpos, lpos, dur = self._do_axes[channel] | ||||||||||
| if not leds_were_on: | ||||||||||
| self._expected_do_pos[do_an] = self.position.value[do_an] | ||||||||||
| # TODO: ideally, for each DO, we should know when was the last time it | ||||||||||
| # was set, and if it's been set to the requested value for long | ||||||||||
| # enough, we don't need to do the extra sleep | ||||||||||
| self.SetIO(2, channel, val == hpos) | ||||||||||
| tsleep = max(tsleep, dur) | ||||||||||
|
|
||||||||||
| time.sleep(tsleep) | ||||||||||
| self._updatePosition() | ||||||||||
| self._leds_on = True | ||||||||||
| # Activate protection (ie, force the shutters closed) | ||||||||||
| self._switch_led_prot(protected=True) | ||||||||||
|
|
||||||||||
| self._active_refswitchs.add(axis) | ||||||||||
| logging.debug("Activating ref switch power line %d (for axis %d)", refswitch, axis) | ||||||||||
|
|
@@ -1510,15 +1554,8 @@ def _releaseRefSwitch(self, axis): | |||||||||
| logging.debug("Leaving ref switch power line %d active", refswitch) | ||||||||||
|
|
||||||||||
| # Set digital axis outputs to latest requested value | ||||||||||
| if not self._leds_on: | ||||||||||
| tsleep = 0 # max transition period for all shutters | ||||||||||
| for an, val in self._expected_do_pos.items(): | ||||||||||
| channel = self._name_to_do_axis[an] | ||||||||||
| _, hpos, lpos, dur = self._do_axes[channel] | ||||||||||
| self.SetIO(2, channel, val == hpos) | ||||||||||
| tsleep = max(tsleep, dur) | ||||||||||
| time.sleep(tsleep) | ||||||||||
| self._updatePosition() | ||||||||||
| if not hasattr(self, "protection") or not self.protection.value: | ||||||||||
| self._switch_led_prot(protected=False) | ||||||||||
|
|
||||||||||
| def _startReferencingStd(self, axis): | ||||||||||
| """ | ||||||||||
|
|
@@ -1951,15 +1988,16 @@ def _doMoveAbs(self, future, pos): | |||||||||
| # Check if it's a digital output | ||||||||||
| if an in self._name_to_do_axis: | ||||||||||
| channel = self._name_to_do_axis[an] | ||||||||||
| _, hpos, lpos, dur = self._do_axes[channel] | ||||||||||
| with self._refswitch_lock: # don't start do move at the same time as referencing | ||||||||||
| if self._leds_on and channel in self._led_prot_do: | ||||||||||
| # don't move protected do axis now if leds are on, schedule for later | ||||||||||
| self._expected_do_pos[an] = v | ||||||||||
| with self._refswitch_lock: | ||||||||||
| self._expected_do_pos[an] = v # Update user-requested position | ||||||||||
| if channel in self._led_prot_do and (self.protection.value or self._leds_on): | ||||||||||
|
||||||||||
| if channel in self._led_prot_do and (self.protection.value or self._leds_on): | |
| protection_attr = getattr(self, "protection", None) | |
| protection_active = bool(protection_attr.value) if protection_attr is not None else False | |
| if channel in self._led_prot_do and (protection_active or self._leds_on): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typo in date: "Dev" should be "Dec" (December).