1- #!/usr/bin/env python3
1+ # pylint: disable=duplicate-code
22"""Modbus Write Coil Status operation handler"""
3+
34import logging
45
56from pymodbus .exceptions import ConnectionException
@@ -32,12 +33,12 @@ def run(arguments: str | list[str], topic: str | None = None) -> None:
3233 "value": < 0 | 1 >
3334 }
3435
35- 2. New format (coilid -based):
36+ 2. New format (name -based):
3637 {
3738 "timestamp": "2025-09-23T00:00:00Z",
3839 "uuid": "device-id",
3940 "metrics": [{
40- "name": "<coilid >_xxxxxxxx",
41+ "name": "<name >_xxxxxxxx",
4142 "timestamp": "2025-09-23T01:00:00Z",
4243 "value": 0 or 1
4344 }]
@@ -48,53 +49,63 @@ def run(arguments: str | list[str], topic: str | None = None) -> None:
4849 context = Context ()
4950 modbus_config = context .base_config
5051 apply_loglevel (logger , modbus_config )
51- logger .info ("New set coil operation" )
52+ logger .info ("Processing set coil operation" )
5253
54+ # Determine format and extract parameters
5355 if "metrics" in payload and topic :
54- slave_id , coil_number , value_int , ip_address = _process_new_format_coil (
55- payload , topic , context
56- )
56+ params = _process_new_format_coil (payload , topic , context )
5757 else :
58- slave_id , coil_number , value_int , ip_address = _process_legacy_format_coil (
59- payload
60- )
58+ params = _process_legacy_format_coil (payload )
6159
62- # Prepare client (resolve target, backfill defaults, build client)
60+ # Prepare client
6361 client = prepare_client (
64- ip_address ,
65- slave_id ,
62+ params [ " ip_address" ] ,
63+ params [ " slave_id" ] ,
6664 context .config_dir / "devices.toml" ,
6765 modbus_config ,
6866 )
6967
7068 try :
71- coil_value = bool (value_int )
7269 result = client .write_coil (
73- address = coil_number ,
74- value = coil_value ,
75- slave = slave_id ,
70+ address = params [ " coil_number" ] ,
71+ value = bool ( params [ "value" ]) ,
72+ slave = params [ " slave_id" ] ,
7673 )
7774 if result .isError ():
78- raise RuntimeError (f"Failed to write coil { coil_number } : { result } " )
75+ raise RuntimeError (
76+ f"Failed to write coil { params ['coil_number' ]} : { result } "
77+ )
7978 logger .info (
80- "Wrote %s to coil %d on slave %d" , coil_value , coil_number , slave_id
79+ "Wrote %s to coil %d on slave %d" ,
80+ bool (params ["value" ]),
81+ params ["coil_number" ],
82+ params ["slave_id" ],
8183 )
8284 except ConnectionException as err :
83- logger .error ("Connection error while writing to slave %d: %s" , slave_id , err )
85+ logger .error (
86+ "Connection error while writing to slave %d: %s" ,
87+ params ["slave_id" ],
88+ err ,
89+ )
8490 raise
8591 finally :
8692 close_client_quietly (client )
8793
8894
89- def _process_new_format_coil (payload : dict , topic : str , context ):
90- """Process new format coil payload."""
95+ def _process_new_format_coil (payload : dict , topic : str , context ) -> dict :
96+ """Process new format coil payload with metrics array ."""
9197 logger .info ("Processing new format payload with metrics array" )
9298 device_name = extract_device_from_topic (topic )
9399 if not device_name :
94100 raise ValueError (f"Could not extract device name from topic: { topic } " )
95101
96- coil_config , target_device , coil_id , write_value = _match_coil_from_metrics (
97- payload , device_name , context .config_dir / "devices.toml"
102+ coil_config , target_device , coil_id , write_value = _match_from_metrics (
103+ payload ,
104+ device_name ,
105+ context .config_dir / "devices.toml" ,
106+ "coils" ,
107+ "name" ,
108+ int ,
98109 )
99110
100111 if not coil_config :
@@ -103,53 +114,40 @@ def _process_new_format_coil(payload: dict, topic: str, context):
103114 raise ValueError (f"Could not find device '{ device_name } ' in devices.toml" )
104115
105116 logger .info ("Matched CoilID: %s, Value: %s" , coil_id , write_value )
106- coil_num = coil_config .get ("number" )
107- if coil_num is None :
117+
118+ coil_number = coil_config .get ("number" )
119+ if coil_number is None :
108120 raise ValueError (
109- f"Coil configuration missing 'number' field for coilid '{ coil_id } '"
121+ f"Coil configuration missing 'number' field for name '{ coil_id } '"
110122 )
111123
112124 value_int = int (write_value )
113125 if value_int not in (0 , 1 ):
114126 raise ValueError ("Coil value must be 0 or 1" )
115127
116- return (
117- target_device .get ("address " ),
118- coil_num ,
119- value_int ,
120- target_device . get ( "ip" , "" ) ,
121- )
128+ return {
129+ "ip_address" : target_device .get ("ip" , " " ),
130+ "slave_id" : target_device . get ( "address" ) ,
131+ "coil_number" : coil_number ,
132+ "value" : value_int ,
133+ }
122134
123135
124- def _process_legacy_format_coil (payload : dict ):
136+ def _process_legacy_format_coil (payload : dict ) -> dict :
125137 """Process legacy format coil payload."""
126138 logger .info ("Processing legacy format payload" )
127139 try :
128- slave_id = int (payload ["address" ])
129- coil_number = int (payload ["coil" ])
130- value_int = int (payload ["value" ])
140+ value = int (payload ["value" ])
141+ if value not in (0 , 1 ):
142+ raise ValueError ("value must be 0 or 1 for a coil write" )
143+
144+ return {
145+ "ip_address" : payload .get ("ipAddress" , "" ),
146+ "slave_id" : int (payload ["address" ]),
147+ "coil_number" : int (payload ["coil" ]),
148+ "value" : value ,
149+ }
131150 except KeyError as err :
132151 raise ValueError (f"Missing required field: { err } " ) from err
133152 except (TypeError , ValueError ) as err :
134153 raise ValueError (f"Invalid numeric field: { err } " ) from err
135-
136- if value_int not in (0 , 1 ):
137- raise ValueError ("value must be 0 or 1 for a coil write" )
138-
139- return slave_id , coil_number , value_int , payload .get ("ipAddress" , "" )
140-
141-
142- def _match_coil_from_metrics (payload : dict , device_name : str , devices_path ):
143- """Match coil from devices.toml by checking if metric name starts with coilid.
144-
145- Args:
146- payload: Payload containing metrics array
147- device_name: Name of the device to search
148- devices_path: Path to devices.toml
149-
150- Returns:
151- Tuple of (coil_config, target_device, coilid, value)
152- """
153- return _match_from_metrics (
154- payload , device_name , devices_path , "coils" , "coilid" , int
155- )
0 commit comments