From 0e719af69f72dc3bb1cba4bea1d449ee6e10e890 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:27:16 +0000 Subject: [PATCH 01/16] Initial plan From e828da38f18912722860df31655dd64f7f7cc512 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:33:43 +0000 Subject: [PATCH 02/16] feat: add UDP telemetry bridge for universal RPM indicator input Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/1c5e11a3-1121-4ded-890a-d44821be9fe5 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 11 +++- boxflat/app.py | 4 ++ boxflat/telemetry_bridge.py | 102 ++++++++++++++++++++++++++++++++++++ 3 files changed, 116 insertions(+), 1 deletion(-) create mode 100644 boxflat/telemetry_bridge.py diff --git a/README.md b/README.md index 8a12e0d..1afad8b 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,21 @@ Boxflat for Moza Racing. Control your Moza gear settings... and more! | Generic devices | Detection fix | | ### Ideas -- Telemetry ingestion through REST API/WebSockets - Cammus support - PXN Support - Simagic support - H-Pattern and Sequential settings available for arbitrary HID devices +### Telemetry bridge (all racing games) +Boxflat now listens for external telemetry on UDP `127.0.0.1:27194` (override with `BOXFLAT_TELEMETRY_PORT`). + +Send JSON with one of these formats: +- `{"rpm_led_mask": 31}` (direct 10-bit LED mask) +- `{"rpm_percent": 50}` or `{"rpm_ratio": 0.5}` +- `{"rpm": 5000, "max_rpm": 10000}` + +This lets any game/tool drive the RPM indicator by forwarding telemetry in a simple common format. + ### Firmware upgrades There are some EEPROM functions available, but I need to do more testing to make sure I won't brick anything. For now, just use Pit House on Windows if you can, as FW upgrade support is not coming in the near future. diff --git a/boxflat/app.py b/boxflat/app.py index 7147462..0009612 100644 --- a/boxflat/app.py +++ b/boxflat/app.py @@ -10,6 +10,7 @@ from boxflat.connection_manager import MozaConnectionManager from boxflat.hid_handler import HidHandler from boxflat.settings_handler import SettingsHandler +from boxflat.telemetry_bridge import TelemetryBridge from threading import Thread, Event import os @@ -134,6 +135,7 @@ class MyApp(Adw.Application): def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool, autostart: bool,**kwargs): super().__init__(**kwargs) self.connect('activate', self.on_activate) + self.connect('shutdown', self._shutdown) self.Tray = None @@ -155,6 +157,7 @@ def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool self._cm = MozaConnectionManager(os.path.join(data_path, "serial.yml"), dry_run) self._cm.subscribe("hid-device-connected", self._hid_handler.add_device) self._cm.subscribe("hid-device-disconnected", self._hid_handler.remove_device) + self._telemetry_bridge = TelemetryBridge(self._cm) with open(os.path.join(data_path, "version"), "r") as version: self._version = version.readline().strip() @@ -332,6 +335,7 @@ def _prepare_settings(self): def _shutdown(self, *_) -> None: + self._telemetry_bridge.shutdown() for panel in self._panels.values(): panel.shutdown() diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py new file mode 100644 index 0000000..7ed4543 --- /dev/null +++ b/boxflat/telemetry_bridge.py @@ -0,0 +1,102 @@ +# Copyright (c) 2025, Tomasz Pakuła Using Arch BTW + +from __future__ import annotations + +import json +import os +import socket +from threading import Event, Thread +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from boxflat.connection_manager import MozaConnectionManager + + +class TelemetryBridge: + def __init__(self, connection_manager: "MozaConnectionManager") -> None: + self._cm = connection_manager + self._shutdown = Event() + self._last_mask = -1 + + self._host = "127.0.0.1" + self._port = int(os.environ.get("BOXFLAT_TELEMETRY_PORT", "27194")) + + self._thread = Thread(target=self._worker, daemon=True) + self._thread.start() + + + def shutdown(self, *_) -> None: + self._shutdown.set() + + + def _worker(self) -> None: + sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) + try: + sock.bind((self._host, self._port)) + except OSError as e: + print(f"Telemetry bridge disabled: {e}") + sock.close() + return + + sock.settimeout(1) + print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") + + while not self._shutdown.is_set(): + try: + payload, _ = sock.recvfrom(4096) + except socket.timeout: + continue + except OSError: + break + + mask = self._packet_to_mask(payload) + if mask is None or mask == self._last_mask: + continue + + self._last_mask = mask + self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") + self._cm.set_setting(mask, "dash-send-telemetry") + + sock.close() + + + def _packet_to_mask(self, payload: bytes) -> int | None: + try: + data = json.loads(payload.decode(errors="ignore")) + except json.JSONDecodeError: + return None + + if not isinstance(data, dict): + return None + + direct_mask = self._first_number(data, "rpm_led_mask", "rpmMask", "rpm-mask", "led_mask") + if direct_mask is not None: + return max(0, min(1023, int(direct_mask))) + + ratio = self._first_number(data, "rpm_percent", "rpmPercent", "rpm_ratio", "rpmRatio") + if ratio is None: + rpm = self._first_number(data, "rpm", "engine_rpm", "engineRpm", "current_rpm", "currentRpm") + max_rpm = self._first_number(data, "max_rpm", "maxRpm", "maxRPM", "rpm_max", "redline") + if rpm is None or max_rpm is None or max_rpm <= 0: + return None + ratio = rpm / max_rpm + elif ratio > 1: + ratio = ratio / 100 + + ratio = max(0, min(1, ratio)) + lit_leds = int(round(ratio * 10)) + return (1 << lit_leds) - 1 if lit_leds > 0 else 0 + + + def _first_number(self, data: dict, *keys: str) -> float | None: + for key in keys: + if key not in data: + continue + + value = data[key] + if isinstance(value, bool): + continue + + if isinstance(value, int | float): + return value + return None From c3ff497882e2e6169434f059093ea9c68a52b7fa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:36:41 +0000 Subject: [PATCH 03/16] docs and bridge polish after review feedback Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/1c5e11a3-1121-4ded-890a-d44821be9fe5 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 2 ++ boxflat/telemetry_bridge.py | 66 +++++++++++++++++++++---------------- 2 files changed, 39 insertions(+), 29 deletions(-) diff --git a/README.md b/README.md index 1afad8b..c3987eb 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,8 @@ Send JSON with one of these formats: - `{"rpm_percent": 50}` or `{"rpm_ratio": 0.5}` - `{"rpm": 5000, "max_rpm": 10000}` +`rpm_led_mask` accepts arbitrary 10-bit patterns (not only progressive fill patterns), for example `{"rpm_led_mask": 682}`. + This lets any game/tool drive the RPM indicator by forwarding telemetry in a simple common format. ### Firmware upgrades diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 7ed4543..905ee14 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -11,6 +11,8 @@ if TYPE_CHECKING: from boxflat.connection_manager import MozaConnectionManager +PERCENT_SCALE = 100 + class TelemetryBridge: def __init__(self, connection_manager: "MozaConnectionManager") -> None: @@ -19,50 +21,53 @@ def __init__(self, connection_manager: "MozaConnectionManager") -> None: self._last_mask = -1 self._host = "127.0.0.1" - self._port = int(os.environ.get("BOXFLAT_TELEMETRY_PORT", "27194")) + try: + self._port = int(os.environ.get("BOXFLAT_TELEMETRY_PORT", "27194")) + except ValueError: + print("Invalid BOXFLAT_TELEMETRY_PORT, falling back to 27194") + self._port = 27194 self._thread = Thread(target=self._worker, daemon=True) self._thread.start() - def shutdown(self, *_) -> None: + def shutdown(self) -> None: self._shutdown.set() def _worker(self) -> None: - sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) - try: - sock.bind((self._host, self._port)) - except OSError as e: - print(f"Telemetry bridge disabled: {e}") - sock.close() - return - - sock.settimeout(1) - print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") - - while not self._shutdown.is_set(): + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: try: - payload, _ = sock.recvfrom(4096) - except socket.timeout: - continue - except OSError: - break + sock.bind((self._host, self._port)) + except OSError as e: + print(f"Telemetry bridge disabled: {e}") + return - mask = self._packet_to_mask(payload) - if mask is None or mask == self._last_mask: - continue + sock.settimeout(1) + print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") + + while not self._shutdown.is_set(): + try: + payload, _ = sock.recvfrom(4096) + except socket.timeout: + continue + except OSError: + break - self._last_mask = mask - self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") - self._cm.set_setting(mask, "dash-send-telemetry") + mask = self._packet_to_mask(payload) + if mask is None or mask == self._last_mask: + continue - sock.close() + self._last_mask = mask + self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") + self._cm.set_setting(mask, "dash-send-telemetry") def _packet_to_mask(self, payload: bytes) -> int | None: try: - data = json.loads(payload.decode(errors="ignore")) + data = json.loads(payload.decode()) + except UnicodeDecodeError: + return None except json.JSONDecodeError: return None @@ -80,8 +85,11 @@ def _packet_to_mask(self, payload: bytes) -> int | None: if rpm is None or max_rpm is None or max_rpm <= 0: return None ratio = rpm / max_rpm - elif ratio > 1: - ratio = ratio / 100 + # Treat values in (1, 100] as percentages while keeping ratio=1.0 as 100%. + elif ratio > 1 and ratio <= PERCENT_SCALE: + ratio = ratio / PERCENT_SCALE + elif ratio > PERCENT_SCALE: + return None ratio = max(0, min(1, ratio)) lit_leds = int(round(ratio * 10)) From 8c358c04d46749534a3599ec06dc71f7bdd6f1a1 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:37:10 +0000 Subject: [PATCH 04/16] docs: clarify alternating rpm_led_mask example Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/1c5e11a3-1121-4ded-890a-d44821be9fe5 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index c3987eb..2aa03ed 100644 --- a/README.md +++ b/README.md @@ -51,7 +51,7 @@ Send JSON with one of these formats: - `{"rpm_percent": 50}` or `{"rpm_ratio": 0.5}` - `{"rpm": 5000, "max_rpm": 10000}` -`rpm_led_mask` accepts arbitrary 10-bit patterns (not only progressive fill patterns), for example `{"rpm_led_mask": 682}`. +`rpm_led_mask` accepts arbitrary 10-bit patterns (not only progressive fill patterns), for example `{"rpm_led_mask": 682}` for an alternating LED pattern. This lets any game/tool drive the RPM indicator by forwarding telemetry in a simple common format. From c16755e659e6a6a3c7a2c7c6847ae23f0559eda7 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:40:11 +0000 Subject: [PATCH 05/16] refine telemetry parsing and docs for rpm bridge Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/1c5e11a3-1121-4ded-890a-d44821be9fe5 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 2 +- boxflat/telemetry_bridge.py | 23 ++++++++++++----------- 2 files changed, 13 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index 2aa03ed..b592b48 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,7 @@ Boxflat now listens for external telemetry on UDP `127.0.0.1:27194` (override wi Send JSON with one of these formats: - `{"rpm_led_mask": 31}` (direct 10-bit LED mask) -- `{"rpm_percent": 50}` or `{"rpm_ratio": 0.5}` +- `{"rpm_percent": 50}` (0-100) or `{"rpm_ratio": 0.5}` (0.0-1.0) - `{"rpm": 5000, "max_rpm": 10000}` `rpm_led_mask` accepts arbitrary 10-bit patterns (not only progressive fill patterns), for example `{"rpm_led_mask": 682}` for an alternating LED pattern. diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 905ee14..9624b3f 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -21,20 +21,19 @@ def __init__(self, connection_manager: "MozaConnectionManager") -> None: self._last_mask = -1 self._host = "127.0.0.1" + env_port = os.environ.get("BOXFLAT_TELEMETRY_PORT", "27194") try: - self._port = int(os.environ.get("BOXFLAT_TELEMETRY_PORT", "27194")) + self._port = int(env_port) except ValueError: - print("Invalid BOXFLAT_TELEMETRY_PORT, falling back to 27194") + print(f"Invalid BOXFLAT_TELEMETRY_PORT value '{env_port}', falling back to 27194") self._port = 27194 self._thread = Thread(target=self._worker, daemon=True) self._thread.start() - def shutdown(self) -> None: self._shutdown.set() - def _worker(self) -> None: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: try: @@ -59,10 +58,10 @@ def _worker(self) -> None: continue self._last_mask = mask + # Wheel command payload is two bytes (LSB/MSB), while dash accepts full int mask. self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") self._cm.set_setting(mask, "dash-send-telemetry") - def _packet_to_mask(self, payload: bytes) -> int | None: try: data = json.loads(payload.decode()) @@ -78,24 +77,26 @@ def _packet_to_mask(self, payload: bytes) -> int | None: if direct_mask is not None: return max(0, min(1023, int(direct_mask))) - ratio = self._first_number(data, "rpm_percent", "rpmPercent", "rpm_ratio", "rpmRatio") + ratio = self._first_number(data, "rpm_percent", "rpmPercent") + if ratio is not None: + if ratio < 0 or ratio > PERCENT_SCALE: + return None + ratio = ratio / PERCENT_SCALE + else: + ratio = self._first_number(data, "rpm_ratio", "rpmRatio") if ratio is None: rpm = self._first_number(data, "rpm", "engine_rpm", "engineRpm", "current_rpm", "currentRpm") max_rpm = self._first_number(data, "max_rpm", "maxRpm", "maxRPM", "rpm_max", "redline") if rpm is None or max_rpm is None or max_rpm <= 0: return None ratio = rpm / max_rpm - # Treat values in (1, 100] as percentages while keeping ratio=1.0 as 100%. - elif ratio > 1 and ratio <= PERCENT_SCALE: - ratio = ratio / PERCENT_SCALE - elif ratio > PERCENT_SCALE: + elif ratio > 1: return None ratio = max(0, min(1, ratio)) lit_leds = int(round(ratio * 10)) return (1 << lit_leds) - 1 if lit_leds > 0 else 0 - def _first_number(self, data: dict, *keys: str) -> float | None: for key in keys: if key not in data: From a266b09470633ac20296ba895a988d6ee7935859 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sat, 28 Mar 2026 22:52:34 +0000 Subject: [PATCH 06/16] feat: add UI controls for telemetry bridge enable and port Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/990b6761-2069-486e-8e15-75594adf62af Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 4 ++- boxflat/app.py | 46 +++++++++++++++++++++--- boxflat/panels/others.py | 70 +++++++++++++++++++++++++++++++++++++ boxflat/telemetry_bridge.py | 39 ++++++++++++++++----- 4 files changed, 145 insertions(+), 14 deletions(-) diff --git a/README.md b/README.md index b592b48..6cd8820 100644 --- a/README.md +++ b/README.md @@ -44,7 +44,9 @@ Boxflat for Moza Racing. Control your Moza gear settings... and more! - H-Pattern and Sequential settings available for arbitrary HID devices ### Telemetry bridge (all racing games) -Boxflat now listens for external telemetry on UDP `127.0.0.1:27194` (override with `BOXFLAT_TELEMETRY_PORT`). +Boxflat now listens for external telemetry on UDP `127.0.0.1:27194`. +You can change this in the UI under **Other → Application settings** (`Enable telemetry bridge` and `Telemetry bridge UDP port`). +On first launch, `BOXFLAT_TELEMETRY_PORT` is still respected as the initial default. Send JSON with one of these formats: - `{"rpm_led_mask": 31}` (direct 10-bit LED mask) diff --git a/boxflat/app.py b/boxflat/app.py index 0009612..b1f532f 100644 --- a/boxflat/app.py +++ b/boxflat/app.py @@ -10,8 +10,13 @@ from boxflat.connection_manager import MozaConnectionManager from boxflat.hid_handler import HidHandler from boxflat.settings_handler import SettingsHandler -from boxflat.telemetry_bridge import TelemetryBridge -from threading import Thread, Event +from boxflat.telemetry_bridge import ( + TelemetryBridge, + get_telemetry_bridge_port, + DEFAULT_TELEMETRY_ENABLED, + DEFAULT_TELEMETRY_PORT +) +from threading import Thread, Event, Lock import os import subprocess @@ -153,11 +158,24 @@ def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool self._config_path = config_path self._data_path = data_path self._held = Event() + self._telemetry_reload_lock = Lock() self._cm = MozaConnectionManager(os.path.join(data_path, "serial.yml"), dry_run) self._cm.subscribe("hid-device-connected", self._hid_handler.add_device) self._cm.subscribe("hid-device-disconnected", self._hid_handler.remove_device) - self._telemetry_bridge = TelemetryBridge(self._cm) + self._telemetry_bridge = None + + telemetry_enabled = self._settings.read_setting("telemetry-bridge-enabled") + if telemetry_enabled is None: + telemetry_enabled = DEFAULT_TELEMETRY_ENABLED + self._settings.write_setting(telemetry_enabled, "telemetry-bridge-enabled") + + telemetry_port = self._settings.read_setting("telemetry-bridge-port") + if telemetry_port is None: + telemetry_port = get_telemetry_bridge_port() + self._settings.write_setting(telemetry_port, "telemetry-bridge-port") + + self.reload_telemetry_bridge(telemetry_enabled, telemetry_port) with open(os.path.join(data_path, "version"), "r") as version: self._version = version.readline().strip() @@ -335,13 +353,33 @@ def _prepare_settings(self): def _shutdown(self, *_) -> None: - self._telemetry_bridge.shutdown() + if self._telemetry_bridge: + self._telemetry_bridge.shutdown() for panel in self._panels.values(): panel.shutdown() self._cm.shutdown() + def reload_telemetry_bridge(self, enabled: bool, port: int): + """Recreate telemetry bridge using updated settings. + + :param enabled: whether telemetry bridge should run. + :param port: UDP port to bind for telemetry packets. + """ + with self._telemetry_reload_lock: + if self._telemetry_bridge: + self._telemetry_bridge.shutdown() + + try: + port = int(port) + except (TypeError, ValueError): + port = DEFAULT_TELEMETRY_PORT + + port = max(1, min(65535, port)) + self._telemetry_bridge = TelemetryBridge(self._cm, port=port, enabled=bool(enabled)) + + def _activate_default(self) -> SettingsPanel: self._panels["Home"].button.set_active(True) return self._panels["Home"] diff --git a/boxflat/panels/others.py b/boxflat/panels/others.py index 5451cb2..aeb7ab5 100644 --- a/boxflat/panels/others.py +++ b/boxflat/panels/others.py @@ -2,6 +2,7 @@ from boxflat.connection_manager import MozaConnectionManager from boxflat.settings_handler import SettingsHandler +from boxflat.telemetry_bridge import DEFAULT_TELEMETRY_ENABLED, DEFAULT_TELEMETRY_PORT from boxflat.panels import SettingsPanel from boxflat.widgets import * from boxflat.bitwise import * @@ -83,6 +84,20 @@ def prepare_ui(self): fix_row.subscribe(self._hid_handler.set_detection_fix_enabled) fix_row.set_value(self._settings.read_setting("moza-detection-fix-enabled")) + self._add_row(BoxflatSwitchRow("Enable telemetry bridge", "External game telemetry input")) + telemetry_enabled = self._read_setting_default("telemetry-bridge-enabled", DEFAULT_TELEMETRY_ENABLED) + self._current_row.set_value(telemetry_enabled) + self._current_row.subscribe(self._settings.write_setting, "telemetry-bridge-enabled") + self._current_row.subscribe(lambda *_: self._reload_telemetry_bridge()) + + telemetry_port = Adw.EntryRow() + telemetry_port.set_title("Telemetry bridge UDP port") + telemetry_port_value = self._read_setting_default("telemetry-bridge-port", DEFAULT_TELEMETRY_PORT) + telemetry_port.set_text(str(telemetry_port_value)) + telemetry_port.connect("notify::has-focus", lambda widget, _pspec: self._validate_telemetry_port(widget)) + telemetry_port.connect("apply", lambda widget: self._save_telemetry_port(widget)) + self._add_row(telemetry_port) + # Autostart and background stuff hidden = BoxflatSwitchRow("Start hidden") hidden.set_value(self._settings.read_setting("autostart-hidden") or 0) @@ -181,3 +196,58 @@ def _autostart_flatpak(self, enabled: bool) -> None: None, lambda p, t: p.request_background_finish(t) ) + + + def _validate_telemetry_port(self, row: Adw.EntryRow): + if row.has_focus(): + return + + valid, _ = self._get_valid_telemetry_port(row.get_text()) + row.remove_css_class("error") + if not valid and row.get_text() != "": + row.add_css_class("error") + + + def _save_telemetry_port(self, row: Adw.EntryRow) -> None: + valid, result = self._get_valid_telemetry_port(row.get_text()) + if not valid: + row.add_css_class("error") + self.show_toast(result, 2) + return + + row.remove_css_class("error") + self._settings.write_setting(result, "telemetry-bridge-port") + self._reload_telemetry_bridge() + + + def _get_valid_telemetry_port(self, value: str) -> tuple[bool, int | str]: + if value == "": + return False, "Telemetry bridge port must not be empty" + + try: + port = int(value) + except ValueError: + return False, "Telemetry bridge port must be a number" + + if port < 1 or port > 65535: + return False, "Telemetry bridge port must be in range 1-65535" + + return True, port + + + def _reload_telemetry_bridge(self) -> None: + if not self._application: + return + + enabled = self._read_setting_default("telemetry-bridge-enabled", DEFAULT_TELEMETRY_ENABLED) + port = self._read_setting_default("telemetry-bridge-port", DEFAULT_TELEMETRY_PORT) + + if hasattr(self._application, "reload_telemetry_bridge"): + self._application.reload_telemetry_bridge(enabled, port) + + + def _read_setting_default(self, key: str, default): + value = self._settings.read_setting(key) + if value is None: + return default + return value diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 9624b3f..b9dc3ac 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -12,27 +12,34 @@ from boxflat.connection_manager import MozaConnectionManager PERCENT_SCALE = 100 +DEFAULT_TELEMETRY_PORT = 27194 +DEFAULT_TELEMETRY_ENABLED = True +TELEMETRY_SHUTDOWN_TIMEOUT = 1 class TelemetryBridge: - def __init__(self, connection_manager: "MozaConnectionManager") -> None: + def __init__(self, connection_manager: "MozaConnectionManager", port: int = DEFAULT_TELEMETRY_PORT, enabled: bool = DEFAULT_TELEMETRY_ENABLED) -> None: self._cm = connection_manager self._shutdown = Event() self._last_mask = -1 + self._enabled = enabled self._host = "127.0.0.1" - env_port = os.environ.get("BOXFLAT_TELEMETRY_PORT", "27194") - try: - self._port = int(env_port) - except ValueError: - print(f"Invalid BOXFLAT_TELEMETRY_PORT value '{env_port}', falling back to 27194") - self._port = 27194 + self._port = port + + if self._enabled: + self._thread = Thread(target=self._worker, daemon=True) + self._thread.start() + else: + self._thread = None - self._thread = Thread(target=self._worker, daemon=True) - self._thread.start() def shutdown(self) -> None: self._shutdown.set() + # Bridge loop uses a 1s socket timeout, so bounded join is sufficient here. + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=TELEMETRY_SHUTDOWN_TIMEOUT) + def _worker(self) -> None: with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: @@ -62,6 +69,7 @@ def _worker(self) -> None: self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") self._cm.set_setting(mask, "dash-send-telemetry") + def _packet_to_mask(self, payload: bytes) -> int | None: try: data = json.loads(payload.decode()) @@ -97,6 +105,7 @@ def _packet_to_mask(self, payload: bytes) -> int | None: lit_leds = int(round(ratio * 10)) return (1 << lit_leds) - 1 if lit_leds > 0 else 0 + def _first_number(self, data: dict, *keys: str) -> float | None: for key in keys: if key not in data: @@ -109,3 +118,15 @@ def _first_number(self, data: dict, *keys: str) -> float | None: if isinstance(value, int | float): return value return None + + +def get_telemetry_bridge_port(default: int = DEFAULT_TELEMETRY_PORT) -> int: + env_port = os.environ.get("BOXFLAT_TELEMETRY_PORT") + if env_port is None: + return default + + try: + return int(env_port) + except ValueError: + print(f"Invalid BOXFLAT_TELEMETRY_PORT value '{env_port}', falling back to {default}") + return default From 684cd685078b26a6bb3c560917b2ecc04cc213dd Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:30:59 +0000 Subject: [PATCH 07/16] Fix telemetry bridge handling for invalid ratio/mask inputs with debug logging Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/53b64d6c-9bbf-48a3-b975-9e19b7b5719f Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- boxflat/telemetry_bridge.py | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index b9dc3ac..7970b07 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -3,6 +3,7 @@ from __future__ import annotations import json +import math import os import socket from threading import Event, Thread @@ -26,6 +27,7 @@ def __init__(self, connection_manager: "MozaConnectionManager", port: int = DEFA self._host = "127.0.0.1" self._port = port + self._debug = os.environ.get("BOXFLAT_TELEMETRY_DEBUG", "").lower() in ("1", "true", "yes", "on") if self._enabled: self._thread = Thread(target=self._worker, daemon=True) @@ -74,15 +76,21 @@ def _packet_to_mask(self, payload: bytes) -> int | None: try: data = json.loads(payload.decode()) except UnicodeDecodeError: + self._debug_log("ignored packet: invalid UTF-8 payload") return None except json.JSONDecodeError: + self._debug_log("ignored packet: invalid JSON payload") return None if not isinstance(data, dict): + self._debug_log("ignored packet: JSON root is not an object") return None direct_mask = self._first_number(data, "rpm_led_mask", "rpmMask", "rpm-mask", "led_mask") if direct_mask is not None: + if not math.isfinite(direct_mask): + self._debug_log("ignored packet: rpm_led_mask is non-finite") + return None return max(0, min(1023, int(direct_mask))) ratio = self._first_number(data, "rpm_percent", "rpmPercent") @@ -96,16 +104,26 @@ def _packet_to_mask(self, payload: bytes) -> int | None: rpm = self._first_number(data, "rpm", "engine_rpm", "engineRpm", "current_rpm", "currentRpm") max_rpm = self._first_number(data, "max_rpm", "maxRpm", "maxRPM", "rpm_max", "redline") if rpm is None or max_rpm is None or max_rpm <= 0: + self._debug_log("ignored packet: missing or invalid rpm/max_rpm") return None ratio = rpm / max_rpm - elif ratio > 1: + elif ratio < 0 or ratio > 1: + self._debug_log("ignored packet: rpm_ratio outside 0.0-1.0") return None + if not math.isfinite(ratio): + self._debug_log("ignored packet: non-finite rpm ratio") + return None ratio = max(0, min(1, ratio)) lit_leds = int(round(ratio * 10)) return (1 << lit_leds) - 1 if lit_leds > 0 else 0 + def _debug_log(self, message: str) -> None: + if self._debug: + print(f"Telemetry bridge: {message}") + + def _first_number(self, data: dict, *keys: str) -> float | None: for key in keys: if key not in data: From 58ead5e03e49c5b042b38f4e713d363b73e07e0e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:31:37 +0000 Subject: [PATCH 08/16] Harden rpm_led_mask int conversion for extreme numeric inputs Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/53b64d6c-9bbf-48a3-b975-9e19b7b5719f Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- boxflat/telemetry_bridge.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 7970b07..c6aade7 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -91,7 +91,11 @@ def _packet_to_mask(self, payload: bytes) -> int | None: if not math.isfinite(direct_mask): self._debug_log("ignored packet: rpm_led_mask is non-finite") return None - return max(0, min(1023, int(direct_mask))) + try: + return max(0, min(1023, int(direct_mask))) + except (ValueError, OverflowError): + self._debug_log("ignored packet: invalid rpm_led_mask value") + return None ratio = self._first_number(data, "rpm_percent", "rpmPercent") if ratio is not None: From e0aff06835d2a3e53decbeae101f24f82ea0e514 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:32:13 +0000 Subject: [PATCH 09/16] Simplify ratio validation and remove redundant clamping Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/53b64d6c-9bbf-48a3-b975-9e19b7b5719f Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- boxflat/telemetry_bridge.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index c6aade7..1b7fe6f 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -111,14 +111,13 @@ def _packet_to_mask(self, payload: bytes) -> int | None: self._debug_log("ignored packet: missing or invalid rpm/max_rpm") return None ratio = rpm / max_rpm - elif ratio < 0 or ratio > 1: - self._debug_log("ignored packet: rpm_ratio outside 0.0-1.0") - return None if not math.isfinite(ratio): self._debug_log("ignored packet: non-finite rpm ratio") return None - ratio = max(0, min(1, ratio)) + if ratio < 0 or ratio > 1: + self._debug_log("ignored packet: rpm ratio outside 0.0-1.0") + return None lit_leds = int(round(ratio * 10)) return (1 << lit_leds) - 1 if lit_leds > 0 else 0 From af9e007d06e3d45700e27e34b10e31b90f9fa816 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:38:43 +0000 Subject: [PATCH 10/16] Improve telemetry bridge diagnostics for missing ACC packets Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/e9cbd4a0-1f2d-4b62-a50d-77a3c8452217 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 9 +++++++++ boxflat/telemetry_bridge.py | 27 +++++++++++++++++++++++++-- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index 6cd8820..4252aac 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ Boxflat for Moza Racing. Control your Moza gear settings... and more! Boxflat now listens for external telemetry on UDP `127.0.0.1:27194`. You can change this in the UI under **Other → Application settings** (`Enable telemetry bridge` and `Telemetry bridge UDP port`). On first launch, `BOXFLAT_TELEMETRY_PORT` is still respected as the initial default. +Most games (including ACC) do not send this JSON format directly, so you must run/enable a telemetry adapter that forwards game telemetry to this UDP port. Send JSON with one of these formats: - `{"rpm_led_mask": 31}` (direct 10-bit LED mask) @@ -57,6 +58,14 @@ Send JSON with one of these formats: This lets any game/tool drive the RPM indicator by forwarding telemetry in a simple common format. +#### Troubleshooting (no RPM or blinking RPM) +- Start Boxflat from terminal with debug enabled: + - `BOXFLAT_TELEMETRY_DEBUG=1 ./entrypoint.py --local` +- Confirm you see: `Telemetry bridge listening on udp://127.0.0.1:` +- If you then see `has not received packets ...`, your game/adapter is not sending to Boxflat's UDP port. +- If you see `dropped packet ...`, incoming payload format is invalid; inspect the adapter payload and match one JSON format above. +- If you only see `ignored duplicate mask ...`, your adapter is sending unchanged RPM mask values (often when game telemetry output is paused/disabled). + ### Firmware upgrades There are some EEPROM functions available, but I need to do more testing to make sure I won't brick anything. For now, just use Pit House on Windows if you can, as FW upgrade support is not coming in the near future. diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 1b7fe6f..fa0e947 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -16,6 +16,7 @@ DEFAULT_TELEMETRY_PORT = 27194 DEFAULT_TELEMETRY_ENABLED = True TELEMETRY_SHUTDOWN_TIMEOUT = 1 +NO_PACKET_HINT_SECONDS = 10 class TelemetryBridge: @@ -44,6 +45,10 @@ def shutdown(self) -> None: def _worker(self) -> None: + has_received_packet = False + hinted_no_packets = False + waited_for_packets = 0 + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: try: sock.bind((self._host, self._port)) @@ -53,23 +58,41 @@ def _worker(self) -> None: sock.settimeout(1) print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") + print("Telemetry bridge expects UDP JSON packets from a game adapter. Set BOXFLAT_TELEMETRY_DEBUG=1 for packet diagnostics.") while not self._shutdown.is_set(): try: - payload, _ = sock.recvfrom(4096) + payload, source = sock.recvfrom(4096) except socket.timeout: + if not has_received_packet: + waited_for_packets += 1 + if not hinted_no_packets and waited_for_packets >= NO_PACKET_HINT_SECONDS: + print( + f"Telemetry bridge has not received packets on udp://{self._host}:{self._port} yet. " + "Most games (including ACC) need an external telemetry adapter that forwards JSON to this port." + ) + hinted_no_packets = True continue except OSError: break + if not has_received_packet: + print(f"Telemetry bridge received first packet from {source[0]}:{source[1]}") + has_received_packet = True + mask = self._packet_to_mask(payload) - if mask is None or mask == self._last_mask: + if mask is None: + self._debug_log(f"dropped packet from {source[0]}:{source[1]}") + continue + if mask == self._last_mask: + self._debug_log(f"ignored duplicate mask {mask} from {source[0]}:{source[1]}") continue self._last_mask = mask # Wheel command payload is two bytes (LSB/MSB), while dash accepts full int mask. self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") self._cm.set_setting(mask, "dash-send-telemetry") + self._debug_log(f"forwarded mask {mask} from {source[0]}:{source[1]}") def _packet_to_mask(self, payload: bytes) -> int | None: From 118628d217164d1819e9e90a8195bd237ee17183 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:39:16 +0000 Subject: [PATCH 11/16] Polish telemetry diagnostics wording and source formatting Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/e9cbd4a0-1f2d-4b62-a50d-77a3c8452217 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 2 +- boxflat/telemetry_bridge.py | 7 ++++--- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 4252aac..b68f925 100644 --- a/README.md +++ b/README.md @@ -62,7 +62,7 @@ This lets any game/tool drive the RPM indicator by forwarding telemetry in a sim - Start Boxflat from terminal with debug enabled: - `BOXFLAT_TELEMETRY_DEBUG=1 ./entrypoint.py --local` - Confirm you see: `Telemetry bridge listening on udp://127.0.0.1:` -- If you then see `has not received packets ...`, your game/adapter is not sending to Boxflat's UDP port. +- If you then see `Telemetry bridge has not received packets ...`, your game/adapter is not sending to Boxflat's UDP port. - If you see `dropped packet ...`, incoming payload format is invalid; inspect the adapter payload and match one JSON format above. - If you only see `ignored duplicate mask ...`, your adapter is sending unchanged RPM mask values (often when game telemetry output is paused/disabled). diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index fa0e947..bb2adda 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -80,19 +80,20 @@ def _worker(self) -> None: print(f"Telemetry bridge received first packet from {source[0]}:{source[1]}") has_received_packet = True + source_text = f"{source[0]}:{source[1]}" mask = self._packet_to_mask(payload) if mask is None: - self._debug_log(f"dropped packet from {source[0]}:{source[1]}") + self._debug_log(f"dropped packet from {source_text}") continue if mask == self._last_mask: - self._debug_log(f"ignored duplicate mask {mask} from {source[0]}:{source[1]}") + self._debug_log(f"ignored duplicate mask {mask} from {source_text}") continue self._last_mask = mask # Wheel command payload is two bytes (LSB/MSB), while dash accepts full int mask. self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") self._cm.set_setting(mask, "dash-send-telemetry") - self._debug_log(f"forwarded mask {mask} from {source[0]}:{source[1]}") + self._debug_log(f"forwarded mask {mask} from {source_text}") def _packet_to_mask(self, payload: bytes) -> int | None: From b615a734b3cf93d940f45619b903259d3538d338 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:46:43 +0000 Subject: [PATCH 12/16] Clarify Windows Pit House vs Linux telemetry bridge path Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/b3d48a0e-561d-4f2f-b15d-efdbd9ca3bf5 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 5 +++++ boxflat/telemetry_bridge.py | 9 +++++++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/README.md b/README.md index b68f925..463c82b 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,11 @@ You can change this in the UI under **Other → Application settings** (`Enable On first launch, `BOXFLAT_TELEMETRY_PORT` is still respected as the initial default. Most games (including ACC) do not send this JSON format directly, so you must run/enable a telemetry adapter that forwards game telemetry to this UDP port. +#### Why does Pit House on Windows show RPM directly? +Pit House can use game-specific plugins/integrations on Windows, while Boxflat on Linux does not ship those closed game plugins. +So Boxflat uses one generic input path: a local UDP JSON bridge. +If your game is not writing to that bridge, Boxflat has no RPM data to display. + Send JSON with one of these formats: - `{"rpm_led_mask": 31}` (direct 10-bit LED mask) - `{"rpm_percent": 50}` (0-100) or `{"rpm_ratio": 0.5}` (0.0-1.0) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index bb2adda..b5cc0e9 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -58,7 +58,11 @@ def _worker(self) -> None: sock.settimeout(1) print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") - print("Telemetry bridge expects UDP JSON packets from a game adapter. Set BOXFLAT_TELEMETRY_DEBUG=1 for packet diagnostics.") + print( + "Telemetry bridge expects UDP JSON packets from a game adapter. " + "On Windows, Pit House can use game-specific plugins directly; Boxflat relies on this bridge input. " + "Set BOXFLAT_TELEMETRY_DEBUG=1 for packet diagnostics." + ) while not self._shutdown.is_set(): try: @@ -69,7 +73,8 @@ def _worker(self) -> None: if not hinted_no_packets and waited_for_packets >= NO_PACKET_HINT_SECONDS: print( f"Telemetry bridge has not received packets on udp://{self._host}:{self._port} yet. " - "Most games (including ACC) need an external telemetry adapter that forwards JSON to this port." + "Most games (including ACC) need an external telemetry adapter that forwards JSON to this port " + "instead of sending directly like Pit House plugins on Windows." ) hinted_no_packets = True continue From a10803b49bdb5598899074b80036489c8a442751 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 00:51:35 +0000 Subject: [PATCH 13/16] Document ACC telemetry adapter requirement for RPM bridge Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/4264d549-93b0-4fb3-9c3c-6a450ca5d85f Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/README.md b/README.md index 463c82b..b3c16af 100644 --- a/README.md +++ b/README.md @@ -54,6 +54,11 @@ Pit House can use game-specific plugins/integrations on Windows, while Boxflat o So Boxflat uses one generic input path: a local UDP JSON bridge. If your game is not writing to that bridge, Boxflat has no RPM data to display. +#### ACC specifics +ACC telemetry is typically exposed via game UDP/shared-memory interfaces, while Boxflat listens only for local UDP JSON on `127.0.0.1:27194`. +That means ACC data needs a small adapter layer (`ACC source -> Boxflat JSON bridge`) before Boxflat can show RPM LEDs. +In practice, this is why tools on Windows can appear "direct" (game-specific integration), but Boxflat keeps one generic bridge input format across games. + Send JSON with one of these formats: - `{"rpm_led_mask": 31}` (direct 10-bit LED mask) - `{"rpm_percent": 50}` (0-100) or `{"rpm_ratio": 0.5}` (0.0-1.0) From 7cf519ae0ac5bd857254f13dbfdadb2ccb6900f9 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 01:08:26 +0000 Subject: [PATCH 14/16] Add direct ACC shared-memory RPM telemetry source Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/6a1c89eb-7ec8-4c9c-96be-4e80c7880975 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- README.md | 6 +- boxflat/telemetry_bridge.py | 159 ++++++++++++++++++++++++++--- tests/test_telemetry_bridge_acc.py | 21 ++++ 3 files changed, 170 insertions(+), 16 deletions(-) create mode 100644 tests/test_telemetry_bridge_acc.py diff --git a/README.md b/README.md index b3c16af..6a89ddb 100644 --- a/README.md +++ b/README.md @@ -55,9 +55,9 @@ So Boxflat uses one generic input path: a local UDP JSON bridge. If your game is not writing to that bridge, Boxflat has no RPM data to display. #### ACC specifics -ACC telemetry is typically exposed via game UDP/shared-memory interfaces, while Boxflat listens only for local UDP JSON on `127.0.0.1:27194`. -That means ACC data needs a small adapter layer (`ACC source -> Boxflat JSON bridge`) before Boxflat can show RPM LEDs. -In practice, this is why tools on Windows can appear "direct" (game-specific integration), but Boxflat keeps one generic bridge input format across games. +Boxflat now reads ACC RPM directly from ACC shared memory when available, and still supports the UDP JSON bridge on `127.0.0.1:27194`. +So ACC can drive RPM LEDs without a separate adapter in the common local setup. +If shared memory is unavailable, you can still use an external adapter that forwards JSON telemetry to the bridge port. Send JSON with one of these formats: - `{"rpm_led_mask": 31}` (direct 10-bit LED mask) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index b5cc0e9..7283c33 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -4,9 +4,13 @@ import json import math +import mmap import os import socket -from threading import Event, Thread +import struct +import sys +import time +from threading import Event, Lock, Thread from typing import TYPE_CHECKING if TYPE_CHECKING: @@ -17,13 +21,32 @@ DEFAULT_TELEMETRY_ENABLED = True TELEMETRY_SHUTDOWN_TIMEOUT = 1 NO_PACKET_HINT_SECONDS = 10 +ACC_SHARED_MEMORY_POLL_SECONDS = 0.05 +ACC_PHYSICS_RPM_OFFSET = 20 +ACC_STATIC_MAX_RPM_OFFSET = 416 +ACC_INT_SIZE = 4 +ACC_WINDOWS_PHYSICS_MAP = "Local\\acpmf_physics" +ACC_WINDOWS_STATIC_MAP = "Local\\acpmf_static" +ACC_WINDOWS_MAP_SIZE = 4096 +ACC_LINUX_PHYSICS_PATHS = ( + "/dev/shm/acpmf_physics", + "/dev/shm/Local\\acpmf_physics", + "/dev/shm/Local_acpmf_physics", +) +ACC_LINUX_STATIC_PATHS = ( + "/dev/shm/acpmf_static", + "/dev/shm/Local\\acpmf_static", + "/dev/shm/Local_acpmf_static", +) class TelemetryBridge: def __init__(self, connection_manager: "MozaConnectionManager", port: int = DEFAULT_TELEMETRY_PORT, enabled: bool = DEFAULT_TELEMETRY_ENABLED) -> None: self._cm = connection_manager self._shutdown = Event() + self._has_received_input = Event() self._last_mask = -1 + self._mask_lock = Lock() self._enabled = enabled self._host = "127.0.0.1" @@ -33,8 +56,11 @@ def __init__(self, connection_manager: "MozaConnectionManager", port: int = DEFA if self._enabled: self._thread = Thread(target=self._worker, daemon=True) self._thread.start() + self._acc_thread = Thread(target=self._acc_worker, daemon=True) + self._acc_thread.start() else: self._thread = None + self._acc_thread = None def shutdown(self) -> None: @@ -42,6 +68,8 @@ def shutdown(self) -> None: # Bridge loop uses a 1s socket timeout, so bounded join is sufficient here. if self._thread and self._thread.is_alive(): self._thread.join(timeout=TELEMETRY_SHUTDOWN_TIMEOUT) + if self._acc_thread and self._acc_thread.is_alive(): + self._acc_thread.join(timeout=TELEMETRY_SHUTDOWN_TIMEOUT) def _worker(self) -> None: @@ -59,8 +87,7 @@ def _worker(self) -> None: sock.settimeout(1) print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") print( - "Telemetry bridge expects UDP JSON packets from a game adapter. " - "On Windows, Pit House can use game-specific plugins directly; Boxflat relies on this bridge input. " + "Telemetry bridge accepts UDP JSON packets from adapters and also reads ACC shared memory when available. " "Set BOXFLAT_TELEMETRY_DEBUG=1 for packet diagnostics." ) @@ -68,13 +95,13 @@ def _worker(self) -> None: try: payload, source = sock.recvfrom(4096) except socket.timeout: - if not has_received_packet: + if not self._has_received_input.is_set(): waited_for_packets += 1 if not hinted_no_packets and waited_for_packets >= NO_PACKET_HINT_SECONDS: print( f"Telemetry bridge has not received packets on udp://{self._host}:{self._port} yet. " - "Most games (including ACC) need an external telemetry adapter that forwards JSON to this port " - "instead of sending directly like Pit House plugins on Windows." + "For ACC, ensure shared memory is enabled/running. " + "For other games, forward telemetry JSON to this port." ) hinted_no_packets = True continue @@ -84,21 +111,127 @@ def _worker(self) -> None: if not has_received_packet: print(f"Telemetry bridge received first packet from {source[0]}:{source[1]}") has_received_packet = True + self._has_received_input.set() source_text = f"{source[0]}:{source[1]}" mask = self._packet_to_mask(payload) if mask is None: self._debug_log(f"dropped packet from {source_text}") continue - if mask == self._last_mask: - self._debug_log(f"ignored duplicate mask {mask} from {source_text}") + self._forward_mask(mask, source_text) + + + def _acc_worker(self) -> None: + maps: dict | None = None + has_announced = False + while not self._shutdown.is_set(): + if maps is None: + maps = self._open_acc_maps() + if maps is None: + time.sleep(1) continue + if not has_announced: + print("Telemetry bridge ACC source enabled (shared memory).") + has_announced = True + + rpm_data = self._read_acc_rpm_data(maps) + if rpm_data is None: + self._close_acc_maps(maps) + maps = None + continue + + rpm, max_rpm = rpm_data + if max_rpm > 0: + ratio = rpm / max_rpm + if 0 <= ratio <= 1 and math.isfinite(ratio): + lit_leds = int(round(ratio * 10)) + mask = (1 << lit_leds) - 1 if lit_leds > 0 else 0 + self._has_received_input.set() + self._forward_mask(mask, "acc-shm") + time.sleep(ACC_SHARED_MEMORY_POLL_SECONDS) + + if maps is not None: + self._close_acc_maps(maps) + + + def _open_acc_maps(self) -> dict | None: + if sys.platform == "win32": + physics = self._open_windows_map(ACC_WINDOWS_PHYSICS_MAP) + static = self._open_windows_map(ACC_WINDOWS_STATIC_MAP) + if physics is None or static is None: + for mm in (physics, static): + if mm is not None: + mm.close() + return None + return {"physics": physics, "static": static, "files": []} + + physics_file, physics = self._open_linux_map(ACC_LINUX_PHYSICS_PATHS) + static_file, static = self._open_linux_map(ACC_LINUX_STATIC_PATHS) + if physics is None or static is None: + for mm in (physics, static): + if mm is not None: + mm.close() + for handle in (physics_file, static_file): + if handle is not None: + handle.close() + return None + return {"physics": physics, "static": static, "files": [physics_file, static_file]} + + + def _open_windows_map(self, name: str) -> mmap.mmap | None: + try: + return mmap.mmap(-1, ACC_WINDOWS_MAP_SIZE, tagname=name, access=mmap.ACCESS_READ) + except (OSError, TypeError, ValueError): + return None + + + def _open_linux_map(self, paths: tuple[str, ...]) -> tuple[object | None, mmap.mmap | None]: + for path in paths: + try: + file_handle = open(path, "rb") + mm = mmap.mmap(file_handle.fileno(), 0, access=mmap.ACCESS_READ) + return file_handle, mm + except OSError: + continue + return None, None + + + def _close_acc_maps(self, maps: dict) -> None: + for mm in (maps["physics"], maps["static"]): + try: + mm.close() + except OSError: + pass + for file_handle in maps["files"]: + try: + file_handle.close() + except OSError: + pass + + + def _read_acc_rpm_data(self, maps: dict) -> tuple[int, int] | None: + try: + physics = maps["physics"] + static = maps["static"] + rpm = struct.unpack_from(" None: + with self._mask_lock: + if mask == self._last_mask: + self._debug_log(f"ignored duplicate mask {mask} from {source_text}") + return - self._last_mask = mask - # Wheel command payload is two bytes (LSB/MSB), while dash accepts full int mask. - self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") - self._cm.set_setting(mask, "dash-send-telemetry") - self._debug_log(f"forwarded mask {mask} from {source_text}") + self._last_mask = mask + # Wheel command payload is two bytes (LSB/MSB), while dash accepts full int mask. + self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") + self._cm.set_setting(mask, "dash-send-telemetry") + self._debug_log(f"forwarded mask {mask} from {source_text}") def _packet_to_mask(self, payload: bytes) -> int | None: diff --git a/tests/test_telemetry_bridge_acc.py b/tests/test_telemetry_bridge_acc.py new file mode 100644 index 0000000..8711950 --- /dev/null +++ b/tests/test_telemetry_bridge_acc.py @@ -0,0 +1,21 @@ +import unittest + +from boxflat.telemetry_bridge import ACC_PHYSICS_RPM_OFFSET, ACC_STATIC_MAX_RPM_OFFSET, TelemetryBridge + + +class TestTelemetryBridgeACC(unittest.TestCase): + def test_read_acc_rpm_data_reads_expected_offsets(self): + bridge = TelemetryBridge.__new__(TelemetryBridge) + + physics = bytearray(1024) + static = bytearray(1024) + physics[ACC_PHYSICS_RPM_OFFSET : ACC_PHYSICS_RPM_OFFSET + 4] = int(4567).to_bytes(4, "little", signed=True) + static[ACC_STATIC_MAX_RPM_OFFSET : ACC_STATIC_MAX_RPM_OFFSET + 4] = int(9123).to_bytes(4, "little", signed=True) + + rpm_data = TelemetryBridge._read_acc_rpm_data(bridge, {"physics": physics, "static": static}) + + self.assertEqual(rpm_data, (4567, 9123)) + + +if __name__ == "__main__": + unittest.main() From 72cbccececc99614fb439017052f92c205cbc216 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 01:08:45 +0000 Subject: [PATCH 15/16] Polish ACC shared memory telemetry implementation Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/6a1c89eb-7ec8-4c9c-96be-4e80c7880975 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- boxflat/telemetry_bridge.py | 1 - 1 file changed, 1 deletion(-) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 7283c33..412857a 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -24,7 +24,6 @@ ACC_SHARED_MEMORY_POLL_SECONDS = 0.05 ACC_PHYSICS_RPM_OFFSET = 20 ACC_STATIC_MAX_RPM_OFFSET = 416 -ACC_INT_SIZE = 4 ACC_WINDOWS_PHYSICS_MAP = "Local\\acpmf_physics" ACC_WINDOWS_STATIC_MAP = "Local\\acpmf_static" ACC_WINDOWS_MAP_SIZE = 4096 From 14759c677e04c5e61e0e428597d30e6f685528a8 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Sun, 29 Mar 2026 01:09:31 +0000 Subject: [PATCH 16/16] Address review feedback for ACC telemetry bridge changes Agent-Logs-Url: https://github.com/SiFuMax09/boxflat/sessions/6a1c89eb-7ec8-4c9c-96be-4e80c7880975 Co-authored-by: SiFuMax09 <131544883+SiFuMax09@users.noreply.github.com> --- boxflat/telemetry_bridge.py | 18 ++++++++++++------ tests/test_telemetry_bridge_acc.py | 4 +--- 2 files changed, 13 insertions(+), 9 deletions(-) diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py index 412857a..5a64aef 100644 --- a/boxflat/telemetry_bridge.py +++ b/boxflat/telemetry_bridge.py @@ -11,7 +11,7 @@ import sys import time from threading import Event, Lock, Thread -from typing import TYPE_CHECKING +from typing import TYPE_CHECKING, BinaryIO if TYPE_CHECKING: from boxflat.connection_manager import MozaConnectionManager @@ -184,14 +184,20 @@ def _open_windows_map(self, name: str) -> mmap.mmap | None: return None - def _open_linux_map(self, paths: tuple[str, ...]) -> tuple[object | None, mmap.mmap | None]: + def _open_linux_map(self, paths: tuple[str, ...]) -> tuple[BinaryIO | None, mmap.mmap | None]: for path in paths: try: file_handle = open(path, "rb") - mm = mmap.mmap(file_handle.fileno(), 0, access=mmap.ACCESS_READ) - return file_handle, mm except OSError: continue + + try: + mm = mmap.mmap(file_handle.fileno(), 0, access=mmap.ACCESS_READ) + except (OSError, ValueError): + file_handle.close() + continue + + return file_handle, mm return None, None @@ -208,14 +214,14 @@ def _close_acc_maps(self, maps: dict) -> None: pass - def _read_acc_rpm_data(self, maps: dict) -> tuple[int, int] | None: + @staticmethod + def _read_acc_rpm_data(maps: dict) -> tuple[int, int] | None: try: physics = maps["physics"] static = maps["static"] rpm = struct.unpack_from("