Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 33 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,44 @@ Boxflat for Moza Racing. Control your Moza gear settings... and more!
| Generic devices | Detection fix | |

### Ideas
- Telemetry ingestion through REST API/WebSockets
- Cammus support
- PXN Support
- Simagic support
- H-Pattern and Sequential settings available for arbitrary HID devices

### Telemetry bridge (all racing games)
Boxflat now listens for external telemetry on UDP `127.0.0.1:27194`.
You can change this in the UI under **Other → Application settings** (`Enable telemetry bridge` and `Telemetry bridge UDP port`).
On first launch, `BOXFLAT_TELEMETRY_PORT` is still respected as the initial default.
Most games (including ACC) do not send this JSON format directly, so you must run/enable a telemetry adapter that forwards game telemetry to this UDP port.

#### Why does Pit House on Windows show RPM directly?
Pit House can use game-specific plugins/integrations on Windows, while Boxflat on Linux does not ship those closed game plugins.
So Boxflat uses one generic input path: a local UDP JSON bridge.
If your game is not writing to that bridge, Boxflat has no RPM data to display.

#### ACC specifics
Boxflat now reads ACC RPM directly from ACC shared memory when available, and still supports the UDP JSON bridge on `127.0.0.1:27194`.
So ACC can drive RPM LEDs without a separate adapter in the common local setup.
If shared memory is unavailable, you can still use an external adapter that forwards JSON telemetry to the bridge port.

Send JSON with one of these formats:
- `{"rpm_led_mask": 31}` (direct 10-bit LED mask)
- `{"rpm_percent": 50}` (0-100) or `{"rpm_ratio": 0.5}` (0.0-1.0)
- `{"rpm": 5000, "max_rpm": 10000}`

`rpm_led_mask` accepts arbitrary 10-bit patterns (not only progressive fill patterns), for example `{"rpm_led_mask": 682}` for an alternating LED pattern.

This lets any game/tool drive the RPM indicator by forwarding telemetry in a simple common format.

#### Troubleshooting (no RPM or blinking RPM)
- Start Boxflat from terminal with debug enabled:
- `BOXFLAT_TELEMETRY_DEBUG=1 ./entrypoint.py --local`
- Confirm you see: `Telemetry bridge listening on udp://127.0.0.1:<port>`
- If you then see `Telemetry bridge has not received packets ...`, your game/adapter is not sending to Boxflat's UDP port.
- If you see `dropped packet ...`, incoming payload format is invalid; inspect the adapter payload and match one JSON format above.
- If you only see `ignored duplicate mask ...`, your adapter is sending unchanged RPM mask values (often when game telemetry output is paused/disabled).

### Firmware upgrades
There are some EEPROM functions available, but I need to do more testing to make sure I won't brick anything. For now, just use Pit House on Windows if you can, as FW upgrade support is not coming in the near future.

Expand Down
44 changes: 43 additions & 1 deletion boxflat/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,13 @@
from boxflat.connection_manager import MozaConnectionManager
from boxflat.hid_handler import HidHandler
from boxflat.settings_handler import SettingsHandler
from threading import Thread, Event
from boxflat.telemetry_bridge import (
TelemetryBridge,
get_telemetry_bridge_port,
DEFAULT_TELEMETRY_ENABLED,
DEFAULT_TELEMETRY_PORT
)
from threading import Thread, Event, Lock

import os
import subprocess
Expand Down Expand Up @@ -134,6 +140,7 @@ class MyApp(Adw.Application):
def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool, autostart: bool,**kwargs):
super().__init__(**kwargs)
self.connect('activate', self.on_activate)
self.connect('shutdown', self._shutdown)

self.Tray = None

Expand All @@ -151,10 +158,24 @@ def __init__(self, data_path: str, config_path: str, dry_run: bool, custom: bool
self._config_path = config_path
self._data_path = data_path
self._held = Event()
self._telemetry_reload_lock = Lock()

self._cm = MozaConnectionManager(os.path.join(data_path, "serial.yml"), dry_run)
self._cm.subscribe("hid-device-connected", self._hid_handler.add_device)
self._cm.subscribe("hid-device-disconnected", self._hid_handler.remove_device)
self._telemetry_bridge = None

telemetry_enabled = self._settings.read_setting("telemetry-bridge-enabled")
if telemetry_enabled is None:
telemetry_enabled = DEFAULT_TELEMETRY_ENABLED
self._settings.write_setting(telemetry_enabled, "telemetry-bridge-enabled")

telemetry_port = self._settings.read_setting("telemetry-bridge-port")
if telemetry_port is None:
telemetry_port = get_telemetry_bridge_port()
self._settings.write_setting(telemetry_port, "telemetry-bridge-port")

self.reload_telemetry_bridge(telemetry_enabled, telemetry_port)

with open(os.path.join(data_path, "version"), "r") as version:
self._version = version.readline().strip()
Expand Down Expand Up @@ -332,12 +353,33 @@ def _prepare_settings(self):


def _shutdown(self, *_) -> None:
if self._telemetry_bridge:
self._telemetry_bridge.shutdown()
for panel in self._panels.values():
panel.shutdown()

self._cm.shutdown()


def reload_telemetry_bridge(self, enabled: bool, port: int):
"""Recreate telemetry bridge using updated settings.

:param enabled: whether telemetry bridge should run.
:param port: UDP port to bind for telemetry packets.
"""
with self._telemetry_reload_lock:
if self._telemetry_bridge:
self._telemetry_bridge.shutdown()

try:
port = int(port)
except (TypeError, ValueError):
port = DEFAULT_TELEMETRY_PORT

port = max(1, min(65535, port))
self._telemetry_bridge = TelemetryBridge(self._cm, port=port, enabled=bool(enabled))


def _activate_default(self) -> SettingsPanel:
self._panels["Home"].button.set_active(True)
return self._panels["Home"]
Expand Down
70 changes: 70 additions & 0 deletions boxflat/panels/others.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from boxflat.connection_manager import MozaConnectionManager
from boxflat.settings_handler import SettingsHandler
from boxflat.telemetry_bridge import DEFAULT_TELEMETRY_ENABLED, DEFAULT_TELEMETRY_PORT
from boxflat.panels import SettingsPanel
from boxflat.widgets import *
from boxflat.bitwise import *
Expand Down Expand Up @@ -83,6 +84,20 @@ def prepare_ui(self):
fix_row.subscribe(self._hid_handler.set_detection_fix_enabled)
fix_row.set_value(self._settings.read_setting("moza-detection-fix-enabled"))

self._add_row(BoxflatSwitchRow("Enable telemetry bridge", "External game telemetry input"))
telemetry_enabled = self._read_setting_default("telemetry-bridge-enabled", DEFAULT_TELEMETRY_ENABLED)
self._current_row.set_value(telemetry_enabled)
self._current_row.subscribe(self._settings.write_setting, "telemetry-bridge-enabled")
self._current_row.subscribe(lambda *_: self._reload_telemetry_bridge())

telemetry_port = Adw.EntryRow()
telemetry_port.set_title("Telemetry bridge UDP port")
telemetry_port_value = self._read_setting_default("telemetry-bridge-port", DEFAULT_TELEMETRY_PORT)
telemetry_port.set_text(str(telemetry_port_value))
telemetry_port.connect("notify::has-focus", lambda widget, _pspec: self._validate_telemetry_port(widget))
telemetry_port.connect("apply", lambda widget: self._save_telemetry_port(widget))
self._add_row(telemetry_port)

# Autostart and background stuff
hidden = BoxflatSwitchRow("Start hidden")
hidden.set_value(self._settings.read_setting("autostart-hidden") or 0)
Expand Down Expand Up @@ -181,3 +196,58 @@ def _autostart_flatpak(self, enabled: bool) -> None:
None,
lambda p, t: p.request_background_finish(t)
)


def _validate_telemetry_port(self, row: Adw.EntryRow):
if row.has_focus():
return

valid, _ = self._get_valid_telemetry_port(row.get_text())
row.remove_css_class("error")
if not valid and row.get_text() != "":
row.add_css_class("error")


def _save_telemetry_port(self, row: Adw.EntryRow) -> None:
valid, result = self._get_valid_telemetry_port(row.get_text())
if not valid:
row.add_css_class("error")
self.show_toast(result, 2)
return

row.remove_css_class("error")
self._settings.write_setting(result, "telemetry-bridge-port")
self._reload_telemetry_bridge()


def _get_valid_telemetry_port(self, value: str) -> tuple[bool, int | str]:
if value == "":
return False, "Telemetry bridge port must not be empty"

try:
port = int(value)
except ValueError:
return False, "Telemetry bridge port must be a number"

if port < 1 or port > 65535:
return False, "Telemetry bridge port must be in range 1-65535"

return True, port


def _reload_telemetry_bridge(self) -> None:
if not self._application:
return

enabled = self._read_setting_default("telemetry-bridge-enabled", DEFAULT_TELEMETRY_ENABLED)
port = self._read_setting_default("telemetry-bridge-port", DEFAULT_TELEMETRY_PORT)

if hasattr(self._application, "reload_telemetry_bridge"):
self._application.reload_telemetry_bridge(enabled, port)


def _read_setting_default(self, key: str, default):
value = self._settings.read_setting(key)
if value is None:
return default
return value
Loading