-
-
Couldn't load subscription status.
- Fork 668
[BUGF-MCP][Fixed MCP Streaming by adding proper async token-by-token streaming support] #1005
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
| print(f" Job ID: {result.get('job_id', 'N/A')}") | ||
| print(f" Status: {result.get('status', 'N/A')}") | ||
| print(f" Execution Time: {result.get('execution_time', 0):.2f}s") | ||
| print(f" Total Cost: ${result.get('usage', {}).get('billing_info', {}).get('total_cost', 0):.6f}") |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
sensitive data (private)
| print(f" Job ID: {result.get('job_id', 'N/A')}") | ||
| print(f" Status: {result.get('status', 'N/A')}") | ||
| print(f" Execution Time: {result.get('execution_time', 0):.2f}s") | ||
| print(f" Total Cost: ${result.get('usage', {}).get('billing_info', {}).get('total_cost', 0):.6f}") |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
sensitive data (private)
| result = response.json() | ||
| print(f"\n[OK] API call successful!") | ||
| print(f"[TIME] Duration: {end_time - start_time:.2f} seconds") | ||
| print(f"[COST] Total cost: ${result.get('usage', {}).get('billing_info', {}).get('total_cost', 0):.6f}") |
Check failure
Code scanning / CodeQL
Clear-text logging of sensitive information High
sensitive data (private)
| # Add the project root to the path | ||
| sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..', '..')) | ||
|
|
||
| from loguru import logger |
Check failure
Code scanning / Pyre
Undefined import Error
|
|
||
| # Load environment variables from .env file | ||
| try: | ||
| from dotenv import load_dotenv |
Check failure
Code scanning / Pyre
Undefined import Error
| from typing import Any, Dict, List, Literal, Optional, Union, AsyncGenerator | ||
| from urllib.parse import urlparse | ||
|
|
||
| from loguru import logger |
Check failure
Code scanning / Pyre
Undefined import Error
| from urllib.parse import urlparse | ||
|
|
||
| from loguru import logger | ||
| from pydantic import BaseModel, Field |
Check failure
Code scanning / Pyre
Undefined import Error
|
|
||
| # Try to import MCP libraries | ||
| try: | ||
| from mcp import ClientSession |
Check failure
Code scanning / Pyre
Undefined import Error
| return UnifiedTransportConfig( | ||
| transport_type="stdio", | ||
| command=command, | ||
| enable_streaming=True, | ||
| **kwargs | ||
| ) |
Check failure
Code scanning / Pyre
Unexpected keyword Error
| return UnifiedTransportConfig( | ||
| transport_type="http", | ||
| url=url, | ||
| headers=headers, | ||
| enable_streaming=True, | ||
| **kwargs | ||
| ) |
Check failure
Code scanning / Pyre
Unexpected keyword Error
| return UnifiedTransportConfig( | ||
| transport_type="streamable_http", | ||
| url=url, | ||
| headers=headers, | ||
| enable_streaming=True, | ||
| **kwargs | ||
| ) |
Check failure
Code scanning / Pyre
Unexpected keyword Error
| return UnifiedTransportConfig( | ||
| transport_type="sse", | ||
| url=url, | ||
| headers=headers, | ||
| enable_streaming=True, | ||
| **kwargs | ||
| ) |
Check failure
Code scanning / Pyre
Unexpected keyword Error
| return UnifiedTransportConfig( | ||
| transport_type="auto", | ||
| url=url, | ||
| auto_detect=True, | ||
| enable_streaming=True, | ||
| **kwargs | ||
| ) |
Check failure
Code scanning / Pyre
Unexpected keyword Error
swarms/structs/__init__.py
Outdated
| from swarms.tools.mcp_unified_client import ( | ||
| MCPUnifiedClient, | ||
| UnifiedTransportConfig, | ||
| call_tool_streaming_sync, | ||
| execute_tool_call_streaming_unified, | ||
| create_auto_config, | ||
| create_http_config, | ||
| create_streamable_http_config, | ||
| create_stdio_config, | ||
| create_sse_config, | ||
| ) |
Check failure
Code scanning / Pyre
Undefined import Error
| from swarms.tools.mcp_unified_client import ( | ||
| UnifiedMCPClient, | ||
| UnifiedTransportConfig, | ||
| call_tool_streaming, | ||
| call_tool_streaming_sync, | ||
| execute_tool_call_streaming_unified, | ||
| ) |
Check failure
Code scanning / Pyre
Undefined import Error
| from swarms.tools.mcp_unified_client import ( | ||
| UnifiedMCPClient, | ||
| UnifiedTransportConfig, | ||
| call_tool_streaming, | ||
| call_tool_streaming_sync, | ||
| execute_tool_call_streaming_unified, | ||
| ) |
Check failure
Code scanning / Pyre
Undefined import Error
| from swarms.tools.mcp_unified_client import ( | ||
| UnifiedMCPClient, | ||
| UnifiedTransportConfig, | ||
| call_tool_streaming, | ||
| call_tool_streaming_sync, | ||
| execute_tool_call_streaming_unified, | ||
| ) |
Check failure
Code scanning / Pyre
Undefined import Error
| ) | ||
|
|
||
| if use_streaming: | ||
| tool_response = self._handle_mcp_streaming(response, current_loop) |
Check failure
Code scanning / Pyre
Incompatible parameter type Error
| if use_streaming: | ||
| tool_response = self._handle_mcp_streaming(response, current_loop) | ||
| else: | ||
| tool_response = self._handle_mcp_traditional(response, current_loop) |
Check failure
Code scanning / Pyre
Incompatible parameter type Error
| tool_response = self._handle_mcp_traditional(response, current_loop) | ||
|
|
||
| # Process the tool response | ||
| self._process_mcp_response(tool_response, current_loop) |
Check failure
Code scanning / Pyre
Incompatible parameter type Error
| config = UnifiedTransportConfig( | ||
| enable_streaming=True, | ||
| streaming_timeout=self.mcp_streaming_timeout, | ||
| streaming_callback=self.mcp_streaming_callback | ||
| ) |
Check failure
Code scanning / Pyre
Unexpected keyword Error
swarms/structs/agent.py
Outdated
| style="blue" | ||
| ) | ||
|
|
||
| tool_response = call_tool_streaming_sync( |
Check failure
Code scanning / Pyre
Undefined attribute Error
swarms/structs/agent.py
Outdated
| style="blue" | ||
| ) | ||
|
|
||
| tool_response = call_tool_streaming_sync( |
Check failure
Code scanning / Pyre
Undefined attribute Error
swarms/structs/agent.py
Outdated
| tool_response = asyncio.run( | ||
| execute_tool_call_simple( | ||
| response=response, | ||
| server_path=self.mcp_url, |
Check failure
Code scanning / Pyre
Incompatible parameter type Error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should change the exports here from the structs/init to the tools/init
|
@IlumCI there are some conflicts as well, can you help to resolve them please |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All the tests should be in one file for all the MCP logic or corresponding strucuture.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can remove this try and except stuff. if it can't be exported, then there is a bug within the class/functions itself
Description
This PR implements comprehensive streaming support for the Model Context Protocol (MCP) integration in the Swarms framework. The implementation adds optional Streamable HTTP support alongside existing transport types (STDIO, HTTP, SSE) with auto-detection capabilities and graceful fallback mechanisms. Additionally, this PR integrates MCP streaming functionality directly into the core Agent class, enabling real-time streaming of MCP tool execution.
Issue
Addresses the need for real-time streaming capabilities in MCP operations, enabling token-by-token streaming output for enhanced user experience and better integration with modern AI workflows. The integration into the Agent class provides seamless streaming capabilities for all MCP tool interactions.
Dependencies
pip install mcppip install mcp[streamable-http](optional)pip install httpx(optional)Files Changed
Core Production Files (Required)
swarms/tools/mcp_unified_client.py- New unified MCP client with streaming supportswarms/schemas/mcp_schemas.py- Enhanced schemas with streaming configurationswarms/structs/agent.py- NEW: Integrated MCP streaming into core Agent classswarms/structs/__init__.py- NEW: Added MCP streaming imports for easy accessswarms/tools/mcp_client_call.py- ENHANCED: Restored advanced functionality with complex tool call extraction and multiple tool executionTest Files (New)
test_core_functionality.py- NEW: Comprehensive tests for MCP streaming functionalitytest_riemann_tools.py- NEW: Tests for Riemann Hypothesis mathematical toolssimple_working_example.py- NEW: Working demonstration of MCP streamingworking_swarms_api_mcp_demo.py- NEW: MCP demo with Swarms API integrationExample Files (Enhanced)
excelswarm.py- ENHANCED: Advanced example demonstrating Riemann Hypothesis proof attempt with MCP streamingexamples/mcp/working_mcp_server.py- ENHANCED: MCP server with mathematical tools for excelswarm.pyFiles Not Modified (Dependencies)
Tag Maintainer
@kyegomez
Twitter Handle
https://x.com/IlumTheProtogen
What Was Wrong Before
What Has Been Fixed
1. Comprehensive Streaming Support
2. Agent Class Integration
3. Enhanced MCP Tool Handling
4. Auto-detection and Fallback
5. Comprehensive Error Handling
6. Easy Import Access
7. Type Safety Improvements
8. Advanced Functionality Restoration
9. Comprehensive Testing Suite
10. Real-world Example: excelswarm.py
How It Works Now
1. Unified Transport System
The new system automatically detects the appropriate transport type from URLs:
http://localhost:8000/mcp→streamable_http(if available)stdio:///path/to/server→stdiosse://localhost:8000/events→sse2. Agent Class Integration
The Agent class now includes comprehensive MCP streaming support:
3. Real-time Streaming Output
When MCP streaming is enabled, users see:
4. Backward Compatibility
All existing MCP functionality continues to work:
5. Production-Ready Features
6. Easy Access Through Imports
All MCP streaming functionality is now easily accessible:
7. Advanced Functionality
Usage Examples
Basic MCP Streaming
Advanced Streaming with Callbacks
Runtime Streaming Control
Direct MCP Client Usage
Advanced Mathematical Example (excelswarm.py)
Testing
Unit Tests
Integration Tests
Comprehensive Test Suite
Manual Testing
Performance Impact
Breaking Changes
None - This is a fully backward-compatible enhancement. All existing code continues to work without modification.
Migration Guide
For Existing Users
No migration required. Existing MCP configurations continue to work:
For New Streaming Features
Enable streaming by adding new parameters:
For Direct MCP Client Usage
Use the new unified client for enhanced functionality:
Documentation Updates
Community Impact
Code Quality Improvements
Type Safety
any→Anytype annotationsCallableimportsError Handling
Documentation
Code Structure
Testing Coverage
Recent Fixes and Enhancements
Advanced Functionality Restoration
mcp_client_call.pyComprehensive Testing Suite
Real-world Example: excelswarm.py
Code Cleanliness
Conclusion
This PR successfully integrates comprehensive MCP streaming support into the Swarms framework, providing:
DEMO VIDEO:
Recording.2025-08-15.165350.online-video-cutter.com.mp4