Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion openscan_firmware/config/light.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,9 @@ class LightConfig(BaseModel):
default=False,
description="Indicates whether this light hardware can handle PWM (otherwise only on/off).",
)
pwm_frequency: float = Field(10000.0, ge=50.0, le=100000.0, description="PWM frequency for led driver.")
pwm_min: float = Field(0.0, ge=0, le=3.3, description="Minimum pwm voltage for led driver.")
pwm_max: float = Field(0.0, ge=0, le=3.3, description="Maximum pwm voltage for led driver.")

@model_validator(mode="before")
@classmethod
Expand All @@ -37,4 +40,16 @@ def ensure_pins(cls, values):
merged_pins.append(pin)

values["pins"] = list(dict.fromkeys(merged_pins))
return values
return values

@model_validator(mode="after")
def validate_pwm_range(self):
"""
Ensures pwm_min <= pwm_max and consistency with pwm_support.
"""

if self.pwm_min >= self.pwm_max:
raise ValueError("pwm_min must be less than pwm_max")


return self
98 changes: 86 additions & 12 deletions openscan_firmware/controllers/hardware/gpio.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import logging

from gpiozero import DigitalOutputDevice, Button
from gpiozero import DigitalOutputDevice, PWMOutputDevice, Button
from typing import Dict, List, Optional, Callable

# hardware PWM module
from openscan_firmware.utils.pwm_hardware import hwpwm

logger = logging.getLogger(__name__)

# Track pins and buttons
_output_pins = {}
_pwm_pins = {}
_buttons = {}


Expand All @@ -15,8 +19,10 @@ def initialize_output_pins(pins: List[int]):
for pin in pins:
if pin in _output_pins:
logger.warning(f"Warning: Output pin {pin} already initialized.")
elif pin in _pwm_pins:
logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as PWM.")
elif pin in _buttons:
logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as Button.")
logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as button.")
else:
try:
_output_pins[pin] = DigitalOutputDevice(pin, initial_value=False)
Expand All @@ -27,7 +33,6 @@ def initialize_output_pins(pins: List[int]):
if pin in _output_pins:
del _output_pins[pin]


def toggle_output_pin(pin: int):
"""Toggles the state of an output pin."""
if pin in _output_pins:
Expand All @@ -44,14 +49,6 @@ def set_output_pin(pin: int, status: bool):
logger.warning(f"Warning: Cannot set pin {pin}. Not initialized as output.")


def get_initialized_pins() -> Dict[str, List[int]]:
"""Returns a dictionary listing initialized output pins and buttons."""
return {
"output_pins": list(_output_pins.keys()),
"buttons": list(_buttons.keys())
}


def get_output_pin(pin: int):
"""Returns the state of an output pin."""
if pin in _output_pins:
Expand All @@ -61,6 +58,63 @@ def get_output_pin(pin: int):
return None


def initialize_pwm_pins(pins: List[int], freq: int):
"""Initializes one or more GPIO pins as pwm outputs."""
for pin in pins:
if pin in _pwm_pins:
logger.warning(f"Warning: PWM pin {pin} already initialized.")
elif pin in _output_pins:
logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as output.")
elif pin in _buttons:
logger.error(f"Error: Cannot initialize pin {pin} as PWM. Already initialized as button.")
else:
try:
if hwpwm.supports(pin):
_pwm_pins[pin] = pin
hwpwm.setup(pin)
hwpwm.set_frequency(pin, freq)
logger.info(f"Initialized pin {pin} as hardware PWM.")
else:
_pwm_pins[pin] = PWMOutputDevice(pin, active_high=True, initial_value=0.0, frequency=freq)
logger.info(f"Initialized pin {pin} as software PWM.")
except Exception as e:
logger.error(f"Error initializing PWM pin {pin}: {e}", exc_info=True)
# Clean up if initialization failed partially
if pin in _pwm_pins:
del _pwm_pins[pin]

def set_pwm_pin(pin: int, value: float):
"""Sets the value of a PWM pin."""
if pin in _pwm_pins:
dev = _pwm_pins[pin]

# on hw pwm we store just pin number here, not the device
if isinstance(dev, int):
# hw pwm
hwpwm.set_duty_cycle(dev, value)
else:
# soft pwm
_pwm_pins[pin].value = value
else:
logger.warning(f"Warning: Cannot set pin {pin}. Not initialized as PWM.")

def get_pwm_pin(pin: int):
"""Returns the state of an output pin."""
if pin in _pwm_pins:
dev = _pwm_pins[pin]

# on hw pwm we store just pin number here, not the device
if isinstance(dev, int):
# hw pwm
return hwpwm.get_duty_cycle(dev)
else:
# soft pwm
return _pwm_pins[pin].value
else:
logger.warning(f"Warning: Pin {pin} not initialized as PWM.")
return None


def initialize_button(pin: int, pull_up: Optional[bool] = True, bounce_time: Optional[float] = 0.05):
"""
Initializes a GPIO pin as button input using gpiozero.Button.
Expand All @@ -76,6 +130,8 @@ def initialize_button(pin: int, pull_up: Optional[bool] = True, bounce_time: Opt
logger.warning(f"Warning: Button on pin {pin} already initialized.")
elif pin in _output_pins:
logger.error(f"Error: Cannot initialize pin {pin} as Button. Already initialized as output.")
elif pin in _pwm_pins:
logger.error(f"Error: Cannot initialize pin {pin} as output. Already initialized as PWM.")
else:
try:
_buttons[pin] = Button(pin, pull_up=pull_up, bounce_time=bounce_time, hold_time=0.01)
Expand Down Expand Up @@ -160,10 +216,28 @@ def is_button_pressed(pin: int) -> Optional[bool]:
# Returning None indicates it's not a known button.
return None

def get_initialized_pins() -> Dict[str, List[int]]:
"""Returns a dictionary listing initialized output pins and buttons."""
return {
"output_pins": list(_output_pins.keys()),
"pwm_pins": list(_pwm_pins.keys()),
"buttons": list(_buttons.keys())
}

def cleanup_all_pins():
"""Closes all initialized GPIO devices (output pins and buttons)."""
logger.debug("Cleaning up GPIO resources...")

# Close PWM pins
pins_to_remove = list(_pwm_pins.keys()) # Create a copy of keys to iterate over
for pin in pins_to_remove:
try:
_pwm_pins[pin].close()
del _pwm_pins[pin] # Remove from tracking dict after successful close
logger.debug(f"Output pin {pin} closed.")
except Exception as e:
logger.error(f"Error closing output pin {pin}: {e}", exc_info=True)

# Close output pins
pins_to_remove = list(_output_pins.keys()) # Create a copy of keys to iterate over
for pin in pins_to_remove:
Expand All @@ -188,4 +262,4 @@ def cleanup_all_pins():
if not _output_pins and not _buttons:
logger.info("GPIO cleanup successful. All tracked pins released.")
else:
logger.warning(f"GPIO cleanup potentially incomplete. Remaining outputs: {list(_output_pins.keys())}, Remaining buttons: {list(_buttons.keys())}")
logger.warning(f"GPIO cleanup potentially incomplete. Remaining outputs: {list(_output_pins.keys())}, Remaining buttons: {list(_buttons.keys())}")
48 changes: 42 additions & 6 deletions openscan_firmware/controllers/hardware/lights.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from openscan_firmware.controllers.hardware.interfaces import HardwareEvent, SwitchableHardware, SleepCapableHardware, create_controller_registry
from openscan_firmware.controllers.services.device_events import schedule_device_status_broadcast

from openscan_firmware.utils.inactivity_timer import inactivity_timer

logger = logging.getLogger(__name__)

class LightController(SwitchableHardware, SleepCapableHardware):
Expand All @@ -26,17 +28,25 @@ def __init__(self, light: Light):
on_change=self._apply_settings_to_hardware
)
self._is_on = False
# idle helpers must exist before first refresh
self.is_idle = lambda: False
self._value = self.settings.pwm_max

# no idle callbacks
self.is_idle = lambda: True
self.send_event = None

self._apply_settings_to_hardware(self.settings.model)
logger.debug(f"Light controller for '{self.model.name}' initialized.")

def _apply_settings_to_hardware(self, settings: LightConfig):
"""Apply settings to hardware and preserve light state."""
self.model.settings = settings

gpio.initialize_output_pins(self.settings.pins)
if self.settings.pwm_support:
logger.info(f"Light '{self.model.name}' initializing PWM.")
gpio.initialize_pwm_pins(self.settings.pins, self.settings.pwm_frequency)
else:
logger.info(f"Light '{self.model.name}' initializing digital.")
gpio.initialize_output_pins(self.settings.pins)
# Re-apply desired state synchronously; refresh handles idle logic
self.refresh()

Expand All @@ -47,21 +57,32 @@ def get_status(self):
return {
"name": self.model.name,
"is_on": self.is_on,
"value": self._value,
"settings": self.get_config().model_dump()
}

def get_config(self) -> LightConfig:
return self.settings.model

def refresh(self):
inactivity_timer.reset()
if self.is_idle():
logger.info(f"Light '{self.model.name}' idle.")
for pin in self.settings.pins:
gpio.set_output_pin(pin, False)
if self.settings.pwm_support:
gpio.set_pwm_pin(pin, self.settings.pwm_min)
else:
gpio.set_output_pin(pin, False)
else:
logger.info(f"Light '{self.model.name}' active.")
for pin in self.settings.pins:
gpio.set_output_pin(pin, self._is_on)
if self.settings.pwm_support:
_minVal = self.settings.pwm_min / 3.3
_maxVal = self.settings.pwm_max / 3.3
_val = self._value / 100.0 * (_maxVal - _minVal) + _minVal
gpio.set_pwm_pin(pin, _val if self._is_on else _minVal)
else:
gpio.set_output_pin(pin, self._is_on)


def set_idle_callbacks(self, is_idle: Callable[[], bool], send_event: Callable[[HardwareEvent], Awaitable[None]]) -> None:
Expand Down Expand Up @@ -91,6 +112,21 @@ async def turn_off(self):
await self._wake_if_idle(HardwareEvent.LIGHT_EVENT)
logger.info(f"Light '{self.model.name}' turned off.")
schedule_device_status_broadcast([f"lights.{self.model.name}.is_on"])

async def set_value(self, value: float):
if value < 0:
self._value = 0
elif value > 100:
self._value = 100
else:
self._value = value
#resume from idle
if self.is_idle():
logger.info("Device idle, must exit before")
await self.send_event(HardwareEvent.LIGHT_EVENT)
else:
self.refresh()
logger.info(f"Light '{self.model.name}' value set to {self._value}.")


create_light_controller, get_light_controller, remove_light_controller, _light_registry = create_controller_registry(LightController)
Expand Down
27 changes: 26 additions & 1 deletion openscan_firmware/routers/next/lights.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from fastapi import APIRouter, HTTPException
from fastapi import APIRouter, HTTPException, Query
from pydantic import BaseModel

from openscan_firmware.controllers.hardware.lights import get_light_controller, get_all_light_controllers
Expand Down Expand Up @@ -102,6 +102,31 @@ async def toggle_light(light_name: str):
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))

@router.put("/{light_name}/intensity", response_model=LightStatusResponse)
async def pwm_light(
light_name: str,
value: float = Query(
100,
description=(
"sets light intensity, from 0 to 100%"
),
),
):
"""Set light intensity

Args:
light_name: The name of the light to toggle
value: intensity of light, from 0% to 100%

Returns:
LightStatusResponse: A response object containing the status of the light after the toggle operation
"""
try:
controller = get_light_controller(light_name)
await controller.set_value(value)
return controller.get_status()
except ValueError as e:
raise HTTPException(status_code=404, detail=str(e))

create_settings_endpoints(
router=router,
Expand Down
Loading