diff --git a/config/devices.toml b/config/devices.toml index 8328558..3c29e44 100644 --- a/config/devices.toml +++ b/config/devices.toml @@ -19,6 +19,7 @@ multiplier=1 divisor=1 decimalshiftright=0 input=false # true = Input Register false = Holding Register +name="Test_Int16" #see https://thin-edge.github.io/thin-edge.io/html/architecture/thin-edge-json.html measurementmapping.templatestring="{\"Test\":{\"Int16\":%% }}" # tedge JSON format string, %% will be replaced with the calculated value #measurementmapping.combinemeasurements=true # Overrides device setting; Combines all measurements of a device to reduce the number of created measurements in the cloud @@ -34,6 +35,7 @@ divisor=1 decimalshiftright=0 input=false datatype="float" +name="Test_Float32" measurementmapping.templatestring="{\"Test\":{\"Float32\":%% }}" @@ -42,4 +44,5 @@ number=2 input=false alarmmapping.severity="MAJOR" alarmmapping.text="This alarm should be created once" -alarmmapping.type="TestAlarm" \ No newline at end of file +alarmmapping.type="TestAlarm" +name="TestAlarm" \ No newline at end of file diff --git a/images/simulator/modbus.json b/images/simulator/modbus.json index 9f81658..5056c09 100644 --- a/images/simulator/modbus.json +++ b/images/simulator/modbus.json @@ -34,7 +34,7 @@ } }, "invalid": [1], - "write": [3, 48], + "write": [3, 6, 7, 48], "bits": [ 48 ], diff --git a/tedge_modbus/operations/common.py b/tedge_modbus/operations/common.py index f5e82f4..580e36f 100644 --- a/tedge_modbus/operations/common.py +++ b/tedge_modbus/operations/common.py @@ -4,6 +4,7 @@ import json import logging +import re import toml from pymodbus.client import ModbusSerialClient, ModbusTcpClient @@ -32,24 +33,25 @@ def resolve_target_device( Returns (target_device, protocol). """ if ip_address: - ip = ip_address or "127.0.0.1" - protocol = "TCP" target_device = { "protocol": "TCP", - "ip": ip, + "ip": ip_address or "127.0.0.1", "port": 502, "address": slave_id, } - else: - devices_cfg = toml.load(devices_path) - devices = devices_cfg.get("device", []) or [] - target_device = next( - (d for d in devices if d.get("address") == slave_id), None - ) or next((d for d in devices if d.get("protocol") == "TCP"), None) - if target_device is None: - raise ValueError(f"No suitable device found in {devices_path}") - protocol = target_device.get("protocol") - return target_device, protocol # type: ignore[return-value] + return target_device, "TCP" + + devices_cfg = toml.load(devices_path) + devices = devices_cfg.get("device", []) or [] + target_device = next( + (d for d in devices if d.get("address") == slave_id), None + ) or next((d for d in devices if d.get("protocol") == "TCP"), None) + + if target_device is None: + raise ValueError(f"No suitable device found in {devices_path}") + + protocol = target_device.get("protocol", "TCP") + return target_device, protocol def backfill_serial_defaults( @@ -81,7 +83,7 @@ def build_modbus_client(target_device: dict, protocol: str): parity=target_device["parity"], bytesize=target_device["databits"], ) - raise ValueError("Expected protocol to be RTU or TCP. Got " + str(protocol) + ".") + raise ValueError(f"Expected protocol to be RTU or TCP, got {protocol}") def close_client_quietly(client) -> None: @@ -144,3 +146,138 @@ def compute_masked_value( raise ValueError(f"value must be within 0..{max_value} for noBits={num_bits}") mask = ((1 << num_bits) - 1) << start_bit return (current_value & ~mask) | ((write_value << start_bit) & mask) + + +def extract_device_from_topic(topic: str) -> str: + """Extract device-id from topic. + + Expected topic format: te/device////cmd/modbus_Set{Register|Coil}/ + + Supports both SetRegister and SetCoil operations. + + Returns device_id or empty string + """ + # Match pattern like: + # - te/device/TestCase1///cmd/modbus_SetRegister/c8y-mapper-123 + # - te/device/TestCase1///cmd/modbus_SetCoil/c8y-mapper-123 + match = re.search(r"te/device/([^/]+)///cmd/modbus_Set(?:Register|Coil)/.+$", topic) + if match: + return match.group(1) + return "" + + +def _match_from_metrics( # pylint: disable=too-many-arguments + payload: dict, + device_name: str, + devices_path, + config_type: str, + id_key: str, + value_type, +): + """Generic function to match register/coil from devices.toml. + + Args: + payload: Payload containing metrics array + device_name: Name of the device to search + devices_path: Path to devices.toml + config_type: Type of config ("registers" or "coils") + id_key: ID field name ("name" for registers and coils) + value_type: Type to convert value to (float or int) + + Returns: + Tuple of (config_dict, target_device, matched_id, value) + """ + logger = logging.getLogger(__name__) + + metric_name, value = _extract_metric_from_payload(payload, value_type, logger) + target_device = _load_target_device(devices_path, device_name) + if target_device is None: + return None, None, None, None + + configs = target_device.get(config_type, []) or [] + return _match_config_by_id( + configs, id_key, metric_name, target_device, value, logger + ) + + +def _extract_metric_from_payload( + payload: dict, value_type: type, logger +) -> tuple[str, int | float]: + """Extract metric name and value from payload. + + Args: + payload: Payload dictionary + value_type: Type to convert value to (int or float) + logger: Logger instance + + Returns: + Tuple of (metric_name, converted_value) + """ + metrics = payload.get("metrics", []) + if not metrics: + raise ValueError("No metrics found in payload") + + if len(metrics) > 1: + logger.warning("Multiple metrics found, using first one") + + metric = metrics[0] + return metric.get("name", ""), value_type(metric.get("value", 0)) + + +def _load_target_device(devices_path, device_name: str) -> dict | None: + """Load and find target device from devices.toml by name. + + Args: + devices_path: Path to devices.toml file + device_name: Name of the device to find + + Returns: + Device dictionary or None if not found + """ + devices_cfg = toml.load(devices_path) + devices = devices_cfg.get("device", []) or [] + return next((d for d in devices if d.get("name") == device_name), None) + + +def _match_config_by_id( + configs, id_key, metric_name, target_device, value, logger +): # pylint: disable=too-many-arguments + """Match config by checking if metric name starts with or contains id.""" + partial_match = None + + for config in configs: + config_id = config.get(id_key) + if not config_id: + continue + + # Exact prefix match (preferred) + if metric_name.startswith(config_id): + logger.info( + "Matched metric name '%s' with %s '%s'", + metric_name, + id_key, + config_id, + ) + return config, target_device, config_id, value + + # Partial match (fallback) + if config_id in metric_name and partial_match is None: + partial_match = (config, config_id) + + # Return partial match if found + if partial_match: + config, config_id = partial_match + logger.info( + "Matched metric name '%s' with %s '%s' (partial match)", + metric_name, + id_key, + config_id, + ) + return config, target_device, config_id, value + + logger.warning( + "No matching %s found for metric name '%s'", + type(configs).__name__, + metric_name, + ) + return None, None, None, None diff --git a/tedge_modbus/operations/set_coil.py b/tedge_modbus/operations/set_coil.py index 26f90c0..04690af 100644 --- a/tedge_modbus/operations/set_coil.py +++ b/tedge_modbus/operations/set_coil.py @@ -1,5 +1,6 @@ -#!/usr/bin/env python3 +# pylint: disable=duplicate-code """Modbus Write Coil Status operation handler""" + import logging from pymodbus.exceptions import ConnectionException @@ -9,6 +10,8 @@ prepare_client, apply_loglevel, close_client_quietly, + extract_device_from_topic, + _match_from_metrics, ) logger = logging.getLogger(__name__) @@ -17,60 +20,134 @@ ) -def run(arguments: str | list[str]) -> None: +def run(arguments: str | list[str], topic: str | None = None) -> None: """Run set coil operation handler - Expected arguments (JSON): - { - "input": false, - "address": < Fieldbusaddress >, - "coil": < coilnumber >, - "value": < 0 | 1 > - } - Parse JSON payload""" - payload = parse_json_arguments(arguments) - # Create context with default config directory - context = Context() + Supports two payload formats: + + 1. Explicit address format (full coil details): + { + "input": false, + "address": < Fieldbusaddress >, + "coil": < coilnumber >, + "value": < 0 | 1 > + } - # Load configs and set log level + 2. New format (name-based): + { + "timestamp": "2025-09-23T00:00:00Z", + "uuid": "device-id", + "metrics": [{ + "name": "_xxxxxxxx", + "timestamp": "2025-09-23T01:00:00Z", + "value": 0 or 1 + }] + } + Requires topic: te/device////cmd/modbus_SetCoil/ + """ + payload = parse_json_arguments(arguments) + context = Context() modbus_config = context.base_config apply_loglevel(logger, modbus_config) - logger.info("New set coil operation. args=%s", arguments) - - try: - slave_id = int(payload["address"]) # Fieldbus address - coil_number = int(payload["coil"]) # Coil address - value_int = int(payload["value"]) # 0 or 1 - except KeyError as err: - raise ValueError(f"Missing required field: {err}") from err - except (TypeError, ValueError) as err: - raise ValueError(f"Invalid numeric field: {err}") from err + logger.info("Processing set coil operation") - if value_int not in (0, 1): - raise ValueError("value must be 0 or 1 for a coil write") + # Determine format and extract parameters + if "metrics" in payload and topic: + params = _process_new_format_coil(payload, topic, context) + else: + params = _process_explicit_format_coil(payload) - # Prepare client (resolve target, backfill defaults, build client) + # Prepare client client = prepare_client( - payload["ipAddress"], - slave_id, + params["ip_address"], + params["slave_id"], context.config_dir / "devices.toml", modbus_config, ) try: - coil_value = bool(value_int) result = client.write_coil( - address=coil_number, - value=coil_value, - slave=slave_id, + address=params["coil_number"], + value=bool(params["value"]), + slave=params["slave_id"], ) if result.isError(): - raise RuntimeError(f"Failed to write coil {coil_number}: {result}") + raise RuntimeError( + f"Failed to write coil {params['coil_number']}: {result}" + ) logger.info( - "Wrote %s to coil %d on slave %d", coil_value, coil_number, slave_id + "Wrote %s to coil %d on slave %d", + bool(params["value"]), + params["coil_number"], + params["slave_id"], ) except ConnectionException as err: - logger.error("Connection error while writing to slave %d: %s", slave_id, err) + logger.error( + "Connection error while writing to slave %d: %s", + params["slave_id"], + err, + ) raise finally: close_client_quietly(client) + + +def _process_new_format_coil(payload: dict, topic: str, context) -> dict: + """Process new format coil payload with metrics array.""" + logger.info("Processing new format payload with metrics array") + device_name = extract_device_from_topic(topic) + if not device_name: + raise ValueError(f"Could not extract device name from topic: {topic}") + + coil_config, target_device, coil_id, write_value = _match_from_metrics( + payload, + device_name, + context.config_dir / "devices.toml", + "coils", + "name", + int, + ) + + if not coil_config: + raise ValueError("Could not match any coil for metrics in payload") + if not target_device: + raise ValueError(f"Could not find device '{device_name}' in devices.toml") + + logger.info("Matched CoilID: %s, Value: %s", coil_id, write_value) + + coil_number = coil_config.get("number") + if coil_number is None: + raise ValueError( + f"Coil configuration missing 'number' field for name '{coil_id}'" + ) + + value_int = int(write_value) + if value_int not in (0, 1): + raise ValueError("Coil value must be 0 or 1") + + return { + "ip_address": target_device.get("ip", ""), + "slave_id": target_device.get("address"), + "coil_number": coil_number, + "value": value_int, + } + + +def _process_explicit_format_coil(payload: dict) -> dict: + """Process explicit address format coil payload.""" + logger.info("Processing explicit address format payload") + try: + value = int(payload["value"]) + if value not in (0, 1): + raise ValueError("value must be 0 or 1 for a coil write") + + return { + "ip_address": payload.get("ipAddress", ""), + "slave_id": int(payload["address"]), + "coil_number": int(payload["coil"]), + "value": value, + } + except KeyError as err: + raise ValueError(f"Missing required field: {err}") from err + except (TypeError, ValueError) as err: + raise ValueError(f"Invalid numeric field: {err}") from err diff --git a/tedge_modbus/operations/set_register.py b/tedge_modbus/operations/set_register.py index 998845c..e64cd9c 100644 --- a/tedge_modbus/operations/set_register.py +++ b/tedge_modbus/operations/set_register.py @@ -1,6 +1,9 @@ # pylint: disable=duplicate-code """Modbus Write register status operation handler""" + import logging +import struct +import sys from pymodbus.exceptions import ConnectionException from .context import Context @@ -11,6 +14,8 @@ close_client_quietly, parse_register_params, compute_masked_value, + extract_device_from_topic, + _match_from_metrics, ) logger = logging.getLogger(__name__) @@ -19,34 +24,25 @@ ) -def run(arguments: str | list[str]) -> None: +def run(arguments: str | list[str], topic: str | None = None) -> None: """Run set register operation handler - Expected arguments (JSON): - { - "input": false, - "ipAddress": , - "address": , - "register": , - "startBit": , - "noBits": , - "value": - } - Parse JSON arguments. Depending on the caller, we may receive the JSON as a single - string or a list of comma-split segments. Handle both cases robustly.""" + Supports two payload formats: + 1. Explicit address c8y format (full register details) + 2. New format (name-based) + """ payload = parse_json_arguments(arguments) - - # Create context with default config directory context = Context() - - # Load configs and set log level modbus_config = context.base_config apply_loglevel(logger, modbus_config) - logger.info("New set register operation") + logger.info("Processing set register operation") - # Parse required fields from JSON - params = parse_register_params(payload) + # Determine format and extract parameters + if "metrics" in payload and topic: + params = _process_new_format_register(payload, topic, context) + else: + params = _process_explicit_format_register(payload) - # Prepare client (resolve target, backfill defaults, build client) + # Prepare client client = prepare_client( params["ip_address"], params["slave_id"], @@ -54,42 +50,11 @@ def run(arguments: str | list[str]) -> None: modbus_config, ) - # Validate and compute new value - try: - # Read current register value - read_resp = client.read_holding_registers( - address=params["register"], count=1, slave=params["slave_id"] - ) - if read_resp.isError(): - raise RuntimeError( - f"Failed to read register {params['register']}: {read_resp}" - ) - current_value = read_resp.registers[0] & 0xFFFF - new_value = compute_masked_value( - current_value, - params["start_bit"], - params["num_bits"], - params["write_value"], - ) - - # Write back register - write_resp = client.write_register( - address=params["register"], value=new_value, slave=params["slave_id"] - ) - if write_resp.isError(): - raise RuntimeError( - f"Failed to write register {params['register']}: {write_resp}" - ) - logger.info( - "Updated register %d (bits %d..%d) from 0x%04X to 0x%04X on slave %d", - params["register"], - params["start_bit"], - params["start_bit"] + params["num_bits"] - 1, - current_value, - new_value, - params["slave_id"], - ) + if params["is_float"]: + _write_float_registers(client, params) + else: + _write_integer_register(client, params) except ConnectionException as err: logger.error( "Connection error while writing to slave %d: %s", @@ -99,3 +64,169 @@ def run(arguments: str | list[str]) -> None: raise finally: close_client_quietly(client) + + +def _process_new_format_register(payload: dict, topic: str, context) -> dict: + """Process new format register payload with metrics array""" + logger.info("Processing new format payload with metrics array") + device_name = extract_device_from_topic(topic) + if not device_name: + raise ValueError(f"Could not extract device name from topic: {topic}") + + register_config, target_device, register_id, write_value = _match_from_metrics( + payload, + device_name, + context.config_dir / "devices.toml", + "registers", + "name", + float, + ) + + if not register_config: + raise ValueError("Could not match any register for metrics in payload") + if not target_device: + raise ValueError(f"Could not find device '{device_name}' in devices.toml") + + logger.info("Matched RegisterID: %s, Value: %s", register_id, write_value) + + register_num = register_config.get("number") + if register_num is None: + raise ValueError( + f"Register configuration missing 'number' field for name '{register_id}'" + ) + + is_float = register_config.get("datatype") == "float" + params = { + "ip_address": target_device.get("ip", ""), + "slave_id": target_device.get("address"), + "register": register_num, + "start_bit": register_config.get("startbit", 0), + "num_bits": register_config.get("nobits", 16), + "is_float": is_float, + } + + if is_float: + params["write_value"] = write_value + params["little_word_endian"] = target_device.get("littlewordendian", False) + else: + params["write_value"] = int(write_value) + + return params + + +def _process_explicit_format_register(payload: dict) -> dict: + """Process explicit address format register payload.""" + logger.info("Processing explicit address format payload") + params = parse_register_params(payload) + params["is_float"] = False + return params + + +def _write_float_registers(client, params: dict) -> None: + """Write float values to multiple registers.""" + register_pairs = _float_to_register_pairs( + params["write_value"], + params["num_bits"], + params.get("little_word_endian", False), + ) + + logger.info( + "Writing float value %s to registers starting at %d", + params["write_value"], + params["register"], + ) + + write_resp = client.write_registers( + address=params["register"], + values=register_pairs, + slave=params["slave_id"], + ) + if write_resp.isError(): + raise RuntimeError( + f"Failed to write registers {params['register']}: {write_resp}" + ) + + for i, reg_value in enumerate(register_pairs): + logger.info("Wrote 0x%04X to register %d", reg_value, params["register"] + i) + + +def _write_integer_register(client, params: dict) -> None: + """Write integer value to register with bit masking.""" + # Read current register value first + read_resp = client.read_holding_registers( + address=params["register"], count=1, slave=params["slave_id"] + ) + if read_resp.isError(): + raise RuntimeError(f"Failed to read register {params['register']}: {read_resp}") + + current_value = read_resp.registers[0] & 0xFFFF + new_value = compute_masked_value( + current_value, + params["start_bit"], + params["num_bits"], + params["write_value"], + ) + + # Write updated value + write_resp = client.write_register( + address=params["register"], value=new_value, slave=params["slave_id"] + ) + if write_resp.isError(): + raise RuntimeError( + f"Failed to write register {params['register']}: {write_resp}" + ) + + logger.info( + "Updated register %d (bits %d..%d) from 0x%04X to 0x%04X on slave %d", + params["register"], + params["start_bit"], + params["start_bit"] + params["num_bits"] - 1, + current_value, + new_value, + params["slave_id"], + ) + + +def _float_to_register_pairs( + value: float, num_bits: int = 32, little_word_endian: bool = False +) -> list[int]: + """Convert float to register values based on datatype and endianness. + + Ensures cross-platform consistency by using sys.byteorder (same as reader). + Handles endianness reversal to match buffer_register logic in reader. + + Args: + value: Float value to convert + num_bits: Number of bits (16, 32, or 64) + little_word_endian: Whether device uses little word endian + + Returns: + List of register values (as integers) representing the float + """ + byte_order_suffix = ">" if sys.byteorder == "big" else "<" + + if num_bits == 32: + packed = struct.pack(f"{byte_order_suffix}f", value) + reg_low = struct.unpack("