diff --git a/config/default.yaml b/config/default.yaml index 89877c3..0ddbd01 100644 --- a/config/default.yaml +++ b/config/default.yaml @@ -38,6 +38,15 @@ blue_sky_air_traffic_simulator_settings: single_or_multiple_sensors: "multiple" # this setting specifiies if the traffic data is submitted from a single sensor or multiple sensors sensor_ids: ["562e6297036a4adebb4848afcd1ede90"] # List of sensor IDs to use when 'multiple' is selected +# AMQP/RabbitMQ configuration for event monitoring +# Set AMQP_URL environment variable or configure here +amqp: + url: "" # e.g., amqp://guest:guest@localhost:5672/ (can also use AMQP_URL env var) + exchange_name: "operational_events" + exchange_type: "direct" + routing_key: "#" # Flight declaration ID or '#' for all messages + queue_name: "" # Empty means auto-generate exclusive queue + data_files: trajectory: "config/bern/trajectory_f1.json" # Path to flight declarations JSON file simulation: "config/bern/blue_sky_sim_example.scn" # Path to air traffic simulation scenario file, used for BlueSky enabled simulations only. diff --git a/pyproject.toml b/pyproject.toml index e0653ab..94d8432 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -66,36 +66,28 @@ dev = [ "types-pyyaml", "types-redis", "types-requests", + "pytest-cov>=7.0.0", ] [build-system] -requires = [ - "hatchling", -] +requires = ["hatchling"] build-backend = "hatchling.build" [tool.hatch.build.targets.wheel] -packages = [ - "src/openutm_verification", -] +packages = ["src/openutm_verification"] # (artifacts configuration removed; docs are included via force-include) [tool.hatch.build.targets.wheel.force-include] "docs/scenarios" = "openutm_verification/docs/scenarios" [tool.pytest.ini_options] -pythonpath = [ - ".", - "src/openutm_verification", -] -testpaths = [ - "tests", -] +pythonpath = [".", "src/openutm_verification"] +testpaths = ["tests"] +markers = ["asyncio: mark test as asyncio"] + [tool.coverage.run] -source = [ - "src/openutm_verification", -] +source = ["src/openutm_verification"] [tool.pylint."messages control"] diff --git a/pytest.ini b/pytest.ini deleted file mode 100644 index 6bbac54..0000000 --- a/pytest.ini +++ /dev/null @@ -1,4 +0,0 @@ -[pytest] -markers = - asyncio: mark test as asyncio -addopts = -q --tb=no diff --git a/scenarios/F1_happy_path.yaml b/scenarios/F1_happy_path.yaml index 68b0ab8..0b60484 100644 --- a/scenarios/F1_happy_path.yaml +++ b/scenarios/F1_happy_path.yaml @@ -1,23 +1,25 @@ name: F1_happy_path -description: > - This scenario verifies the nominal flow (Happy Path) for a flight operation. - It walks through the lifecycle of a flight from declaration to activation, - submission of telemetry, and finally ending the operation. +description: 'This scenario verifies the nominal flow (Happy Path) for a flight operation. + It walks through the lifecycle of a flight from declaration to activation, submission + of telemetry, and finally ending the operation. + + ' steps: +- step: Cleanup Flight Declarations - step: Setup Flight Declaration description: Creates a fresh flight declaration in the DSS. - step: Update Operation State - description: Activates the flight operation, transitioning it to the active state. arguments: state: ACTIVATED + description: Activates the flight operation, transitioning it to the active state. - step: Submit Telemetry - description: Simulates the broadcast of telemetry data for 30 seconds. arguments: duration: 30 + description: Simulates the broadcast of telemetry data for 30 seconds. - id: update_state_ended step: Update Operation State - description: Marks the operation as ended after the flight is complete. arguments: state: ENDED + description: Marks the operation as ended after the flight is complete. - step: Teardown Flight Declaration description: Cleans up the flight declaration and any associated resources. diff --git a/src/openutm_verification/core/clients/amqp/__init__.py b/src/openutm_verification/core/clients/amqp/__init__.py new file mode 100644 index 0000000..28aa692 --- /dev/null +++ b/src/openutm_verification/core/clients/amqp/__init__.py @@ -0,0 +1,15 @@ +"""AMQP client module for RabbitMQ queue monitoring.""" + +from openutm_verification.core.clients.amqp.amqp_client import ( + AMQPClient, + AMQPMessage, + AMQPSettings, + create_amqp_settings, +) + +__all__ = [ + "AMQPClient", + "AMQPMessage", + "AMQPSettings", + "create_amqp_settings", +] diff --git a/src/openutm_verification/core/clients/amqp/amqp_client.py b/src/openutm_verification/core/clients/amqp/amqp_client.py new file mode 100644 index 0000000..5e39ff6 --- /dev/null +++ b/src/openutm_verification/core/clients/amqp/amqp_client.py @@ -0,0 +1,518 @@ +"""AMQP client for monitoring RabbitMQ queues in verification scenarios.""" + +from __future__ import annotations + +import asyncio +import json +import os +import threading +import time +from dataclasses import dataclass, field +from datetime import datetime +from typing import Any +from urllib.parse import urlparse + +import pika +import requests +from loguru import logger +from pika.adapters.blocking_connection import BlockingChannel, BlockingConnection +from pika.exceptions import AMQPConnectionError, ChannelClosedByBroker + +from openutm_verification.core.execution.config_models import get_settings +from openutm_verification.core.execution.scenario_runner import scenario_step + + +@dataclass +class AMQPSettings: + """Settings for AMQP connection.""" + + url: str = "" + exchange_name: str = "operational_events" + exchange_type: str = "direct" + routing_key: str = "#" # Flight declaration ID or '#' for all + queue_name: str = "" + heartbeat: int = 600 + blocked_connection_timeout: int = 300 + auto_discover: bool = False + + +@dataclass +class AMQPMessage: + """Represents a received AMQP message.""" + + body: bytes + routing_key: str + exchange: str + delivery_tag: int + content_type: str | None = None + correlation_id: str | None = None + timestamp: str = "" + + def body_str(self) -> str: + """Decode message body as UTF-8 string.""" + return self.body.decode("utf-8", errors="replace") + + def body_json(self) -> dict[str, Any] | list[Any] | None: + """Attempt to parse body as JSON, return None if invalid.""" + try: + data = json.loads(self.body_str()) + # Unpack nested 'body' JSON if present + if isinstance(data, dict) and "body" in data: + try: + inner_body = json.loads(data["body"]) + data["body"] = inner_body + except (json.JSONDecodeError, TypeError): + pass + return data + except (json.JSONDecodeError, TypeError): + return None + + def to_dict(self) -> dict[str, Any]: + """Convert message to dictionary for reporting.""" + return { + "routing_key": self.routing_key, + "exchange": self.exchange, + "content_type": self.content_type, + "correlation_id": self.correlation_id, + "timestamp": self.timestamp, + "body": self.body_json() or self.body_str(), + } + + +@dataclass +class AMQPConsumerState: + """State for an active AMQP consumer.""" + + connection: BlockingConnection | None = None + channel: BlockingChannel | None = None + queue_name: str = "" + messages: list[AMQPMessage] = field(default_factory=list) + consuming: bool = False + error: str | None = None + consumer_thread: threading.Thread | None = None + stop_event: threading.Event = field(default_factory=threading.Event) + + +def create_amqp_settings() -> AMQPSettings: + """Create AMQP settings from configuration or environment. + + Priority: config file > environment variables > defaults. + """ + settings = AMQPSettings() + + # Try to get from config first + try: + config = get_settings() + if hasattr(config, "amqp") and config.amqp: + amqp_config = config.amqp + settings.url = amqp_config.url or settings.url + settings.exchange_name = amqp_config.exchange_name or settings.exchange_name + settings.exchange_type = amqp_config.exchange_type or settings.exchange_type + settings.routing_key = amqp_config.routing_key or settings.routing_key + settings.queue_name = amqp_config.queue_name or settings.queue_name + except Exception: + pass # Config not available, use env vars + + # Environment overrides + settings.url = os.environ.get("AMQP_URL", settings.url) + settings.routing_key = os.environ.get("AMQP_ROUTING_KEY", settings.routing_key) + settings.queue_name = os.environ.get("AMQP_QUEUE", settings.queue_name) + settings.auto_discover = os.environ.get("AMQP_AUTO_DISCOVER", "").lower() in ( + "1", + "true", + "yes", + ) + + return settings + + +class AMQPClient: + """AMQP client for monitoring RabbitMQ queues in verification scenarios. + + This client can be used to: + - Start background queue monitoring + - Collect messages during scenario execution + - Filter and verify received messages + + Example usage in YAML scenarios: + - step: Start AMQP Queue Monitor + arguments: + queue_name: "my-queue" + background: true + - step: Submit Telemetry + arguments: + duration: 30 + - step: Stop AMQP Queue Monitor + - step: Get AMQP Messages + + Example usage in Python scenarios: + amqp_task = asyncio.create_task( + amqp_client.start_queue_monitor(queue_name="my-queue", duration=60) + ) + # ... perform other operations ... + await amqp_task + messages = await amqp_client.get_received_messages() + """ + + def __init__(self, settings: AMQPSettings): + self.settings = settings + self._state = AMQPConsumerState() + self._lock = threading.Lock() + + def _get_connection_parameters(self) -> pika.URLParameters: + """Create pika connection parameters from settings.""" + if not self.settings.url: + raise ValueError("AMQP URL not configured. Set AMQP_URL environment variable or configure 'amqp.url' in config yaml.") + + parameters = pika.URLParameters(self.settings.url) + parameters.heartbeat = self.settings.heartbeat + parameters.blocked_connection_timeout = self.settings.blocked_connection_timeout + return parameters + + def _discover_queues_with_messages(self) -> list[str]: + """Use RabbitMQ Management API to find queues with messages.""" + if not self.settings.url: + return [] + + parsed = urlparse(self.settings.url) + username = parsed.username or "guest" + password = parsed.password or "guest" + host = parsed.hostname or "localhost" + vhost = parsed.path.lstrip("/") or "%2f" + + mgmt_ports = [15672, 443, 15671] + + for port in mgmt_ports: + try: + scheme = "https" if port == 443 else "http" + api_url = f"{scheme}://{host}:{port}/api/queues/{vhost}" + + response = requests.get(api_url, auth=(username, password), timeout=5) + + if response.status_code == 200: + queues = response.json() + queues_with_msgs = [q for q in queues if q.get("messages", 0) > 0 and not q.get("name", "").startswith("amq.gen-")] + queues_with_msgs.sort(key=lambda q: q.get("messages", 0), reverse=True) + return [q["name"] for q in queues_with_msgs] + except requests.RequestException: + continue + + return [] + + def _on_message( + self, + ch: BlockingChannel, + method: Any, + properties: pika.BasicProperties, + body: bytes, + ) -> None: + """Internal callback for processing received messages.""" + timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")[:-3] + + msg = AMQPMessage( + body=body, + routing_key=method.routing_key, + exchange=method.exchange or "(default)", + delivery_tag=method.delivery_tag, + content_type=properties.content_type, + correlation_id=properties.correlation_id, + timestamp=timestamp, + ) + + with self._lock: + self._state.messages.append(msg) + + logger.debug(f"AMQP message received - routing_key={method.routing_key}, size={len(body)} bytes") + + # Acknowledge the message + ch.basic_ack(delivery_tag=method.delivery_tag) + + def _consumer_loop( + self, + queue_name: str | None, + routing_key: str | None, + duration: int | None, + ) -> None: + """Background consumer loop running in separate thread.""" + connection = None + try: + parameters = self._get_connection_parameters() + connection = pika.BlockingConnection(parameters) + channel = connection.channel() + + with self._lock: + self._state.connection = connection + self._state.channel = channel + + target_queue = queue_name or self.settings.queue_name + + # Auto-discover if enabled + if self.settings.auto_discover and not target_queue: + discovered = self._discover_queues_with_messages() + if discovered: + target_queue = discovered[0] + logger.info(f"Auto-discovered queue: {target_queue}") + + if target_queue: + self._state.queue_name = target_queue + logger.info(f"Consuming from queue: {target_queue}") + else: + # Create exclusive queue and bind to exchange + result = channel.queue_declare(queue="", exclusive=True) + self._state.queue_name = result.method.queue + + rk = routing_key or self.settings.routing_key + channel.queue_bind( + exchange=self.settings.exchange_name, + queue=self._state.queue_name, + routing_key=rk, + ) + logger.info(f"Bound to exchange '{self.settings.exchange_name}' with routing key '{rk}'") + + channel.basic_qos(prefetch_count=1) + channel.basic_consume( + queue=self._state.queue_name, + on_message_callback=self._on_message, + ) + + self._state.consuming = True + logger.info("AMQP consumer started") + + start_time = time.time() + while not self._state.stop_event.is_set(): + # Check duration limit + if duration and (time.time() - start_time) >= duration: + logger.info(f"AMQP monitor duration ({duration}s) reached") + break + + # Process pending events with short timeout + connection.process_data_events(time_limit=1) + + except AMQPConnectionError as e: + logger.error(f"AMQP connection error: {e}") + self._state.error = str(e) + except ChannelClosedByBroker as e: + logger.error(f"Channel closed by broker: {e}") + self._state.error = str(e) + except Exception as e: + logger.error(f"AMQP consumer error: {e}") + self._state.error = str(e) + finally: + self._state.consuming = False + if connection and connection.is_open: + try: + connection.close() + except Exception: + pass + logger.info("AMQP consumer stopped") + + @scenario_step("Start AMQP Queue Monitor") + async def start_queue_monitor( + self, + queue_name: str | None = None, + routing_key: str | None = None, + duration: int | None = None, + ) -> dict[str, Any]: + """Start monitoring an AMQP queue for messages. + + This step starts a background consumer that collects messages from the + specified queue (or creates an exclusive queue bound to the exchange). + + Args: + queue_name: Specific queue to monitor. If not provided, creates an + exclusive queue bound to the exchange with the routing key. + routing_key: Flight declaration ID to filter messages, or '#' for all. + duration: Optional duration in seconds to monitor. If not set, + runs until stop_queue_monitor is called. + + Returns: + Dictionary with monitoring status and queue name. + """ + # Clear previous state + self._state.messages.clear() + self._state.error = None + self._state.stop_event.clear() + + # Start consumer in background thread + self._state.consumer_thread = threading.Thread( + target=self._consumer_loop, + args=(queue_name, routing_key, duration), + daemon=True, + ) + self._state.consumer_thread.start() + + # Wait briefly for connection to establish + await asyncio.sleep(0.5) + + if self._state.error: + raise RuntimeError(f"Failed to start AMQP monitor: {self._state.error}") + + return { + "status": "started", + "queue_name": self._state.queue_name or "pending", + "exchange": self.settings.exchange_name, + "routing_key": routing_key or self.settings.routing_key, + "duration": duration, + } + + @scenario_step("Stop AMQP Queue Monitor") + async def stop_queue_monitor(self) -> dict[str, Any]: + """Stop the AMQP queue monitor. + + Returns: + Dictionary with final status and message count. + """ + if not self._state.consumer_thread: + return {"status": "not_running", "message_count": 0} + + # Signal thread to stop + self._state.stop_event.set() + + # Wait for thread to finish (with timeout) + self._state.consumer_thread.join(timeout=5.0) + + message_count = len(self._state.messages) + logger.info(f"AMQP monitor stopped, collected {message_count} messages") + + return { + "status": "stopped", + "queue_name": self._state.queue_name, + "message_count": message_count, + "error": self._state.error, + } + + @scenario_step("Get AMQP Messages") + async def get_received_messages( + self, + routing_key_filter: str | None = None, + limit: int | None = None, + ) -> list[dict[str, Any]]: + """Get messages received by the AMQP monitor. + + Args: + routing_key_filter: Optional flight declaration ID to filter messages. + limit: Maximum number of messages to return. + + Returns: + List of message dictionaries with body, routing_key, etc. + """ + with self._lock: + messages = self._state.messages.copy() + + # Apply routing key filter + if routing_key_filter: + # Simple pattern matching for AMQP-style wildcards + import fnmatch + + pattern = routing_key_filter.replace("#", "*").replace(".", "\\.") + messages = [m for m in messages if fnmatch.fnmatch(m.routing_key, pattern)] + + # Apply limit + if limit: + messages = messages[:limit] + + return [m.to_dict() for m in messages] + + @scenario_step("Wait for AMQP Messages") + async def wait_for_messages( + self, + count: int = 1, + timeout: int = 30, + routing_key_filter: str | None = None, + ) -> dict[str, Any]: + """Wait for a specific number of messages to be received. + + Args: + count: Number of messages to wait for. + timeout: Maximum time to wait in seconds. + routing_key_filter: Optional flight declaration ID to filter messages. + + Returns: + Dictionary with success status and messages. + """ + start_time = time.time() + + while (time.time() - start_time) < timeout: + messages = await self.get_received_messages(routing_key_filter=routing_key_filter) + if len(messages) >= count: + return { + "success": True, + "message_count": len(messages), + "messages": messages[:count], + "waited_seconds": time.time() - start_time, + } + await asyncio.sleep(0.5) + + # Timeout reached + messages = await self.get_received_messages(routing_key_filter=routing_key_filter) + return { + "success": False, + "message_count": len(messages), + "messages": messages, + "timeout": timeout, + "error": f"Timed out waiting for {count} messages, got {len(messages)}", + } + + @scenario_step("Clear AMQP Messages") + async def clear_messages(self) -> dict[str, Any]: + """Clear the collected messages buffer. + + Returns: + Dictionary with number of messages cleared. + """ + with self._lock: + count = len(self._state.messages) + self._state.messages.clear() + + logger.info(f"Cleared {count} AMQP messages") + return {"cleared_count": count} + + @scenario_step("Check AMQP Connection") + async def check_connection(self) -> dict[str, Any]: + """Check if AMQP connection can be established. + + Raises: + RuntimeError: If connection cannot be established. + + Returns: + Dictionary with connection status and server info. + """ + try: + parameters = self._get_connection_parameters() + connection = pika.BlockingConnection(parameters) + + # Get server version info if available + version = "unknown" + try: + # Access internal implementation for server properties + if hasattr(connection, "_impl") and hasattr(connection._impl, "server_properties"): + server_props = connection._impl.server_properties + version_bytes = server_props.get("version", b"unknown") + if isinstance(version_bytes, bytes): + version = version_bytes.decode("utf-8") + elif isinstance(version_bytes, str): + version = version_bytes + except (AttributeError, KeyError): + pass + + connection.close() + + return { + "connected": True, + "server_version": version, + "url_host": urlparse(self.settings.url).hostname, + } + except (AMQPConnectionError, ChannelClosedByBroker) as e: + raise RuntimeError(f"AMQP connection failed: {e}") from e + except ValueError as e: + # Handle configuration errors (e.g., missing URL) + raise RuntimeError(f"AMQP configuration error: {e}") from e + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + # Stop consumer if running + if self._state.consumer_thread and self._state.consumer_thread.is_alive(): + self._state.stop_event.set() + self._state.consumer_thread.join(timeout=2.0) diff --git a/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py b/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py index 8ed772b..fc5783c 100644 --- a/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py +++ b/src/openutm_verification/core/clients/flight_blender/flight_blender_client.py @@ -244,20 +244,13 @@ async def upload_flight_declaration(self, declaration: str | BaseModel) -> dict[ logger.debug("Uploading flight declaration from model") flight_declaration = declaration.model_dump(mode="json") - # Adjust datetimes to current time + offsets - now = arrow.now() - few_seconds_from_now = now.shift(seconds=5) - four_minutes_from_now = now.shift(minutes=4) - - flight_declaration["start_datetime"] = few_seconds_from_now.isoformat() - flight_declaration["end_datetime"] = four_minutes_from_now.isoformat() - response = await self.post(endpoint, json=flight_declaration) logger.info(f"Flight declaration upload response: {response.status_code}") response_json = response.json() if not response_json.get("is_approved"): + logger.error(f"Flight declaration not approved. Full Response: {json.dumps(response_json, indent=2)}") logger.error(f"Flight declaration not approved. State: {OperationState(response_json.get('state')).name}") raise FlightBlenderError(f"Flight declaration not approved. State: {OperationState(response_json.get('state')).name}") # Store latest declaration id for later use @@ -303,13 +296,7 @@ async def upload_flight_declaration_via_operational_intent(self, declaration: st logger.debug("Uploading flight declaration from model") flight_declaration = declaration.model_dump(mode="json") - # Adjust datetimes to current time + offsets - now = arrow.now() - few_seconds_from_now = now.shift(seconds=5) - four_minutes_from_now = now.shift(minutes=4) - - flight_declaration["start_datetime"] = few_seconds_from_now.isoformat() - flight_declaration["end_datetime"] = four_minutes_from_now.isoformat() + # Use provided datetimes from the declaration response = await self.post(endpoint, json=flight_declaration) logger.info(f"Flight declaration upload response: {response.status_code}") @@ -398,6 +385,7 @@ async def _submit_telemetry_states_impl(self, states: list[RIDAircraftState], du Raises: FlightBlenderError: If maximum waiting time is exceeded due to rate limits. + asyncio.CancelledError: If the task is cancelled (propagated, not caught). """ duration_seconds = parse_duration(duration) endpoint = "/flight_stream/set_telemetry" @@ -412,32 +400,36 @@ async def _submit_telemetry_states_impl(self, states: list[RIDAircraftState], du billable_time_elapsed = 0.0 sleep_interval = 1.0 logger.info(f"Starting telemetry submission for {len(states)} states") - for i, state in enumerate(states): - if duration_seconds and billable_time_elapsed >= duration_seconds: - logger.info(f"Telemetry submission duration of {duration_seconds} seconds has passed.") - break - - payload = { - "observations": [ - { - "current_states": [state], - "flight_details": asdict(rid_operator_details), - } - ] - } - response = await self.put(endpoint, json=payload, silent_status=[400]) - request_duration = response.elapsed.total_seconds() - if response.status_code == 201: - logger.info(f"Telemetry point {i + 1} submitted, sleeping {sleep_interval} seconds... {billable_time_elapsed:.2f}s elapsed") - billable_time_elapsed += request_duration + sleep_interval - else: - logger.warning(f"{response.status_code} {response.json()}") - waiting_time_elapsed += request_duration + sleep_interval - if waiting_time_elapsed >= maximum_waiting_time + sleep_interval: - logger.error(f"Maximum waiting time of {maximum_waiting_time} seconds exceeded.") - raise FlightBlenderError(f"Maximum waiting time of {maximum_waiting_time} seconds exceeded.") - last_response = response.json() - await asyncio.sleep(sleep_interval) + try: + for i, state in enumerate(states): + if duration_seconds and billable_time_elapsed >= duration_seconds: + logger.info(f"Telemetry submission duration of {duration_seconds} seconds has passed.") + break + + payload = { + "observations": [ + { + "current_states": [state], + "flight_details": asdict(rid_operator_details), + } + ] + } + response = await self.put(endpoint, json=payload, silent_status=[400]) + request_duration = response.elapsed.total_seconds() + if response.status_code == 201: + logger.info(f"Telemetry point {i + 1} submitted, sleeping {sleep_interval} seconds... {billable_time_elapsed:.2f}s elapsed") + billable_time_elapsed += request_duration + sleep_interval + else: + logger.warning(f"{response.status_code} {response.json()}") + waiting_time_elapsed += request_duration + sleep_interval + if waiting_time_elapsed >= maximum_waiting_time + sleep_interval: + logger.error(f"Maximum waiting time of {maximum_waiting_time} seconds exceeded.") + raise FlightBlenderError(f"Maximum waiting time of {maximum_waiting_time} seconds exceeded.") + last_response = response.json() + await asyncio.sleep(sleep_interval) + except asyncio.CancelledError: + logger.info("Telemetry submission cancelled") + raise logger.info("Telemetry submission completed") return last_response @@ -552,14 +544,69 @@ async def check_operation_state_connected( f"Operation {self.latest_flight_declaration_id} did not reach expected state {expected_state.name} within {duration_seconds} seconds" ) + @scenario_step("Cleanup Flight Declarations") + async def cleanup_flight_declarations(self) -> dict[str, Any]: + """Specific cleanup for flight declarations in the active volume. + + This method lists all existing flight declarations and deletes them one by one. + This is useful for cleaning up 'zombie' declarations from previous failed runs. + """ + endpoint = "/flight_declaration_ops/flight_declaration" + logger.info("Cleaning up existing flight declarations...") + + try: + response = await self.get(endpoint) + if response.status_code != 200: + logger.warning(f"Failed to list flight declarations: {response.status_code}") + return {"cleaned": False, "reason": "List failed"} + + data = response.json() + # Handle pagination + if isinstance(data, dict) and "results" in data: + declarations = data["results"] + elif isinstance(data, list): + declarations = data + else: + logger.warning("Unexpected response format for flight declaration list") + return {"cleaned": False, "reason": "Invalid format"} + + logger.info(f"Found {len(declarations)} existing flight declarations") + deleted_count = 0 + + for declaration in declarations: + dec_id = declaration.get("id") + if dec_id: + logger.debug(f"Deleting cleanup target: {dec_id}") + # Direct delete to avoid nested steps and cluttering reports + cleanup_endpoint = f"/flight_declaration_ops/flight_declaration/{dec_id}/delete" + delete_response = await self.delete(cleanup_endpoint) + if delete_response.status_code == 204: + deleted_count += 1 + else: + logger.warning(f"Failed to delete {dec_id}: {delete_response.status_code}") + + logger.info(f"Cleanup complete. Deleted {deleted_count} declarations.") + # Yield for backend consistency + if deleted_count > 0: + await asyncio.sleep(2) + return {"cleaned": True, "count": deleted_count} + + except Exception as e: + logger.error(f"Error during flight declaration cleanup: {e}") + return {"cleaned": False, "error": str(e)} + @scenario_step("Delete Flight Declaration") - async def delete_flight_declaration(self) -> dict[str, Any]: + async def delete_flight_declaration(self, flight_declaration_id: str | None = None) -> dict[str, Any]: """Delete a flight declaration by ID. + Args: + flight_declaration_id: Optional ID of the flight declaration to delete. If not provided, + uses the latest uploaded flight declaration ID. + Returns: A dictionary with deletion status, including whether it was successful. """ - op_id = self.latest_flight_declaration_id + op_id = flight_declaration_id or self.latest_flight_declaration_id if not op_id: logger.warning("No flight declaration ID available for deletion") return { @@ -587,8 +634,27 @@ async def submit_simulated_air_traffic( self, observations: list[list[FlightObservationSchema]], single_or_multiple_sensors: str = "single", - ) -> bool: - now = arrow.now() + ) -> dict[str, Any]: + """Submit simulated air traffic observations to the Flight Blender API. + + Plays back observations in real-time, submitting one observation per aircraft per second. + + Args: + observations: List of observation lists, one per aircraft. + single_or_multiple_sensors: Whether to use single or multiple sensor IDs. + + Returns: + Dictionary with submission statistics. + """ + if not observations: + logger.warning("No air traffic observations to submit.") + return { + "success": True, + "aircraft_count": 0, + "observations_submitted": 0, + "duration_seconds": 0, + } + number_of_aircraft = len(observations) logger.debug(f"Submitting simulated air traffic for {number_of_aircraft} aircraft") @@ -603,11 +669,24 @@ async def submit_simulated_air_traffic( continue start_times.append(arrow.get(aircraft_obs[0].timestamp)) end_times.append(arrow.get(aircraft_obs[-1].timestamp)) + + if not start_times: + logger.warning("No valid start/end times found in observations.") + return { + "success": True, + "aircraft_count": number_of_aircraft, + "observations_submitted": 0, + "duration_seconds": 0, + "warning": "No valid start/end times found in observations", + } + simulation_start = min(start_times) simulation_end = max(end_times) now = arrow.now() start_time = now + observations_submitted = 0 + submission_errors = 0 current_simulation_time = simulation_start # Loop through the simulation time from start to end, advancing by 1 second each iteration @@ -635,12 +714,26 @@ async def submit_simulated_air_traffic( payload = {"observations": [obs.model_dump(mode="json") for obs in filtered_observation]} ScenarioContext.add_air_traffic_data(filtered_observation) - response = await self.post(endpoint, json=payload) - logger.debug(f"Air traffic submission response: {response.text}") - logger.info(f"Observations submitted for aircraft {filtered_observation[0].icao_address} at time {current_simulation_time}") + try: + response = await self.post(endpoint, json=payload) + logger.debug(f"Air traffic submission response: {response.text}") + logger.info(f"Observations submitted for aircraft {filtered_observation[0].icao_address} at time {current_simulation_time}") + observations_submitted += 1 + except Exception as e: + logger.error(f"Failed to submit observation: {e}") + submission_errors += 1 # Advance the simulation time by 1 second current_simulation_time = current_simulation_time.shift(seconds=1) - return True + + duration_seconds = (arrow.now() - start_time).total_seconds() + return { + "success": submission_errors == 0, + "aircraft_count": number_of_aircraft, + "observations_submitted": observations_submitted, + "submission_errors": submission_errors, + "duration_seconds": round(duration_seconds, 2), + "simulation_duration_seconds": (simulation_end - simulation_start).total_seconds(), + } @scenario_step("Submit Air Traffic") async def submit_air_traffic(self, observations: list[FlightObservationSchema]) -> dict[str, Any]: @@ -850,16 +943,29 @@ async def setup_flight_declaration_via_operational_intent( self, flight_declaration_via_operational_intent_path: str, trajectory_path: str, - ) -> None: - """Generates data and uploads flight declaration via Operational Intent.""" + ) -> dict[str, Any]: + """Generates data and uploads flight declaration via Operational Intent. + + Returns: + Dictionary with flight declaration info including 'id'. + """ from openutm_verification.scenarios.common import ( generate_flight_declaration_via_operational_intent, generate_telemetry, ) + # Synchronize start times + now = arrow.now() + start_time = now.shift(seconds=2) + end_time = now.shift(minutes=60) + flight_declaration = generate_flight_declaration_via_operational_intent(flight_declaration_via_operational_intent_path) - telemetry_states = generate_telemetry(trajectory_path) + # Update flight declaration times to match synchronized start time + flight_declaration.start_datetime = start_time.isoformat() + flight_declaration.end_datetime = end_time.isoformat() + + telemetry_states = generate_telemetry(trajectory_path, reference_time=start_time.isoformat()) self.telemetry_states = telemetry_states @@ -873,13 +979,24 @@ async def setup_flight_declaration_via_operational_intent( logger.error(f"Flight declaration upload failed: {upload_result}") raise FlightBlenderError("Failed to upload flight declaration during setup_flight_declaration_via_operational_intent") + # Return flight declaration info for use in subsequent steps + return { + "id": self.latest_flight_declaration_id, + "start_datetime": flight_declaration.start_datetime, + "end_datetime": flight_declaration.end_datetime, + } + @scenario_step("Setup Flight Declaration") async def setup_flight_declaration( self, flight_declaration_path: str | None = None, trajectory_path: str | None = None, - ) -> None: - """Generates data and uploads flight declaration.""" + ) -> dict[str, Any]: + """Generates data and uploads flight declaration. + + Returns: + Dictionary with flight declaration info including 'id'. + """ from openutm_verification.scenarios.common import ( generate_flight_declaration, @@ -896,8 +1013,17 @@ async def setup_flight_declaration( if not trajectory_path: raise ValueError("trajectory_path not provided and not found in config") + # Synchronize start times + now = arrow.now() + start_time = now.shift(seconds=2) + end_time = now.shift(minutes=60) + flight_declaration = generate_flight_declaration(flight_declaration_path) - telemetry_states = generate_telemetry(trajectory_path) + # Update flight declaration times to match synchronized start time + flight_declaration.start_datetime = start_time.isoformat() + flight_declaration.end_datetime = end_time.isoformat() + + telemetry_states = generate_telemetry(trajectory_path, reference_time=start_time.isoformat()) self.telemetry_states = telemetry_states @@ -911,12 +1037,21 @@ async def setup_flight_declaration( logger.error(f"Flight declaration upload failed: {upload_result}") raise FlightBlenderError("Failed to upload flight declaration during setup_flight_declaration") + # Return flight declaration info for use in subsequent steps + return { + "id": self.latest_flight_declaration_id, + "start_datetime": flight_declaration.start_datetime, + "end_datetime": flight_declaration.end_datetime, + } + @asynccontextmanager async def create_flight_declaration(self): """Context manager to setup and teardown a flight operation based on scenario config.""" assert self.flight_declaration_path is not None, "Flight declaration file path must be provided" assert self.trajectory_path is not None, "Trajectory file path must be provided" - await self.setup_flight_declaration(self.flight_declaration_path, self.trajectory_path) + result = await self.setup_flight_declaration(self.flight_declaration_path, self.trajectory_path) + if result.status == Status.FAIL: + raise FlightBlenderError(f"Setup Flight Declaration failed: {result.error_message}") try: yield finally: diff --git a/src/openutm_verification/core/clients/opensky/opensky_client.py b/src/openutm_verification/core/clients/opensky/opensky_client.py index 907f280..0519d11 100644 --- a/src/openutm_verification/core/clients/opensky/opensky_client.py +++ b/src/openutm_verification/core/clients/opensky/opensky_client.py @@ -76,7 +76,8 @@ def process_flight_data(self, flight_df: pd.DataFrame) -> list[FlightObservation """Process flight DataFrame into observation format.""" observations = [] for _, row in flight_df.iterrows(): - altitude = 0.0 if row["baro_altitude"] == "No Data" else row["baro_altitude"] + # OpenSky baro_altitude is in meters, convert to millimeters + altitude_m = 0.0 if row["baro_altitude"] == "No Data" else float(row["baro_altitude"]) # Create observation using Pydantic model observation = FlightObservationSchema( @@ -86,7 +87,7 @@ def process_flight_data(self, flight_df: pd.DataFrame) -> list[FlightObservation source_type=1, # Aircraft lat_dd=float(row["lat"]), lon_dd=float(row["long"]), - altitude_mm=float(altitude), + altitude_mm=altitude_m * 1000, # Convert m -> mm metadata={"velocity": row["velocity"]}, ) observations.append(observation) diff --git a/src/openutm_verification/core/execution/conditions.py b/src/openutm_verification/core/execution/conditions.py index 7cc328c..79ffba3 100644 --- a/src/openutm_verification/core/execution/conditions.py +++ b/src/openutm_verification/core/execution/conditions.py @@ -70,7 +70,7 @@ def _replace_functions(self, condition: str) -> str: """Replace GitHub Actions-style functions.""" # success() - previous step succeeded if "success()" in condition: - result = self.last_step_status == Status.PASS if self.last_step_status else True + result = self.last_step_status in (Status.PASS, Status.RUNNING) if self.last_step_status else True condition = condition.replace("success()", str(result)) # failure() - previous step failed diff --git a/src/openutm_verification/core/execution/config_models.py b/src/openutm_verification/core/execution/config_models.py index f318a6e..6a29c77 100644 --- a/src/openutm_verification/core/execution/config_models.py +++ b/src/openutm_verification/core/execution/config_models.py @@ -54,6 +54,16 @@ class BlueSkyAirTrafficSimulatorSettings(StrictBaseModel): sensor_ids: list[str] = Field(default_factory=list) +class AMQPConfig(StrictBaseModel): + """AMQP/RabbitMQ connection configuration.""" + + url: str = "" # AMQP URL, e.g., amqp://guest:guest@localhost:5672/ + exchange_name: str = "operational_events" + exchange_type: str = "direct" + routing_key: str = "#" # '#' matches all routing keys + queue_name: str = "" # Empty means auto-generate exclusive queue + + class OpenSkyConfig(StrictBaseModel): """OpenSky Network connection details.""" @@ -159,6 +169,7 @@ class AppConfig(StrictBaseModel): opensky: OpenSkyConfig air_traffic_simulator_settings: AirTrafficSimulatorSettings blue_sky_air_traffic_simulator_settings: BlueSkyAirTrafficSimulatorSettings + amqp: AMQPConfig | None = None # Optional AMQP/RabbitMQ configuration data_files: DataFiles suites: dict[str, SuiteConfig] = Field(default_factory=dict) reporting: ReportingConfig diff --git a/src/openutm_verification/core/execution/dependencies.py b/src/openutm_verification/core/execution/dependencies.py index 7539d3c..59ddd44 100644 --- a/src/openutm_verification/core/execution/dependencies.py +++ b/src/openutm_verification/core/execution/dependencies.py @@ -19,6 +19,10 @@ from openutm_verification.core.clients.air_traffic.blue_sky_client import ( BlueSkyClient, ) +from openutm_verification.core.clients.amqp import ( + AMQPClient, + create_amqp_settings, +) from openutm_verification.core.clients.common.common_client import CommonClient from openutm_verification.core.clients.flight_blender.flight_blender_client import ( FlightBlenderClient, @@ -223,3 +227,11 @@ async def bluesky_client() -> AsyncGenerator[BlueSkyClient, None]: settings = create_blue_sky_air_traffic_settings() async with BlueSkyClient(settings) as client: yield client + + +@dependency(AMQPClient) +async def amqp_client() -> AsyncGenerator[AMQPClient, None]: + """Provides an AMQPClient instance for dependency injection.""" + settings = create_amqp_settings() + async with AMQPClient(settings) as client: + yield client diff --git a/src/openutm_verification/core/execution/execution.py b/src/openutm_verification/core/execution/execution.py index e636fea..370f877 100644 --- a/src/openutm_verification/core/execution/execution.py +++ b/src/openutm_verification/core/execution/execution.py @@ -2,7 +2,9 @@ Core execution logic for running verification scenarios. """ +import importlib import json +import pkgutil from datetime import datetime, timezone from pathlib import Path from typing import TYPE_CHECKING @@ -10,6 +12,7 @@ from loguru import logger from pydantic import ValidationError +import openutm_verification.scenarios from openutm_verification.core.clients.air_traffic.base_client import ( AirTrafficError, ) @@ -18,15 +21,29 @@ ) from openutm_verification.core.execution.config_models import AppConfig from openutm_verification.core.execution.dependencies import scenarios -from openutm_verification.core.execution.dependency_resolution import CONTEXT +from openutm_verification.core.execution.dependency_resolution import CONTEXT, call_with_dependencies from openutm_verification.core.execution.scenario_loader import load_yaml_scenario_definition from openutm_verification.core.reporting.reporting import _sanitize_config, create_report_data, generate_reports from openutm_verification.core.reporting.reporting_models import ( ScenarioResult, Status, ) +from openutm_verification.scenarios.registry import SCENARIO_REGISTRY from openutm_verification.utils.paths import get_docs_directory + +def _import_python_scenarios(): + """Import all python scenarios to populate the registry.""" + path = list(openutm_verification.scenarios.__path__) + prefix = openutm_verification.scenarios.__name__ + "." + + for _, name, _ in pkgutil.iter_modules(path, prefix): + try: + importlib.import_module(name) + except Exception as e: + logger.warning(f"Failed to import scenario module {name}: {e}") + + if TYPE_CHECKING: from openutm_verification.server.runner import SessionManager @@ -51,14 +68,37 @@ async def run_verification_scenarios(config: AppConfig, config_path: Path, sessi session_manager = RunnerSessionManager(config_path=str(config_path)) + # Import Python scenarios to populate registry + _import_python_scenarios() + scenario_results = [] for scenario_id in scenarios(): try: # Initialize session with the current context await session_manager.initialize_session() - scenario_def = load_yaml_scenario_definition(scenario_id) - await session_manager.run_scenario(scenario_def) + if scenario_id in SCENARIO_REGISTRY: + logger.info(f"Running Python scenario: {scenario_id}") + wrapper = SCENARIO_REGISTRY[scenario_id]["func"] + # Unwrap to get the original function for dependency injection + func_to_call = getattr(wrapper, "__wrapped__", wrapper) + + # Execute within the session context + # session_manager.initialize_session() already sets up session_context but doesn't enter it + # We need to manually enter the context or use run_scenario logic + # session_context is a ScenarioContext. + # ScenarioContext.__enter__ sets the thread-local state. + + if not session_manager.session_context: + raise RuntimeError("Session context not initialized") + + with session_manager.session_context: + await call_with_dependencies(func_to_call, resolver=session_manager.session_resolver) + + else: + scenario_def = load_yaml_scenario_definition(scenario_id) + await session_manager.run_scenario(scenario_def) + state = session_manager.session_context.state if session_manager.session_context else None steps = state.steps if state else [] failed = any(s.status == Status.FAIL for s in steps) diff --git a/src/openutm_verification/core/execution/scenario_runner.py b/src/openutm_verification/core/execution/scenario_runner.py index a039008..018bb81 100644 --- a/src/openutm_verification/core/execution/scenario_runner.py +++ b/src/openutm_verification/core/execution/scenario_runner.py @@ -2,7 +2,7 @@ import inspect import time import uuid -from asyncio import Queue +from asyncio import CancelledError, Queue from dataclasses import dataclass, field from functools import wraps from pathlib import Path @@ -104,11 +104,26 @@ def __exit__(self, exc_type, exc_val, exc_tb): def add_result(cls, result: StepResult[Any]) -> None: state = _scenario_state.get() if state and state.active: - if result.id and state.step_results.get(result.id): - state.steps.remove(state.step_results[result.id]) + # Remove all existing entries with same ID (handles duplicates from multiple sources) + if result.id: + state.steps = [s for s in state.steps if s.id != result.id] state.steps.append(result) state.added_results.put_nowait(result) + def update_result(self, result: StepResult[Any]) -> None: + """ + Instance method to update result in the bound state. + This is useful for background tasks where context vars might not be available. + Note: This method does not check if state is active, since background tasks + may complete after the scenario context has exited. + """ + if self._state: + # Remove all existing entries with same ID (handles duplicates from multiple sources) + if result.id: + self._state.steps = [s for s in self._state.steps if s.id != result.id] + self._state.steps.append(result) + self._state.added_results.put_nowait(result) + @classmethod def set_flight_declaration_data(cls, data: FlightDeclaration) -> None: state = _scenario_state.get() @@ -264,7 +279,7 @@ def log_filter(record): handler_id = logger.add(lambda msg: captured_logs.append(msg), filter=log_filter, format="{time:HH:mm:ss} | {level} | {message}") - step_result: StepResult[Any] + step_result: StepResult[Any] | None = None try: with logger.contextualize(step_execution_id=step_execution_id): logger.info("-" * 50) @@ -273,6 +288,9 @@ def log_filter(record): try: result = await func(*args, **kwargs) step_result = handle_result(result, start_time) + except CancelledError: + logger.info(f"Step '{step_name}' was cancelled") + raise except Exception as e: step_result = handle_exception(e, start_time) finally: diff --git a/src/openutm_verification/core/reporting/reporting.py b/src/openutm_verification/core/reporting/reporting.py index 13099da..455567e 100644 --- a/src/openutm_verification/core/reporting/reporting.py +++ b/src/openutm_verification/core/reporting/reporting.py @@ -22,23 +22,43 @@ T = TypeVar("T") +def _mask_url_password(url: str) -> str: + """Mask password in URL if present (e.g., amqp://user:password@host).""" + import re + + # Match URLs with credentials: scheme://user:password@host + pattern = r"((?:amqp|amqps|http|https|rabbitmq)://[^:]+:)([^@]+)(@.+)" + match = re.match(pattern, url, re.IGNORECASE) + if match: + return f"{match.group(1)}***MASKED***{match.group(3)}" + return url + + def _sanitize_config(data: Any) -> Any: """ Recursively sanitize sensitive fields in the configuration data. """ sensitive_mask = "***MASKED***" - sensitive_keys = ["client_id", "client_secret", "audience", "scopes"] + sensitive_keys = ["client_id", "client_secret", "audience", "scopes", "password", "token"] + # Keys that may contain URLs with embedded passwords + url_keys = ["url", "amqp_url", "connection_string", "broker_url"] if isinstance(data, dict): sanitized = {} for key, value in data.items(): if key in sensitive_keys: sanitized[key] = sensitive_mask + elif key.lower() in url_keys and isinstance(value, str) and "@" in value: + # URL with potential embedded credentials + sanitized[key] = _mask_url_password(value) else: sanitized[key] = _sanitize_config(value) return sanitized elif isinstance(data, list): return [_sanitize_config(item) for item in data] + elif isinstance(data, str) and "://" in data and "@" in data: + # Standalone URL string with potential credentials + return _mask_url_password(data) else: return data diff --git a/src/openutm_verification/core/reporting/reporting_models.py b/src/openutm_verification/core/reporting/reporting_models.py index 6e6833c..bb829b7 100644 --- a/src/openutm_verification/core/reporting/reporting_models.py +++ b/src/openutm_verification/core/reporting/reporting_models.py @@ -25,6 +25,7 @@ class Status(StrEnum): PASS = "success" FAIL = "failure" RUNNING = "running" + WAITING = "waiting" SKIP = "skipped" diff --git a/src/openutm_verification/core/templates/report_template.html b/src/openutm_verification/core/templates/report_template.html index 475a2a8..bce14b4 100644 --- a/src/openutm_verification/core/templates/report_template.html +++ b/src/openutm_verification/core/templates/report_template.html @@ -24,6 +24,32 @@ .steps-table th, .steps-table td { border: 1px solid #eee; padding: 8px; text-align: left; } .steps-table th { background-color: #fafafa; } pre { background-color: #f1f1f1; padding: 10px; border-radius: 5px; white-space: pre-wrap; word-wrap: break-word; } + .result-wrapper { position: relative; } + .result-collapsed { max-height: 200px; overflow: hidden; } + .result-collapsed::after { + content: ""; + position: absolute; + bottom: 0; + left: 0; + right: 0; + height: 40px; + background: linear-gradient(transparent, #f1f1f1); + pointer-events: none; + } + .toggle-btn { + position: absolute; + top: 5px; + right: 5px; + z-index: 10; + padding: 4px 12px; + font-size: 12px; + background-color: rgba(233, 236, 239, 0.95); + border: 1px solid #ced4da; + border-radius: 4px; + cursor: pointer; + color: #495057; + } + .toggle-btn:hover { background-color: rgba(222, 226, 230, 0.95); }
@@ -63,7 +89,8 @@{{ step.result | tojson(indent=2) }}{{ result_json }}
+
+ {{ result_json }}
+ {% endif %}
+