From d8f6f3febdff472912298ea9ec65d1731e03619c Mon Sep 17 00:00:00 2001 From: Aaron Nathan Date: Wed, 24 Dec 2025 23:44:48 -0800 Subject: [PATCH 1/2] adds a simple tool to do baseline device configuration over ip --- bin/config_wizard.py | 340 +++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 340 insertions(+) create mode 100755 bin/config_wizard.py diff --git a/bin/config_wizard.py b/bin/config_wizard.py new file mode 100755 index 0000000..252cc96 --- /dev/null +++ b/bin/config_wizard.py @@ -0,0 +1,340 @@ +#!/usr/bin/env python3 + +""" +Interactive configuration wizard for Point One devices. + +This wizard guides users through configuring key device parameters: +- IMU to body lever arm (X, Y, Z) +- GPS to body lever arm (X, Y, Z) +- Device orientation (Z axis direction, X axis direction) +""" + +import os +import socket +import sys +from urllib.parse import urlparse + +from fusion_engine_client.messages import * + +# Add the parent directory to the search path to enable p1_runner imports. +repo_root = os.path.normpath(os.path.join(os.path.dirname(__file__), '..')) +sys.path.append(repo_root) +sys.path.append(os.path.dirname(__file__)) + +from p1_runner import trace as logging +from p1_runner.data_source import SocketDataSource +from p1_runner.device_interface import DeviceInterface + +logger = logging.getLogger('point_one.config_wizard') + +DEFAULT_TCP_PORT = 30200 + +# Direction options for orientation +DIRECTION_OPTIONS = { + 'forward': Direction.FORWARD, + 'backward': Direction.BACKWARD, + 'left': Direction.LEFT, + 'right': Direction.RIGHT, + 'up': Direction.UP, + 'down': Direction.DOWN, +} + +DIRECTION_NAMES = {v: k for k, v in DIRECTION_OPTIONS.items()} + + +def get_direction_name(direction: Direction) -> str: + """Convert Direction enum to human-readable name.""" + return DIRECTION_NAMES.get(direction, str(direction)) + + +def connect_to_device(ip_address: str) -> tuple: + """ + Connect to device via TCP. + + Returns: + Tuple of (data_source, config_interface) or (None, None) on failure. + """ + try: + # Parse the address + if '://' not in ip_address: + ip_address = f'tcp://{ip_address}' + + parts = urlparse(ip_address) + address = parts.hostname + port = parts.port if parts.port is not None else DEFAULT_TCP_PORT + + if address is None: + print(f"Error: Invalid IP address format.") + return None, None + + print(f"Connecting to {address}:{port}...") + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect((address, port)) + s.settimeout(None) + + data_source = SocketDataSource(s) + config_interface = DeviceInterface(data_source) + + print("Connected successfully.") + return data_source, config_interface + + except Exception as e: + print(f"Error connecting to device: {e}") + return None, None + + +def query_config(config_interface: DeviceInterface, config_type) -> object: + """Query a configuration value from the device.""" + config_interface.get_config(ConfigurationSource.ACTIVE, config_type.GetType()) + resp = config_interface.wait_for_message(ConfigResponseMessage.MESSAGE_TYPE) + + if resp is None: + return None + if resp.response != Response.OK: + return None + + return resp.config_object + + +def apply_config(config_interface: DeviceInterface, config_object, save: bool = False) -> bool: + """Apply a configuration value to the device.""" + config_interface.set_config(config_object, save=save) + resp = config_interface.wait_for_message(CommandResponseMessage.MESSAGE_TYPE) + + if not isinstance(resp, CommandResponseMessage): + return False + if resp.response != Response.OK: + print(f"Error: {resp.response}") + return False + + return True + + +def save_all_config(config_interface: DeviceInterface) -> bool: + """Save all configuration to persistent storage.""" + config_interface.send_save(SaveAction.SAVE) + resp = config_interface.wait_for_message(CommandResponseMessage.MESSAGE_TYPE) + + if not isinstance(resp, CommandResponseMessage): + return False + if resp.response != Response.OK: + print(f"Error saving: {resp.response}") + return False + + return True + + +def prompt_float(prompt: str, current_value: float) -> float: + """Prompt for a float value, showing current value as default.""" + while True: + response = input(f"{prompt} [{current_value:.3f}]: ").strip() + if response == '': + return current_value + try: + return float(response) + except ValueError: + print("Please enter a valid number.") + + +def prompt_direction(prompt: str, current_value: Direction, valid_options: list = None) -> Direction: + """Prompt for a direction value.""" + if valid_options is None: + valid_options = list(DIRECTION_OPTIONS.keys()) + + current_name = get_direction_name(current_value) + options_str = ', '.join(valid_options) + + while True: + response = input(f"{prompt} ({options_str}) [{current_name}]: ").strip().lower() + if response == '': + return current_value + if response in DIRECTION_OPTIONS and response in valid_options: + return DIRECTION_OPTIONS[response] + print(f"Please enter one of: {options_str}") + + +def prompt_yes_no(prompt: str, default: bool = True) -> bool: + """Prompt for a yes/no response.""" + default_str = "Y/n" if default else "y/N" + while True: + response = input(f"{prompt} [{default_str}]: ").strip().lower() + if response == '': + return default + if response in ('y', 'yes'): + return True + if response in ('n', 'no'): + return False + print("Please enter y or n.") + + +def print_section(title: str): + """Print a section header.""" + print() + print("=" * 60) + print(f" {title}") + print("=" * 60) + + +def print_lever_arm(name: str, config): + """Print lever arm values.""" + print(f" {name}: X={config.x:.3f}m, Y={config.y:.3f}m, Z={config.z:.3f}m") + + +def print_orientation(config): + """Print orientation values.""" + x_name = get_direction_name(config.x_direction) + z_name = get_direction_name(config.z_direction) + print(f" Orientation: X-axis={x_name}, Z-axis={z_name}") + + +def main(): + logging.basicConfig( + level=logging.WARNING, + format='%(message)s', + stream=sys.stdout + ) + + print() + print("=" * 60) + print(" Point One Device Configuration Wizard") + print("=" * 60) + print() + + # Get device IP + ip_address = input("Enter device IP address [192.168.0.1]: ").strip() + if ip_address == '': + ip_address = "192.168.0.1" + + # Connect to device + data_source, config_interface = connect_to_device(ip_address) + if config_interface is None: + sys.exit(1) + + try: + # Query current values + print() + print("Querying current configuration...") + + device_lever_arm = query_config(config_interface, DeviceLeverArmConfig) + gnss_lever_arm = query_config(config_interface, GNSSLeverArmConfig) + orientation = query_config(config_interface, DeviceCourseOrientationConfig) + + if device_lever_arm is None or gnss_lever_arm is None or orientation is None: + print("Error: Failed to query current configuration.") + sys.exit(1) + + # Display current values + print_section("Current Configuration") + print_lever_arm("IMU Lever Arm", device_lever_arm) + print_lever_arm("GPS Lever Arm", gnss_lever_arm) + print_orientation(orientation) + + # Track what changed + changes = [] + + # IMU Lever Arm + print_section("IMU to Body Lever Arm (meters)") + print("Enter the offset from the vehicle body origin to the IMU.") + print(" +X = Forward, +Y = Left, +Z = Up") + print() + + new_device_x = prompt_float(" X (forward)", device_lever_arm.x) + new_device_y = prompt_float(" Y (left)", device_lever_arm.y) + new_device_z = prompt_float(" Z (up)", device_lever_arm.z) + + if (new_device_x != device_lever_arm.x or + new_device_y != device_lever_arm.y or + new_device_z != device_lever_arm.z): + new_device_lever_arm = DeviceLeverArmConfig(new_device_x, new_device_y, new_device_z) + changes.append(('IMU Lever Arm', new_device_lever_arm)) + + # GPS Lever Arm + print_section("GPS Antenna to Body Lever Arm (meters)") + print("Enter the offset from the vehicle body origin to the GPS antenna.") + print(" +X = Forward, +Y = Left, +Z = Up") + print() + + new_gnss_x = prompt_float(" X (forward)", gnss_lever_arm.x) + new_gnss_y = prompt_float(" Y (left)", gnss_lever_arm.y) + new_gnss_z = prompt_float(" Z (up)", gnss_lever_arm.z) + + if (new_gnss_x != gnss_lever_arm.x or + new_gnss_y != gnss_lever_arm.y or + new_gnss_z != gnss_lever_arm.z): + new_gnss_lever_arm = GNSSLeverArmConfig(new_gnss_x, new_gnss_y, new_gnss_z) + changes.append(('GPS Lever Arm', new_gnss_lever_arm)) + + # Orientation + print_section("Device Orientation") + print("Specify how the device is mounted relative to the vehicle body.") + print(" Vehicle body: +X = Forward, +Y = Left, +Z = Up") + print() + + new_z_dir = prompt_direction( + " Device Z-axis points", + orientation.z_direction, + ['up', 'down', 'forward', 'backward', 'left', 'right'] + ) + new_x_dir = prompt_direction( + " Device X-axis points", + orientation.x_direction, + ['forward', 'backward', 'left', 'right', 'up', 'down'] + ) + + if new_z_dir != orientation.z_direction or new_x_dir != orientation.x_direction: + new_orientation = DeviceCourseOrientationConfig(new_x_dir, new_z_dir) + changes.append(('Orientation', new_orientation)) + + # Summary and confirmation + if not changes: + print() + print("No changes made.") + sys.exit(0) + + print_section("Summary of Changes") + for name, config in changes: + if isinstance(config, DeviceCourseOrientationConfig): + x_name = get_direction_name(config.x_direction) + z_name = get_direction_name(config.z_direction) + print(f" {name}: X-axis={x_name}, Z-axis={z_name}") + else: + print(f" {name}: X={config.x:.3f}m, Y={config.y:.3f}m, Z={config.z:.3f}m") + + print() + if not prompt_yes_no("Apply these changes?", default=True): + print("Changes cancelled.") + sys.exit(0) + + # Apply changes + print() + print("Applying changes...") + for name, config in changes: + print(f" Setting {name}...", end=' ') + if apply_config(config_interface, config): + print("OK") + else: + print("FAILED") + sys.exit(1) + + # Save to persistent storage + print() + if prompt_yes_no("Save changes to persistent storage?", default=True): + print("Saving configuration...", end=' ') + if save_all_config(config_interface): + print("OK") + else: + print("FAILED") + sys.exit(1) + else: + print("Changes applied but NOT saved. They will be lost on reboot.") + + print() + print("Configuration complete!") + + finally: + data_source.stop() + + +if __name__ == "__main__": + main() From d8edfbeed5e6af5cd36313498692f31b65b4f3bb Mon Sep 17 00:00:00 2001 From: Aaron Date: Thu, 25 Dec 2025 18:12:22 -0800 Subject: [PATCH 2/2] add serial port support to wizard updates readme and clarifies some language adds spaces for units adds configurable serial baud, minor reademe tweaks --- README.md | 25 ++++++- bin/config_wizard.py | 156 +++++++++++++++++++++++++++++++++---------- 2 files changed, 143 insertions(+), 38 deletions(-) diff --git a/README.md b/README.md index 6d3e29d..12fcfc5 100644 --- a/README.md +++ b/README.md @@ -16,8 +16,10 @@ This application and additional documentation on Point One devices, protocols, a - [`p1_runner` - Log Data And Receive GNSS Corrections In Real Time](#p1_runner---log-data-and-receive-gnss-corrections-in-real-time) - [Basic Usage](#basic-usage) - [Sending GNSS Corrections](#sending-gnss-corrections) - - [`config_tool` - Read/Write Device Configuration](#config_tool---readwrite-device-configuration) + - [`config_wizard` - Simple Setup Wizard](#config_wizard---simple-setup-wizard) - [Basic Usage](#basic-usage-1) + - [`config_tool` - Read/Write Device Configuration](#config_tool---readwrite-device-configuration) + - [Basic Usage](#basic-usage-2) - [Saving Changes](#saving-changes) - [`device_bridge` - Connect Two Devices Through The Host Computer](#device_bridge---connect-two-devices-through-the-host-computer) @@ -87,7 +89,7 @@ The following sections cover the most common use case. See `runner.py --help` fo ### Basic Usage -1. Connect a USB cable from the Point One device to your host computer. +1. If your device is USB, connect it your host computer. 2. If used, activate the Python virtual environment as described in [Setup / Installation](#setup--installation). 3. Run p1_runner to connect the device. @@ -123,6 +125,23 @@ To use another NTRIP service, use the `--ntrip` argument, specifying URL, mountp $ python3 bin/runner.py --ntrip http://corrections.com:2101,my_mountpoint,my_username,my_password ``` +## `config_wizard` - Simple Setup Wizard + +`config_wizard.py` is an easy to use tool to query and update basic settings and stored data of a Point One device. + +### Basic Usage + +1. Connect a USB cable from the Point One device to your host computer (if USB). For IP based devices, get your device's IP address. +2. If used, activate the Python virtual environment as described in [Setup / Installation](#setup--installation). +3. Run config_tool to connect the device. + + Linux: `python3 bin/config_wizard.py ADDRESS` + + Windows: `python bin/config_wizard.py ADDRESS` + + Where `ADDRESS` is something like 192.168.0.1, COM3, /dev/ttyUSB0, or serial://COM3. + + ## `config_tool` - Read/Write Device Configuration `config_tool.py` can be used to query and update the setting and stored data on a Point One device. @@ -131,7 +150,7 @@ The following sections cover the most common use case. See `config_tool.py --hel ### Basic Usage -1. Connect a USB cable from the Point One device to your host computer. +1. Connect a USB cable from the Point One device to your host computer (if used). For IP based devices, get your device's IP address. 2. If used, activate the Python virtual environment as described in [Setup / Installation](#setup--installation). 3. Run config_tool to connect the device. diff --git a/bin/config_wizard.py b/bin/config_wizard.py index 252cc96..b4b664a 100755 --- a/bin/config_wizard.py +++ b/bin/config_wizard.py @@ -4,8 +4,8 @@ Interactive configuration wizard for Point One devices. This wizard guides users through configuring key device parameters: -- IMU to body lever arm (X, Y, Z) -- GPS to body lever arm (X, Y, Z) +- Body to IMU lever arm (X, Y, Z) +- Body to GPS lever arm (X, Y, Z) - Device orientation (Z axis direction, X axis direction) """ @@ -14,6 +14,7 @@ import sys from urllib.parse import urlparse +import serial from fusion_engine_client.messages import * # Add the parent directory to the search path to enable p1_runner imports. @@ -22,12 +23,13 @@ sys.path.append(os.path.dirname(__file__)) from p1_runner import trace as logging -from p1_runner.data_source import SocketDataSource +from p1_runner.data_source import SocketDataSource, SerialDataSource from p1_runner.device_interface import DeviceInterface logger = logging.getLogger('point_one.config_wizard') DEFAULT_TCP_PORT = 30200 +DEFAULT_SERIAL_BAUD = 460800 # Direction options for orientation DIRECTION_OPTIONS = { @@ -47,37 +49,116 @@ def get_direction_name(direction: Direction) -> str: return DIRECTION_NAMES.get(direction, str(direction)) -def connect_to_device(ip_address: str) -> tuple: +def parse_serial_address(address: str) -> tuple: """ - Connect to device via TCP. + Parse a serial port address with optional baud rate. + + Supports formats: + - "COM3" or "/dev/ttyUSB0" (uses default baud) + - "COM3:115200" or "/dev/ttyUSB0:115200" (explicit baud) Returns: - Tuple of (data_source, config_interface) or (None, None) on failure. + Tuple of (port, baud_rate) """ - try: - # Parse the address - if '://' not in ip_address: - ip_address = f'tcp://{ip_address}' + # Check for baud rate suffix (e.g., COM3:115200 or /dev/ttyUSB0:115200) + # For /dev paths, only split on the last colon to handle the path correctly + if address.startswith('/dev/'): + # Linux/Mac path - baud rate comes after the device name + if ':' in address[5:]: # Check for colon after /dev/ + last_colon = address.rfind(':') + port = address[:last_colon] + try: + baud = int(address[last_colon + 1:]) + return port, baud + except ValueError: + pass + return address, DEFAULT_SERIAL_BAUD + else: + # Windows COM port or other + if ':' in address: + port, baud_str = address.rsplit(':', 1) + try: + baud = int(baud_str) + return port, baud + except ValueError: + pass + return address, DEFAULT_SERIAL_BAUD + + +def connect_to_device(address: str) -> tuple: + """ + Connect to device via TCP or serial port. - parts = urlparse(ip_address) - address = parts.hostname - port = parts.port if parts.port is not None else DEFAULT_TCP_PORT + Supports multiple formats: + - IP address: "192.168.0.1" or "tcp://192.168.0.1:30200" + - Serial port: "COM3", "COM3:115200", "/dev/ttyUSB0", "/dev/ttyUSB0:115200" + - Serial URL: "serial://COM3:115200" or "serial:///dev/ttyUSB0:115200" - if address is None: - print(f"Error: Invalid IP address format.") - return None, None + IP connections are the default if no scheme is specified and the address looks like an IP. - print(f"Connecting to {address}:{port}...") - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - s.settimeout(5.0) - s.connect((address, port)) - s.settimeout(None) + Returns: + Tuple of (data_source, config_interface) or (None, None) on failure. + """ + try: + # Determine connection type + is_serial = False + baud_rate = DEFAULT_SERIAL_BAUD + + if '://' in address: + parts = urlparse(address) + scheme = parts.scheme.lower() + + if scheme == 'serial': + is_serial = True + # Handle both serial://COM3 and serial:///dev/ttyUSB0 + serial_addr = parts.path if parts.path else parts.netloc + serial_port, baud_rate = parse_serial_address(serial_addr) + elif scheme == 'tcp': + is_serial = False + host = parts.hostname + port = parts.port if parts.port is not None else DEFAULT_TCP_PORT + else: + print(f"Error: Unknown scheme '{scheme}'. Use 'tcp://' or 'serial://'") + return None, None + else: + # No scheme specified - determine based on the address format + # If it looks like a COM port or /dev path, treat as serial + if address.upper().startswith('COM') or address.startswith('/dev/'): + is_serial = True + serial_port, baud_rate = parse_serial_address(address) + else: + # Default to TCP for IP addresses + is_serial = False + host = address + port = DEFAULT_TCP_PORT + + # Connect based on type + if is_serial: + print(f"Connecting to serial port {serial_port} at {baud_rate} baud...") + ser = serial.Serial( + port=serial_port, + baudrate=baud_rate, + timeout=1.0 + ) + + data_source = SerialDataSource(ser) + data_source.start_read_thread() + config_interface = DeviceInterface(data_source) + + print("Connected successfully.") + return data_source, config_interface + else: + print(f"Connecting to {host}:{port}...") + s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + s.settimeout(5.0) + s.connect((host, port)) + s.settimeout(None) - data_source = SocketDataSource(s) - config_interface = DeviceInterface(data_source) + data_source = SocketDataSource(s) + config_interface = DeviceInterface(data_source) - print("Connected successfully.") - return data_source, config_interface + print("Connected successfully.") + return data_source, config_interface except Exception as e: print(f"Error connecting to device: {e}") @@ -178,7 +259,7 @@ def print_section(title: str): def print_lever_arm(name: str, config): """Print lever arm values.""" - print(f" {name}: X={config.x:.3f}m, Y={config.y:.3f}m, Z={config.z:.3f}m") + print(f" {name}: X={config.x:.3f} m, Y={config.y:.3f} m, Z={config.z:.3f} m") def print_orientation(config): @@ -201,13 +282,16 @@ def main(): print("=" * 60) print() - # Get device IP - ip_address = input("Enter device IP address [192.168.0.1]: ").strip() - if ip_address == '': - ip_address = "192.168.0.1" + # Get device address (IP or serial port) + print("Connect via IP address or serial port.") + print(" Examples: 192.168.0.1, COM3, /dev/ttyUSB0:115200") + print() + address = input("Enter device address/port [127.0.0.1]: ").strip() + if address == '': + address = "127.0.0.1" # Connect to device - data_source, config_interface = connect_to_device(ip_address) + data_source, config_interface = connect_to_device(address) if config_interface is None: sys.exit(1) @@ -235,7 +319,8 @@ def main(): # IMU Lever Arm print_section("IMU to Body Lever Arm (meters)") - print("Enter the offset from the vehicle body origin to the IMU.") + print("Enter the distance from the vehicle body origin to the IMU.") + print("The vehicle body origin is usually the center of the rear axle.") print(" +X = Forward, +Y = Left, +Z = Up") print() @@ -250,8 +335,9 @@ def main(): changes.append(('IMU Lever Arm', new_device_lever_arm)) # GPS Lever Arm - print_section("GPS Antenna to Body Lever Arm (meters)") - print("Enter the offset from the vehicle body origin to the GPS antenna.") + print_section("Body to GPS Antenna Lever Arm (meters)") + print("Enter the distance from the vehicle body origin to the GPS antenna.") + print("The vehicle body origin is usually the center of the rear axle.") print(" +X = Forward, +Y = Left, +Z = Up") print() @@ -299,7 +385,7 @@ def main(): z_name = get_direction_name(config.z_direction) print(f" {name}: X-axis={x_name}, Z-axis={z_name}") else: - print(f" {name}: X={config.x:.3f}m, Y={config.y:.3f}m, Z={config.z:.3f}m") + print(f" {name}: X={config.x:.3f} m, Y={config.y:.3f} m, Z={config.z:.3f} m") print() if not prompt_yes_no("Apply these changes?", default=True):