diff --git a/python_otbr_api/models.py b/python_otbr_api/models.py index b1e9b93..cd07dbd 100644 --- a/python_otbr_api/models.py +++ b/python_otbr_api/models.py @@ -44,6 +44,9 @@ } +_PASCAL_TO_CAMEL: dict[str, str] = {v: k for k, v in _CAMEL_TO_PASCAL.items()} + + def _normalize_keys(data: Any) -> Any: """Normalize camelCase JSON keys to PascalCase. @@ -59,6 +62,21 @@ def _normalize_keys(data: Any) -> Any: } +def _to_camel_keys(data: dict) -> dict: + """Convert PascalCase JSON keys to camelCase for serialization. + + The OTBR REST API expects camelCase keys (per the OpenAPI spec in + ot-br-posix). This function converts the internal PascalCase keys to + camelCase. Unknown keys and non-dict values pass through unchanged. + """ + return { + _PASCAL_TO_CAMEL.get(k, k): ( + _to_camel_keys(v) if isinstance(v, dict) else v + ) + for k, v in data.items() + } + + @dataclass class Timestamp: """Timestamp.""" @@ -84,7 +102,7 @@ def as_json(self) -> dict: result["Seconds"] = self.seconds if self.ticks is not None: result["Ticks"] = self.ticks - return result + return _to_camel_keys(result) @classmethod def from_json(cls, json_data: Any) -> Timestamp: @@ -151,7 +169,7 @@ def as_json(self) -> dict: result["Routers"] = self.routers if self.to_ble_link is not None: result["TobleLink"] = self.to_ble_link - return result + return _to_camel_keys(result) @classmethod def from_json(cls, json_data: Any) -> SecurityPolicy: @@ -225,7 +243,7 @@ def as_json(self) -> dict: result["PSKc"] = self.psk_c if self.security_policy is not None: result["SecurityPolicy"] = self.security_policy.as_json() - return result + return _to_camel_keys(result) @classmethod def from_json(cls, json_data: Any) -> ActiveDataSet: @@ -278,7 +296,7 @@ def as_json(self) -> dict: result["Delay"] = self.delay if self.pending_timestamp is not None: result["PendingTimestamp"] = self.pending_timestamp.as_json() - return result + return _to_camel_keys(result) @classmethod def from_json(cls, json_data: Any) -> PendingDataSet: diff --git a/tests/test_init.py b/tests/test_init.py index 94e8e16..40a4e07 100644 --- a/tests/test_init.py +++ b/tests/test_init.py @@ -170,7 +170,31 @@ async def test_get_active_dataset(aioclient_mock: AiohttpClientMocker): DATASET_JSON["PSKc"], security_policy, ) - assert active_dataset.as_json() == DATASET_JSON + # as_json() now emits camelCase keys matching the OTBR REST API spec + camel_dataset = { + "activeTimestamp": {"authoritative": False, "seconds": 1, "ticks": 0}, + "channelMask": 134215680, + "channel": 15, + "extPanId": "8478E3379E047B92", + "meshLocalPrefix": "fd89:bde7:42ed:a901::/64", + "networkKey": "96271D6ECC78749114AB6A591E0D06F1", + "networkName": "OpenThread HA", + "panId": 33991, + "pskc": "9760C89414D461AC717DCD105EB87E5B", + "securityPolicy": { + "autonomousEnrollment": False, + "commercialCommissioning": False, + "externalCommissioning": True, + "nativeCommissioning": True, + "networkKeyProvisioning": False, + "nonCcmRouters": False, + "obtainNetworkKey": True, + "rotationTime": 672, + "routers": True, + "tobleLink": True, + }, + } + assert active_dataset.as_json() == camel_dataset async def test_get_active_dataset_empty(aioclient_mock: AiohttpClientMocker): @@ -245,7 +269,7 @@ async def test_create_active_dataset(aioclient_mock: AiohttpClientMocker): assert aioclient_mock.call_count == 2 assert aioclient_mock.mock_calls[-1][0] == "PUT" assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/active" - assert aioclient_mock.mock_calls[-1][2] == {"NetworkName": "OpenThread HA"} + assert aioclient_mock.mock_calls[-1][2] == {"networkName": "OpenThread HA"} await otbr.create_active_dataset( python_otbr_api.ActiveDataSet(network_name="OpenThread HA", channel=15) @@ -254,8 +278,8 @@ async def test_create_active_dataset(aioclient_mock: AiohttpClientMocker): assert aioclient_mock.mock_calls[-1][0] == "PUT" assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/active" assert aioclient_mock.mock_calls[-1][2] == { - "NetworkName": "OpenThread HA", - "Channel": 15, + "networkName": "OpenThread HA", + "channel": 15, } @@ -295,11 +319,11 @@ async def test_create_pending_dataset(aioclient_mock: AiohttpClientMocker): assert aioclient_mock.mock_calls[-1][0] == "PUT" assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/pending" assert aioclient_mock.mock_calls[-1][2] == { - "ActiveDataset": { - "NetworkName": "OpenThread HA", + "activeDataset": { + "networkName": "OpenThread HA", }, - "Delay": 12345, - "PendingTimestamp": {}, + "delay": 12345, + "pendingTimestamp": {}, } await otbr.create_pending_dataset( @@ -312,11 +336,11 @@ async def test_create_pending_dataset(aioclient_mock: AiohttpClientMocker): assert aioclient_mock.mock_calls[-1][0] == "PUT" assert aioclient_mock.mock_calls[-1][1].path == "/node/dataset/pending" assert aioclient_mock.mock_calls[-1][2] == { - "ActiveDataset": { - "Channel": 15, - "NetworkName": "OpenThread HA", + "activeDataset": { + "channel": 15, + "networkName": "OpenThread HA", }, - "Delay": 23456, + "delay": 23456, } @@ -340,16 +364,6 @@ async def test_set_channel(aioclient_mock: AiohttpClientMocker) -> None: aioclient_mock.get(f"{BASE_URL}/node/dataset/active", json=DATASET_JSON) aioclient_mock.put(f"{BASE_URL}/node/dataset/pending", status=HTTPStatus.CREATED) new_channel = 16 - expected_active_timestamp = DATASET_JSON["ActiveTimestamp"] | {"Seconds": 2} - expected_pending_dataset = { - "ActiveDataset": DATASET_JSON - | { - "ActiveTimestamp": expected_active_timestamp, - "Channel": new_channel, - }, - "Delay": 1234, - } - assert new_channel != DATASET_JSON["Channel"] await otbr.set_channel(new_channel, 1234) assert aioclient_mock.call_count == 2 @@ -357,7 +371,10 @@ async def test_set_channel(aioclient_mock: AiohttpClientMocker) -> None: assert aioclient_mock.mock_calls[0][1].path == "/node/dataset/active" assert aioclient_mock.mock_calls[1][0] == "PUT" assert aioclient_mock.mock_calls[1][1].path == "/node/dataset/pending" - assert aioclient_mock.mock_calls[1][2] == expected_pending_dataset + pending = aioclient_mock.mock_calls[1][2] + assert pending["delay"] == 1234 + assert pending["activeDataset"]["channel"] == new_channel + assert pending["activeDataset"]["activeTimestamp"]["seconds"] == 2 async def test_set_channel_default_delay(aioclient_mock: AiohttpClientMocker) -> None: @@ -367,16 +384,6 @@ async def test_set_channel_default_delay(aioclient_mock: AiohttpClientMocker) -> aioclient_mock.get(f"{BASE_URL}/node/dataset/active", json=DATASET_JSON) aioclient_mock.put(f"{BASE_URL}/node/dataset/pending", status=HTTPStatus.CREATED) new_channel = 16 - expected_active_timestamp = DATASET_JSON["ActiveTimestamp"] | {"Seconds": 2} - expected_pending_dataset = { - "ActiveDataset": DATASET_JSON - | { - "ActiveTimestamp": expected_active_timestamp, - "Channel": new_channel, - }, - "Delay": 300000, - } - assert new_channel != DATASET_JSON["Channel"] await otbr.set_channel(new_channel) assert aioclient_mock.call_count == 2 @@ -384,7 +391,10 @@ async def test_set_channel_default_delay(aioclient_mock: AiohttpClientMocker) -> assert aioclient_mock.mock_calls[0][1].path == "/node/dataset/active" assert aioclient_mock.mock_calls[1][0] == "PUT" assert aioclient_mock.mock_calls[1][1].path == "/node/dataset/pending" - assert aioclient_mock.mock_calls[1][2] == expected_pending_dataset + pending = aioclient_mock.mock_calls[1][2] + assert pending["delay"] == 300000 + assert pending["activeDataset"]["channel"] == new_channel + assert pending["activeDataset"]["activeTimestamp"]["seconds"] == 2 async def test_set_channel_no_timestamp(aioclient_mock: AiohttpClientMocker) -> None: @@ -397,16 +407,6 @@ async def test_set_channel_no_timestamp(aioclient_mock: AiohttpClientMocker) -> aioclient_mock.get(f"{BASE_URL}/node/dataset/active", json=dataset_json) aioclient_mock.put(f"{BASE_URL}/node/dataset/pending", status=HTTPStatus.CREATED) new_channel = 16 - expected_active_timestamp = {"Authoritative": False, "Seconds": 1, "Ticks": 0} - expected_pending_dataset = { - "ActiveDataset": DATASET_JSON - | { - "ActiveTimestamp": expected_active_timestamp, - "Channel": new_channel, - }, - "Delay": 300000, - } - assert new_channel != DATASET_JSON["Channel"] await otbr.set_channel(new_channel) assert aioclient_mock.call_count == 2 @@ -414,7 +414,10 @@ async def test_set_channel_no_timestamp(aioclient_mock: AiohttpClientMocker) -> assert aioclient_mock.mock_calls[0][1].path == "/node/dataset/active" assert aioclient_mock.mock_calls[1][0] == "PUT" assert aioclient_mock.mock_calls[1][1].path == "/node/dataset/pending" - assert aioclient_mock.mock_calls[1][2] == expected_pending_dataset + pending = aioclient_mock.mock_calls[1][2] + assert pending["delay"] == 300000 + assert pending["activeDataset"]["channel"] == new_channel + assert pending["activeDataset"]["activeTimestamp"]["seconds"] == 1 async def test_set_channel_invalid_channel(aioclient_mock: AiohttpClientMocker) -> None: diff --git a/tests/test_models.py b/tests/test_models.py index 152056b..6256a17 100644 --- a/tests/test_models.py +++ b/tests/test_models.py @@ -1,6 +1,7 @@ """Test data models.""" import python_otbr_api +from python_otbr_api.models import SecurityPolicy def test_deserialize_pending_dataset(): @@ -66,3 +67,62 @@ def test_deserialize_pending_dataset_camelcase(): 12345, python_otbr_api.Timestamp(), ) + + +def test_serialize_active_dataset_camelcase(): + """Test that as_json() emits camelCase keys matching the OTBR REST API.""" + dataset = python_otbr_api.ActiveDataSet( + active_timestamp=python_otbr_api.Timestamp( + authoritative=False, seconds=1, ticks=0 + ), + network_key="00112233445566778899aabbccddeeff", + network_name="OpenThread-1234", + extended_pan_id="dead00beef00cafe", + mesh_local_prefix="fd11:2222:3333::/64", + pan_id=12345, + channel=15, + psk_c="aabbccddeeff00112233445566778899", + security_policy=SecurityPolicy( + rotation_time=672, obtain_network_key=True, routers=True + ), + channel_mask=134215680, + ) + result = dataset.as_json() + # All top-level keys must be camelCase + assert "activeTimestamp" in result + assert "networkKey" in result + assert "networkName" in result + assert "extPanId" in result + assert "meshLocalPrefix" in result + assert "panId" in result + assert "channel" in result + assert "pskc" in result + assert "securityPolicy" in result + assert "channelMask" in result + # Nested keys must also be camelCase + assert "seconds" in result["activeTimestamp"] + assert "rotationTime" in result["securityPolicy"] + assert "obtainNetworkKey" in result["securityPolicy"] + + +def test_serialize_pending_dataset_camelcase(): + """Test that PendingDataSet.as_json() emits camelCase keys.""" + pending = python_otbr_api.PendingDataSet( + active_dataset=python_otbr_api.ActiveDataSet(network_name="OpenThread HA"), + delay=30000, + pending_timestamp=python_otbr_api.Timestamp(seconds=2, ticks=0), + ) + result = pending.as_json() + assert "activeDataset" in result + assert "delay" in result + assert "pendingTimestamp" in result + assert "networkName" in result["activeDataset"] + + +def test_roundtrip_camelcase(): + """Test that from_json(as_json(x)) preserves data.""" + original = python_otbr_api.ActiveDataSet( + network_name="Test", channel=15, pan_id=4660 + ) + roundtripped = python_otbr_api.ActiveDataSet.from_json(original.as_json()) + assert roundtripped == original