diff --git a/README.md b/README.md index 112c809..de194a1 100644 --- a/README.md +++ b/README.md @@ -1 +1,65 @@ # Frame-Based Monitoring + + + +![Documentation Status](https://readthedocs.org/projects/lotus-ai/badge/?version=latest) +![PyPI - Python Version](https://img.shields.io/pypi/pyversions/lotus-ai) +![PyPI](https://img.shields.io/pypi/v/lotus-ai) +[![GitHub license](https://img.shields.io/badge/License-MIT-blu.svg)][#license-gh-package] + +[#license-gh-package]: https://lbesson.mit-license.org/ + + + +## Key Concept: Static Frame Condensation +This project focuses on condensing and analyzing frames, initially handling static frames for now. This system aims to streamline data from various sources (e.g., NuScenes dataset) by extracting symbolic representations, which can be applied in autonomous systems and object state prediction. + +## Goals +1. **Condense Frames:** Transform dense, complex data into simplified symbolic representations for easier processing. +2. **Dynamic Frame Condensation:** Build upon static frame condensation to enable real-time data updates and adaptive frame structures. +3. **Symbolic Parsing:** Use a symbolic parser to convert data into human-readable text, enhancing interpretability. + +## Folder Structure + +- **`src/condensing`**: Contains the primary code for condensing static frames. +- **`src/symbolic_parser`**: Houses files for parsing and converting frames into symbolic representations. +- **`dynamic_frame_condensation`**: Future work that will expand upon static frame condensation to enable dynamic, real-time updates. + + +- **Autonomous Systems**: Enhancing decision-making by reducing the volume of static data frames processed, thereby optimizing computational resources. +- **Natural Language Processing (NLP)**: Improving text condensation and summarization tasks by filtering repetitive frame data in symbolic language models. + +The repository contains tools and methods for: +- **Identifying Redundant Frames**: Employing statistical analysis and clustering techniques to detect duplicate or similar frames from static datasets. +- **Optimized Data Structures**: Utilizing efficient data structures that facilitate condensation of static frames, enabling faster processing speeds. +- **Real-World Applications**: With applications in AI, robotics, and NLP, this project serves as a foundation for further research into frame-based communication, decision-making models, and symbolic processing. + +The main goal of this repository is to provide a robust framework for researchers and developers working on frame condensation techniques, with a focus on both efficiency and effectiveness in data handling. + +#Installation +- Clone this repository +git clone https://github.com/aiea-lab/frames.git + +- Navigate to the project directory +cd scripts/nuscenes_main.py + +- Install dependencies +pip install -r requirements.txt + +#Usage +- Import the static frame condensation module +from condensation import StaticFrameCondensor + +- Initialize the condensor with your data +condensor = StaticFrameCondensor(data) + +- Run the condensation process +condensed_data = condensor.condense() + +- Save or analyze the condensed data +save(condensed_data, 'output_file_path') + +#Examples + +#Condensation +Contributions are welcome! Please fork this repository, create a branch, and submit a pull request with your changes. diff --git a/__pycache__/nuscenes_utils.cpython-39.pyc b/__pycache__/nuscenes_utils.cpython-39.pyc deleted file mode 100644 index 230618e..0000000 Binary files a/__pycache__/nuscenes_utils.cpython-39.pyc and /dev/null differ diff --git a/frames/__pycache__/frame_structure.cpython-312.pyc b/frames/__pycache__/frame_structure.cpython-312.pyc deleted file mode 100644 index b90270d..0000000 Binary files a/frames/__pycache__/frame_structure.cpython-312.pyc and /dev/null differ diff --git a/frames/__pycache__/frame_structure.cpython-39.pyc b/frames/__pycache__/frame_structure.cpython-39.pyc deleted file mode 100644 index 3bef880..0000000 Binary files a/frames/__pycache__/frame_structure.cpython-39.pyc and /dev/null differ diff --git a/frames/__pycache__/nuscenes_utils.cpython-312.pyc b/frames/__pycache__/nuscenes_utils.cpython-312.pyc deleted file mode 100644 index 1de431b..0000000 Binary files a/frames/__pycache__/nuscenes_utils.cpython-312.pyc and /dev/null differ diff --git a/frames/__pycache__/nuscenes_utils.cpython-39.pyc b/frames/__pycache__/nuscenes_utils.cpython-39.pyc deleted file mode 100644 index 0c28999..0000000 Binary files a/frames/__pycache__/nuscenes_utils.cpython-39.pyc and /dev/null differ diff --git a/frames/__pycache__/test_frame_structure.cpython-39.pyc b/frames/__pycache__/test_frame_structure.cpython-39.pyc deleted file mode 100644 index c71553b..0000000 Binary files a/frames/__pycache__/test_frame_structure.cpython-39.pyc and /dev/null differ diff --git a/frames/condense_frames.py b/frames/condense_frames.py deleted file mode 100644 index a6f57fa..0000000 --- a/frames/condense_frames.py +++ /dev/null @@ -1,114 +0,0 @@ -import numpy as np - -class FrameCondenser: - """ - Class to handle the process of condensing multiple frames into a single scene. - """ - - def __init__(self): - pass - - def condense_frames(self, frames): - """ - Condenses a list of frames into a single scene. - - Args: - frames (list): List of frame objects with coordinates, status, and annotations. - - Returns: - dict: A dictionary representing the condensed scene with aggregated data. - """ - if not frames: - raise ValueError("No frames provided for condensing.") - - # Initialize scene attributes - scene_coords = [0.0, 0.0, 0.0] - scene_status = None - merged_annotations = [] - - # Calculate average coordinates and status - for frame in frames: - # Sum frame coordinates - scene_coords[0] += frame.coordinates[0] - scene_coords[1] += frame.coordinates[1] - scene_coords[2] += frame.coordinates[2] - - # Determine the overall status - if scene_status is None: - scene_status = frame.status - elif scene_status != frame.status: - scene_status = 'MIXED' - - # Average the coordinates - num_frames = len(frames) - scene_coords = [coord / num_frames for coord in scene_coords] - - # Merge object annotations - if frames[0].annotations: - merged_annotations = self._merge_annotations(frames) - - # Construct the final condensed scene - condensed_scene = { - 'coordinates': scene_coords, - 'status': scene_status, - 'annotations': merged_annotations - } - - return condensed_scene - - def _merge_annotations(self, frames): - """ - Merges annotations from multiple frames into a single list of annotations. - - Args: - frames (list): List of frame objects with annotations. - - Returns: - list: Merged annotations for the condensed scene. - """ - merged_annotations = [] - - # Start with annotations from the first frame - base_annotations = frames[0].annotations - - for base_obj in base_annotations: - matching_objs = [base_obj] - - - for frame in frames[1:]: - for other_obj in frame.annotations: - if other_obj.category == base_obj.category: - matching_objs.append(other_obj) - break - - # Calculate average attributes for the merged object - avg_location = np.mean( - [[obj.location.x, obj.location.y, obj.location.z] for obj in matching_objs], axis=0 - ) - avg_size = np.mean([obj.size for obj in matching_objs], axis=0) - avg_rotation = np.mean([obj.rotation for obj in matching_objs], axis=0) - - # Handle velocity, replacing NaN with 0 - avg_velocity = np.nan_to_num( - [np.nanmean([obj.velocity[i] for obj in matching_objs]) for i in range(3)] - ) - - # Sum lidar and radar points - total_lidar_pts = sum(obj.num_lidar_pts for obj in matching_objs) - total_radar_pts = sum(obj.num_radar_pts for obj in matching_objs) - - # Create the merged object annotation - - - merged_obj = { - 'category': base_obj.category, - 'location': avg_location, - 'size': avg_size, - 'rotation': avg_rotation, - 'velocity': avg_velocity, - 'num_lidar_pts': total_lidar_pts, - 'num_radar_pts': total_radar_pts - } - merged_annotations.append(merged_obj) - - return merged_annotations diff --git a/frames/frame_structure.py b/frames/frame_structure.py deleted file mode 100644 index 2515384..0000000 --- a/frames/frame_structure.py +++ /dev/null @@ -1,133 +0,0 @@ -from typing import List, Union, Tuple -from enum import Enum -from datetime import datetime - -class Status(Enum): - Active = 'Active' - Idle = 'Idle' - Operational = 'Operational' - Malfunctioning = 'Malfunctioning' - InProgress = 'InProgress' - Completed = 'Completed' - Aborted = 'Aborted' - Connected = 'Connected' - Disconnected = 'Disconnected' - Planned = 'Planned' - Minor = 'Minor' - Major = 'Major' - Critical = 'Critical' - Collision = 'Collision' - ObstacleDetected = 'ObstacleDetected' - LaneDeparture = 'LaneDeparture' - Object = 'Object' - Event = 'Event' - Camera = 'Camera' - LiDAR = 'LiDAR' - Radar = 'Radar' - Image = 'Image' - PointCloud = 'PointCloud' - RadarSignal = 'RadarSignal' - Info = 'Info' - Warning = 'Warning' - LogError = 'LogError' - SensorReading = 'SensorReading' - LogMessage = 'LogMessage' - ExternalReport = 'ExternalReport' - -class Coordinate: - def __init__(self, latitude: float, longitude: float): - self.latitude = latitude - self.longitude = longitude - - def __repr__(self): - return f"Coordinate(latitude={self.latitude}, longitude={self.longitude})" - - -class Frame: - def __init__(self, frame_name: str, timestamp: datetime, status: Status, coordinates: Coordinate): - self.frame_name = frame_name - self.timestamp = timestamp - self.status = status - self.coordinates = coordinates - self.data = {} - - - def add_data(self, data_type: Status, data: Union[str, List, dict]): - if data_type not in self.data: - self.data[data_type] = [] - self.data[data_type].append(data) - - def get_data(self, data_type: Status): - return self.data.get(data_type, []) - - def check_missing_data(self, required_data_types: List[Status]) -> List[Status]: - missing_data = [data_type for data_type in required_data_types if data_type not in self.data or not self.data[data_type]] - return missing_data - - def __repr__(self): - return f"Frame(name={self.frame_name}, timestamp={self.timestamp}, status={self.status}, coordinates=({self.coordinates.latitude}, {self.coordinates.longitude}))" - -class FrameManager: - def __init__(self): - self.frames = [] - - def create_frame(self, frame_name: str, timestamp: datetime, status: Status, coordinates: Coordinate): - frame = Frame(frame_name, timestamp, status, coordinates) - self.frames.append(frame) - return frame - - def get_frame(self, frame_name: str) -> Union[Frame, None]: - for frame in self.frames: - if frame.frame_name == frame_name: - return frame - return None - - def get_all_frames(self) -> List[Frame]: - return self.frames - - def update_frame_status(self, frame_name: str, new_status: Status): - frame = self.get_frame(frame_name) - if frame: - frame.status = new_status - - def add_frame_data(self, frame_name: str, data_type: Status, data: Union[str, List, dict]): - frame = self.get_frame(frame_name) - if frame: - frame.add_data(data_type, data) - - def get_frame_data(self, frame_name: str, data_type: Status): - frame = self.get_frame(frame_name) - if frame: - return frame.get_data(data_type) - return [] - - -if __name__ == "__main__": - # Create a frame manager - frame_manager = FrameManager() - - # Create a coordinate - coord = Coordinate(latitude=37.7749, longitude=-122.4194) - - # Create a frame - frame = frame_manager.create_frame( - frame_name="Frame1", - timestamp=datetime.now(), - status=Status.Active, - coordinates=coord - ) - - # Add some data to the frame - frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "image_data"}) - frame_manager.add_frame_data("Frame1", Status.SensorReading, {"sensor": "LiDAR", "reading": [1, 2, 3]}) - - # Retrieve and print frame data - camera_data = frame_manager.get_frame_data("Frame1", Status.Camera) - sensor_data = frame_manager.get_frame_data("Frame1", Status.SensorReading) - print(f"Camera Data: {camera_data}") - print(f"Sensor Data: {sensor_data}") - - # Print all frames - print(f"All Frames: {frame_manager.get_all_frames()}") - - \ No newline at end of file diff --git a/frames/main.py b/frames/main.py deleted file mode 100644 index aa6569f..0000000 --- a/frames/main.py +++ /dev/null @@ -1,40 +0,0 @@ -from frame_structure import FrameManager -from nuscenes_utils import initialize_nuscenes, create_frame_from_sample -# from symbolic_parser import symbolicParser - - -dataroot = '/Users/ananya/Documents/frames/frames/v1.0-mini' -version = 'v1.0-mini' # or 'v1.0-trainval' for the full dataset -nusc = initialize_nuscenes(version, dataroot) - -frame_manager = FrameManager() - -scene = nusc.scene[0] -first_sample_token = scene['first_sample_token'] -sample = nusc.get('sample', first_sample_token) - -frame_1 = create_frame_from_sample(nusc, sample, frame_manager) - -next_sample_token = sample['next'] # Assuming the sample has a 'next' token -if next_sample_token: - next_sample = nusc.get('sample', next_sample_token) - frame_2 = create_frame_from_sample(nusc, next_sample, frame_manager) -else: - frame_2 = None # In case there is no next sample - -# parser = symbolicParser() - -# print("Symbolic Representation for Frame 1:") -# symbolic_rep_1 = parser.parse_frame(frame_1) -# for key, value in symbolic_rep_1.items(): -# print(f"{key.name}: {value}") - -# print("Symbolic Representation for Frame 1:") -# symbolic_rep_1 = parser.parse_frame(frame_1) -# for key, value in symbolic_rep_1.items(): -# print(f"{key.name}: {value}") - -# Output both frames -print("Frame 1:", frame_1) -print("Frame 2:", frame_2) - diff --git a/frames/nuscenes_utils.py b/frames/nuscenes_utils.py deleted file mode 100644 index 96abb9f..0000000 --- a/frames/nuscenes_utils.py +++ /dev/null @@ -1,48 +0,0 @@ -from nuscenes.nuscenes import NuScenes -from nuscenes.utils.data_classes import RadarPointCloud, LidarPointCloud -from datetime import datetime -import os - -from frame_structure import Status, Coordinate, FrameManager - -def initialize_nuscenes(version: str, dataroot: str): - return NuScenes(version=version, dataroot=dataroot, verbose=True) - -def extract_timestamp(sample): - return datetime.utcfromtimestamp(sample['timestamp'] / 1e6) - -def create_frame_from_sample(nusc, sample, frame_manager): - frame_name = sample['token'] - timestamp = extract_timestamp(sample) - status = Status.Active # Example status - - # Assuming these coordinates are placeholders; real coordinates need to be sourced correctly - coordinates = Coordinate( - latitude=0.0, - longitude=0.0 - ) - - frame = frame_manager.create_frame( - frame_name=frame_name, - timestamp=timestamp, - status=status, - coordinates=coordinates - ) - - for sensor_name in ['CAM_FRONT', 'CAM_BACK', 'CAM_FRONT_LEFT', 'CAM_FRONT_RIGHT', 'CAM_BACK_LEFT', 'CAM_BACK_RIGHT']: - sensor_token = sample['data'][sensor_name] - sensor_data = nusc.get('sample_data', sensor_token) - frame_manager.add_frame_data(frame_name, Status.Camera, sensor_data) - - lidar_token = sample['data']['LIDAR_TOP'] - lidar_data = nusc.get('sample_data', lidar_token) - lidar_pointcloud = LidarPointCloud.from_file(os.path.join(nusc.dataroot, lidar_data['filename'])) - frame_manager.add_frame_data(frame_name, Status.SensorReading, lidar_pointcloud.points) - - for radar_name in ['RADAR_FRONT', 'RADAR_FRONT_LEFT', 'RADAR_FRONT_RIGHT', 'RADAR_BACK_LEFT', 'RADAR_BACK_RIGHT']: - radar_token = sample['data'][radar_name] - radar_data = nusc.get('sample_data', radar_token) - radar_pointcloud = RadarPointCloud.from_file(os.path.join(nusc.dataroot, radar_data['filename'])) - frame_manager.add_frame_data(frame_name, Status.SensorReading, radar_pointcloud.points) - - return frame \ No newline at end of file diff --git a/frames/readme.md b/frames/readme.md deleted file mode 100644 index 69e0426..0000000 --- a/frames/readme.md +++ /dev/null @@ -1,57 +0,0 @@ -#Frame-Based Monitoring Experiment - -This repository contains the implementation and results of an experiment conducted on the NuScenes dataset, aligning with the objectives outlined in the AFOSR YIP proposal. The experiment leverages a frame-based monitoring system to detect, diagnose, and explain errors in multimodal autonomous systems, particularly focusing on scenarios depicted in the NuScenes dataset. - -Objectives -The primary objective of this experiment is to develop and validate a frame-based monitoring system for multimodal autonomous systems. This work is inspired by the hypothesis that a symbolic, frame-based representation can enhance the precision and explainability of error detection in complex, real-world scenarios. - -Experiment Structure -1. Frame Structure Implementation -Representation: We translated multimodal log data from the NuScenes dataset into a symbolic frame-based representation. -Integration: The representation was integrated with a reasoning engine that compares and diagnoses errors in autonomous driving scenarios. -2. NuScenes Dataset -Initialization: The NuScenes dataset was initialized, and specific samples were selected for analysis. -Visualization: Frames from the dataset were visualized, focusing on sensor data such as camera images and LiDAR point clouds. -3. Experimentation -Setup: A series of experiments were conducted to validate the frame-based monitoring system's effectiveness in real-world autonomous driving scenarios. -Results: The outcomes were analyzed to determine the system's precision, recall, and ability to provide symbolic explanations for detected errors. - -Requirements - -- Python 3.x -- Conda environment with the necessary dependencies - -How to Run the Experiment -Prerequisites -Python 3.x -Conda environment with the following packages: -nuscenes-devkit -matplotlib -opencv-python -Installation -Clone the Repository -Create and Activate the Conda Environment - -Running the Experiment -Initialize the NuScenes Dataset: - -Update the main.py file with the correct path to your NuScenes dataset. - -Run the Frame Visualization Script: - -Execute the script to visualize and diagnose frames: - -View and Analyze Results: - -The results will include visualized frames and diagnostic information, which can be used to analyze the system's performance. -Experiment Results -The experiment successfully demonstrated the feasibility of using a frame-based monitoring system for multimodal autonomous systems. Key results include: - -Improved Precision: The frame-based approach showed higher precision in error detection compared to traditional methods. -Enhanced Explainability: The system provided symbolic explanations that were both intuitive and aligned with human reasoning, increasing user trust. - -Acknowledgments -This work is part of the research supported by the AFOSR Young Investigator Program (YIP). The NuScenes dataset used in this experiment is provided by Motional. - -License -This project is licensed under the MIT License. See the LICENSE file for details. \ No newline at end of file diff --git a/frames/requirements.txt b/frames/requirements.txt deleted file mode 100644 index 7db3b83..0000000 --- a/frames/requirements.txt +++ /dev/null @@ -1 +0,0 @@ -nuscenes-devkit \ No newline at end of file diff --git a/frames/test_frame_structure.py b/frames/test_frame_structure.py deleted file mode 100644 index 3d7c2df..0000000 --- a/frames/test_frame_structure.py +++ /dev/null @@ -1,135 +0,0 @@ -import unittest -from datetime import datetime -from frame_structure import Status, Coordinate, Frame, FrameManager - -class TestFrameStructure(unittest.TestCase): - - def setUp(self): - self.frame_manager = FrameManager() - self.coord = Coordinate(latitude=37.7749, longitude=-122.4194) - self.frame = self.frame_manager.create_frame( - frame_name="Frame1", - timestamp=datetime.now(), - status=Status.Active, - coordinates=self.coord - ) - - def test_create_frame(self): - frame = self.frame_manager.get_frame("Frame1") - self.assertIsNotNone(frame) - self.assertEqual(frame.frame_name, "Frame1") - self.assertEqual(frame.status, Status.Active) - self.assertEqual(frame.coordinates.latitude, 37.7749) - self.assertEqual(frame.coordinates.longitude, -122.4194) - - def test_add_frame_data(self): - self.frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "image_data"}) - self.frame_manager.add_frame_data("Frame1", Status.SensorReading, {"sensor": "LiDAR", "reading": [1, 2, 3]}) - camera_data = self.frame_manager.get_frame_data("Frame1", Status.Camera) - sensor_data = self.frame_manager.get_frame_data("Frame1", Status.SensorReading) - self.assertEqual(len(camera_data), 1) - self.assertEqual(camera_data[0], {"image": "image_data"}) - self.assertEqual(len(sensor_data), 1) - self.assertEqual(sensor_data[0], {"sensor": "LiDAR", "reading": [1, 2, 3]}) - - def test_update_frame_status(self): - self.frame_manager.update_frame_status("Frame1", Status.Completed) - frame = self.frame_manager.get_frame("Frame1") - self.assertEqual(frame.status, Status.Completed) - - def test_get_nonexistent_frame(self): - frame = self.frame_manager.get_frame("NonexistentFrame") - self.assertIsNone(frame) - - def test_get_frame_data_empty(self): - data = self.frame_manager.get_frame_data("Frame1", Status.Warning) - self.assertEqual(data, []) - - def test_get_all_frames(self): - frames = self.frame_manager.get_all_frames() - self.assertEqual(len(frames), 1) - self.assertEqual(frames[0].frame_name, "Frame1") - - def test_check_missing_data(self): - # Test with no data added - required_data_types = [Status.Camera, Status.SensorReading] - missing_data = self.frame.check_missing_data(required_data_types) - self.assertEqual(missing_data, [Status.Camera, Status.SensorReading]) - - # Test with one data type added - self.frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "image_data"}) - missing_data = self.frame.check_missing_data(required_data_types) - self.assertEqual(missing_data, [Status.SensorReading]) - - # Test with all required data added - self.frame_manager.add_frame_data("Frame1", Status.SensorReading, {"sensor": "LiDAR", "reading": [1, 2, 3]}) - missing_data = self.frame.check_missing_data(required_data_types) - self.assertEqual(missing_data, []) - - def test_update_nonexistent_frame(self): - self.frame_manager.update_frame_status("NonexistentFrame", Status.Completed) - frame = self.frame_manager.get_frame("NonexistentFrame") - self.assertIsNone(frame) - - def test_remove_frame_data(self): - self.frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "image_data"}) - frame = self.frame_manager.get_frame("Frame1") - frame.data.pop(Status.Camera, None) - camera_data = self.frame_manager.get_frame_data("Frame1", Status.Camera) - self.assertEqual(camera_data, []) - - # def test_edge_cases_coordinates(self): - # with self.assertRaises(ValueError): - # coord_invalid = Coordinate(latitude=100.0, longitude=190.0) # Invalid coordinates - - def test_frame_manager_initialization_and_cleanup(self): - new_frame_manager = FrameManager() - self.assertEqual(len(new_frame_manager.frames), 0) - new_frame_manager.create_frame( - frame_name="TestFrame", - timestamp=datetime.now(), - status=Status.Active, - coordinates=self.coord - ) - self.assertEqual(len(new_frame_manager.frames), 1) - new_frame_manager.frames.clear() - self.assertEqual(len(new_frame_manager.frames), 0) - - def test_add_data_to_nonexistent_frame(self): - self.frame_manager.add_frame_data("NonexistentFrame", Status.Camera, {"image": "image_data"}) - frame = self.frame_manager.get_frame("NonexistentFrame") - self.assertIsNone(frame) - - # def test_add_invalid_data_type(self): - # with self.assertRaises(TypeError): - # self.frame_manager.add_frame_data("Frame1", "InvalidStatus", {"image": "image_data"}) # Invalid data type - - # def test_frame_with_different_timestamps(self): - # timestamp1 = datetime.now() - # timestamp2 = datetime.now() - # frame1 = self.frame_manager.create_frame("Frame2", timestamp1, Status.Active, self.coord) - # frame2 = self.frame_manager.create_frame("Frame3", timestamp2, Status.Active, self.coord) - # self.assertNotEqual(frame1.timestamp, frame2.timestamp) - - def test_update_frame_data(self): - self.frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "image_data"}) - self.frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "updated_image_data"}) - camera_data = self.frame_manager.get_frame_data("Frame1", Status.Camera) - self.assertEqual(len(camera_data), 2) - self.assertEqual(camera_data[1], {"image": "updated_image_data"}) - - def test_remove_frame(self): - frame_to_remove = self.frame_manager.create_frame("FrameToRemove", datetime.now(), Status.Active, self.coord) - self.frame_manager.frames.remove(frame_to_remove) - frame = self.frame_manager.get_frame("FrameToRemove") - self.assertIsNone(frame) - - def test_retrieve_frames_by_status(self): - self.frame_manager.create_frame("Frame2", datetime.now(), Status.Completed, self.coord) - self.frame_manager.create_frame("Frame3", datetime.now(), Status.Completed, self.coord) - completed_frames = [frame for frame in self.frame_manager.get_all_frames() if frame.status == Status.Completed] - self.assertEqual(len(completed_frames), 2) - - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/frames/validate_condensing.py b/frames/validate_condensing.py deleted file mode 100644 index 5e41278..0000000 --- a/frames/validate_condensing.py +++ /dev/null @@ -1,52 +0,0 @@ -from condense_frames import FrameCondenser -from frame_manager import Frame, Annotation, Coordinate -import numpy as np - -def validate_condensing(): - """ - Validates the frame condensing process with sample data. - """ - # Sample frames data - frame1 = Frame( - coordinates=(608.85, 2011.05, 0.0), - status='ACTIVE', - annotations=[ - Annotation( - category='vehicle.car', - location=Coordinate(640.775, 1970.009, 1.143), - size=[2.0, 4.32, 1.49], - rotation=[0.9227, 0.0, 0.0, -0.3855], - velocity=np.array([np.nan, np.nan, np.nan]), - num_lidar_pts=0, - num_radar_pts=1 - ) - ] - ) - - frame2 = Frame( - coordinates=(610.2, 2012.1, 0.0), - status='ACTIVE', - annotations=[ - Annotation( - category='vehicle.car', - location=Coordinate(625.206, 1972.364, 0.772), - size=[2.126, 4.592, 1.641], - rotation=[0.3673, 0.0, 0.0, 0.9301], - velocity=np.array([-6.88, 6.54, -0.36]), - num_lidar_pts=8, - num_radar_pts=2 - ) - ] - ) - - # Initialize the condenser - condenser = FrameCondenser() - - # Condense frames - condensed_scene = condenser.condense_frames([frame1, frame2]) - - # Display the condensed scene - print("Condensed Scene:", condensed_scene) - -if __name__ == "__main__": - validate_condensing() diff --git a/testCoordinate.py b/testCoordinate.py deleted file mode 100644 index 504bd55..0000000 --- a/testCoordinate.py +++ /dev/null @@ -1,40 +0,0 @@ -import unittest -from datetime import datetime - -class TestMissingData(unittest.TestCase): - def setUp(self): - self.frame_manager = FrameManager() - - def test_create_frame_with_missing_name(self): - coord = Coordinate(37.7749, -122.4194) - with self.assertRaises(TypeError): # Assuming TypeError for missing required positional arguments - frame = self.frame_manager.create_frame(None, datetime.now(), Status.Active, coord) - - def test_create_frame_with_missing_timestamp(self): - coord = Coordinate(37.7749, -122.4194) - with self.assertRaises(TypeError): # Assuming TypeError for missing required positional arguments - frame = self.frame_manager.create_frame("Frame1", None, Status.Active, coord) - - def test_create_frame_with_missing_status(self): - coord = Coordinate(37.7749, -122.4194) - with self.assertRaises(TypeError): # Assuming TypeError for missing required positional arguments - frame = self.frame_manager.create_frame("Frame1", datetime.now(), None, coord) - - def test_create_frame_with_missing_coordinates(self): - with self.assertRaises(TypeError): # Assuming TypeError for missing required positional arguments - frame = self.frame_manager.create_frame("Frame1", datetime.now(), Status.Active, None) - - def test_add_data_to_nonexistent_frame(self): - with self.assertRaises(AttributeError): # Assuming AttributeError for operations on NoneType - self.frame_manager.add_frame_data("NonExistentFrame", Status.Camera, {"image": "image_data"}) - - def test_get_data_from_nonexistent_frame(self): - data = self.frame_manager.get_frame_data("NonExistentFrame", Status.Camera) - self.assertEqual(data, []) - - def test_update_status_of_nonexistent_frame(self): - with self.assertRaises(AttributeError): # Assuming AttributeError for operations on NoneType - self.frame_manager.update_frame_status("NonExistentFrame", Status.Completed) - -if __name__ == "__main__": - unittest.main() \ No newline at end of file diff --git a/test_frame_manager.py b/test_frame_manager.py deleted file mode 100644 index 0c06970..0000000 --- a/test_frame_manager.py +++ /dev/null @@ -1,76 +0,0 @@ -import unittest -from datetime import datetime -from enum import Enum -from typing import List, Union - -# Assuming the classes are imported from your module -from your_module import Status, Coordinate, Frame, FrameManager - -class TestCoordinate(unittest.TestCase): - def test_coordinate_initialization(self): - coord = Coordinate(37.7749, -122.4194) - self.assertEqual(coord.latitude, 37.7749) - self.assertEqual(coord.longitude, -122.4194) - -class TestFrame(unittest.TestCase): - def setUp(self): - self.coord = Coordinate(37.7749, -122.4194) - self.frame = Frame( - frame_name="Frame1", - timestamp=datetime.now(), - status=Status.Active, - coordinates=self.coord - ) - - def test_frame_initialization(self): - self.assertEqual(self.frame.frame_name, "Frame1") - self.assertEqual(self.frame.status, Status.Active) - self.assertEqual(self.frame.coordinates.latitude, 37.7749) - self.assertEqual(self.frame.coordinates.longitude, -122.4194) - - def test_add_data(self): - self.frame.add_data(Status.Camera, {"image": "image_data"}) - self.assertEqual(self.frame.get_data(Status.Camera), [{"image": "image_data"}]) - - def test_get_data(self): - self.frame.add_data(Status.SensorReading, {"sensor": "LiDAR", "reading": [1, 2, 3]}) - data = self.frame.get_data(Status.SensorReading) - self.assertEqual(data, [{"sensor": "LiDAR", "reading": [1, 2, 3]}]) - -class TestFrameManager(unittest.TestCase): - def setUp(self): - self.frame_manager = FrameManager() - self.coord = Coordinate(37.7749, -122.4194) - self.frame = self.frame_manager.create_frame( - frame_name="Frame1", - timestamp=datetime.now(), - status=Status.Active, - coordinates=self.coord - ) - - def test_create_frame(self): - self.assertEqual(len(self.frame_manager.get_all_frames()), 1) - self.assertEqual(self.frame_manager.get_all_frames()[0].frame_name, "Frame1") - - def test_get_frame(self): - frame = self.frame_manager.get_frame("Frame1") - self.assertIsNotNone(frame) - self.assertEqual(frame.frame_name, "Frame1") - - def test_update_frame_status(self): - self.frame_manager.update_frame_status("Frame1", Status.Completed) - frame = self.frame_manager.get_frame("Frame1") - self.assertEqual(frame.status, Status.Completed) - - def test_add_frame_data(self): - self.frame_manager.add_frame_data("Frame1", Status.Camera, {"image": "image_data"}) - data = self.frame_manager.get_frame_data("Frame1", Status.Camera) - self.assertEqual(data, [{"image": "image_data"}]) - - def test_get_frame_data(self): - self.frame_manager.add_frame_data("Frame1", Status.SensorReading, {"sensor": "LiDAR", "reading": [1, 2, 3]}) - data = self.frame_manager.get_frame_data("Frame1", Status.SensorReading) - self.assertEqual(data, [{"sensor": "LiDAR", "reading": [1, 2, 3]}]) - -if __name__ == "__main__": - unittest.main()