From 6098b7cba40a7f637fbddbf489b6eb789d3aaad2 Mon Sep 17 00:00:00 2001 From: esto-openscan <193278706+esto-openscan@users.noreply.github.com> Date: Thu, 5 Mar 2026 17:25:59 +0000 Subject: [PATCH 1/3] initial pwm support --- openscan_firmware/config/light.py | 17 ++++- .../controllers/hardware/gpio.py | 73 ++++++++++++++++--- .../controllers/hardware/lights.py | 48 ++++++++++-- openscan_firmware/routers/next/lights.py | 27 ++++++- 4 files changed, 145 insertions(+), 20 deletions(-) diff --git a/openscan_firmware/config/light.py b/openscan_firmware/config/light.py index b8cd511..4ac77b8 100644 --- a/openscan_firmware/config/light.py +++ b/openscan_firmware/config/light.py @@ -12,6 +12,9 @@ class LightConfig(BaseModel): default=False, description="Indicates whether this light hardware can handle PWM (otherwise only on/off).", ) + pwm_frequency: float = Field(10000.0, ge=50.0, le=100000.0, description="PWM frequency for led driver.") + pwm_min: float = Field(0.0, ge=0, le=3.3, description="Minimum pwm voltage for led driver.") + pwm_max: float = Field(0.0, ge=0, le=3.3, description="Maximum pwm voltage for led driver.") @model_validator(mode="before") @classmethod @@ -37,4 +40,16 @@ def ensure_pins(cls, values): merged_pins.append(pin) values["pins"] = list(dict.fromkeys(merged_pins)) - return values \ No newline at end of file + return values + + @model_validator(mode="after") + def validate_pwm_range(self): + """ + Ensures pwm_min <= pwm_max and consistency with pwm_support. + """ + + if self.pwm_min >= self.pwm_max: + raise ValueError("pwm_min must be less than pwm_max") + + + return self diff --git a/openscan_firmware/controllers/hardware/gpio.py b/openscan_firmware/controllers/hardware/gpio.py index e2e0d55..5189e6e 100644 --- a/openscan_firmware/controllers/hardware/gpio.py +++ b/openscan_firmware/controllers/hardware/gpio.py @@ -1,12 +1,13 @@ import logging -from gpiozero import DigitalOutputDevice, Button +from gpiozero import DigitalOutputDevice, PWMOutputDevice, Button from typing import Dict, List, Optional, Callable logger = logging.getLogger(__name__) # Track pins and buttons _output_pins = {} +_pwm_pins = {} _buttons = {} @@ -15,8 +16,10 @@ def initialize_output_pins(pins: List[int]): for pin in pins: if pin in _output_pins: logger.warning(f"Warning: Output pin {pin} already initialized.") + elif pin in _pwm_pins: + logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as PWM.") elif pin in _buttons: - logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as Button.") + logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as button.") else: try: _output_pins[pin] = DigitalOutputDevice(pin, initial_value=False) @@ -27,7 +30,6 @@ def initialize_output_pins(pins: List[int]): if pin in _output_pins: del _output_pins[pin] - def toggle_output_pin(pin: int): """Toggles the state of an output pin.""" if pin in _output_pins: @@ -44,14 +46,6 @@ def set_output_pin(pin: int, status: bool): logger.warning(f"Warning: Cannot set pin {pin}. Not initialized as output.") -def get_initialized_pins() -> Dict[str, List[int]]: - """Returns a dictionary listing initialized output pins and buttons.""" - return { - "output_pins": list(_output_pins.keys()), - "buttons": list(_buttons.keys()) - } - - def get_output_pin(pin: int): """Returns the state of an output pin.""" if pin in _output_pins: @@ -61,6 +55,41 @@ def get_output_pin(pin: int): return None +def initialize_pwm_pins(pins: List[int], freq: int): + """Initializes one or more GPIO pins as pwm outputs.""" + for pin in pins: + if pin in _pwm_pins: + logger.warning(f"Warning: PWM pin {pin} already initialized.") + elif pin in _output_pins: + logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as output.") + elif pin in _buttons: + logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as button.") + else: + try: + _pwm_pins[pin] = PWMOutputDevice(pin, active_high=True, initial_value=0.0, frequency=freq) + logger.debug(f"Initialized pin {pin} as PWM.") + except Exception as e: + logger.error(f"Error initializing PWM pin {pin}: {e}", exc_info=True) + # Clean up if initialization failed partially + if pin in _pwm_pins: + del _pwm_pins[pin] + +def set_pwm_pin(pin: int, value: float): + """Sets the value of a PWM pin.""" + if pin in _pwm_pins: + _pwm_pins[pin].value = value + else: + logger.warning(f"Warning: Cannot set pin {pin}. Not initialized as PWM.") + +def get_pwm_pin(pin: int): + """Returns the state of an output pin.""" + if pin in _pwm_pins: + return _pwm_pins[pin].value + else: + logger.warning(f"Warning: Pin {pin} not initialized as PWM.") + return None + + def initialize_button(pin: int, pull_up: Optional[bool] = True, bounce_time: Optional[float] = 0.05): """ Initializes a GPIO pin as button input using gpiozero.Button. @@ -76,6 +105,8 @@ def initialize_button(pin: int, pull_up: Optional[bool] = True, bounce_time: Opt logger.warning(f"Warning: Button on pin {pin} already initialized.") elif pin in _output_pins: logger.error(f"Error: Cannot initialize pin {pin} as Button. Already initialized as output.") + elif pin in _pwm_pins: + logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as PWM.") else: try: _buttons[pin] = Button(pin, pull_up=pull_up, bounce_time=bounce_time, hold_time=0.01) @@ -160,10 +191,28 @@ def is_button_pressed(pin: int) -> Optional[bool]: # Returning None indicates it's not a known button. return None +def get_initialized_pins() -> Dict[str, List[int]]: + """Returns a dictionary listing initialized output pins and buttons.""" + return { + "output_pins": list(_output_pins.keys()), + "pwm_pins": list(_pwm_pins.keys()), + "buttons": list(_buttons.keys()) + } + def cleanup_all_pins(): """Closes all initialized GPIO devices (output pins and buttons).""" logger.debug("Cleaning up GPIO resources...") + # Close PWM pins + pins_to_remove = list(_pwm_pins.keys()) # Create a copy of keys to iterate over + for pin in pins_to_remove: + try: + _pwm_pins[pin].close() + del _pwm_pins[pin] # Remove from tracking dict after successful close + logger.debug(f"Output pin {pin} closed.") + except Exception as e: + logger.error(f"Error closing output pin {pin}: {e}", exc_info=True) + # Close output pins pins_to_remove = list(_output_pins.keys()) # Create a copy of keys to iterate over for pin in pins_to_remove: @@ -188,4 +237,4 @@ def cleanup_all_pins(): if not _output_pins and not _buttons: logger.info("GPIO cleanup successful. All tracked pins released.") else: - logger.warning(f"GPIO cleanup potentially incomplete. Remaining outputs: {list(_output_pins.keys())}, Remaining buttons: {list(_buttons.keys())}") \ No newline at end of file + logger.warning(f"GPIO cleanup potentially incomplete. Remaining outputs: {list(_output_pins.keys())}, Remaining buttons: {list(_buttons.keys())}") diff --git a/openscan_firmware/controllers/hardware/lights.py b/openscan_firmware/controllers/hardware/lights.py index 0c08dff..dd6fd41 100644 --- a/openscan_firmware/controllers/hardware/lights.py +++ b/openscan_firmware/controllers/hardware/lights.py @@ -16,6 +16,8 @@ from openscan_firmware.controllers.hardware.interfaces import HardwareEvent, SwitchableHardware, SleepCapableHardware, create_controller_registry from openscan_firmware.controllers.services.device_events import schedule_device_status_broadcast +from openscan_firmware.utils.inactivity_timer import inactivity_timer + logger = logging.getLogger(__name__) class LightController(SwitchableHardware, SleepCapableHardware): @@ -26,17 +28,25 @@ def __init__(self, light: Light): on_change=self._apply_settings_to_hardware ) self._is_on = False - # idle helpers must exist before first refresh - self.is_idle = lambda: False + self._value = self.settings.pwm_max + + # no idle callbacks + self.is_idle = lambda: True self.send_event = None + self._apply_settings_to_hardware(self.settings.model) logger.debug(f"Light controller for '{self.model.name}' initialized.") - + def _apply_settings_to_hardware(self, settings: LightConfig): """Apply settings to hardware and preserve light state.""" self.model.settings = settings - gpio.initialize_output_pins(self.settings.pins) + if self.settings.pwm_support: + logger.info(f"Light '{self.model.name}' initializing PWM.") + gpio.initialize_pwm_pins(self.settings.pins, self.settings.pwm_frequency) + else: + logger.info(f"Light '{self.model.name}' initializing digital.") + gpio.initialize_output_pins(self.settings.pins) # Re-apply desired state synchronously; refresh handles idle logic self.refresh() @@ -47,6 +57,7 @@ def get_status(self): return { "name": self.model.name, "is_on": self.is_on, + "value": self._value, "settings": self.get_config().model_dump() } @@ -54,14 +65,24 @@ def get_config(self) -> LightConfig: return self.settings.model def refresh(self): + inactivity_timer.reset() if self.is_idle(): logger.info(f"Light '{self.model.name}' idle.") for pin in self.settings.pins: - gpio.set_output_pin(pin, False) + if self.settings.pwm_support: + gpio.set_pwm_pin(pin, self.settings.pwm_min) + else: + gpio.set_output_pin(pin, False) else: logger.info(f"Light '{self.model.name}' active.") for pin in self.settings.pins: - gpio.set_output_pin(pin, self._is_on) + if self.settings.pwm_support: + _minVal = self.settings.pwm_min / 3.3 + _maxVal = self.settings.pwm_max / 3.3 + _val = self._value / 100.0 * (_maxVal - _minVal) + _minVal + gpio.set_pwm_pin(pin, _val if self._is_on else _minVal) + else: + gpio.set_output_pin(pin, self._is_on) def set_idle_callbacks(self, is_idle: Callable[[], bool], send_event: Callable[[HardwareEvent], Awaitable[None]]) -> None: @@ -91,6 +112,21 @@ async def turn_off(self): await self._wake_if_idle(HardwareEvent.LIGHT_EVENT) logger.info(f"Light '{self.model.name}' turned off.") schedule_device_status_broadcast([f"lights.{self.model.name}.is_on"]) + + async def set_value(self, value: float): + if value < 0: + self._value = 0 + elif value > 100: + self._value = 100 + else: + self._value = value + #resume from idle + if self.is_idle(): + logger.info("Device idle, must exit before") + await self.send_event(HardwareEvent.LIGHT_EVENT) + else: + self.refresh() + logger.info(f"Light '{self.model.name}' value set to {self._value}.") create_light_controller, get_light_controller, remove_light_controller, _light_registry = create_controller_registry(LightController) diff --git a/openscan_firmware/routers/next/lights.py b/openscan_firmware/routers/next/lights.py index 8bc8a08..5e467f4 100644 --- a/openscan_firmware/routers/next/lights.py +++ b/openscan_firmware/routers/next/lights.py @@ -1,4 +1,4 @@ -from fastapi import APIRouter, HTTPException +from fastapi import APIRouter, HTTPException, Query from pydantic import BaseModel from openscan_firmware.controllers.hardware.lights import get_light_controller, get_all_light_controllers @@ -102,6 +102,31 @@ async def toggle_light(light_name: str): except ValueError as e: raise HTTPException(status_code=404, detail=str(e)) +@router.put("/{light_name}/intensity", response_model=LightStatusResponse) +async def pwm_light( + light_name: str, + value: float = Query( + 100, + description=( + "sets light intensity, from 0 to 100%" + ), + ), +): + """Set light intensity + + Args: + light_name: The name of the light to toggle + value: intensity of light, from 0% to 100% + + Returns: + LightStatusResponse: A response object containing the status of the light after the toggle operation + """ + try: + controller = get_light_controller(light_name) + await controller.set_value(value) + return controller.get_status() + except ValueError as e: + raise HTTPException(status_code=404, detail=str(e)) create_settings_endpoints( router=router, From e57ffd61e1027434b2645d76d953eb13dad2fd97 Mon Sep 17 00:00:00 2001 From: esto-openscan <193278706+esto-openscan@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:15:58 +0000 Subject: [PATCH 2/3] added hardware pwm --- .../controllers/hardware/gpio.py | 33 ++- openscan_firmware/utils/pwm_hardware.py | 203 ++++++++++++++++++ 2 files changed, 232 insertions(+), 4 deletions(-) create mode 100644 openscan_firmware/utils/pwm_hardware.py diff --git a/openscan_firmware/controllers/hardware/gpio.py b/openscan_firmware/controllers/hardware/gpio.py index 5189e6e..08a9de2 100644 --- a/openscan_firmware/controllers/hardware/gpio.py +++ b/openscan_firmware/controllers/hardware/gpio.py @@ -3,6 +3,9 @@ from gpiozero import DigitalOutputDevice, PWMOutputDevice, Button from typing import Dict, List, Optional, Callable +# hardware PWM module +from openscan_firmware.utils.pwm_hardware import hwpwm + logger = logging.getLogger(__name__) # Track pins and buttons @@ -66,8 +69,14 @@ def initialize_pwm_pins(pins: List[int], freq: int): logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as button.") else: try: - _pwm_pins[pin] = PWMOutputDevice(pin, active_high=True, initial_value=0.0, frequency=freq) - logger.debug(f"Initialized pin {pin} as PWM.") + if hwpwm.supports(pin): + _pwm_pins[pin] = pin + hwpwm.setup(pin) + hwpwm.set_frequency(pin, freq) + logger.info(f"Initialized pin {pin} as hardware PWM.") + else: + _pwm_pins[pin] = PWMOutputDevice(pin, active_high=True, initial_value=0.0, frequency=freq) + logger.info(f"Initialized pin {pin} as software PWM.") except Exception as e: logger.error(f"Error initializing PWM pin {pin}: {e}", exc_info=True) # Clean up if initialization failed partially @@ -77,14 +86,30 @@ def initialize_pwm_pins(pins: List[int], freq: int): def set_pwm_pin(pin: int, value: float): """Sets the value of a PWM pin.""" if pin in _pwm_pins: - _pwm_pins[pin].value = value + dev = _pwm_pins[pin] + + # on hw pwm we store just pin number here, not the device + if isinstance(dev, int): + # hw pwm + hwpwm.set_duty_cycle(dev, value) + else: + # soft pwm + _pwm_pins[pin].value = value else: logger.warning(f"Warning: Cannot set pin {pin}. Not initialized as PWM.") def get_pwm_pin(pin: int): """Returns the state of an output pin.""" if pin in _pwm_pins: - return _pwm_pins[pin].value + dev = _pwm_pins[pin] + + # on hw pwm we store just pin number here, not the device + if isinstance(dev, int): + # hw pwm + return hwpwm.get_duty_cycle(dev) + else: + # soft pwm + return _pwm_pins[pin].value else: logger.warning(f"Warning: Pin {pin} not initialized as PWM.") return None diff --git a/openscan_firmware/utils/pwm_hardware.py b/openscan_firmware/utils/pwm_hardware.py new file mode 100644 index 0000000..fabb51a --- /dev/null +++ b/openscan_firmware/utils/pwm_hardware.py @@ -0,0 +1,203 @@ +import subprocess +from pathlib import Path + +from dataclasses import dataclass + +import atexit +import signal +import sys + +@dataclass +class _HwPWM: + + _PWMCHIP = Path("/sys/class/pwm/pwmchip0") + + _PIN_INFO = { + 12: {"channel": 0, "alt": "a0"}, + 18: {"channel": 0, "alt": "a5"}, + 13: {"channel": 1, "alt": "a0"}, + 19: {"channel": 1, "alt": "a5"}, + } + + _pins = {} + + # register cleanup at exit + def __init__(self): + atexit.register(_HwPWM._cleanup) + signal.signal(signal.SIGTERM, _HwPWM._signal_handler) + signal.signal(signal.SIGINT, _HwPWM._signal_handler) + + @staticmethod + def _run(cmd): + result = subprocess.run(cmd, check=True, capture_output=True, text=True).stdout + try: + return result.split(":", 1)[1].split()[0] + except: + return "" + + @staticmethod + def _pwm_path(channel): + return _HwPWM._PWMCHIP / f"pwm{channel}" + + + @staticmethod + def _write(path, value): + path.write_text(str(value)) + + + @staticmethod + def _export(channel): + p = _HwPWM._pwm_path(channel) + if not p.exists(): + (_HwPWM._PWMCHIP / "export").write_text(str(channel)) + + + @staticmethod + def _unexport(channel): + p = _HwPWM._pwm_path(channel) + if p.exists(): + (_HwPWM._PWMCHIP / "unexport").write_text(str(channel)) + + + @staticmethod + def supports(pin: int): + # first check if pin is a supported one + if not pin in _HwPWM._PIN_INFO: + return False + + # then check if its PWM is not already in use + chan = _HwPWM._PIN_INFO[pin]["channel"] + for p in _HwPWM._pins.keys(): + # harmless re-set already set pin + if p == pin: + return True + # if using same channel as already setup pin don't accept it + if chan == _HwPWM._PIN_INFO[p]["channel"]: + return False + + # available PWM pin and channel not used, ok + return True + + @staticmethod + def setup(pin: int): + if not _HwPWM.supports(pin): + raise ValueError("unsupported pin or pwm channel in use") + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + # configure pin mux + old_func = _HwPWM._run(["pinctrl", str(pin)]) + _HwPWM._run(["pinctrl", str(pin), info["alt"]]) + + # enable pwm channel + _HwPWM._export(ch) + + pwm = _HwPWM._pwm_path(ch) + + # ensure disabled before configuration + try: + _HwPWM._write(pwm / "enable", 0) + except: + pass + + _HwPWM._pins[pin] = { "freq": 20000.0, "duty": 1.0, "oldfunc": old_func } + + + @staticmethod + def release(pin: int): + if not pin in _HwPWM._pins: + return + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + pwm = _HwPWM._pwm_path(ch) + + if pwm.exists(): + try: + _HwPWM.write(pwm / "enable", 0) + except: + pass + + # return pin to input + _HwPWM._run(["pinctrl", str(pin), _HwPWM._pins[pin]["oldfunc"]]) + + del _HwPWM._pins[pin] + + @staticmethod + def _set_freq_duty(pin: int, freq: float, duty: float): + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + pwm = _HwPWM._pwm_path(ch) + + period_ns = int(1_000_000_000 / freq) + duty_val = int(period_ns * duty) + + _HwPWM._write(pwm / "enable", 0) + _HwPWM._write(pwm / "period", period_ns) + _HwPWM._write(pwm / "duty_cycle", duty_val) + _HwPWM._write(pwm / "enable", 1) + + _HwPWM._pins[pin]["freq"] = freq + _HwPWM._pins[pin]["duty"] = duty + + @staticmethod + def set_frequency(pin: int, freq: float): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + duty = _HwPWM._pins[pin]["duty"] + _HwPWM._set_freq_duty(pin, freq, duty) + + @staticmethod + def get_frequency(pin: int): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + return _HwPWM._pins[pin]["freq"] + + @staticmethod + def set_duty_cycle(pin: int, duty: float): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + info = _HwPWM._PIN_INFO[pin] + ch = info["channel"] + + freq = _HwPWM._pins[pin]["freq"] + _HwPWM._set_freq_duty(pin, freq, duty) + + + @staticmethod + def get_duty_cycle(pin: int): + if not pin in _HwPWM._pins: + raise ValueError("pwm pin not initialized") + + return _HwPWM._pins[pin]["duty"] + + # cleanup routines -- resets PWM pins + + @staticmethod + def _cleanup(): + to_clean = [] + for pin in _HwPWM._pins.keys(): + to_clean.append(pin) + for pin in to_clean: + _HwPWM.release(pin) + + def _signal_handler(signum, frame): + _HwPWM._cleanup() + + +# ========================================================== +# SINGLETON +# ========================================================== + +# hardware pw, singleton +hwpwm = _HwPWM() From fc2de7c5ca31890311cbfede830f9e8c0ed32eb4 Mon Sep 17 00:00:00 2001 From: esto-openscan <193278706+esto-openscan@users.noreply.github.com> Date: Fri, 6 Mar 2026 17:37:54 +0000 Subject: [PATCH 3/3] updated example device config --- settings/device/example_custom.json | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/settings/device/example_custom.json b/settings/device/example_custom.json index 6236f8c..fee54b0 100644 --- a/settings/device/example_custom.json +++ b/settings/device/example_custom.json @@ -26,10 +26,15 @@ } }, "lights": { - "Openscan.eu Ringlight": { - "pins": [12], - "pwm_support": true - } + "Openscan.eu Ringlight": { + "pins": [ + 12 + ], + "pwm_support": true, + "pwm_frequency": 50000.0, + "pwm_min": 0.0, + "pwm_max": 3.3 + } }, "endstops": { "rotor-endstop": { @@ -44,7 +49,7 @@ } } }, - "motors_timeout": 180, + "motors_timeout": 30.0, "startup_mode": "startup_idle", - "calibrate_mode": "calibrate_on_wake" + "calibrate_mode": "calibrate_on_home" }