Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions resources/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Resources package for ROS MCP Server
# Functions to register resources with the MCP server instance
from resources.robot_specs import register_robot_spec_resources
from resources.ros_metadata import register_ros_metadata_resources


def register_all_resources(mcp, ws_manager):
"""Register all resources with the MCP server instance."""
register_robot_spec_resources(mcp)
register_ros_metadata_resources(mcp, ws_manager)
45 changes: 45 additions & 0 deletions resources/robot_specs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
"""Resources for robot specifications."""

import json
from pathlib import Path


def register_robot_spec_resources(mcp):
"""Register robot specification resources with the MCP server."""

# Get the robot_specifications directory path
specs_dir = Path(__file__).parent.parent / "robot_specifications"

@mcp.resource("ros-mcp://robot-specs/get_all_robots")
def get_all_robots() -> str:
"""
Get all available robot specifications.

Returns:
str: JSON string with list of available robot names
"""
try:
if not specs_dir.exists():
return json.dumps(
{
"error": f"Robot specifications directory not found: {specs_dir}",
"robot_specifications": [],
}
)

# Find all YAML files
yaml_files = list(specs_dir.glob("*.yaml"))

# Filter out template files only
robot_names = [file.stem for file in yaml_files if not file.stem.startswith("YOUR_")]
robot_names.sort()

return json.dumps({"robot_specifications": robot_names, "count": len(robot_names)})

except Exception as e:
return json.dumps(
{
"error": f"Failed to list robot specifications: {str(e)}",
"robot_specifications": [],
}
)
159 changes: 159 additions & 0 deletions resources/ros_metadata.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
"""Resources for ROS metadata and discovery information."""

import json

from utils.websocket_manager import WebSocketManager


def register_ros_metadata_resources(mcp, ws_manager: WebSocketManager):
"""Register ROS metadata resources with the MCP server."""

@mcp.resource("ros-mcp://ros-metadata/all")
def get_all_ros_metadata() -> str:
"""
Get all ROS metadata including topics, services, nodes, and parameters.

Returns:
str: JSON string with comprehensive ROS system information
"""
try:
metadata = {
"topics": [],
"services": [],
"nodes": [],
"parameters": [],
"ros_version": None,
"errors": [],
}

# Get ROS version
try:
ros2_request = {
"op": "call_service",
"id": "ros2_version_check",
"service": "/rosapi/get_ros_version",
"args": {},
}
with ws_manager:
response = ws_manager.request(ros2_request)
values = response.get("values") if response else None
if isinstance(values, dict) and "version" in values:
metadata["ros_version"] = {
"version": values.get("version"),
"distro": values.get("distro"),
}
else:
# Try ROS1
ros1_request = {
"op": "call_service",
"id": "ros1_distro_check",
"service": "/rosapi/get_param",
"args": {"name": "/rosdistro"},
}
response = ws_manager.request(ros1_request)
value = response.get("values") if response else None
if value:
distro = value.get("value") if isinstance(value, dict) else value
distro_clean = (
str(distro).strip('"').replace("\\n", "").replace("\n", "")
)
metadata["ros_version"] = {"version": "1", "distro": distro_clean}
except Exception as e:
metadata["errors"].append(f"Failed to get ROS version: {str(e)}")

# Get topics
try:
topics_message = {
"op": "call_service",
"service": "/rosapi/topics",
"type": "rosapi/Topics",
"args": {},
"id": "get_topics_request",
}
with ws_manager:
response = ws_manager.request(topics_message)
if response and "values" in response:
topics = response["values"].get("topics", [])
types = response["values"].get("types", [])
metadata["topics"] = [
{"name": topic, "type": topic_type}
for topic, topic_type in zip(topics, types)
]
except Exception as e:
metadata["errors"].append(f"Failed to get topics: {str(e)}")

# Get services
try:
services_message = {
"op": "call_service",
"service": "/rosapi/services",
"type": "rosapi/Services",
"args": {},
"id": "get_services_request",
}
with ws_manager:
response = ws_manager.request(services_message)
if response and "values" in response:
services = response["values"].get("services", [])
types = response["values"].get("types", [])
metadata["services"] = [
{"name": service, "type": service_type}
for service, service_type in zip(services, types)
]
except Exception as e:
metadata["errors"].append(f"Failed to get services: {str(e)}")

# Get nodes
try:
nodes_message = {
"op": "call_service",
"service": "/rosapi/nodes",
"type": "rosapi/Nodes",
"args": {},
"id": "get_nodes_request",
}
with ws_manager:
response = ws_manager.request(nodes_message)
if response and "values" in response:
metadata["nodes"] = response["values"].get("nodes", [])
except Exception as e:
metadata["errors"].append(f"Failed to get nodes: {str(e)}")

# Get parameters (ROS 2 only)
try:
params_message = {
"op": "call_service",
"service": "/rosapi/get_param_names",
"type": "rosapi/GetParamNames",
"args": {},
"id": "get_parameters_request",
}
with ws_manager:
response = ws_manager.request(params_message)
if response and "values" in response:
metadata["parameters"] = response["values"].get("names", [])
except Exception:
# Parameters might not be available in ROS1 or if service doesn't exist
pass

# Add summary counts
metadata["summary"] = {
"total_topics": len(metadata["topics"]),
"total_services": len(metadata["services"]),
"total_nodes": len(metadata["nodes"]),
"total_parameters": len(metadata["parameters"]),
"has_errors": len(metadata["errors"]) > 0,
}

return json.dumps(metadata, indent=2)

except Exception as e:
return json.dumps(
{
"error": f"Failed to get ROS metadata: {str(e)}",
"topics": [],
"services": [],
"nodes": [],
"parameters": [],
}
)
4 changes: 4 additions & 0 deletions server.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
from fastmcp.utilities.types import Image
from PIL import Image as PILImage

from resources import register_all_resources
from utils.config_utils import get_verified_robot_spec_util, get_verified_robots_list_util
from utils.network_utils import ping_ip_and_port
from utils.websocket_manager import WebSocketManager, parse_input
Expand Down Expand Up @@ -42,6 +43,9 @@
) # Increased default timeout for ROS operations


register_all_resources(mcp, ws_manager)


def convert_expects_image_hint(expects_image: str) -> bool | None:
"""
Convert string-based expects_image hint to boolean for internal use.
Expand Down