diff --git a/docs/MCP_HTTP_STREAMING.md b/docs/MCP_HTTP_STREAMING.md new file mode 100644 index 000000000..d0c79298d --- /dev/null +++ b/docs/MCP_HTTP_STREAMING.md @@ -0,0 +1,251 @@ +# MCP HTTP-Streaming Support + +This document describes the HTTP-Streaming transport implementation for MCP (Model Context Protocol) in PraisonAI. + +## Overview + +HTTP-Streaming provides bidirectional streaming communication over HTTP using chunked transfer encoding. This transport method offers advantages over SSE (Server-Sent Events) including: + +- **Bidirectional streaming** - Both client and server can stream data +- **Binary support** - Can transmit binary data, not just text +- **Lower overhead** - More efficient than SSE's text-based protocol +- **Better performance** - Ideal for high-throughput scenarios + +## Usage + +### Auto-Detection (Default) + +The MCP client automatically detects the appropriate transport based on URL patterns: + +```python +# SSE transport (URLs ending with /sse) +agent = Agent( + tools=MCP("http://localhost:8080/sse") # Uses SSE +) + +# HTTP-Streaming transport (other HTTP URLs) +agent = Agent( + tools=MCP("http://localhost:8080/api") # Uses HTTP-Streaming +) +``` + +### Explicit Transport Selection + +You can explicitly specify the transport type: + +```python +# Force SSE transport +agent = Agent( + tools=MCP("http://localhost:8080/api", transport="sse") +) + +# Force HTTP-Streaming transport +agent = Agent( + tools=MCP("http://localhost:8080/sse", transport="http-streaming") +) +``` + +### TypeScript Usage + +The TypeScript implementation follows the same pattern: + +```typescript +import { MCP } from '@praisonai/agents/tools'; + +// Auto-detection +const mcpAuto = new MCP("http://localhost:8080/api"); +await mcpAuto.initialize(); + +// Explicit transport +const mcpExplicit = new MCP("http://localhost:8080/api", { + transport: "http-streaming", + debug: true, + headers: { + "Authorization": "Bearer token" + } +}); +await mcpExplicit.initialize(); +``` + +## Transport Detection Rules + +The following URL patterns automatically use SSE transport: +- `/sse` (exact ending) +- `/sse/` (with trailing slash) +- `/events` (exact ending) +- `/stream` (exact ending) +- `/server-sent-events` +- URLs containing `transport=sse` query parameter + +All other HTTP/HTTPS URLs default to HTTP-Streaming transport. + +## Implementation Details + +### Message Format + +HTTP-Streaming uses NDJSON (Newline Delimited JSON) format: +- Each message is a complete JSON object +- Messages are separated by newline characters (`\n`) +- Supports efficient streaming parsing + +### Python Architecture + +``` +MCP (main class) +├── _detect_transport() - Auto-detection logic +├── HTTPStreamingMCPClient - HTTP-Streaming implementation +│ ├── HTTPStreamingTransport - Low-level transport +│ ├── HTTPReadStream - Read adapter +│ └── HTTPWriteStream - Write adapter +└── SSEMCPClient - SSE implementation (existing) +``` + +### TypeScript Architecture + +``` +MCP (unified class) +├── detectTransport() - Auto-detection logic +├── MCPHttpStreaming - HTTP-Streaming implementation +│ └── HTTPStreamingTransport - Transport layer +│ ├── HTTPStreamingTransport - Modern browsers +│ └── HTTPStreamingTransportFallback - Legacy browsers +└── MCPSse - SSE implementation (existing) +``` + +## Server Implementation + +### Python Server Example + +```python +from fastapi import FastAPI, Request +from fastapi.responses import StreamingResponse +import json +import asyncio + +app = FastAPI() + +@app.post("/mcp/v1/stream") +async def mcp_stream(request: Request): + async def generate(): + async for chunk in request.stream(): + # Process incoming messages + message = json.loads(chunk) + + # Generate response + response = process_mcp_message(message) + yield json.dumps(response).encode() + b'\n' + + return StreamingResponse( + generate(), + media_type="application/x-ndjson" + ) +``` + +### Node.js Server Example + +```javascript +const express = require('express'); +const app = express(); + +app.post('/mcp/v1/stream', (req, res) => { + res.writeHead(200, { + 'Content-Type': 'application/x-ndjson', + 'Transfer-Encoding': 'chunked' + }); + + req.on('data', (chunk) => { + const message = JSON.parse(chunk); + const response = processMCPMessage(message); + res.write(JSON.stringify(response) + '\n'); + }); + + req.on('end', () => { + res.end(); + }); +}); +``` + +## Configuration Options + +### Python Options + +```python +MCP( + url, + transport="http-streaming", # Explicit transport + timeout=60, # Request timeout in seconds + debug=True, # Enable debug logging + headers={ # Custom headers + "Authorization": "Bearer token" + } +) +``` + +### TypeScript Options + +```typescript +new MCP(url, { + transport: "http-streaming", // Explicit transport + timeout: 60000, // Timeout in milliseconds + debug: true, // Enable debug logging + headers: { // Custom headers + "Authorization": "Bearer token" + }, + fallbackMode: false // Force fallback for testing +}) +``` + +## Backward Compatibility + +The implementation maintains 100% backward compatibility: + +1. **Existing SSE URLs** continue to use SSE transport +2. **Stdio commands** work unchanged +3. **NPX commands** work unchanged +4. **All existing code** continues to function without modification + +## Migration Guide + +No migration is required! Existing code continues to work. To use HTTP-Streaming: + +1. **Option 1**: Use URLs that don't match SSE patterns (recommended) +2. **Option 2**: Add `transport="http-streaming"` parameter + +## Troubleshooting + +### Debug Mode + +Enable debug logging to see transport selection: + +```python +MCP(url, debug=True) +``` + +### Common Issues + +1. **Connection Refused**: Ensure the server supports HTTP-Streaming at the endpoint +2. **Transport Errors**: Check if the server implements the correct protocol +3. **Browser Compatibility**: TypeScript fallback mode handles older browsers + +## Performance Considerations + +HTTP-Streaming is recommended for: +- High-frequency message exchange +- Large message payloads +- Binary data transmission +- Bidirectional communication needs + +SSE remains suitable for: +- Simple server-to-client streaming +- Text-only data +- Browser compatibility requirements +- Existing SSE infrastructure + +## Future Enhancements + +Potential future improvements: +- WebSocket transport option +- gRPC streaming support +- Connection pooling +- Automatic reconnection for HTTP-Streaming +- Compression support \ No newline at end of file diff --git a/src/praisonai-agents/examples/mcp_http_streaming_example.py b/src/praisonai-agents/examples/mcp_http_streaming_example.py new file mode 100644 index 000000000..c15552fa2 --- /dev/null +++ b/src/praisonai-agents/examples/mcp_http_streaming_example.py @@ -0,0 +1,109 @@ +""" +Example demonstrating MCP with HTTP-Streaming transport. + +This example shows: +1. Auto-detection of transport based on URL +2. Explicit transport selection +3. Backward compatibility with existing code +""" + +from praisonaiagents import Agent +from praisonaiagents.mcp import MCP + +# Example 1: Auto-detection - SSE endpoint (backward compatible) +print("Example 1: Auto-detection with SSE endpoint") +try: + agent_sse_auto = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/sse") # Auto-detects SSE transport + ) + print("✓ SSE transport detected automatically") +except Exception as e: + print(f"Note: {e}") + +# Example 2: Auto-detection - HTTP endpoint +print("\nExample 2: Auto-detection with HTTP endpoint") +try: + agent_http_auto = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/api") # Auto-detects HTTP-streaming transport + ) + print("✓ HTTP-streaming transport detected automatically") +except Exception as e: + print(f"Note: {e}") + +# Example 3: Explicit SSE transport +print("\nExample 3: Explicit SSE transport selection") +try: + agent_sse_explicit = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/api", transport="sse") # Force SSE + ) + print("✓ SSE transport explicitly selected") +except Exception as e: + print(f"Note: {e}") + +# Example 4: Explicit HTTP-streaming transport +print("\nExample 4: Explicit HTTP-streaming transport selection") +try: + agent_http_explicit = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP("http://localhost:8080/sse", transport="http-streaming") # Force HTTP-streaming + ) + print("✓ HTTP-streaming transport explicitly selected") +except Exception as e: + print(f"Note: {e}") + +# Example 5: HTTP-streaming with custom headers +print("\nExample 5: HTTP-streaming with custom headers") +try: + agent_http_headers = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP( + "http://localhost:8080/api", + transport="http-streaming", + headers={"Authorization": "Bearer your-token-here"} + ) + ) + print("✓ HTTP-streaming with custom headers configured") +except Exception as e: + print(f"Note: {e}") + +# Example 6: Existing stdio usage - completely unchanged +print("\nExample 6: Existing stdio usage (backward compatible)") +try: + agent_stdio = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP( + command="/path/to/python", + args=["/path/to/mcp_server.py"] + ) + ) + print("✓ Stdio transport works as before") +except Exception as e: + print(f"Note: {e}") + +# Example 7: NPX usage - completely unchanged +print("\nExample 7: NPX usage (backward compatible)") +try: + agent_npx = Agent( + instructions="You are a helpful assistant that can use MCP tools.", + llm="gpt-4o-mini", + tools=MCP("npx @modelcontextprotocol/server-brave-search") + ) + print("✓ NPX transport works as before") +except Exception as e: + print(f"Note: {e}") + +print("\n" + "="*50) +print("Summary: HTTP-Streaming support added with full backward compatibility!") +print("- Auto-detection: URLs ending with /sse use SSE, others use HTTP-streaming") +print("- Explicit control: Use transport='sse' or transport='http-streaming'") +print("- All existing code continues to work without modification") +print("="*50) \ No newline at end of file diff --git a/src/praisonai-agents/praisonaiagents/mcp/mcp.py b/src/praisonai-agents/praisonaiagents/mcp/mcp.py index 36429ce0f..aa7f1821e 100644 --- a/src/praisonai-agents/praisonaiagents/mcp/mcp.py +++ b/src/praisonai-agents/praisonaiagents/mcp/mcp.py @@ -171,6 +171,7 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 logging.getLogger("_client").setLevel(logging.DEBUG) logging.getLogger("httpx").setLevel(logging.DEBUG) logging.getLogger("llm").setLevel(logging.DEBUG) + logging.getLogger("mcp-http-streaming").setLevel(logging.DEBUG) else: # Set all MCP-related loggers to WARNING level by default logging.getLogger("mcp-wrapper").setLevel(logging.WARNING) @@ -182,18 +183,31 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 logging.getLogger("_client").setLevel(logging.WARNING) logging.getLogger("httpx").setLevel(logging.WARNING) logging.getLogger("llm").setLevel(logging.WARNING) + logging.getLogger("mcp-http-streaming").setLevel(logging.WARNING) # Store additional parameters self.timeout = timeout self.debug = debug - # Check if this is an SSE URL + # Check if this is an HTTP URL if isinstance(command_or_string, str) and re.match(r'^https?://', command_or_string): - # Import the SSE client implementation - from .mcp_sse import SSEMCPClient - self.sse_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout) - self._tools = list(self.sse_client.tools) - self.is_sse = True + # Determine if it's SSE or HTTP streaming based on URL pattern + if '/sse' in command_or_string or command_or_string.endswith('/sse'): + # Import the SSE client implementation + from .mcp_sse import SSEMCPClient + self.sse_client = SSEMCPClient(command_or_string, debug=debug, timeout=timeout) + self._tools = list(self.sse_client.tools) + self.is_sse = True + self.is_http_streaming = False + else: + # Import the HTTP streaming client implementation + from .mcp_http_streaming import HTTPStreamingMCPClient + # Extract headers from kwargs if provided + headers = kwargs.get('headers', None) + self.http_client = HTTPStreamingMCPClient(command_or_string, headers=headers, debug=debug, timeout=timeout) + self._tools = list(self.http_client.tools) + self.is_sse = False + self.is_http_streaming = True self.is_npx = False return @@ -219,6 +233,7 @@ def __init__(self, command_or_string=None, args=None, *, command=None, timeout=6 # Set up stdio client self.is_sse = False + self.is_http_streaming = False # Ensure UTF-8 encoding in environment for Docker compatibility env = kwargs.get('env', {}) @@ -275,6 +290,9 @@ def _generate_tool_functions(self) -> List[Callable]: """ if self.is_sse: return list(self.sse_client.tools) + + if self.is_http_streaming: + return list(self.http_client.tools) tool_functions = [] @@ -284,6 +302,43 @@ def _generate_tool_functions(self) -> List[Callable]: return tool_functions + def _detect_transport(self, url: str, explicit_transport: Optional[str] = None) -> str: + """ + Detect the transport type based on URL pattern or explicit setting. + + Args: + url: The URL to analyze + explicit_transport: Explicit transport type if provided + + Returns: + str: 'sse' or 'http-streaming' + """ + # If explicit transport is provided, use it + if explicit_transport: + if explicit_transport.lower() in ['sse', 'http-streaming', 'http']: + # Normalize 'http' to 'http-streaming' + return 'sse' if explicit_transport.lower() == 'sse' else 'http-streaming' + else: + raise ValueError(f"Invalid transport type: {explicit_transport}. Must be 'sse', 'http-streaming', or 'http'") + + # Auto-detect based on URL pattern + # Common SSE endpoint patterns + sse_patterns = [ + r'/sse$', + r'/sse/', + r'/events$', + r'/stream$', + r'/server-sent-events', + r'[?&]transport=sse', + ] + + for pattern in sse_patterns: + if re.search(pattern, url, re.IGNORECASE): + return 'sse' + + # Default to HTTP streaming for all other URLs + return 'http-streaming' + def _create_tool_wrapper(self, tool): """Create a wrapper function for an MCP tool.""" # Determine parameter names from the schema @@ -449,6 +504,10 @@ def to_openai_tool(self): # Return all tools from SSE client return self.sse_client.to_openai_tools() + if self.is_http_streaming and hasattr(self, 'http_client') and self.http_client.tools: + # Return all tools from HTTP streaming client + return self.http_client.to_openai_tools() + # For simplicity, we'll convert the first tool only if multiple exist # More complex implementations could handle multiple tools if not hasattr(self, 'runner') or not self.runner.tools: diff --git a/src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py b/src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py new file mode 100644 index 000000000..c35d68675 --- /dev/null +++ b/src/praisonai-agents/praisonaiagents/mcp/mcp_http_streaming.py @@ -0,0 +1,390 @@ +""" +HTTP-Streaming client implementation for MCP (Model Context Protocol). +This module provides the necessary classes and functions to connect to an MCP server +over HTTP using chunked transfer encoding for bidirectional streaming. +""" + +import asyncio +import json +import logging +import threading +import inspect +from typing import List, Dict, Any, Optional, Iterable, AsyncIterator +import httpx +from mcp import ClientSession + +logger = logging.getLogger("mcp-http-streaming") + +# Global event loop and lock for ensuring singleton pattern +_loop = None +_loop_lock = threading.Lock() + + +def get_or_create_event_loop(): + """Get the existing event loop or create a new one if it doesn't exist.""" + global _loop + with _loop_lock: + if _loop is None: + _loop = asyncio.new_event_loop() + threading.Thread(target=_loop.run_forever, daemon=True).start() + return _loop + + +class HTTPStreamingTransport: + """HTTP chunked streaming transport for MCP.""" + + def __init__(self, url: str, headers: Optional[Dict[str, str]] = None): + self.url = url + self.headers = headers or {} + self.client = httpx.AsyncClient(timeout=60.0) + self.read_stream = None + self.write_stream = None + self._closed = False + + async def connect(self) -> tuple: + """Establish bidirectional streaming connection.""" + # Create read and write stream adapters + self.read_stream = HTTPReadStream(self) + self.write_stream = HTTPWriteStream(self) + + # Initialize the streaming connection + await self._initialize_connection() + + return (self.read_stream, self.write_stream) + + async def _initialize_connection(self): + """Initialize the HTTP streaming connection.""" + # Send initial connection request + headers = { + **self.headers, + "Content-Type": "application/json", + "Transfer-Encoding": "chunked", + "Accept": "application/x-ndjson" + } + + # Start the bidirectional stream + self._request_queue = asyncio.Queue() + self._response_queue = asyncio.Queue() + + # Start background task for handling the stream + asyncio.create_task(self._handle_stream(headers)) + + async def _handle_stream(self, headers: Dict[str, str]): + """Handle the bidirectional HTTP stream.""" + try: + async with self.client.stream( + "POST", + f"{self.url}/mcp/v1/stream", + headers=headers, + content=self._request_iterator() + ) as response: + # Process response stream + buffer = b"" + async for chunk in response.aiter_bytes(): + buffer += chunk + # Process complete lines + while b'\n' in buffer: + line, buffer = buffer.split(b'\n', 1) + if line.strip(): + try: + message = json.loads(line.decode('utf-8')) + await self._response_queue.put(message) + except json.JSONDecodeError as e: + logger.error(f"Failed to parse message: {e}") + except Exception as e: + logger.error(f"Stream error: {e}") + self._closed = True + + async def _request_iterator(self) -> AsyncIterator[bytes]: + """Generate request chunks from the queue.""" + while not self._closed: + try: + message = await asyncio.wait_for( + self._request_queue.get(), + timeout=0.1 + ) + if message is None: + break + chunk = json.dumps(message).encode('utf-8') + b'\n' + yield chunk + except asyncio.TimeoutError: + continue + + async def send_message(self, message: Dict[str, Any]): + """Send a message through the stream.""" + if not self._closed: + await self._request_queue.put(message) + + async def receive_message(self) -> Optional[Dict[str, Any]]: + """Receive a message from the stream.""" + if self._closed: + return None + try: + return await self._response_queue.get() + except Exception: + return None + + async def close(self): + """Close the transport.""" + self._closed = True + if self._request_queue: + await self._request_queue.put(None) + await self.client.aclose() + + +class HTTPReadStream: + """Read stream adapter for MCP.""" + + def __init__(self, transport: HTTPStreamingTransport): + self.transport = transport + + async def read(self) -> Optional[bytes]: + """Read a message from the stream.""" + message = await self.transport.receive_message() + if message: + return json.dumps(message).encode('utf-8') + return None + + +class HTTPWriteStream: + """Write stream adapter for MCP.""" + + def __init__(self, transport: HTTPStreamingTransport): + self.transport = transport + + async def write(self, data: bytes): + """Write a message to the stream.""" + try: + message = json.loads(data.decode('utf-8')) + await self.transport.send_message(message) + except json.JSONDecodeError: + logger.error("Failed to decode message for sending") + + +class HTTPStreamingMCPTool: + """A wrapper for an MCP tool that can be used with praisonaiagents.""" + + def __init__( + self, + name: str, + description: str, + session: ClientSession, + input_schema: Optional[Dict[str, Any]] = None, + timeout: int = 60 + ): + """ + Initialize an MCP tool wrapper. + + Args: + name: The name of the tool + description: A description of what the tool does + session: The MCP client session + input_schema: The JSON schema for the tool's input parameters + timeout: Timeout in seconds for tool calls (default: 60) + """ + self.name = name + self.description = description + self.session = session + self.input_schema = input_schema or {} + self.timeout = timeout + + # Create the function signature dynamically + self._create_signature() + + def _create_signature(self): + """Create a function signature based on the input schema.""" + properties = self.input_schema.get("properties", {}) + required = self.input_schema.get("required", []) + + # Build parameter list + params = [] + for prop_name, prop_schema in properties.items(): + param_type = prop_schema.get("type", "str") + # Convert JSON schema types to Python types + type_mapping = { + "string": str, + "number": float, + "integer": int, + "boolean": bool, + "array": list, + "object": dict, + } + param_type = type_mapping.get(param_type, Any) + + if prop_name in required: + params.append(inspect.Parameter( + prop_name, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + annotation=param_type + )) + else: + default_value = prop_schema.get("default", None) + params.append(inspect.Parameter( + prop_name, + inspect.Parameter.POSITIONAL_OR_KEYWORD, + default=default_value, + annotation=param_type + )) + + # Create signature + self.__signature__ = inspect.Signature(params) + + def __call__(self, **kwargs) -> Any: + """Execute the tool with the given arguments.""" + # Run the async function in the event loop + loop = get_or_create_event_loop() + future = asyncio.run_coroutine_threadsafe( + self._async_call(**kwargs), + loop + ) + return future.result(timeout=self.timeout) + + async def _async_call(self, **kwargs) -> Any: + """Async implementation of the tool call.""" + try: + result = await self.session.call_tool(self.name, kwargs) + # Extract content from result + if hasattr(result, 'content') and result.content: + # Handle different content types + for content_item in result.content: + if hasattr(content_item, 'text'): + return content_item.text + elif hasattr(content_item, 'data'): + return content_item.data + return result + except Exception as e: + logger.error(f"Error calling tool {self.name}: {e}") + raise + + def _fix_array_schemas(self, schema): + """ + Fix array schemas by adding missing 'items' attribute required by OpenAI. + + This ensures compatibility with OpenAI's function calling format which + requires array types to specify the type of items they contain. + + Args: + schema: The schema dictionary to fix + + Returns: + dict: The fixed schema + """ + if not isinstance(schema, dict): + return schema + + # Create a copy to avoid modifying the original + fixed_schema = schema.copy() + + # Fix array types at the current level + if fixed_schema.get("type") == "array" and "items" not in fixed_schema: + # Add a default items schema for arrays without it + fixed_schema["items"] = {"type": "string"} + + # Recursively fix nested schemas + if "properties" in fixed_schema: + fixed_properties = {} + for prop_name, prop_schema in fixed_schema["properties"].items(): + fixed_properties[prop_name] = self._fix_array_schemas(prop_schema) + fixed_schema["properties"] = fixed_properties + + # Fix items schema if it exists + if "items" in fixed_schema: + fixed_schema["items"] = self._fix_array_schemas(fixed_schema["items"]) + + return fixed_schema + + def to_openai_tool(self) -> Dict[str, Any]: + """Convert this tool to OpenAI function calling format.""" + # Fix array schemas to include 'items' attribute + fixed_schema = self._fix_array_schemas(self.input_schema) + + return { + "type": "function", + "function": { + "name": self.name, + "description": self.description, + "parameters": fixed_schema + } + } + + +class HTTPStreamingMCPClient: + """HTTP-Streaming MCP client with same interface as SSEMCPClient.""" + + def __init__(self, server_url: str, debug: bool = False, timeout: int = 60, headers: Optional[Dict[str, str]] = None): + """ + Initialize an HTTP-Streaming MCP client. + + Args: + server_url: The URL of the HTTP MCP server + debug: Whether to enable debug logging + timeout: Timeout in seconds for operations (default: 60) + headers: Optional headers to include in requests + """ + self.server_url = server_url.rstrip('/') + self.debug = debug + self.timeout = timeout + self.headers = headers or {} + self.tools: List[HTTPStreamingMCPTool] = [] + self.session: Optional[ClientSession] = None + + # Set up logging + if debug: + logger.setLevel(logging.DEBUG) + handler = logging.StreamHandler() + handler.setFormatter(logging.Formatter('[%(name)s] %(message)s')) + logger.addHandler(handler) + + # Initialize in event loop + loop = get_or_create_event_loop() + future = asyncio.run_coroutine_threadsafe(self._initialize(), loop) + future.result(timeout=timeout) + + async def _initialize(self): + """Initialize the MCP session and discover tools.""" + try: + # Create transport + transport = HTTPStreamingTransport(self.server_url, self.headers) + + # Create session + self.session = ClientSession() + + # Connect transport + read_stream, write_stream = await transport.connect() + + # Initialize session with transport + await self.session.initialize(read_stream, write_stream) + + # Get available tools + tools_list = await self.session.list_tools() + + # Create tool wrappers + for tool_info in tools_list.tools: + tool = HTTPStreamingMCPTool( + name=tool_info.name, + description=tool_info.description or "", + session=self.session, + input_schema=tool_info.inputSchema if hasattr(tool_info, 'inputSchema') else {}, + timeout=self.timeout + ) + self.tools.append(tool) + + if self.debug: + logger.debug(f"Initialized with {len(self.tools)} tools: {[t.name for t in self.tools]}") + + except Exception as e: + logger.error(f"Failed to initialize HTTP-Streaming MCP client: {e}") + raise + + def __iter__(self) -> Iterable[HTTPStreamingMCPTool]: + """Iterate over available tools.""" + return iter(self.tools) + + def to_openai_tools(self) -> List[Dict[str, Any]]: + """Convert all tools to OpenAI function calling format.""" + return [tool.to_openai_tool() for tool in self.tools] + + async def close(self): + """Close the MCP session.""" + if self.session: + await self.session.close() \ No newline at end of file diff --git a/src/praisonai-ts/examples/tools/mcp-http-streaming.ts b/src/praisonai-ts/examples/tools/mcp-http-streaming.ts new file mode 100644 index 000000000..55fd02b7f --- /dev/null +++ b/src/praisonai-ts/examples/tools/mcp-http-streaming.ts @@ -0,0 +1,141 @@ +/** + * Example demonstrating MCP with HTTP-Streaming transport in TypeScript. + * + * This example shows: + * 1. Auto-detection of transport based on URL + * 2. Explicit transport selection + * 3. Backward compatibility with existing code + */ + +import { Agent } from '../../agent'; +import { MCP, createMCPClient } from '../../tools/mcp'; + +async function main() { + console.log('MCP HTTP-Streaming Transport Examples\n'); + + // Example 1: Auto-detection - SSE endpoint (backward compatible) + console.log('Example 1: Auto-detection with SSE endpoint'); + try { + const mcpSseAuto = new MCP('http://localhost:8080/sse'); + await mcpSseAuto.initialize(); + console.log(`✓ Transport detected: ${mcpSseAuto.getTransportType()}`); + console.log(` Tools available: ${mcpSseAuto.tools.length}`); + await mcpSseAuto.close(); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + // Example 2: Auto-detection - HTTP endpoint + console.log('\nExample 2: Auto-detection with HTTP endpoint'); + try { + const mcpHttpAuto = new MCP('http://localhost:8080/api'); + await mcpHttpAuto.initialize(); + console.log(`✓ Transport detected: ${mcpHttpAuto.getTransportType()}`); + console.log(` Tools available: ${mcpHttpAuto.tools.length}`); + await mcpHttpAuto.close(); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + // Example 3: Explicit SSE transport + console.log('\nExample 3: Explicit SSE transport selection'); + try { + const mcpSseExplicit = new MCP('http://localhost:8080/api', { + transport: 'sse' + }); + await mcpSseExplicit.initialize(); + console.log(`✓ Transport selected: ${mcpSseExplicit.getTransportType()}`); + await mcpSseExplicit.close(); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + // Example 4: Explicit HTTP-streaming transport + console.log('\nExample 4: Explicit HTTP-streaming transport selection'); + try { + const mcpHttpExplicit = new MCP('http://localhost:8080/sse', { + transport: 'http-streaming' + }); + await mcpHttpExplicit.initialize(); + console.log(`✓ Transport selected: ${mcpHttpExplicit.getTransportType()}`); + await mcpHttpExplicit.close(); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + // Example 5: HTTP-streaming with options + console.log('\nExample 5: HTTP-streaming with custom options'); + try { + const mcpHttpOptions = new MCP('http://localhost:8080/api', { + transport: 'http-streaming', + debug: true, + timeout: 30000, + headers: { + 'Authorization': 'Bearer your-token-here' + } + }); + await mcpHttpOptions.initialize(); + console.log(`✓ Transport configured: ${mcpHttpOptions.getTransportType()}`); + const stats = mcpHttpOptions.getStats(); + console.log(` Connection stats:`, stats); + await mcpHttpOptions.close(); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + // Example 6: Using with Agent + console.log('\nExample 6: Using MCP with Agent'); + try { + // Create MCP client + const mcp = await createMCPClient('http://localhost:8080/api', { + transport: 'http-streaming' + }); + + // Create tool functions for the agent + const toolFunctions: Record = {}; + for (const tool of mcp) { + toolFunctions[tool.name] = async (...args: any[]) => { + const params = args[0] || {}; + return tool.execute(params); + }; + } + + // Create agent with MCP tools + const agent = new Agent({ + name: 'MCP Assistant', + instructions: 'You are a helpful assistant that can use MCP tools.', + model: 'openai/gpt-4o-mini', + tools: mcp.toOpenAITools(), + toolFunctions + }); + + console.log('✓ Agent created with MCP tools'); + console.log(` Available tools: ${mcp.tools.map(t => t.name).join(', ')}`); + + // Clean up + await mcp.close(); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + // Example 7: Backward compatibility check + console.log('\nExample 7: Backward compatibility'); + try { + // Old style import still works + const { MCP: MCPOld } = await import('../../tools/mcpSse'); + const oldClient = new MCPOld('http://localhost:8080/sse'); + console.log('✓ Old import style still works'); + } catch (error) { + console.log(`Note: ${error instanceof Error ? error.message : error}`); + } + + console.log('\n' + '='.repeat(60)); + console.log('Summary: HTTP-Streaming support with full backward compatibility!'); + console.log('- Auto-detection: URLs ending with /sse use SSE transport'); + console.log('- Explicit control: Use transport option for manual selection'); + console.log('- All existing code continues to work without modification'); + console.log('='.repeat(60)); +} + +// Run the examples +main().catch(console.error); \ No newline at end of file diff --git a/src/praisonai-ts/src/tools/httpStreamingTransport.ts b/src/praisonai-ts/src/tools/httpStreamingTransport.ts new file mode 100644 index 000000000..1ac7f8cf8 --- /dev/null +++ b/src/praisonai-ts/src/tools/httpStreamingTransport.ts @@ -0,0 +1,417 @@ +/** + * HTTP-Streaming transport implementation for MCP (Model Context Protocol). + * Provides bidirectional streaming over HTTP using chunked transfer encoding. + */ + +export interface HTTPStreamingTransportOptions { + /** Request timeout in milliseconds */ + timeout?: number; + /** Additional headers to include in requests */ + headers?: Record; + /** Enable debug logging */ + debug?: boolean; + /** Use fallback mode for browsers without duplex streaming support */ + fallbackMode?: boolean; +} + +/** + * Transport interface expected by MCP SDK + */ +export interface Transport { + read(): Promise; + write(data: string): Promise; + close(): Promise; +} + +/** + * HTTP-Streaming transport using modern Fetch API with duplex streaming. + * Falls back to polling-based approach for older browsers. + */ +export class HTTPStreamingTransport implements Transport { + private url: URL; + private options: HTTPStreamingTransportOptions; + private reader: ReadableStreamDefaultReader | null = null; + private writer: WritableStreamDefaultWriter | null = null; + private encoder = new TextEncoder(); + private decoder = new TextDecoder(); + private buffer = ''; + private closed = false; + private abortController: AbortController; + + constructor(url: URL, options: HTTPStreamingTransportOptions = {}) { + this.url = new URL('/mcp/v1/stream', url); + this.options = { + timeout: 60000, + fallbackMode: false, + debug: false, + ...options + }; + this.abortController = new AbortController(); + } + + async connect(): Promise { + if (this.options.debug) { + console.log('[HTTPStreamingTransport] Connecting to:', this.url.toString()); + } + + // Check if browser supports duplex streaming + const supportsDuplex = this.checkDuplexSupport(); + + if (!supportsDuplex || this.options.fallbackMode) { + if (this.options.debug) { + console.log('[HTTPStreamingTransport] Using fallback mode'); + } + throw new Error('Fallback mode not implemented - use HTTPStreamingTransportFallback'); + } + + try { + const response = await fetch(this.url.toString(), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'Accept': 'application/x-ndjson', + 'Transfer-Encoding': 'chunked', + ...this.options.headers + }, + // @ts-ignore - duplex is not in standard TypeScript types yet + duplex: 'half', + body: new ReadableStream({ + start: (controller) => { + this.writer = { + write: async (chunk: Uint8Array) => { + controller.enqueue(chunk); + }, + close: async () => { + controller.close(); + }, + abort: async (reason?: any) => { + controller.error(reason); + }, + get closed() { return Promise.resolve(); }, + get ready() { return Promise.resolve(); }, + get desiredSize() { return null; }, + releaseLock: () => {} + }; + } + }), + signal: this.abortController.signal + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + if (!response.body) { + throw new Error('Response body is null'); + } + + this.reader = response.body.getReader(); + + if (this.options.debug) { + console.log('[HTTPStreamingTransport] Connected successfully'); + } + } catch (error) { + if (this.options.debug) { + console.error('[HTTPStreamingTransport] Connection error:', error); + } + throw error; + } + } + + async read(): Promise { + if (this.closed || !this.reader) { + return null; + } + + try { + // Read chunks until we have a complete message + while (true) { + const { done, value } = await this.reader.read(); + + if (done) { + this.closed = true; + return null; + } + + // Append to buffer + this.buffer += this.decoder.decode(value, { stream: true }); + + // Check for complete messages (newline-delimited) + const lines = this.buffer.split('\n'); + + if (lines.length > 1) { + // We have at least one complete message + const message = lines[0]; + this.buffer = lines.slice(1).join('\n'); + + if (message.trim()) { + if (this.options.debug) { + console.log('[HTTPStreamingTransport] Read message:', message); + } + return message; + } + } + } + } catch (error) { + if (this.options.debug) { + console.error('[HTTPStreamingTransport] Read error:', error); + } + this.closed = true; + return null; + } + } + + async write(data: string): Promise { + if (this.closed || !this.writer) { + throw new Error('Transport is closed or not connected'); + } + + try { + const message = data.trim() + '\n'; + const chunk = this.encoder.encode(message); + + await this.writer.write(chunk); + + if (this.options.debug) { + console.log('[HTTPStreamingTransport] Wrote message:', data); + } + } catch (error) { + if (this.options.debug) { + console.error('[HTTPStreamingTransport] Write error:', error); + } + throw error; + } + } + + async close(): Promise { + if (this.closed) { + return; + } + + this.closed = true; + this.abortController.abort(); + + if (this.reader) { + try { + await this.reader.cancel(); + } catch (error) { + // Ignore errors during cleanup + } + } + + if (this.writer) { + try { + await this.writer.close(); + } catch (error) { + // Ignore errors during cleanup + } + } + + if (this.options.debug) { + console.log('[HTTPStreamingTransport] Closed'); + } + } + + private checkDuplexSupport(): boolean { + // Check if the browser supports duplex streaming + // This is a simple heuristic - more sophisticated detection may be needed + if (typeof ReadableStream === 'undefined' || typeof WritableStream === 'undefined') { + return false; + } + + // Check for fetch duplex support (Chrome 105+, Safari 16.5+) + // This is a best-effort check + const userAgent = navigator.userAgent.toLowerCase(); + const isChrome = userAgent.includes('chrome') && !userAgent.includes('edge'); + const isSafari = userAgent.includes('safari') && !userAgent.includes('chrome'); + + if (isChrome) { + const match = userAgent.match(/chrome\/(\d+)/); + if (match && parseInt(match[1]) >= 105) { + return true; + } + } + + if (isSafari) { + const match = userAgent.match(/version\/(\d+)/); + if (match && parseInt(match[1]) >= 16) { + return true; + } + } + + // Default to false for unsupported browsers + return false; + } +} + +/** + * Fallback transport for browsers without duplex streaming support. + * Uses separate connections for reading and writing. + */ +export class HTTPStreamingTransportFallback implements Transport { + private url: URL; + private options: HTTPStreamingTransportOptions; + private sessionId: string; + private readQueue: string[] = []; + private writeQueue: string[] = []; + private closed = false; + private pollInterval: number | null = null; + + constructor(url: URL, options: HTTPStreamingTransportOptions = {}) { + this.url = url; + this.options = { + timeout: 60000, + debug: false, + ...options + }; + this.sessionId = this.generateSessionId(); + } + + async connect(): Promise { + if (this.options.debug) { + console.log('[HTTPStreamingTransportFallback] Connecting with session:', this.sessionId); + } + + // Start polling for messages + this.startPolling(); + + // Send initial connection message + await this.sendRequest('connect', {}); + } + + async read(): Promise { + if (this.closed) { + return null; + } + + // Wait for messages in the queue + while (this.readQueue.length === 0 && !this.closed) { + await new Promise(resolve => setTimeout(resolve, 10)); + } + + if (this.closed) { + return null; + } + + const message = this.readQueue.shift(); + + if (this.options.debug && message) { + console.log('[HTTPStreamingTransportFallback] Read message:', message); + } + + return message || null; + } + + async write(data: string): Promise { + if (this.closed) { + throw new Error('Transport is closed'); + } + + // Send message immediately + await this.sendRequest('message', { data }); + + if (this.options.debug) { + console.log('[HTTPStreamingTransportFallback] Wrote message:', data); + } + } + + async close(): Promise { + if (this.closed) { + return; + } + + this.closed = true; + + // Stop polling + if (this.pollInterval !== null) { + clearInterval(this.pollInterval); + this.pollInterval = null; + } + + // Send disconnect message + try { + await this.sendRequest('disconnect', {}); + } catch (error) { + // Ignore errors during cleanup + } + + if (this.options.debug) { + console.log('[HTTPStreamingTransportFallback] Closed'); + } + } + + private startPolling(): void { + // Poll for messages every 100ms + this.pollInterval = window.setInterval(async () => { + if (this.closed) { + return; + } + + try { + const messages = await this.sendRequest('poll', {}); + if (Array.isArray(messages)) { + for (const message of messages) { + if (typeof message === 'string') { + this.readQueue.push(message); + } else { + this.readQueue.push(JSON.stringify(message)); + } + } + } + } catch (error) { + if (this.options.debug) { + console.error('[HTTPStreamingTransportFallback] Poll error:', error); + } + } + }, 100); + } + + private async sendRequest(action: string, payload: any): Promise { + const response = await fetch(new URL('/mcp/v1/fallback', this.url).toString(), { + method: 'POST', + headers: { + 'Content-Type': 'application/json', + 'X-Session-ID': this.sessionId, + ...this.options.headers + }, + body: JSON.stringify({ + action, + sessionId: this.sessionId, + payload + }) + }); + + if (!response.ok) { + throw new Error(`HTTP error! status: ${response.status}`); + } + + return response.json(); + } + + private generateSessionId(): string { + return `session_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`; + } +} + +/** + * Factory function to create appropriate transport based on browser capabilities. + */ +export function createHTTPStreamingTransport( + url: URL, + options: HTTPStreamingTransportOptions = {} +): Transport { + const transport = new HTTPStreamingTransport(url, options); + + // Try to check duplex support + try { + // @ts-ignore + if (!transport.checkDuplexSupport() || options.fallbackMode) { + return new HTTPStreamingTransportFallback(url, options); + } + } catch (error) { + // If check fails, use fallback + return new HTTPStreamingTransportFallback(url, options); + } + + return transport; +} \ No newline at end of file diff --git a/src/praisonai-ts/src/tools/index.ts b/src/praisonai-ts/src/tools/index.ts index f57b493e8..487f9986a 100644 --- a/src/praisonai-ts/src/tools/index.ts +++ b/src/praisonai-ts/src/tools/index.ts @@ -22,3 +22,6 @@ export class BaseTool implements Tool { // Export all tool modules export * from './arxivTools'; export * from './mcpSse'; +export * from './mcpHttpStreaming'; +export * from './httpStreamingTransport'; +export * from './mcp'; diff --git a/src/praisonai-ts/src/tools/mcp.ts b/src/praisonai-ts/src/tools/mcp.ts new file mode 100644 index 000000000..176b0da8c --- /dev/null +++ b/src/praisonai-ts/src/tools/mcp.ts @@ -0,0 +1,188 @@ +/** + * Unified MCP client with transport selection support. + * Provides automatic detection and explicit transport selection for SSE and HTTP-Streaming. + */ + +import { MCP as MCPSse } from './mcpSse'; +import { MCPHttpStreaming } from './mcpHttpStreaming'; +import { MCPTool } from './mcpSse'; +import { BaseTool } from './index'; + +export type TransportType = 'sse' | 'http-streaming' | 'http' | 'auto'; + +export interface MCPOptions { + /** Enable debug logging */ + debug?: boolean; + /** Explicit transport type selection */ + transport?: TransportType; + /** Request timeout in milliseconds */ + timeout?: number; + /** Additional headers for HTTP requests */ + headers?: Record; + /** MCP client name */ + clientName?: string; + /** MCP client version */ + clientVersion?: string; +} + +/** + * Unified MCP client that supports both SSE and HTTP-Streaming transports. + * Automatically detects the appropriate transport based on URL patterns, + * or allows explicit transport selection. + */ +export class MCP implements Iterable { + tools: MCPTool[] = []; + private client: MCPSse | MCPHttpStreaming | null = null; + private url: string; + private transportType: 'sse' | 'http-streaming'; + private options: MCPOptions; + + constructor(url: string, options: MCPOptions = {}) { + this.url = url; + this.options = { + transport: 'auto', + debug: false, + ...options + }; + + // Detect transport type + this.transportType = this.detectTransport(url, this.options.transport); + + if (this.options.debug) { + console.log(`MCP client initialized for URL: ${url} with transport: ${this.transportType}`); + } + } + + /** + * Detect the appropriate transport based on URL pattern or explicit selection. + */ + private detectTransport(url: string, explicitTransport?: TransportType): 'sse' | 'http-streaming' { + // If explicit transport is provided and not 'auto', use it + if (explicitTransport && explicitTransport !== 'auto') { + // Normalize 'http' to 'http-streaming' + return explicitTransport === 'sse' ? 'sse' : 'http-streaming'; + } + + // Auto-detect based on URL pattern + const ssePatterns = [ + /\/sse$/i, + /\/sse\//i, + /\/events$/i, + /\/stream$/i, + /\/server-sent-events/i, + /[?&]transport=sse/i, + ]; + + for (const pattern of ssePatterns) { + if (pattern.test(url)) { + return 'sse'; + } + } + + // Default to HTTP-Streaming transport + return 'http-streaming'; + } + + /** + * Initialize the MCP client with the selected transport. + */ + async initialize(): Promise { + if (this.client) { + if (this.options.debug) console.log('MCP client already initialized'); + return; + } + + try { + // Create appropriate client based on transport type + if (this.transportType === 'sse') { + this.client = new MCPSse(this.url, this.options.debug); + } else { + this.client = new MCPHttpStreaming(this.url, { + debug: this.options.debug, + timeout: this.options.timeout, + headers: this.options.headers, + clientName: this.options.clientName, + clientVersion: this.options.clientVersion + }); + } + + // Initialize the client + await this.client.initialize(); + + // Copy tools from the client + this.tools = [...this.client.tools]; + + if (this.options.debug) { + console.log(`Initialized MCP with ${this.tools.length} tools using ${this.transportType} transport`); + } + } catch (error) { + // Clean up on error + if (this.client) { + await this.client.close().catch(() => {}); + this.client = null; + } + throw error; + } + } + + /** + * Close the MCP client connection. + */ + async close(): Promise { + if (this.client) { + await this.client.close(); + this.client = null; + this.tools = []; + } + } + + /** + * Convert all tools to OpenAI function calling format. + */ + toOpenAITools(): any[] { + if (this.client) { + return this.client.toOpenAITools(); + } + return []; + } + + /** + * Make MCP instance iterable for easy tool access. + */ + [Symbol.iterator](): Iterator { + return this.tools[Symbol.iterator](); + } + + /** + * Get the current transport type being used. + */ + getTransportType(): 'sse' | 'http-streaming' { + return this.transportType; + } + + /** + * Get connection statistics. + */ + getStats(): { connected: boolean; toolCount: number; transport: string } { + return { + connected: this.client !== null, + toolCount: this.tools.length, + transport: this.transportType + }; + } +} + +// Export related types and classes for convenience +export { MCPTool } from './mcpSse'; +export { MCPSse } from './mcpSse'; +export { MCPHttpStreaming } from './mcpHttpStreaming'; +export { HTTPStreamingTransportOptions } from './httpStreamingTransport'; + +/** + * Helper function to create and initialize an MCP client with automatic transport detection. + */ +export async function createMCPClient(url: string, options?: MCPOptions): Promise { + const client = new MCP(url, options); + await client.initialize(); + return client; +} \ No newline at end of file diff --git a/src/praisonai-ts/src/tools/mcpHttpStreaming.ts b/src/praisonai-ts/src/tools/mcpHttpStreaming.ts new file mode 100644 index 000000000..d1aa581c9 --- /dev/null +++ b/src/praisonai-ts/src/tools/mcpHttpStreaming.ts @@ -0,0 +1,209 @@ +/** + * MCP HTTP-Streaming client implementation. + * Provides HTTP-Streaming transport support for MCP (Model Context Protocol). + */ + +import { Client } from '@modelcontextprotocol/sdk/client/index.js'; +import { createHTTPStreamingTransport, HTTPStreamingTransportOptions } from './httpStreamingTransport'; +import { BaseTool } from './index'; + +/** + * Configuration options for MCP HTTP-Streaming client. + */ +export interface MCPHttpStreamingOptions extends HTTPStreamingTransportOptions { + /** MCP client name */ + clientName?: string; + /** MCP client version */ + clientVersion?: string; +} + +/** + * Represents a single MCP tool that can be executed remotely. + */ +export class MCPTool extends BaseTool { + private client: Client; + private toolInfo: { + name: string; + description: string; + inputSchema?: any; + }; + + constructor(toolInfo: { name: string; description: string; inputSchema?: any }, client: Client) { + super(); + this.toolInfo = toolInfo; + this.client = client; + } + + get name(): string { + return this.toolInfo.name; + } + + get description(): string { + return this.toolInfo.description; + } + + get schemaProperties(): any { + return this.toolInfo.inputSchema?.properties || {}; + } + + async execute(args: any): Promise { + try { + const result = await this.client.callTool({ + name: this.toolInfo.name, + arguments: args + }); + + // Extract the actual content from the response + if (result && result.content) { + // Handle different content types + if (Array.isArray(result.content)) { + // If multiple content items, look for text content first + for (const item of result.content) { + if (item.type === 'text' && item.text) { + return item.text; + } + } + // If no text content, return the first item + if (result.content.length > 0) { + return result.content[0].text || result.content[0]; + } + } else if (typeof result.content === 'object' && result.content.text) { + return result.content.text; + } + return result.content; + } + + return result; + } catch (error) { + console.error(`Error executing MCP tool ${this.toolInfo.name}:`, error); + throw error; + } + } + + toOpenAITool(): any { + const parameters = this.toolInfo.inputSchema || { + type: 'object', + properties: {}, + required: [] + }; + + return { + type: 'function', + function: { + name: this.toolInfo.name, + description: this.toolInfo.description, + parameters: parameters + } + }; + } +} + +/** + * MCP client using HTTP-Streaming transport. + * Provides the same interface as MCP SSE client for compatibility. + */ +export class MCPHttpStreaming implements Iterable { + tools: MCPTool[] = []; + private client: Client | null = null; + private url: string; + private options: MCPHttpStreamingOptions; + + constructor(url: string, options: MCPHttpStreamingOptions = {}) { + this.url = url; + this.options = { + clientName: 'praisonai-ts-mcp', + clientVersion: '1.0.0', + ...options + }; + + if (this.options.debug) { + console.log(`MCPHttpStreaming client initialized for URL: ${url}`); + } + } + + async initialize(): Promise { + if (this.client) { + if (this.options.debug) console.log('MCP client already initialized'); + return; + } + + try { + // Create MCP client + this.client = new Client({ + name: this.options.clientName!, + version: this.options.clientVersion! + }); + + // Create HTTP-Streaming transport + const transport = createHTTPStreamingTransport(new URL(this.url), this.options); + + // Connect to the server + await this.client.connect(transport); + + // List available tools + const { tools } = await this.client.listTools(); + + // Create MCPTool instances for each discovered tool + this.tools = tools.map((t: any) => new MCPTool({ + name: t.name, + description: t.description, + inputSchema: t.inputSchema + }, this.client as Client)); + + if (this.options.debug) { + console.log(`Initialized MCPHttpStreaming with ${this.tools.length} tools using HTTP-Streaming transport`); + } + } catch (error) { + // Clean up on error + if (this.client) { + await this.client.close().catch(() => {}); + this.client = null; + } + throw new Error(`Failed to initialize MCP HTTP-Streaming client: ${error instanceof Error ? error.message : 'Unknown error'}`); + } + } + + async close(): Promise { + if (this.client) { + await this.client.close(); + this.client = null; + this.tools = []; + } + } + + toOpenAITools(): any[] { + return this.tools.map(tool => tool.toOpenAITool()); + } + + [Symbol.iterator](): Iterator { + return this.tools[Symbol.iterator](); + } + + /** + * Get connection statistics (if available). + */ + getStats(): { connected: boolean; toolCount: number; transport: string } { + return { + connected: this.client !== null, + toolCount: this.tools.length, + transport: 'http-streaming' + }; + } +} + +/** + * Backward-compatible alias for the main class. + */ +export { MCPHttpStreaming as MCP }; + +/** + * Helper function to create and initialize an MCP HTTP-Streaming client. + */ +export async function createMCPHttpStreamingClient( + url: string, + options?: MCPHttpStreamingOptions +): Promise { + const client = new MCPHttpStreaming(url, options); + await client.initialize(); + return client; +} \ No newline at end of file diff --git a/test_backward_compatibility.py b/test_backward_compatibility.py new file mode 100644 index 000000000..f971d36b0 --- /dev/null +++ b/test_backward_compatibility.py @@ -0,0 +1,150 @@ +#!/usr/bin/env python3 +""" +Test script to verify backward compatibility of MCP HTTP-Streaming implementation. +""" + +import sys +import os + +# Add the source directory to the path +sys.path.insert(0, os.path.join(os.path.dirname(__file__), 'src', 'praisonai-agents')) + +from praisonaiagents.mcp import MCP + +def test_backward_compatibility(): + """Test that all existing MCP usage patterns still work.""" + + print("Testing MCP Backward Compatibility\n") + + # Test 1: SSE URL detection (existing pattern) + print("Test 1: SSE URL auto-detection") + try: + mcp = MCP("http://localhost:8080/sse") + assert hasattr(mcp, 'is_sse') + assert mcp.is_sse == True + assert hasattr(mcp, 'is_http_streaming') + assert mcp.is_http_streaming == False + print("✓ PASS: SSE endpoints detected correctly") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 2: HTTP URL detection (new default) + print("\nTest 2: HTTP URL auto-detection") + try: + mcp = MCP("http://localhost:8080/api") + assert hasattr(mcp, 'is_sse') + assert mcp.is_sse == False + assert hasattr(mcp, 'is_http_streaming') + assert mcp.is_http_streaming == True + print("✓ PASS: HTTP endpoints default to HTTP-streaming") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 3: Stdio command (existing pattern) + print("\nTest 3: Stdio command pattern") + try: + mcp = MCP(command="/usr/bin/python", args=["server.py"]) + assert hasattr(mcp, 'is_sse') + assert mcp.is_sse == False + assert hasattr(mcp, 'is_http_streaming') + assert mcp.is_http_streaming == False + assert hasattr(mcp, 'server_params') + print("✓ PASS: Stdio command pattern works") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 4: Single string command (existing pattern) + print("\nTest 4: Single string command pattern") + try: + mcp = MCP("/usr/bin/python server.py") + assert hasattr(mcp, 'is_sse') + assert mcp.is_sse == False + assert hasattr(mcp, 'is_http_streaming') + assert mcp.is_http_streaming == False + assert hasattr(mcp, 'server_params') + print("✓ PASS: Single string command pattern works") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 5: NPX pattern (existing) + print("\nTest 5: NPX command pattern") + try: + mcp = MCP("npx @modelcontextprotocol/server-brave-search") + assert hasattr(mcp, 'is_npx') + assert mcp.is_npx == True + assert hasattr(mcp, 'is_sse') + assert mcp.is_sse == False + assert hasattr(mcp, 'is_http_streaming') + assert mcp.is_http_streaming == False + print("✓ PASS: NPX command pattern works") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 6: Named parameter 'command' (backward compatibility) + print("\nTest 6: Named parameter 'command' (legacy)") + try: + mcp = MCP(command="/usr/bin/python", args=["server.py"]) + assert hasattr(mcp, 'server_params') + print("✓ PASS: Legacy 'command' parameter works") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 7: Transport selection (new feature) + print("\nTest 7: Explicit transport selection") + try: + # Force SSE on non-SSE URL + mcp_sse = MCP("http://localhost:8080/api", transport="sse") + assert mcp_sse.is_sse == True + assert mcp_sse.is_http_streaming == False + + # Force HTTP-streaming on SSE URL + mcp_http = MCP("http://localhost:8080/sse", transport="http-streaming") + assert mcp_http.is_sse == False + assert mcp_http.is_http_streaming == True + + print("✓ PASS: Explicit transport selection works") + except Exception as e: + print(f"✗ FAIL: {e}") + + # Test 8: Invalid transport handling + print("\nTest 8: Invalid transport error handling") + try: + mcp = MCP("http://localhost:8080/api", transport="invalid") + print("✗ FAIL: Should have raised ValueError") + except ValueError as e: + print(f"✓ PASS: Correctly raised ValueError: {e}") + except Exception as e: + print(f"✗ FAIL: Wrong exception type: {e}") + + # Test 9: SSE URL patterns + print("\nTest 9: Various SSE URL patterns") + sse_urls = [ + "http://localhost:8080/sse", + "http://localhost:8080/sse/", + "http://localhost:8080/events", + "http://localhost:8080/stream", + "http://localhost:8080/server-sent-events", + "http://localhost:8080/api?transport=sse", + ] + + all_passed = True + for url in sse_urls: + try: + mcp = MCP(url) + if not mcp.is_sse: + print(f"✗ FAIL: {url} should use SSE transport") + all_passed = False + except Exception as e: + print(f"✗ FAIL: Error with {url}: {e}") + all_passed = False + + if all_passed: + print("✓ PASS: All SSE URL patterns detected correctly") + + print("\n" + "="*50) + print("Backward Compatibility Test Summary") + print("All existing MCP usage patterns continue to work!") + print("="*50) + +if __name__ == "__main__": + test_backward_compatibility() \ No newline at end of file