Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/agents/cms-comp-ops.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ tools:
- search_vectorstore_hybrid
- search_local_files
- search_metadata_index
- fetch_url
---

You are the CMS Comp Ops assistant. You help with operational questions, troubleshooting,
Expand Down
22 changes: 22 additions & 0 deletions src/archi/pipelines/agents/cms_comp_ops_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
create_metadata_search_tool,
create_metadata_schema_tool,
create_retriever_tool,
create_fetch_url_tool,
initialize_mcp_client,
RemoteCatalogClient,
MONITOpenSearchClient,
Expand Down Expand Up @@ -132,6 +133,16 @@ def _tool_definitions(self) -> Dict[str, Dict[str, Any]]:
"builder": self._build_mcp_tools,
"description": "Access tools served via configured MCP servers.",
},
"fetch_url": {
"builder": self._build_fetch_url_tool,
"description": (
"Fetch live content from a URL via HTTP GET request. "
"Input: A valid HTTP or HTTPS URL. "
"Output: The response body text or an error message. "
"Use this to retrieve real-time data from web endpoints, APIs, documentation, or status pages. "
"Examples: checking endpoint status, fetching API data, retrieving documentation."
),
},
}

# Keep this safe for lightweight introspection paths that call
Expand Down Expand Up @@ -182,6 +193,17 @@ def _build_fetch_tool(self) -> Callable:
store_tool_input=getattr(self, "_store_tool_input", None),
)

def _build_fetch_url_tool(self) -> Callable:
"""Build the fetch URL tool for retrieving live web content."""
description = self._tool_definitions()["fetch_url"]["description"]
return create_fetch_url_tool(
name="fetch_url",
description=description,
timeout=15.0,
max_response_chars=40000,
store_tool_input=getattr(self, "_store_tool_input", None),
)

def _build_vector_tool_placeholder(self) -> List[Callable]:
return []

Expand Down
2 changes: 2 additions & 0 deletions src/archi/pipelines/agents/tools/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
RemoteCatalogClient,
)
from .retriever import create_retriever_tool
from .fetch_url import create_fetch_url_tool
from .mcp import initialize_mcp_client
from .monit_opensearch import (
MONITOpenSearchClient,
Expand All @@ -23,6 +24,7 @@
"create_metadata_schema_tool",
"RemoteCatalogClient",
"create_retriever_tool",
"create_fetch_url_tool",
"initialize_mcp_client",
"MONITOpenSearchClient",
"create_monit_opensearch_search_tool",
Expand Down
185 changes: 185 additions & 0 deletions src/archi/pipelines/agents/tools/fetch_url.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
"""HTTP GET request tool for fetching live data from URLs."""

from __future__ import annotations

from typing import Callable, Optional
from urllib.parse import urlparse, urlunparse

import requests
from langchain.tools import tool

from src.utils.logging import get_logger
from src.archi.pipelines.agents.tools.base import require_tool_permission

logger = get_logger(__name__)


# Default permission required to use the HTTP GET tool
DEFAULT_REQUIRED_PERMISSION = "tools:http_get"


def _validate_url(url: str) -> tuple[bool, Optional[str]]:
"""
Validate that the URL is well-formed and uses HTTP/HTTPS.

Returns:
(is_valid, error_message) tuple
"""
try:
parsed = urlparse(url)
if parsed.scheme not in ("http", "https"):
return False, f"Invalid URL scheme '{parsed.scheme}'. Only HTTP and HTTPS are supported."
if not parsed.netloc:
return False, "Invalid URL: missing hostname."
return True, None
except Exception as e:
return False, f"Invalid URL: {str(e)}"


def _sanitize_url_for_error(url: str) -> str:
"""
Remove credentials from URL for error messages.

Example: http://user:pass@example.com -> http://***:***@example.com
"""
try:
parsed = urlparse(url)
if parsed.username or parsed.password:
sanitized_netloc = f"***:***@{parsed.hostname}" + (f":{parsed.port}" if parsed.port else "")
return urlunparse((
parsed.scheme,
sanitized_netloc,
parsed.path,
parsed.params,
parsed.query,
parsed.fragment,
))
return url
except Exception:
return "***"


def create_fetch_url_tool(
*,
name: str = "fetch_url",
description: Optional[str] = None,
timeout: float = 10.0,
max_response_chars: int = 40000,
required_permission: Optional[str] = DEFAULT_REQUIRED_PERMISSION,
store_tool_input: Optional[Callable[[str, object], None]] = None,
) -> Callable[[str], str]:
"""
Create a LangChain tool that makes HTTP GET requests to fetch live data from URLs.

This tool allows agents to retrieve real-time information from web endpoints,
APIs, or documentation URLs. Only GET requests are supported for security reasons.

Args:
name: The name of the tool (used by the LLM when selecting tools).
description: Human-readable description of what the tool does.
If None, a default description is used.
timeout: Maximum time in seconds to wait for a response. Default is 10 seconds.
max_response_chars: Maximum number of characters to return from the response body.
Responses longer than this are truncated with a "[truncated]" indicator.
Default is 40000 characters.
required_permission: The RBAC permission required to use this tool.
Default is 'tools:http_get'. Set to None to disable permission checks.
store_tool_input: Optional callback to persist tool inputs for tracing.

Returns:
A callable LangChain tool that accepts a URL string and returns either:
- The response body text (truncated if needed)
- An error message describing what went wrong

Security Notes:
- Only HTTP and HTTPS URLs are accepted
- Credentials in URLs are sanitized in error messages
- Response size is limited to prevent context window overflow
- Timeouts prevent hanging on slow/unresponsive endpoints
- RBAC permission check is enforced at tool invocation time
"""
tool_description = description or (
"Fetch content from a URL via HTTP GET request.\n"
"Input: A valid HTTP or HTTPS URL string.\n"
"Output: The response body text (up to {max_chars} characters) or an error message.\n"
"Use this to retrieve live data from web endpoints, APIs, or documentation URLs.\n"
"Example input: 'https://example.com/api/status'\n"
"IMPORTANT: When using this tool, avoid providing general answers from your knowledge. "
"Instead, if you fail to retrieve the data, inform the user with the error message "
"returned by this tool and ask if they would like a general answer instead."
).format(max_chars=max_response_chars)

@tool(name, description=tool_description)
@require_tool_permission(required_permission)
def _fetch_url_tool(url: str) -> str:
"""Fetch content from a URL via HTTP GET request."""
if store_tool_input:
try:
store_tool_input(name, {"url": url})
except Exception:
logger.debug("Failed to store runtime input for tool '%s'", name, exc_info=True)

# Validate URL
is_valid, error_msg = _validate_url(url)
if not is_valid:
logger.warning(f"Fetch URL tool received invalid URL: {_sanitize_url_for_error(url)}")
return f"Error: {error_msg}"

# Make request with error handling
try:
logger.info(f"Fetch URL tool fetching: {_sanitize_url_for_error(url)}")

response = requests.get(
url,
timeout=timeout,
allow_redirects=True,
)

# Check for authentication errors first
if response.status_code == 401:
logger.warning(
f"Fetch URL tool received 401 Unauthorized from {_sanitize_url_for_error(url)}"
)
return (
"Error: HTTP 401: Unauthorized. This endpoint requires authentication, "
"but the fetch URL tool does not support authentication credentials. "
"Please use a public endpoint or provide the user with alternative access methods."
)

# Check for other HTTP errors (4xx, 5xx)
if response.status_code >= 400:
logger.warning(
f"Fetch URL tool received status {response.status_code} from {_sanitize_url_for_error(url)}"
)
status_text = response.reason or "Error"
return f"Error: HTTP {response.status_code}: {status_text}"

# Success - return response text (truncated if needed)
response_text = response.text
if len(response_text) > max_response_chars:
truncated = response_text[:max_response_chars].rstrip()
logger.info(
f"Fetch URL tool truncated response from {len(response_text)} to {max_response_chars} chars"
)
return f"{truncated}\n\n... [response truncated at {max_response_chars} characters]"

logger.info(f"Fetch URL tool successfully fetched {len(response_text)} chars")
return response_text

except requests.exceptions.Timeout:
logger.warning(f"Fetch URL tool timeout after {timeout}s: {_sanitize_url_for_error(url)}")
return f"Error: Request timed out after {timeout} seconds. The endpoint may be slow or unresponsive."

except requests.exceptions.ConnectionError as e:
logger.warning(f"Fetch URL tool connection error: {_sanitize_url_for_error(url)} - {str(e)}")
return f"Error: Connection failed. The endpoint may be unreachable or the URL may be incorrect."

except requests.exceptions.RequestException as e:
logger.warning(f"Fetch URL tool request error: {_sanitize_url_for_error(url)} - {str(e)}")
return f"Error: Request failed - {type(e).__name__}. Please check the URL and try again."

except Exception as e:
logger.error(f"Fetch URL tool unexpected error: {_sanitize_url_for_error(url)} - {str(e)}")
return f"Error: An unexpected error occurred while fetching the URL."

return _fetch_url_tool