Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,4 +25,4 @@ jobs:
- name: Run tests
run: |
pip install pytest
pytest tests/test_robot_arm_trajectory_executor.py -v
pytest tests/test_trajectory_executor.py -v
26 changes: 22 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,16 @@
[![PyPI version](https://img.shields.io/pypi/v/trajectory-executor)](https://pypi.org/project/trajectory-executor/)
<!-- [![Documentation](https://img.shields.io/github/actions/workflow/status/bxtbold/trajectory-executor/docs.yml?branch=main&label=docs)](https://bxtbold.github.io/trajectory-executor/) -->

Execute time-based joint trajectories for robot arms with rate-limited, thread-safe updates.
A lightweight executor for offline, time-based joint trajectories in robotic applications.
Supports smooth interpolation between waypoints, callback-based control, and optional feedback monitoring.

## Features

- Time-parameterized trajectory execution
- Linear interpolation between joint waypoints
- Rate-limited command publishing
- Thread-safe callbacks for sending commands and receiving feedback
- Minimal dependencies, easy to integrate

## Installation

Expand All @@ -26,18 +35,27 @@ pip install -e .
- `numpy` (>=1.20.0)
- `loop-rate-limiters` (>=0.1.0)

## Examples

Explore usage examples integrated with different systems:

- [Basic example](examples/dummy_example.py)
- [Genesis example](examples/genesis_example.py)
- [Mujoco example](examples/mujoco_example.py)
- [Ruckig example](examples/ruckig_example.py)

## Usage

The `RobotArmTrajectoryExecutor` class executes joint trajectories for a robot arm with specified degrees of freedom (DOF). It interpolates positions, sends commands via a callback, and supports optional feedback processing.
The `TrajectoryExecutor` class executes joint trajectories for a robot arm with specified degrees of freedom (DOF). It interpolates positions, sends commands via a callback, and supports optional feedback processing.

```python
import numpy as np
from trajectory_executor import RobotArmTrajectoryExecutor
from trajectory_executor import TrajectoryExecutor

def update_callback(joints: np.ndarray):
print(f"Joint command: {joints}")

executor = RobotArmTrajectoryExecutor(dof=3, update_callback=update_callback)
executor = TrajectoryExecutor(dof=3, update_callback=update_callback)
points = np.array([[0.0, 0.0, 0.0], [1.0, 1.0, 1.0]])
times = np.array([0.0, 1.0])
executor.execute(points, times)
Expand Down
4 changes: 2 additions & 2 deletions examples/dummy_example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import random
import numpy as np
from trajectory_executor import RobotArmTrajectoryExecutor
from trajectory_executor import TrajectoryExecutor


current_pos = [0.0, 0.0, 0.0]
Expand All @@ -20,7 +20,7 @@ def monitor(cmd, feedback, t):
print(f"[{t:.2f}s] Error: {error:.4f}")


executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3,
update_callback=send_command,
feedback_callback=dummy_feedback,
Expand Down
4 changes: 2 additions & 2 deletions examples/genesis_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import torch
from typing import List

from trajectory_executor import RobotArmTrajectoryExecutor
from trajectory_executor import TrajectoryExecutor


# === 1. Initialize Genesis ===
Expand Down Expand Up @@ -45,7 +45,7 @@ def monitor(cmd, feedback, t):
print(f"[{t:.2f}s] Tracking error: {error:.4f}")


executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=6,
update_callback=send_joint_command,
feedback_callback=get_joint_feedback,
Expand Down
4 changes: 2 additions & 2 deletions examples/mujoco_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import time
from typing import List

from trajectory_executor import RobotArmTrajectoryExecutor
from trajectory_executor import TrajectoryExecutor


# === 1. Initialize Mujoco ===
Expand All @@ -27,7 +27,7 @@ def monitor(cmd, feedback, t):
print(f"[{t:.2f}s] Tracking error: {error:.4f}")


executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=6,
update_callback=send_joint_command,
feedback_callback=get_joint_feedback,
Expand Down
45 changes: 45 additions & 0 deletions examples/ruckig_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import numpy as np
from ruckig import InputParameter, Ruckig, Trajectory, Result
from trajectory_executor import TrajectoryExecutor


# === 1. Initialize Ruckig ===
inp = InputParameter(6)
inp.current_position = [0.0, 0.0, 0.0, 0.0, 0.0, 0.0]
inp.target_position = [0.0, -1.01, -1.7, -1.82, 1.45, 0.0]
inp.max_velocity = [3.14, 3.14, 3.14, 3.14, 3.14, 3.14]
inp.max_acceleration = [1.0, 1.0, 1.0, 1.0, 1.0, 1.0]
inp.max_jerk = [4.0, 4.0, 4.0, 4.0, 4.0, 4.0]
inp.minimum_duration = 5.0

otg = Ruckig(6)
trajectory = Trajectory(6)

# === 2. Calculate trajectory ===
result = otg.calculate(inp, trajectory)
if result == Result.ErrorInvalidInput:
raise Exception("Invalid input!")

times = []
positions = []

for new_time in np.linspace(0, trajectory.duration, 10):
# Then, we can calculate the kinematic state at a given time
pos, _, _ = trajectory.at_time(new_time)
times.append(new_time)
positions.append(pos)


# === 3. Define callback and trajectory executor ===
def update_callback(target_state):
print(f"Target position: {target_state}")


executor = TrajectoryExecutor(
dof=trajectory.degrees_of_freedom,
update_callback=update_callback,
loop_rate_hz=50.0,
)

# === 4. Execute the trajectory ===
executor.execute(positions, times)
5 changes: 2 additions & 3 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ build-backend = "setuptools.build_meta"

[project]
name = "trajectory-executor"
version = "0.1.1"
description = "Execute time-based joint trajectories for robot arms with rate-limited, thread-safe updates"
version = "0.2.0"
description = "A lightweight executor for offline, time-based joint trajectories in robotic applications."
readme = "README.md"
requires-python = ">=3.10"
authors = [
Expand Down Expand Up @@ -45,6 +45,5 @@ dev = [

[project.urls]
Homepage = "https://github.com/bxtbold/trajectory-executor"
Documentation = "https://bxtbold.github.io/trajectory-executor"
Repository = "https://github.com/bxtbold/trajectory-executor"
Issues = "https://github.com/bxtbold/trajectory-executor/issues"
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import numpy as np
import time
import threading
from trajectory_executor import RobotArmTrajectoryExecutor
from trajectory_executor import TrajectoryExecutor


# Mock RateLimiter class
Expand All @@ -21,7 +21,7 @@ def executor():
def update_callback(joints: np.ndarray):
pass # Default no-op callback

return RobotArmTrajectoryExecutor(
return TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)

Expand All @@ -37,7 +37,7 @@ def feedback_callback() -> np.ndarray:
def on_feedback(cmd: np.ndarray, fb: np.ndarray, t: float):
pass

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=2,
update_callback=update_callback,
feedback_callback=feedback_callback,
Expand All @@ -52,7 +52,7 @@ def on_feedback(cmd: np.ndarray, fb: np.ndarray, t: float):


def test_init_no_callbacks():
executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=lambda x: None, feedback_callback=None, on_feedback=None
)
assert executor.has_callbacks["update"] is True
Expand Down Expand Up @@ -104,7 +104,7 @@ def test_execute_single_point():
def update_callback(joints: np.ndarray):
commands.append(joints.copy())

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)
executor.loop_rate = MockRateLimiter(100.0)
Expand All @@ -121,7 +121,7 @@ def test_execute_multiple_points():
def update_callback(joints: np.ndarray):
commands.append(joints.copy())

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)
executor.loop_rate = MockRateLimiter(100.0)
Expand All @@ -139,7 +139,7 @@ def test_execute_unsorted_times():
def update_callback(joints: np.ndarray):
commands.append(joints.copy())

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)
executor.loop_rate = MockRateLimiter(100.0)
Expand All @@ -166,7 +166,7 @@ def on_feedback(cmd: np.ndarray, fb: np.ndarray, t: float):
feedbacks.append(fb.copy())
times_recorded.append(t)

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3,
update_callback=update_callback,
feedback_callback=feedback_callback,
Expand All @@ -192,7 +192,7 @@ def update_callback(joints: np.ndarray):
with lock:
commands.append(joints.copy())

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)
executor.loop_rate = MockRateLimiter(100.0)
Expand Down Expand Up @@ -238,10 +238,10 @@ def test_execute_wrong_dof(executor):
def test_execute_non_numpy_input():
commands = []

def update_callback(joints: np.ndarray):
commands.append(joints.copy())
def update_callback(command: np.ndarray):
commands.append(command.copy())

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)
executor.loop_rate = MockRateLimiter(100.0)
Expand All @@ -260,7 +260,7 @@ def test_execute_rate_limited():
def update_callback(joints: np.ndarray):
start_times.append(time.time())

executor = RobotArmTrajectoryExecutor(
executor = TrajectoryExecutor(
dof=3, update_callback=update_callback, loop_rate_hz=100.0
)
executor.loop_rate = MockRateLimiter(100.0)
Expand Down
2 changes: 1 addition & 1 deletion trajectory_executor/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
from .robot_arm import RobotArmTrajectoryExecutor
from .trajectory_executor import TrajectoryExecutor
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,17 @@
from loop_rate_limiters import RateLimiter


class RobotArmTrajectoryExecutor:
"""Executes a trajectory for a robot arm with rate-limited updates and thread-safe callbacks.
class TrajectoryExecutor:
"""Executes a trajectory with rate-limited updates and thread-safe callbacks.

This class manages the execution of a time-based joint trajectory for a robot arm. It interpolates
joint positions between trajectory points, invokes user-provided callbacks for updates and feedback,
This class manages the execution of a time-based trajectory. It interpolates
positions between trajectory points, invokes user-provided callbacks for updates and feedback,
and ensures thread-safe operations using a lock. The execution is rate-limited to a specified frequency.

Args:
dof (int): Number of degrees of freedom.
update_callback (Callable[[np.ndarray], None]): Function to send joint commands to the robot.
feedback_callback (Optional[Callable[[], np.ndarray]], optional): Function to get joint feedback.
update_callback (Callable[[np.ndarray], None]): Function to send commands to the robot.
feedback_callback (Optional[Callable[[], np.ndarray]], optional): Function to get feedback.
Defaults to None.
on_feedback (Optional[Callable[[np.ndarray, np.ndarray, float], None]], optional): Function to
handle command, feedback, and time. Defaults to None.
Expand Down Expand Up @@ -65,16 +65,19 @@ def _interpolate(
ratio = (t - t0) / (t1 - t0)
return q0 + ratio * (q1 - q0)

def execute(self, points: np.ndarray, times: np.ndarray):
def execute(
self, points: np.ndarray, times: np.ndarray, wait_until_done: bool = True
):
"""Executes the provided trajectory by interpolating points and invoking callbacks.

The trajectory is array of points and correspoding time to reach to the point. The method
interpolates joint positions at the current time, sends commands via the update callback, and handles
interpolates positions at the current time, sends commands via the update callback, and handles
feedback if provided. Execution is rate-limited and thread-safe.

Args:
points (np.ndarray): Trajectory points to be executed.
times (np.ndarray): Time each point to be executed.
wait_until_done (bool, optional): If True, waits until the trajectory is fully executed.

Returns:
None
Expand Down Expand Up @@ -119,8 +122,9 @@ def execute(self, points: np.ndarray, times: np.ndarray):
self.update_callback(target_state)

# Handle feedback
if self.has_callbacks["feedback"] and self.has_callbacks["on_feedback"]:
if self.has_callbacks["feedback"]:
current_state = self.feedback_callback()
if self.has_callbacks["on_feedback"]:
self.on_feedback(target_state, current_state, current_time)

loop_rate.sleep()
Expand Down