diff --git a/README.md b/README.md index 8a12e0d..6a89ddb 100644 --- a/README.md +++ b/README.md @@ -38,12 +38,44 @@ Boxflat for Moza Racing. Control your Moza gear settings... and more! | Generic devices | Detection fix | | ### Ideas -- Telemetry ingestion through REST API/WebSockets - Cammus support - PXN Support - Simagic support - H-Pattern and Sequential settings available for arbitrary HID devices +### Telemetry bridge (all racing games) +Boxflat now listens for external telemetry on UDP `127.0.0.1:27194`. +You can change this in the UI under **Other → Application settings** (`Enable telemetry bridge` and `Telemetry bridge UDP port`). +On first launch, `BOXFLAT_TELEMETRY_PORT` is still respected as the initial default. +Most games (including ACC) do not send this JSON format directly, so you must run/enable a telemetry adapter that forwards game telemetry to this UDP port. + +#### Why does Pit House on Windows show RPM directly? +Pit House can use game-specific plugins/integrations on Windows, while Boxflat on Linux does not ship those closed game plugins. +So Boxflat uses one generic input path: a local UDP JSON bridge. +If your game is not writing to that bridge, Boxflat has no RPM data to display. + +#### ACC specifics +Boxflat now reads ACC RPM directly from ACC shared memory when available, and still supports the UDP JSON bridge on `127.0.0.1:27194`. +So ACC can drive RPM LEDs without a separate adapter in the common local setup. +If shared memory is unavailable, you can still use an external adapter that forwards JSON telemetry to the bridge port. + +Send JSON with one of these formats: +- `{"rpm_led_mask": 31}` (direct 10-bit LED mask) +- `{"rpm_percent": 50}` (0-100) or `{"rpm_ratio": 0.5}` (0.0-1.0) +- `{"rpm": 5000, "max_rpm": 10000}` + +`rpm_led_mask` accepts arbitrary 10-bit patterns (not only progressive fill patterns), for example `{"rpm_led_mask": 682}` for an alternating LED pattern. + +This lets any game/tool drive the RPM indicator by forwarding telemetry in a simple common format. + +#### Troubleshooting (no RPM or blinking RPM) +- Start Boxflat from terminal with debug enabled: + - `BOXFLAT_TELEMETRY_DEBUG=1 ./entrypoint.py --local` +- Confirm you see: `Telemetry bridge listening on udp://127.0.0.1:` +- If you then see `Telemetry bridge has not received packets ...`, your game/adapter is not sending to Boxflat's UDP port. +- If you see `dropped packet ...`, incoming payload format is invalid; inspect the adapter payload and match one JSON format above. +- If you only see `ignored duplicate mask ...`, your adapter is sending unchanged RPM mask values (often when game telemetry output is paused/disabled). + ### Firmware upgrades There are some EEPROM functions available, but I need to do more testing to make sure I won't brick anything. For now, just use Pit House on Windows if you can, as FW upgrade support is not coming in the near future. diff --git a/boxflat/app.py b/boxflat/app.py index 7147462..b1f532f 100644 --- a/boxflat/app.py +++ b/boxflat/app.py @@ -10,7 +10,13 @@ from boxflat.connection_manager import MozaConnectionManager from boxflat.hid_handler import HidHandler from boxflat.settings_handler import SettingsHandler -from threading import Thread, Event +from boxflat.telemetry_bridge import ( + TelemetryBridge, + get_telemetry_bridge_port, + DEFAULT_TELEMETRY_ENABLED, + DEFAULT_TELEMETRY_PORT +) +from threading import Thread, Event, Lock import os import subprocess @@ -134,6 +140,7 @@ class MyApp(Adw.Application): def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool, autostart: bool,**kwargs): super().__init__(**kwargs) self.connect('activate', self.on_activate) + self.connect('shutdown', self._shutdown) self.Tray = None @@ -151,10 +158,24 @@ def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool self._config_path = config_path self._data_path = data_path self._held = Event() + self._telemetry_reload_lock = Lock() self._cm = MozaConnectionManager(os.path.join(data_path, "serial.yml"), dry_run) self._cm.subscribe("hid-device-connected", self._hid_handler.add_device) self._cm.subscribe("hid-device-disconnected", self._hid_handler.remove_device) + self._telemetry_bridge = None + + telemetry_enabled = self._settings.read_setting("telemetry-bridge-enabled") + if telemetry_enabled is None: + telemetry_enabled = DEFAULT_TELEMETRY_ENABLED + self._settings.write_setting(telemetry_enabled, "telemetry-bridge-enabled") + + telemetry_port = self._settings.read_setting("telemetry-bridge-port") + if telemetry_port is None: + telemetry_port = get_telemetry_bridge_port() + self._settings.write_setting(telemetry_port, "telemetry-bridge-port") + + self.reload_telemetry_bridge(telemetry_enabled, telemetry_port) with open(os.path.join(data_path, "version"), "r") as version: self._version = version.readline().strip() @@ -332,12 +353,33 @@ def _prepare_settings(self): def _shutdown(self, *_) -> None: + if self._telemetry_bridge: + self._telemetry_bridge.shutdown() for panel in self._panels.values(): panel.shutdown() self._cm.shutdown() + def reload_telemetry_bridge(self, enabled: bool, port: int): + """Recreate telemetry bridge using updated settings. + + :param enabled: whether telemetry bridge should run. + :param port: UDP port to bind for telemetry packets. + """ + with self._telemetry_reload_lock: + if self._telemetry_bridge: + self._telemetry_bridge.shutdown() + + try: + port = int(port) + except (TypeError, ValueError): + port = DEFAULT_TELEMETRY_PORT + + port = max(1, min(65535, port)) + self._telemetry_bridge = TelemetryBridge(self._cm, port=port, enabled=bool(enabled)) + + def _activate_default(self) -> SettingsPanel: self._panels["Home"].button.set_active(True) return self._panels["Home"] diff --git a/boxflat/panels/others.py b/boxflat/panels/others.py index 5451cb2..aeb7ab5 100644 --- a/boxflat/panels/others.py +++ b/boxflat/panels/others.py @@ -2,6 +2,7 @@ from boxflat.connection_manager import MozaConnectionManager from boxflat.settings_handler import SettingsHandler +from boxflat.telemetry_bridge import DEFAULT_TELEMETRY_ENABLED, DEFAULT_TELEMETRY_PORT from boxflat.panels import SettingsPanel from boxflat.widgets import * from boxflat.bitwise import * @@ -83,6 +84,20 @@ def prepare_ui(self): fix_row.subscribe(self._hid_handler.set_detection_fix_enabled) fix_row.set_value(self._settings.read_setting("moza-detection-fix-enabled")) + self._add_row(BoxflatSwitchRow("Enable telemetry bridge", "External game telemetry input")) + telemetry_enabled = self._read_setting_default("telemetry-bridge-enabled", DEFAULT_TELEMETRY_ENABLED) + self._current_row.set_value(telemetry_enabled) + self._current_row.subscribe(self._settings.write_setting, "telemetry-bridge-enabled") + self._current_row.subscribe(lambda *_: self._reload_telemetry_bridge()) + + telemetry_port = Adw.EntryRow() + telemetry_port.set_title("Telemetry bridge UDP port") + telemetry_port_value = self._read_setting_default("telemetry-bridge-port", DEFAULT_TELEMETRY_PORT) + telemetry_port.set_text(str(telemetry_port_value)) + telemetry_port.connect("notify::has-focus", lambda widget, _pspec: self._validate_telemetry_port(widget)) + telemetry_port.connect("apply", lambda widget: self._save_telemetry_port(widget)) + self._add_row(telemetry_port) + # Autostart and background stuff hidden = BoxflatSwitchRow("Start hidden") hidden.set_value(self._settings.read_setting("autostart-hidden") or 0) @@ -181,3 +196,58 @@ def _autostart_flatpak(self, enabled: bool) -> None: None, lambda p, t: p.request_background_finish(t) ) + + + def _validate_telemetry_port(self, row: Adw.EntryRow): + if row.has_focus(): + return + + valid, _ = self._get_valid_telemetry_port(row.get_text()) + row.remove_css_class("error") + if not valid and row.get_text() != "": + row.add_css_class("error") + + + def _save_telemetry_port(self, row: Adw.EntryRow) -> None: + valid, result = self._get_valid_telemetry_port(row.get_text()) + if not valid: + row.add_css_class("error") + self.show_toast(result, 2) + return + + row.remove_css_class("error") + self._settings.write_setting(result, "telemetry-bridge-port") + self._reload_telemetry_bridge() + + + def _get_valid_telemetry_port(self, value: str) -> tuple[bool, int | str]: + if value == "": + return False, "Telemetry bridge port must not be empty" + + try: + port = int(value) + except ValueError: + return False, "Telemetry bridge port must be a number" + + if port < 1 or port > 65535: + return False, "Telemetry bridge port must be in range 1-65535" + + return True, port + + + def _reload_telemetry_bridge(self) -> None: + if not self._application: + return + + enabled = self._read_setting_default("telemetry-bridge-enabled", DEFAULT_TELEMETRY_ENABLED) + port = self._read_setting_default("telemetry-bridge-port", DEFAULT_TELEMETRY_PORT) + + if hasattr(self._application, "reload_telemetry_bridge"): + self._application.reload_telemetry_bridge(enabled, port) + + + def _read_setting_default(self, key: str, default): + value = self._settings.read_setting(key) + if value is None: + return default + return value diff --git a/boxflat/telemetry_bridge.py b/boxflat/telemetry_bridge.py new file mode 100644 index 0000000..5a64aef --- /dev/null +++ b/boxflat/telemetry_bridge.py @@ -0,0 +1,320 @@ +# Copyright (c) 2025, Tomasz Pakuła Using Arch BTW + +from __future__ import annotations + +import json +import math +import mmap +import os +import socket +import struct +import sys +import time +from threading import Event, Lock, Thread +from typing import TYPE_CHECKING, BinaryIO + +if TYPE_CHECKING: + from boxflat.connection_manager import MozaConnectionManager + +PERCENT_SCALE = 100 +DEFAULT_TELEMETRY_PORT = 27194 +DEFAULT_TELEMETRY_ENABLED = True +TELEMETRY_SHUTDOWN_TIMEOUT = 1 +NO_PACKET_HINT_SECONDS = 10 +ACC_SHARED_MEMORY_POLL_SECONDS = 0.05 +ACC_PHYSICS_RPM_OFFSET = 20 +ACC_STATIC_MAX_RPM_OFFSET = 416 +ACC_WINDOWS_PHYSICS_MAP = "Local\\acpmf_physics" +ACC_WINDOWS_STATIC_MAP = "Local\\acpmf_static" +ACC_WINDOWS_MAP_SIZE = 4096 +ACC_LINUX_PHYSICS_PATHS = ( + "/dev/shm/acpmf_physics", + "/dev/shm/Local\\acpmf_physics", + "/dev/shm/Local_acpmf_physics", +) +ACC_LINUX_STATIC_PATHS = ( + "/dev/shm/acpmf_static", + "/dev/shm/Local\\acpmf_static", + "/dev/shm/Local_acpmf_static", +) + + +class TelemetryBridge: + def __init__(self, connection_manager: "MozaConnectionManager", port: int = DEFAULT_TELEMETRY_PORT, enabled: bool = DEFAULT_TELEMETRY_ENABLED) -> None: + self._cm = connection_manager + self._shutdown = Event() + self._has_received_input = Event() + self._last_mask = -1 + self._mask_lock = Lock() + self._enabled = enabled + + self._host = "127.0.0.1" + self._port = port + self._debug = os.environ.get("BOXFLAT_TELEMETRY_DEBUG", "").lower() in ("1", "true", "yes", "on") + + if self._enabled: + self._thread = Thread(target=self._worker, daemon=True) + self._thread.start() + self._acc_thread = Thread(target=self._acc_worker, daemon=True) + self._acc_thread.start() + else: + self._thread = None + self._acc_thread = None + + + def shutdown(self) -> None: + self._shutdown.set() + # Bridge loop uses a 1s socket timeout, so bounded join is sufficient here. + if self._thread and self._thread.is_alive(): + self._thread.join(timeout=TELEMETRY_SHUTDOWN_TIMEOUT) + if self._acc_thread and self._acc_thread.is_alive(): + self._acc_thread.join(timeout=TELEMETRY_SHUTDOWN_TIMEOUT) + + + def _worker(self) -> None: + has_received_packet = False + hinted_no_packets = False + waited_for_packets = 0 + + with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as sock: + try: + sock.bind((self._host, self._port)) + except OSError as e: + print(f"Telemetry bridge disabled: {e}") + return + + sock.settimeout(1) + print(f"Telemetry bridge listening on udp://{self._host}:{self._port}") + print( + "Telemetry bridge accepts UDP JSON packets from adapters and also reads ACC shared memory when available. " + "Set BOXFLAT_TELEMETRY_DEBUG=1 for packet diagnostics." + ) + + while not self._shutdown.is_set(): + try: + payload, source = sock.recvfrom(4096) + except socket.timeout: + if not self._has_received_input.is_set(): + waited_for_packets += 1 + if not hinted_no_packets and waited_for_packets >= NO_PACKET_HINT_SECONDS: + print( + f"Telemetry bridge has not received packets on udp://{self._host}:{self._port} yet. " + "For ACC, ensure shared memory is enabled/running. " + "For other games, forward telemetry JSON to this port." + ) + hinted_no_packets = True + continue + except OSError: + break + + if not has_received_packet: + print(f"Telemetry bridge received first packet from {source[0]}:{source[1]}") + has_received_packet = True + self._has_received_input.set() + + source_text = f"{source[0]}:{source[1]}" + mask = self._packet_to_mask(payload) + if mask is None: + self._debug_log(f"dropped packet from {source_text}") + continue + self._forward_mask(mask, source_text) + + + def _acc_worker(self) -> None: + maps: dict | None = None + has_announced = False + while not self._shutdown.is_set(): + if maps is None: + maps = self._open_acc_maps() + if maps is None: + time.sleep(1) + continue + if not has_announced: + print("Telemetry bridge ACC source enabled (shared memory).") + has_announced = True + + rpm_data = self._read_acc_rpm_data(maps) + if rpm_data is None: + self._close_acc_maps(maps) + maps = None + continue + + rpm, max_rpm = rpm_data + if max_rpm > 0: + ratio = rpm / max_rpm + if 0 <= ratio <= 1 and math.isfinite(ratio): + lit_leds = int(round(ratio * 10)) + mask = (1 << lit_leds) - 1 if lit_leds > 0 else 0 + self._has_received_input.set() + self._forward_mask(mask, "acc-shm") + time.sleep(ACC_SHARED_MEMORY_POLL_SECONDS) + + if maps is not None: + self._close_acc_maps(maps) + + + def _open_acc_maps(self) -> dict | None: + if sys.platform == "win32": + physics = self._open_windows_map(ACC_WINDOWS_PHYSICS_MAP) + static = self._open_windows_map(ACC_WINDOWS_STATIC_MAP) + if physics is None or static is None: + for mm in (physics, static): + if mm is not None: + mm.close() + return None + return {"physics": physics, "static": static, "files": []} + + physics_file, physics = self._open_linux_map(ACC_LINUX_PHYSICS_PATHS) + static_file, static = self._open_linux_map(ACC_LINUX_STATIC_PATHS) + if physics is None or static is None: + for mm in (physics, static): + if mm is not None: + mm.close() + for handle in (physics_file, static_file): + if handle is not None: + handle.close() + return None + return {"physics": physics, "static": static, "files": [physics_file, static_file]} + + + def _open_windows_map(self, name: str) -> mmap.mmap | None: + try: + return mmap.mmap(-1, ACC_WINDOWS_MAP_SIZE, tagname=name, access=mmap.ACCESS_READ) + except (OSError, TypeError, ValueError): + return None + + + def _open_linux_map(self, paths: tuple[str, ...]) -> tuple[BinaryIO | None, mmap.mmap | None]: + for path in paths: + try: + file_handle = open(path, "rb") + except OSError: + continue + + try: + mm = mmap.mmap(file_handle.fileno(), 0, access=mmap.ACCESS_READ) + except (OSError, ValueError): + file_handle.close() + continue + + return file_handle, mm + return None, None + + + def _close_acc_maps(self, maps: dict) -> None: + for mm in (maps["physics"], maps["static"]): + try: + mm.close() + except OSError: + pass + for file_handle in maps["files"]: + try: + file_handle.close() + except OSError: + pass + + + @staticmethod + def _read_acc_rpm_data(maps: dict) -> tuple[int, int] | None: + try: + physics = maps["physics"] + static = maps["static"] + rpm = struct.unpack_from(" None: + with self._mask_lock: + if mask == self._last_mask: + self._debug_log(f"ignored duplicate mask {mask} from {source_text}") + return + + self._last_mask = mask + # Wheel command payload is two bytes (LSB/MSB), while dash accepts full int mask. + self._cm.set_setting([mask & 255, mask >> 8], "wheel-send-rpm-telemetry") + self._cm.set_setting(mask, "dash-send-telemetry") + self._debug_log(f"forwarded mask {mask} from {source_text}") + + + def _packet_to_mask(self, payload: bytes) -> int | None: + try: + data = json.loads(payload.decode()) + except UnicodeDecodeError: + self._debug_log("ignored packet: invalid UTF-8 payload") + return None + except json.JSONDecodeError: + self._debug_log("ignored packet: invalid JSON payload") + return None + + if not isinstance(data, dict): + self._debug_log("ignored packet: JSON root is not an object") + return None + + direct_mask = self._first_number(data, "rpm_led_mask", "rpmMask", "rpm-mask", "led_mask") + if direct_mask is not None: + if not math.isfinite(direct_mask): + self._debug_log("ignored packet: rpm_led_mask is non-finite") + return None + try: + return max(0, min(1023, int(direct_mask))) + except (ValueError, OverflowError): + self._debug_log("ignored packet: invalid rpm_led_mask value") + return None + + ratio = self._first_number(data, "rpm_percent", "rpmPercent") + if ratio is not None: + if ratio < 0 or ratio > PERCENT_SCALE: + return None + ratio = ratio / PERCENT_SCALE + else: + ratio = self._first_number(data, "rpm_ratio", "rpmRatio") + if ratio is None: + rpm = self._first_number(data, "rpm", "engine_rpm", "engineRpm", "current_rpm", "currentRpm") + max_rpm = self._first_number(data, "max_rpm", "maxRpm", "maxRPM", "rpm_max", "redline") + if rpm is None or max_rpm is None or max_rpm <= 0: + self._debug_log("ignored packet: missing or invalid rpm/max_rpm") + return None + ratio = rpm / max_rpm + + if not math.isfinite(ratio): + self._debug_log("ignored packet: non-finite rpm ratio") + return None + if ratio < 0 or ratio > 1: + self._debug_log("ignored packet: rpm ratio outside 0.0-1.0") + return None + lit_leds = int(round(ratio * 10)) + return (1 << lit_leds) - 1 if lit_leds > 0 else 0 + + + def _debug_log(self, message: str) -> None: + if self._debug: + print(f"Telemetry bridge: {message}") + + + def _first_number(self, data: dict, *keys: str) -> float | None: + for key in keys: + if key not in data: + continue + + value = data[key] + if isinstance(value, bool): + continue + + if isinstance(value, int | float): + return value + return None + + +def get_telemetry_bridge_port(default: int = DEFAULT_TELEMETRY_PORT) -> int: + env_port = os.environ.get("BOXFLAT_TELEMETRY_PORT") + if env_port is None: + return default + + try: + return int(env_port) + except ValueError: + print(f"Invalid BOXFLAT_TELEMETRY_PORT value '{env_port}', falling back to {default}") + return default diff --git a/tests/test_telemetry_bridge_acc.py b/tests/test_telemetry_bridge_acc.py new file mode 100644 index 0000000..094d59d --- /dev/null +++ b/tests/test_telemetry_bridge_acc.py @@ -0,0 +1,19 @@ +import unittest + +from boxflat.telemetry_bridge import ACC_PHYSICS_RPM_OFFSET, ACC_STATIC_MAX_RPM_OFFSET, TelemetryBridge + + +class TestTelemetryBridgeACC(unittest.TestCase): + def test_read_acc_rpm_data_reads_expected_offsets(self): + physics = bytearray(1024) + static = bytearray(1024) + physics[ACC_PHYSICS_RPM_OFFSET : ACC_PHYSICS_RPM_OFFSET + 4] = int(4567).to_bytes(4, "little", signed=True) + static[ACC_STATIC_MAX_RPM_OFFSET : ACC_STATIC_MAX_RPM_OFFSET + 4] = int(9123).to_bytes(4, "little", signed=True) + + rpm_data = TelemetryBridge._read_acc_rpm_data({"physics": physics, "static": static}) + + self.assertEqual(rpm_data, (4567, 9123)) + + +if __name__ == "__main__": + unittest.main()