From 27d1b85a8519c764f562b6c6497fe5d415756065 Mon Sep 17 00:00:00 2001 From: Damon Hayhurst Date: Fri, 23 Jan 2026 15:24:52 +0000 Subject: [PATCH] test: Integration tests for client to daemon --- tests/integration/__init__.py | 1 + tests/integration/data_daemon/__init__.py | 1 + tests/integration/data_daemon/conftest.py | 248 ++++++++++++++++++ .../data_daemon/helpers/__init__.py | 5 + .../helpers/data_type_test_case.py | 17 ++ .../data_daemon/test_client_to_daemon.py | 232 ++++++++++++++++ .../data_daemon/test_client_to_socket.py | 245 +++++++++++++++++ 7 files changed, 749 insertions(+) create mode 100644 tests/integration/__init__.py create mode 100644 tests/integration/data_daemon/__init__.py create mode 100644 tests/integration/data_daemon/conftest.py create mode 100644 tests/integration/data_daemon/helpers/__init__.py create mode 100644 tests/integration/data_daemon/helpers/data_type_test_case.py create mode 100644 tests/integration/data_daemon/test_client_to_daemon.py create mode 100644 tests/integration/data_daemon/test_client_to_socket.py diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py new file mode 100644 index 00000000..b6211566 --- /dev/null +++ b/tests/integration/__init__.py @@ -0,0 +1 @@ +"""Integration tests for neuracore.""" diff --git a/tests/integration/data_daemon/__init__.py b/tests/integration/data_daemon/__init__.py new file mode 100644 index 00000000..ca286b36 --- /dev/null +++ b/tests/integration/data_daemon/__init__.py @@ -0,0 +1 @@ +"""Integration tests for the data daemon.""" diff --git a/tests/integration/data_daemon/conftest.py b/tests/integration/data_daemon/conftest.py new file mode 100644 index 00000000..15649592 --- /dev/null +++ b/tests/integration/data_daemon/conftest.py @@ -0,0 +1,248 @@ +"""Shared fixtures for data daemon integration tests. + +Provides reusable socket and daemon context fixtures for testing +client-to-socket and client-to-daemon communication paths. +""" + +from __future__ import annotations + +import logging +import threading +import time +from collections.abc import Generator +from dataclasses import dataclass +from pathlib import Path +from unittest.mock import MagicMock, patch +from uuid import uuid4 + +import pytest +import zmq + +import neuracore as nc +import neuracore.data_daemon.const as const_module +from neuracore.data_daemon.communications_management import ( + communications_manager as comms_module, +) +from neuracore.data_daemon.communications_management.communications_manager import ( + CommunicationsManager, +) +from neuracore.data_daemon.communications_management.data_bridge import Daemon +from neuracore.data_daemon.communications_management.producer import Producer + +logger = logging.getLogger(__name__) + + +class CaptureRDM: + """Mock RecordingDiskManager that captures enqueued messages. + + Use this to verify the actual payload content that would be written to disk. + Access captured messages via the `enqueued` list. + """ + + def __init__(self) -> None: + self.enqueued: list = [] + + def enqueue(self, message) -> None: + """Capture a message instead of writing to disk.""" + self.enqueued.append(message) + + def shutdown(self) -> None: + """No-op shutdown for compatibility.""" + pass + + +@dataclass +class DaemonRDMCapture: + """Context for a running daemon with capture for payload verification.""" + + daemon: Daemon + capture: CaptureRDM + context: zmq.Context + producer: Producer + + +def _wait_for(predicate, timeout: float, interval: float = 0.05) -> bool: + """Poll predicate until it returns True or timeout is reached.""" + deadline = time.monotonic() + timeout + while time.monotonic() < deadline: + if predicate(): + return True + time.sleep(interval) + return predicate() + + +def _run_daemon_loop( + daemon: Daemon, + comm: CommunicationsManager, + stop_event: threading.Event, + ready_event: threading.Event, + error_bucket: list[BaseException], +) -> None: + """Run the daemon message loop in a background thread.""" + try: + comm.start_consumer() + comm.start_publisher() + except BaseException as exc: + error_bucket.append(exc) + ready_event.set() + return + ready_event.set() + if comm.consumer_socket is None: + raise RuntimeError("consumer socket not initialized") + comm.consumer_socket.setsockopt(zmq.RCVTIMEO, 200) + comm.consumer_socket.setsockopt(zmq.LINGER, 0) + + while not stop_event.is_set(): + msg = None + try: + msg = comm.receive_message() + except zmq.Again: + msg = None + + if msg is not None: + daemon.handle_message(msg) + + daemon._cleanup_expired_channels() + daemon._drain_channel_messages() + + comm.cleanup_daemon() + + +@pytest.fixture +def ipc_paths( + tmp_path: Path, monkeypatch: pytest.MonkeyPatch +) -> Generator[Path, None, None]: + """Set up isolated IPC socket paths for testing. + + Uses ipc:// transport with temp files to allow cross-context communication. + Yields the base directory path for recordings. + """ + base_dir = tmp_path / "ndd" + base_dir.mkdir(parents=True, exist_ok=True) + short_id = uuid4().hex[:8] + socket_path = Path(f"/tmp/ndd-{short_id}.sock") + events_path = Path(f"/tmp/nde-{short_id}.sock") + + mpsa = monkeypatch.setattr + + mpsa(const_module, "BASE_DIR", base_dir) + mpsa(const_module, "SOCKET_PATH", socket_path) + mpsa(const_module, "RECORDING_EVENTS_SOCKET_PATH", events_path) + mpsa(comms_module, "BASE_DIR", base_dir) + mpsa(comms_module, "SOCKET_PATH", socket_path) + mpsa(comms_module, "RECORDING_EVENTS_SOCKET_PATH", events_path) + + yield base_dir + + if socket_path.exists(): + socket_path.unlink() + if events_path.exists(): + events_path.unlink() + + +@pytest.fixture +def daemon_with_capture( + ipc_paths: Path, +) -> Generator[DaemonRDMCapture, None, None]: + """Start a daemon with a CaptureRDM to verify payload content. + + Use this when you need to verify the actual payload data that was + reassembled from chunks, not just that data was written. + """ + capture_rdm = CaptureRDM() + context = zmq.Context() + comm = CommunicationsManager(context=context) + daemon = Daemon( + comm_manager=comm, + recording_disk_manager=capture_rdm, + ) + + stop_event = threading.Event() + ready_event = threading.Event() + error_bucket: list[BaseException] = [] + thread = threading.Thread( + target=_run_daemon_loop, + args=(daemon, comm, stop_event, ready_event, error_bucket), + daemon=True, + ) + thread.start() + + assert _wait_for(ready_event.is_set, timeout=2) + if error_bucket: + raise error_bucket[0] + + producer_comm = CommunicationsManager(context=context) + producer = Producer(comm_manager=producer_comm, chunk_size=64) + + try: + yield DaemonRDMCapture( + daemon=daemon, + capture=capture_rdm, + context=context, + producer=producer, + ) + finally: + stop_event.set() + thread.join(timeout=2) + context.destroy(linger=0) + + +@pytest.fixture +def mock_socket() -> Generator[MagicMock, None, None]: + """Mock the ZMQ socket to capture sent bytes. + + Mocks CommunicationsManager.create_producer_socket to return a mock socket. + The real send_message() runs and serializes to bytes, which we capture. + Use send.call_args_list to inspect captured calls. + """ + mock_sock = MagicMock() + + with patch.object( + CommunicationsManager, "create_producer_socket", return_value=mock_sock + ): + yield mock_sock + + +TEST_ROBOT = "basic_test_robot" + + +@pytest.fixture +def stream_to_daemon_with_capture( + daemon_with_capture: DaemonRDMCapture, +) -> Generator[DaemonRDMCapture, None, None]: + """Combine daemon capture with client API streaming setup. + + Setup: + - nc.login(), nc.connect_robot(), nc.create_dataset(), nc.start_recording() + - daemon_with_capture provides real daemon for verification + + Teardown: + - nc.stop_recording() + """ + dataset_name = f"test_dataset_{uuid4().hex[:8]}" + recording_started = False + + logger.info("Setting up streaming test with daemon capture") + nc.login() + nc.connect_robot(TEST_ROBOT) + nc.create_dataset(dataset_name) + + nc.start_recording() + recording_started = True + logger.info(f"Recording started for dataset: {dataset_name}") + + try: + yield daemon_with_capture + finally: + logger.info("Tearing down streaming test") + if recording_started: + try: + nc.stop_recording(wait=True) + logger.info("Recording stopped successfully") + except Exception as exception: + logger.warning(f"Error stopping recording: {exception}") + try: + nc.cancel_recording() + logger.info("Recording cancelled") + except Exception as cancel_error: + logger.error(f"Failed to cancel recording: {cancel_error}") diff --git a/tests/integration/data_daemon/helpers/__init__.py b/tests/integration/data_daemon/helpers/__init__.py new file mode 100644 index 00000000..9319c8af --- /dev/null +++ b/tests/integration/data_daemon/helpers/__init__.py @@ -0,0 +1,5 @@ +"""Shared test utilities for data daemon integration tests.""" + +from .data_type_test_case import DataTypeTestCase + +__all__ = ["DataTypeTestCase"] diff --git a/tests/integration/data_daemon/helpers/data_type_test_case.py b/tests/integration/data_daemon/helpers/data_type_test_case.py new file mode 100644 index 00000000..3e8d7c40 --- /dev/null +++ b/tests/integration/data_daemon/helpers/data_type_test_case.py @@ -0,0 +1,17 @@ +"""Test case definition for data type integration tests.""" + +from __future__ import annotations + +from collections.abc import Callable +from dataclasses import dataclass + + +@dataclass +class DataTypeTestCase: + """Test case for a specific data type.""" + + name: str + data_type: str + log_func: Callable[[float], None] + timestamp: float = 1234567890.123 + marks: tuple = () diff --git a/tests/integration/data_daemon/test_client_to_daemon.py b/tests/integration/data_daemon/test_client_to_daemon.py new file mode 100644 index 00000000..271d5f37 --- /dev/null +++ b/tests/integration/data_daemon/test_client_to_daemon.py @@ -0,0 +1,232 @@ +"""Tests for client SDK to daemon communication (E2E). + +Tests the full data flow from the neuracore client API through real ZMQ sockets +to the daemon: +- nc.log_*() API calls +- Real ZMQ socket communication +- Daemon receives and processes messages correctly +- All 13 DataTypes flow correctly + +Uses the stream_to_daemon_with_capture fixture which provides: +- Real backend connection (nc.login, nc.connect_robot, nc.start_recording) +- Real daemon with CaptureRDM for payload verification +""" + +from __future__ import annotations + +import base64 +import json +import logging +import warnings + +import numpy as np +import pytest + +import neuracore as nc +from tests.integration.data_daemon.conftest import DaemonRDMCapture, _wait_for +from tests.integration.data_daemon.helpers import DataTypeTestCase + +logger = logging.getLogger(__name__) + + +def make_test_cases() -> list[DataTypeTestCase]: + """Create test cases for all 13 data types.""" + unit_quat_pose = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]) + + return [ + DataTypeTestCase( + name="joint_positions", + data_type="JOINT_POSITIONS", + log_func=lambda timestamp: nc.log_joint_positions( + positions={"joint1": 0.5}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="joint_velocities", + data_type="JOINT_VELOCITIES", + log_func=lambda timestamp: nc.log_joint_velocities( + velocities={"joint1": 0.1}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="joint_torques", + data_type="JOINT_TORQUES", + log_func=lambda timestamp: nc.log_joint_torques( + torques={"joint1": 0.2}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="joint_target_positions", + data_type="JOINT_TARGET_POSITIONS", + log_func=lambda timestamp: nc.log_joint_target_positions( + target_positions={"joint1": 0.5}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="pose", + data_type="POSES", + log_func=lambda timestamp: nc.log_pose( + name="test_pose", pose=unit_quat_pose, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="end_effector_pose", + data_type="END_EFFECTOR_POSES", + log_func=lambda timestamp: nc.log_end_effector_pose( + name="ee", pose=unit_quat_pose, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="parallel_gripper_open_amount", + data_type="PARALLEL_GRIPPER_OPEN_AMOUNTS", + log_func=lambda timestamp: nc.log_parallel_gripper_open_amount( + name="gripper", value=0.5, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="parallel_gripper_target_open_amount", + data_type="PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS", + log_func=lambda timestamp: nc.log_parallel_gripper_target_open_amount( + name="gripper", value=0.75, timestamp=timestamp + ), + marks=( + pytest.mark.xfail( + reason="Daemon 32-byte limit truncates data type name" + ), + ), + ), + DataTypeTestCase( + name="language", + data_type="LANGUAGE", + log_func=lambda timestamp: nc.log_language( + name="instruction", language="pick up the cup", timestamp=timestamp + ), + ), + DataTypeTestCase( + name="custom_1d", + data_type="CUSTOM_1D", + log_func=lambda timestamp: nc.log_custom_1d( + name="sensor", data=np.array([1.0, 2.0, 3.0]), timestamp=timestamp + ), + ), + DataTypeTestCase( + name="rgb_image", + data_type="RGB_IMAGES", + log_func=lambda timestamp: nc.log_rgb( + name="camera", + rgb=np.zeros((480, 640, 3), dtype=np.uint8), + timestamp=timestamp, + ), + ), + DataTypeTestCase( + name="depth_image", + data_type="DEPTH_IMAGES", + log_func=lambda timestamp: nc.log_depth( + name="depth_cam", + depth=np.zeros((480, 640), dtype=np.float32), + timestamp=timestamp, + ), + ), + DataTypeTestCase( + name="point_cloud", + data_type="POINT_CLOUDS", + log_func=lambda timestamp: nc.log_point_cloud( + name="lidar", + points=np.zeros((100, 3), dtype=np.float16), + timestamp=timestamp, + ), + ), + ] + + +DATA_TYPE_TEST_CASES = make_test_cases() + + +class TestClientToDaemon: + """E2E tests for client SDK to daemon communication.""" + + @pytest.mark.parametrize( + "test_case", + [pytest.param(tc, marks=tc.marks, id=tc.name) for tc in DATA_TYPE_TEST_CASES], + ) + def test_log_data_type_to_daemon( + self, + test_case: DataTypeTestCase, + stream_to_daemon_with_capture: DaemonRDMCapture, + ) -> None: + """Data logged via nc.log_*() arrives correctly at daemon.""" + stream_to_daemon_with_capture.capture.enqueued.clear() + + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + test_case.log_func(test_case.timestamp) + + assert _wait_for( + lambda: len(stream_to_daemon_with_capture.capture.enqueued) > 0, + timeout=5, + ), f"No messages captured for {test_case.name}" + + captured_message = stream_to_daemon_with_capture.capture.enqueued[0] + + assert captured_message.data_type.value == test_case.data_type, ( + f"Expected data_type {test_case.data_type}, " + f"got {captured_message.data_type.value}" + ) + + if test_case.data_type not in ("RGB_IMAGES", "DEPTH_IMAGES"): + payload = base64.b64decode(captured_message.data) + decoded = json.loads(payload.decode("utf-8")) + assert decoded.get("timestamp") == test_case.timestamp, ( + f"Expected timestamp {test_case.timestamp}, " + f"got {decoded.get('timestamp')}" + ) + + def test_multiple_logs_single_trace( + self, + stream_to_daemon_with_capture: DaemonRDMCapture, + ) -> None: + """Multiple logs from same recording should have same trace_id.""" + stream_to_daemon_with_capture.capture.enqueued.clear() + + for i in range(3): + nc.log_joint_positions( + positions={"joint1": float(i) * 0.1}, + timestamp=1234567890.0 + i, + ) + + assert _wait_for( + lambda: len(stream_to_daemon_with_capture.capture.enqueued) >= 3, + timeout=5, + ), "Expected 3 messages" + + trace_ids = { + msg.trace_id for msg in stream_to_daemon_with_capture.capture.enqueued + } + assert ( + len(trace_ids) == 1 + ), f"Expected single trace_id, got {len(trace_ids)}: {trace_ids}" + + def test_recording_id_present_on_all_messages( + self, + stream_to_daemon_with_capture: DaemonRDMCapture, + ) -> None: + """All captured messages should have non-empty recording_id.""" + stream_to_daemon_with_capture.capture.enqueued.clear() + + nc.log_joint_positions(positions={"joint1": 0.5}, timestamp=1234567890.0) + nc.log_joint_velocities(velocities={"joint1": 0.1}, timestamp=1234567890.0) + + assert _wait_for( + lambda: len(stream_to_daemon_with_capture.capture.enqueued) >= 2, + timeout=5, + ), "Expected 2 messages" + + for message in stream_to_daemon_with_capture.capture.enqueued: + assert message.recording_id, "recording_id should be non-empty" + + recording_ids = { + msg.recording_id for msg in stream_to_daemon_with_capture.capture.enqueued + } + assert ( + len(recording_ids) == 1 + ), f"All messages should have same recording_id, got {recording_ids}" diff --git a/tests/integration/data_daemon/test_client_to_socket.py b/tests/integration/data_daemon/test_client_to_socket.py new file mode 100644 index 00000000..a58f18f9 --- /dev/null +++ b/tests/integration/data_daemon/test_client_to_socket.py @@ -0,0 +1,245 @@ +"""Tests for client-to-socket data streaming. + +Tests data serialization and transmission from the neuracore client +to the ZMQ socket, using a mock socket to capture sent bytes. +The mock_socket fixture is provided by conftest.py. +""" + +import json +import logging +import uuid +import warnings +from collections.abc import Generator +from unittest.mock import MagicMock + +import numpy as np +import pytest + +import neuracore as nc +from neuracore.data_daemon.models import CommandType, DataChunkPayload, MessageEnvelope +from tests.integration.data_daemon.helpers import DataTypeTestCase + +logger = logging.getLogger(__name__) + +TEST_ROBOT = "basic_test_robot" + + +@pytest.fixture +def stream_to_daemon() -> Generator[None, None, None]: + """Fixture that handles robot setup and recording lifecycle. + + Setup: + - Login to neuracore + - Connect to test robot + - Create a unique dataset + - Start recording + + Teardown: + - Stop recording (always, even on test failure) + - Cancel recording if stop fails + """ + dataset_name = f"test_dataset_{uuid.uuid4().hex[:8]}" + recording_started = False + + # Setup + logger.info("Setting up streaming test") + nc.login() + nc.connect_robot(TEST_ROBOT) + nc.create_dataset(dataset_name) + + nc.start_recording() + recording_started = True + logger.info(f"Recording started for dataset: {dataset_name}") + + try: + yield + finally: + logger.info("Tearing down streaming test") + if recording_started: + try: + nc.stop_recording(wait=True) + logger.info("Recording stopped successfully") + except Exception as e: + logger.warning(f"Error stopping recording: {e}") + try: + nc.cancel_recording() + logger.info("Recording cancelled") + except Exception as cancel_error: + logger.error(f"Failed to cancel recording: {cancel_error}") + + +def make_test_cases() -> list[DataTypeTestCase]: + """Create test cases for all data types.""" + unit_quat_pose = np.array([0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 1.0]) + + return [ + DataTypeTestCase( + name="joint_positions", + data_type="JOINT_POSITIONS", + log_func=lambda timestamp: nc.log_joint_positions( + positions={"joint1": 0.5}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="joint_velocities", + data_type="JOINT_VELOCITIES", + log_func=lambda timestamp: nc.log_joint_velocities( + velocities={"joint1": 0.1}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="joint_torques", + data_type="JOINT_TORQUES", + log_func=lambda timestamp: nc.log_joint_torques( + torques={"joint1": 0.2}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="joint_target_positions", + data_type="JOINT_TARGET_POSITIONS", + log_func=lambda timestamp: nc.log_joint_target_positions( + target_positions={"joint1": 0.5}, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="pose", + data_type="POSES", + log_func=lambda timestamp: nc.log_pose( + name="test_pose", pose=unit_quat_pose, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="end_effector_pose", + data_type="END_EFFECTOR_POSES", + log_func=lambda timestamp: nc.log_end_effector_pose( + name="ee", pose=unit_quat_pose, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="parallel_gripper", + data_type="PARALLEL_GRIPPER_OPEN_AMOUNTS", + log_func=lambda timestamp: nc.log_parallel_gripper_open_amount( + name="gripper", value=0.5, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="parallel_gripper_target", + data_type="PARALLEL_GRIPPER_TARGET_OPEN_AMOUNTS", + log_func=lambda timestamp: nc.log_parallel_gripper_target_open_amount( + name="gripper", value=0.75, timestamp=timestamp + ), + ), + DataTypeTestCase( + name="language", + data_type="LANGUAGE", + log_func=lambda timestamp: nc.log_language( + name="instruction", language="pick up the cup", timestamp=timestamp + ), + ), + DataTypeTestCase( + name="rgb_image", + data_type="RGB_IMAGES", + log_func=lambda timestamp: nc.log_rgb( + name="camera", + rgb=np.zeros((480, 640, 3), dtype=np.uint8), + timestamp=timestamp, + ), + ), + DataTypeTestCase( + name="depth_image", + data_type="DEPTH_IMAGES", + log_func=lambda timestamp: nc.log_depth( + name="depth_cam", + depth=np.zeros((480, 640), dtype=np.float32), + timestamp=timestamp, + ), + ), + DataTypeTestCase( + name="point_cloud", + data_type="POINT_CLOUDS", + log_func=lambda timestamp: nc.log_point_cloud( + name="lidar", + points=np.zeros((100, 3), dtype=np.float16), + timestamp=timestamp, + ), + ), + DataTypeTestCase( + name="custom_1d", + data_type="CUSTOM_1D", + log_func=lambda timestamp: nc.log_custom_1d( + name="sensor", data=np.array([1.0, 2.0, 3.0]), timestamp=timestamp + ), + ), + ] + + +DATA_TYPE_TEST_CASES = make_test_cases() + + +class TestClientStreaming: + """Minimal streaming tests.""" + + @pytest.mark.parametrize( + "test_case", + DATA_TYPE_TEST_CASES, + ids=[test_case.name for test_case in DATA_TYPE_TEST_CASES], + ) + def test_stream_data_type_to_socket( + self, + test_case: DataTypeTestCase, + mock_socket: MagicMock, + stream_to_daemon: None, + ) -> None: + """Stream data and verify correct data_type and timestamp are sent. + + Captures raw bytes from socket.send() and deserializes to verify + the wire format is correct. + """ + with warnings.catch_warnings(): + warnings.simplefilter("ignore") + test_case.log_func(test_case.timestamp) + + raw_bytes_list = [call[0][0] for call in mock_socket.send.call_args_list] + envelopes = [MessageEnvelope.from_bytes(raw) for raw in raw_bytes_list] + + data_chunk_envelopes = [ + envelope + for envelope in envelopes + if envelope.command == CommandType.DATA_CHUNK + ] + assert len(data_chunk_envelopes) >= 1, "No DATA_CHUNK messages sent" + payloads = [ + DataChunkPayload.from_dict(envelope.payload["data_chunk"]) + for envelope in data_chunk_envelopes + ] + + assert all( + payload.data_type.value == test_case.data_type for payload in payloads + ), f"Expected data_type {test_case.data_type} on all chunks" + + target_trace = payloads[0].trace_id + trace_group = [ + payload for payload in payloads if payload.trace_id == target_trace + ] + assert len(trace_group) >= 1, "No chunks found for target trace_id" + + recording_id = trace_group[0].recording_id + assert recording_id, "recording_id must be non-empty" + assert all( + payload.recording_id == recording_id for payload in trace_group + ), "recording_id inconsistent across chunks" + + total_chunks = trace_group[0].total_chunks + assert total_chunks == len( + trace_group + ), "total_chunks does not match chunk count" + + indices = sorted(payload.chunk_index for payload in trace_group) + assert indices == list(range(total_chunks)), "chunk_index sequence incorrect" + + if test_case.data_type not in ("RGB_IMAGES", "DEPTH_IMAGES"): + decoded_samples = [json.loads(payload.data) for payload in trace_group] + assert all( + sample.get("timestamp") == test_case.timestamp + for sample in decoded_samples + ), "timestamp mismatch"