diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index d9f0c2c..6df82a6 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -25,4 +25,4 @@ jobs: - name: Run tests run: | pip install pytest - pytest tests/test_robot_arm_trajectory_executor.py -v + pytest tests/test_trajectory_executor.py -v diff --git a/README.md b/README.md index f702996..8045fef 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,16 @@ [![PyPI version](https://img.shields.io/pypi/v/trajectory-executor)](https://pypi.org/project/trajectory-executor/) -Execute time-based joint trajectories for robot arms with rate-limited, thread-safe updates. +A lightweight executor for offline, time-based joint trajectories in robotic applications. +Supports smooth interpolation between waypoints, callback-based control, and optional feedback monitoring. + +## Features + +- Time-parameterized trajectory execution +- Linear interpolation between joint waypoints +- Rate-limited command publishing +- Thread-safe callbacks for sending commands and receiving feedback +- Minimal dependencies, easy to integrate ## Installation @@ -26,18 +35,27 @@ pip install -e . - `numpy` (>=1.20.0) - `loop-rate-limiters` (>=0.1.0) +## Examples + +Explore usage examples integrated with different systems: + +- [Basic example](examples/dummy_example.py) +- [Genesis example](examples/genesis_example.py) +- [Mujoco example](examples/mujoco_example.py) +- [Ruckig example](examples/ruckig_example.py) + ## Usage -The `RobotArmTrajectoryExecutor` class executes joint trajectories for a robot arm with specified degrees of freedom (DOF). It interpolates positions, sends commands via a callback, and supports optional feedback processing. +The `TrajectoryExecutor` class executes joint trajectories for a robot arm with specified degrees of freedom (DOF). It interpolates positions, sends commands via a callback, and supports optional feedback processing. ```python import numpy as np -from trajectory_executor import RobotArmTrajectoryExecutor +from trajectory_executor import TrajectoryExecutor def update_callback(joints: np.ndarray): print(f"Joint command: {joints}") -executor = RobotArmTrajectoryExecutor(dof=3, update_callback=update_callback) +executor = TrajectoryExecutor(dof=3, update_callback=update_callback) points = np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]]) times = np.array([0.0, 1.0]) executor.execute(points, times) diff --git a/examples/dummy_example.py b/examples/dummy_example.py index 204187a..4ddc903 100644 --- a/examples/dummy_example.py +++ b/examples/dummy_example.py @@ -1,6 +1,6 @@ import random import numpy as np -from trajectory_executor import RobotArmTrajectoryExecutor +from trajectory_executor import TrajectoryExecutor current_pos = [0.0, 0.0, 0.0] @@ -20,7 +20,7 @@ def monitor(cmd, feedback, t): print(f"[{t:.2f}s] Error: {error:.4f}") -executor = RobotArmTrajectoryExecutor( +executor = TrajectoryExecutor( dof=3, update_callback=send_command, feedback_callback=dummy_feedback, diff --git a/examples/genesis_example.py b/examples/genesis_example.py index df5ee21..80393ca 100644 --- a/examples/genesis_example.py +++ b/examples/genesis_example.py @@ -5,7 +5,7 @@ import torch from typing import List -from trajectory_executor import RobotArmTrajectoryExecutor +from trajectory_executor import TrajectoryExecutor # === 1. Initialize Genesis === @@ -45,7 +45,7 @@ def monitor(cmd, feedback, t): print(f"[{t:.2f}s] Tracking error: {error:.4f}") -executor = RobotArmTrajectoryExecutor( +executor = TrajectoryExecutor( dof=6, update_callback=send_joint_command, feedback_callback=get_joint_feedback, diff --git a/examples/mujoco_example.py b/examples/mujoco_example.py index f9e50a3..b43ad33 100644 --- a/examples/mujoco_example.py +++ b/examples/mujoco_example.py @@ -5,7 +5,7 @@ import time from typing import List -from trajectory_executor import RobotArmTrajectoryExecutor +from trajectory_executor import TrajectoryExecutor # === 1. Initialize Mujoco === @@ -27,7 +27,7 @@ def monitor(cmd, feedback, t): print(f"[{t:.2f}s] Tracking error: {error:.4f}") -executor = RobotArmTrajectoryExecutor( +executor = TrajectoryExecutor( dof=6, update_callback=send_joint_command, feedback_callback=get_joint_feedback, diff --git a/examples/ruckig_example.py b/examples/ruckig_example.py new file mode 100644 index 0000000..fb857bb --- /dev/null +++ b/examples/ruckig_example.py @@ -0,0 +1,45 @@ +import numpy as np +from ruckig import InputParameter, Ruckig, Trajectory, Result +from trajectory_executor import TrajectoryExecutor + + +# === 1. Initialize Ruckig === +inp = InputParameter(6) +inp.current_position = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0] +inp.target_position = [0.0, -1.01, -1.7, -1.82, 1.45, 0.0] +inp.max_velocity = [3.14, 3.14, 3.14, 3.14, 3.14, 3.14] +inp.max_acceleration = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0] +inp.max_jerk = [4.0, 4.0, 4.0, 4.0, 4.0, 4.0] +inp.minimum_duration = 5.0 + +otg = Ruckig(6) +trajectory = Trajectory(6) + +# === 2. Calculate trajectory === +result = otg.calculate(inp, trajectory) +if result == Result.ErrorInvalidInput: + raise Exception("Invalid input!") + +times = [] +positions = [] + +for new_time in np.linspace(0, trajectory.duration, 10): + # Then, we can calculate the kinematic state at a given time + pos, _, _ = trajectory.at_time(new_time) + times.append(new_time) + positions.append(pos) + + +# === 3. Define callback and trajectory executor === +def update_callback(target_state): + print(f"Target position: {target_state}") + + +executor = TrajectoryExecutor( + dof=trajectory.degrees_of_freedom, + update_callback=update_callback, + loop_rate_hz=50.0, +) + +# === 4. Execute the trajectory === +executor.execute(positions, times) diff --git a/pyproject.toml b/pyproject.toml index 1446f54..9720f73 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta" [project] name = "trajectory-executor" -version = "0.1.1" -description = "Execute time-based joint trajectories for robot arms with rate-limited, thread-safe updates" +version = "0.2.0" +description = "A lightweight executor for offline, time-based joint trajectories in robotic applications." readme = "README.md" requires-python = ">=3.10" authors = [ @@ -45,6 +45,5 @@ dev = [ [project.urls] Homepage = "https://github.com/bxtbold/trajectory-executor" -Documentation = "https://bxtbold.github.io/trajectory-executor" Repository = "https://github.com/bxtbold/trajectory-executor" Issues = "https://github.com/bxtbold/trajectory-executor/issues" diff --git a/tests/test_robot_arm_trajectory_executor.py b/tests/test_trajectory_executor.py similarity index 93% rename from tests/test_robot_arm_trajectory_executor.py rename to tests/test_trajectory_executor.py index 58799dc..50b70a0 100644 --- a/tests/test_robot_arm_trajectory_executor.py +++ b/tests/test_trajectory_executor.py @@ -2,7 +2,7 @@ import numpy as np import time import threading -from trajectory_executor import RobotArmTrajectoryExecutor +from trajectory_executor import TrajectoryExecutor # Mock RateLimiter class @@ -21,7 +21,7 @@ def executor(): def update_callback(joints: np.ndarray): pass # Default no-op callback - return RobotArmTrajectoryExecutor( + return TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) @@ -37,7 +37,7 @@ def feedback_callback() -> np.ndarray: def on_feedback(cmd: np.ndarray, fb: np.ndarray, t: float): pass - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=2, update_callback=update_callback, feedback_callback=feedback_callback, @@ -52,7 +52,7 @@ def on_feedback(cmd: np.ndarray, fb: np.ndarray, t: float): def test_init_no_callbacks(): - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=lambda x: None, feedback_callback=None, on_feedback=None ) assert executor.has_callbacks["update"] is True @@ -104,7 +104,7 @@ def test_execute_single_point(): def update_callback(joints: np.ndarray): commands.append(joints.copy()) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) executor.loop_rate = MockRateLimiter(100.0) @@ -121,7 +121,7 @@ def test_execute_multiple_points(): def update_callback(joints: np.ndarray): commands.append(joints.copy()) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) executor.loop_rate = MockRateLimiter(100.0) @@ -139,7 +139,7 @@ def test_execute_unsorted_times(): def update_callback(joints: np.ndarray): commands.append(joints.copy()) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) executor.loop_rate = MockRateLimiter(100.0) @@ -166,7 +166,7 @@ def on_feedback(cmd: np.ndarray, fb: np.ndarray, t: float): feedbacks.append(fb.copy()) times_recorded.append(t) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, feedback_callback=feedback_callback, @@ -192,7 +192,7 @@ def update_callback(joints: np.ndarray): with lock: commands.append(joints.copy()) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) executor.loop_rate = MockRateLimiter(100.0) @@ -238,10 +238,10 @@ def test_execute_wrong_dof(executor): def test_execute_non_numpy_input(): commands = [] - def update_callback(joints: np.ndarray): - commands.append(joints.copy()) + def update_callback(command: np.ndarray): + commands.append(command.copy()) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) executor.loop_rate = MockRateLimiter(100.0) @@ -260,7 +260,7 @@ def test_execute_rate_limited(): def update_callback(joints: np.ndarray): start_times.append(time.time()) - executor = RobotArmTrajectoryExecutor( + executor = TrajectoryExecutor( dof=3, update_callback=update_callback, loop_rate_hz=100.0 ) executor.loop_rate = MockRateLimiter(100.0) diff --git a/trajectory_executor/__init__.py b/trajectory_executor/__init__.py index 4e4940a..407c086 100644 --- a/trajectory_executor/__init__.py +++ b/trajectory_executor/__init__.py @@ -1 +1 @@ -from .robot_arm import RobotArmTrajectoryExecutor +from .trajectory_executor import TrajectoryExecutor diff --git a/trajectory_executor/robot_arm.py b/trajectory_executor/trajectory_executor.py similarity index 85% rename from trajectory_executor/robot_arm.py rename to trajectory_executor/trajectory_executor.py index c1680b0..38db9b2 100644 --- a/trajectory_executor/robot_arm.py +++ b/trajectory_executor/trajectory_executor.py @@ -5,17 +5,17 @@ from loop_rate_limiters import RateLimiter -class RobotArmTrajectoryExecutor: - """Executes a trajectory for a robot arm with rate-limited updates and thread-safe callbacks. +class TrajectoryExecutor: + """Executes a trajectory with rate-limited updates and thread-safe callbacks. - This class manages the execution of a time-based joint trajectory for a robot arm. It interpolates - joint positions between trajectory points, invokes user-provided callbacks for updates and feedback, + This class manages the execution of a time-based trajectory. It interpolates + positions between trajectory points, invokes user-provided callbacks for updates and feedback, and ensures thread-safe operations using a lock. The execution is rate-limited to a specified frequency. Args: dof (int): Number of degrees of freedom. - update_callback (Callable[[np.ndarray], None]): Function to send joint commands to the robot. - feedback_callback (Optional[Callable[[], np.ndarray]], optional): Function to get joint feedback. + update_callback (Callable[[np.ndarray], None]): Function to send commands to the robot. + feedback_callback (Optional[Callable[[], np.ndarray]], optional): Function to get feedback. Defaults to None. on_feedback (Optional[Callable[[np.ndarray, np.ndarray, float], None]], optional): Function to handle command, feedback, and time. Defaults to None. @@ -65,16 +65,19 @@ def _interpolate( ratio = (t - t0) / (t1 - t0) return q0 + ratio * (q1 - q0) - def execute(self, points: np.ndarray, times: np.ndarray): + def execute( + self, points: np.ndarray, times: np.ndarray, wait_until_done: bool = True + ): """Executes the provided trajectory by interpolating points and invoking callbacks. The trajectory is array of points and correspoding time to reach to the point. The method - interpolates joint positions at the current time, sends commands via the update callback, and handles + interpolates positions at the current time, sends commands via the update callback, and handles feedback if provided. Execution is rate-limited and thread-safe. Args: points (np.ndarray): Trajectory points to be executed. times (np.ndarray): Time each point to be executed. + wait_until_done (bool, optional): If True, waits until the trajectory is fully executed. Returns: None @@ -119,8 +122,9 @@ def execute(self, points: np.ndarray, times: np.ndarray): self.update_callback(target_state) # Handle feedback - if self.has_callbacks["feedback"] and self.has_callbacks["on_feedback"]: + if self.has_callbacks["feedback"]: current_state = self.feedback_callback() + if self.has_callbacks["on_feedback"]: self.on_feedback(target_state, current_state, current_time) loop_rate.sleep()