Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added
- PR template and YAML issue forms (`.github/`)
- `output_schema` field on `SelectableItem` — stores MCP `outputSchema` JSON Schema (#102)
- `mcp_tool_to_selectable()` now maps `outputSchema` to `SelectableItem.output_schema`
- `mcp_result_to_envelope()` handles `audio` content type (base64-decoded, stored as binary artifact)
- `mcp_result_to_envelope()` handles `resource_link` content type (URI reference as `ArtifactRef`)
- `mcp_result_to_envelope()` handles top-level `structuredContent` (stored as `application/json` artifact with fact extraction)
- `mcp_result_to_envelope()` collects per-part `annotations` (`audience`, `priority`) into `provenance["content_annotations"]`

## [0.1.4] - 2026-03-06

Expand Down
58 changes: 55 additions & 3 deletions docs/integration_mcp.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,15 +23,26 @@ mcp_tool = {
"query": {"type": "string"},
"limit": {"type": "integer", "default": 10}
}
},
"outputSchema": {
"type": "object",
"properties": {
"results": {"type": "array"},
"total": {"type": "integer"}
}
}
}

item = mcp_tool_to_selectable(mcp_tool)
# item.id == "mcp:search_database"
# item.kind == "tool"
# item.name == "search_database"
# item.id == "mcp:search_database"
# item.kind == "tool"
# item.name == "search_database"
# item.output_schema == {"type": "object", ...}
```

If the tool definition includes an `outputSchema`, it is preserved in
`item.output_schema`. When absent the field is `None`.

The namespace is inferred automatically from the tool name prefix:

| Tool name | Inferred namespace |
Expand Down Expand Up @@ -63,6 +74,47 @@ envelope, binaries, full_text = mcp_result_to_envelope(mcp_result, "search_datab
# binaries maps handle → (raw_bytes, media_type, label)
```

#### Supported content types

| Content type | Handling |
|-----------------|-----------------------------------------------------------------------------------------------|
| `text` | Concatenated into `full_text` and `summary` |
| `image` | Base64-decoded; stored as binary artifact |
| `audio` | Base64-decoded; stored as binary artifact (e.g. `audio/wav`) |
| `resource` | Text extracted into `full_text`; raw bytes stored as artifact |
| `resource_link` | URI stored as `ArtifactRef`; URI string in `binaries` for caller resolution |

#### Structured content

If the result contains a top-level `structuredContent` dict, it is
serialized as a JSON artifact and its top-level keys are extracted as
facts:

```python
mcp_result = {
"content": [{"type": "text", "text": "query done"}],
"structuredContent": {"count": 42, "status": "done"},
}
envelope, binaries, _ = mcp_result_to_envelope(mcp_result, "query")
# binaries["mcp:query:structured_content"] → JSON bytes
# envelope.facts includes "count: 42", "status: done"
```

#### Content-part annotations

Per-part `annotations` (with `audience` and `priority` fields) are
collected into `envelope.provenance["content_annotations"]`:

```python
mcp_result = {
"content": [
{"type": "text", "text": "...", "annotations": {"audience": ["human"], "priority": 0.9}},
],
}
envelope, _, _ = mcp_result_to_envelope(mcp_result, "tool")
# envelope.provenance["content_annotations"] == [{"part_index": 0, "audience": ["human"], ...}]
```

### `load_mcp_session_jsonl(path)`

Loads a JSONL session file containing MCP-style events and returns a
Expand Down
130 changes: 107 additions & 23 deletions src/contextweaver/adapters/mcp.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ def mcp_tool_to_selectable(tool_def: dict[str, Any]) -> SelectableItem:
- ``name`` (required)
- ``description`` (required)
- ``inputSchema`` (optional JSON Schema dict)
- ``outputSchema`` (optional JSON Schema dict for structured output)
- ``annotations`` (optional dict with ``title``, ``readOnlyHint``,
``destructiveHint``, ``costHint``, etc.)

Expand All @@ -86,6 +87,10 @@ def mcp_tool_to_selectable(tool_def: dict[str, Any]) -> SelectableItem:

annotations: dict[str, Any] = tool_def.get("annotations") or {}
input_schema: dict[str, Any] = tool_def.get("inputSchema") or {}
output_schema_raw: dict[str, Any] | None = tool_def.get("outputSchema")
output_schema: dict[str, Any] | None = (
dict(output_schema_raw) if output_schema_raw is not None else None
)

# Derive tags from annotation hints
tags: list[str] = ["mcp"]
Expand All @@ -105,12 +110,40 @@ def mcp_tool_to_selectable(tool_def: dict[str, Any]) -> SelectableItem:
tags=sorted(tags),
namespace=infer_namespace(str(name)),
args_schema=dict(input_schema),
output_schema=output_schema,
side_effects=side_effects,
cost_hint=cost_hint,
metadata={k: v for k, v in annotations.items() if k != "costHint"},
)


def _decode_binary_part(
part: dict[str, Any],
tool_name: str,
index: int,
default_mime: str,
kind: str,
) -> tuple[ArtifactRef, tuple[bytes, str, str]]:
"""Decode a base64-encoded content part (image, audio, etc.)."""
import base64 as _b64

mime = part.get("mimeType", default_mime)
data_str = part.get("data") or ""
handle = f"mcp:{tool_name}:{kind}:{index}"
label = f"{kind} from {tool_name}"
try:
raw = _b64.b64decode(data_str, validate=True)
except Exception: # noqa: BLE001
raw = data_str if isinstance(data_str, bytes) else str(data_str).encode("utf-8")
ref = ArtifactRef(
handle=handle,
media_type=mime,
size_bytes=len(raw),
label=label,
)
return ref, (raw, mime, label)


def mcp_result_to_envelope(
result: dict[str, Any],
tool_name: str,
Expand All @@ -120,12 +153,19 @@ def mcp_result_to_envelope(
The MCP result dict is expected to have:

- ``content`` — a list of content parts, each with ``type`` and
``text`` (or ``data`` / ``resource``).
``text`` (or ``data`` / ``resource``). Supported content types:
``text``, ``image``, ``resource``, ``resource_link``, ``audio``.
- ``structuredContent`` (optional) — a JSON value with typed tool
output; stored as a structured artifact. Only dicts get fact extraction.
- ``isError`` (optional bool) — if ``True``, status becomes ``"error"``.

Returns the envelope, a dict of binary data extracted from image and
resource content parts, and the full (untruncated) text. Use
:meth:`ContextManager.ingest_mcp_result` for the full happy path
Each content part may carry per-part ``annotations`` with ``audience``
and ``priority`` fields. These are collected into the envelope's
``provenance["content_annotations"]`` list.

Returns the envelope, a dict of binary data extracted from image,
audio, and resource content parts, and the full (untruncated) text.
Use :meth:`ContextManager.ingest_mcp_result` for the full happy path
that persists artifacts automatically.

Args:
Expand All @@ -137,37 +177,36 @@ def mcp_result_to_envelope(
maps ``handle -> (raw_bytes, media_type, label)`` and *full_text*
is the complete untruncated text content.
"""
import base64 as _b64
import json as _json

is_error = bool(result.get("isError", False))
content_parts: list[dict[str, Any]] = result.get("content") or []
# MCP spec allows any JSON value; only dicts get fact extraction.
structured_content: Any = result.get("structuredContent")

text_parts: list[str] = []
artifacts: list[ArtifactRef] = []
binaries: dict[str, tuple[bytes, str, str]] = {}
content_annotations: list[dict[str, Any]] = []

for i, part in enumerate(content_parts):
part_type = part.get("type", "text")

# Collect per-part annotations (audience / priority)
part_annotations = part.get("annotations")
if isinstance(part_annotations, dict) and part_annotations:
content_annotations.append({"part_index": i, **part_annotations})

if part_type == "text":
text_parts.append(part.get("text", ""))
elif part_type == "image":
mime = part.get("mimeType", "image/png")
data_str = part.get("data") or ""
handle = f"mcp:{tool_name}:image:{i}"
# Decode base64 image data; fall back to raw bytes on error.
try:
raw = _b64.b64decode(data_str)
except Exception: # noqa: BLE001
raw = data_str if isinstance(data_str, bytes) else str(data_str).encode("utf-8")
artifacts.append(
ArtifactRef(
handle=handle,
media_type=mime,
size_bytes=len(raw),
label=f"image from {tool_name}",
)
)
binaries[handle] = (raw, mime, f"image from {tool_name}")
ref, blob = _decode_binary_part(part, tool_name, i, "image/png", "image")
artifacts.append(ref)
binaries[ref.handle] = blob
elif part_type == "audio":
ref, blob = _decode_binary_part(part, tool_name, i, "audio/wav", "audio")
artifacts.append(ref)
binaries[ref.handle] = blob
elif part_type == "resource":
resource: dict[str, Any] = part.get("resource", {})
mime = resource.get("mimeType", "application/octet-stream")
Expand All @@ -187,6 +226,47 @@ def mcp_result_to_envelope(
)
)
binaries[handle] = (raw, mime, label)
elif part_type == "resource_link":
uri = part.get("uri", "")
mime = part.get("mimeType", "application/octet-stream")
name = part.get("name", "")
handle = f"mcp:{tool_name}:resource_link:{i}"
label = name or uri or f"resource link from {tool_name}"
uri_bytes = uri.encode("utf-8")
artifacts.append(
ArtifactRef(
handle=handle,
media_type=mime,
size_bytes=len(uri_bytes),
label=label,
)
)
# No dereferenced payload — resource_link is a URI reference only.
# Store the URI bytes so callers can resolve the URI themselves.
# Use text/uri-list (RFC 2483) since the payload is a URI, not the
# linked resource; the resource's declared MIME stays in ArtifactRef.
binaries[handle] = (uri_bytes, "text/uri-list", label)

# Handle structuredContent (top-level JSON output per MCP spec)
if structured_content is not None:
sc_handle = f"mcp:{tool_name}:structured_content"
sc_bytes = _json.dumps(structured_content, sort_keys=True).encode("utf-8")
artifacts.append(
ArtifactRef(
handle=sc_handle,
media_type="application/json",
size_bytes=len(sc_bytes),
label=f"structured content from {tool_name}",
)
)
binaries[sc_handle] = (sc_bytes, "application/json", f"structured content from {tool_name}")
# Extract facts from top-level keys when structured_content is a mapping.
if isinstance(structured_content, dict):
for key, value in structured_content.items():
rendered = str(value)
if len(rendered) < 200:
facts_line = f"{key}: {rendered}"
text_parts.append(facts_line)

full_text = "\n".join(text_parts) if text_parts else "(no content)"

Expand All @@ -200,12 +280,16 @@ def mcp_result_to_envelope(
if ":" in stripped and len(stripped) < 200:
facts.append(stripped)

provenance: dict[str, Any] = {"tool": tool_name, "protocol": "mcp"}
if content_annotations:
provenance["content_annotations"] = content_annotations

envelope = ResultEnvelope(
status=status,
summary=full_text[:500] if len(full_text) > 500 else full_text,
facts=facts[:20],
artifacts=artifacts,
provenance={"tool": tool_name, "protocol": "mcp"},
provenance=provenance,
)
return envelope, binaries, full_text

Expand Down
5 changes: 5 additions & 0 deletions src/contextweaver/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class SelectableItem:
tags: list[str] = field(default_factory=list)
namespace: str = ""
args_schema: dict[str, Any] = field(default_factory=dict)
output_schema: dict[str, Any] | None = None
examples: list[str] = field(default_factory=list)
constraints: dict[str, Any] = field(default_factory=dict)
side_effects: bool = False
Expand All @@ -87,6 +88,7 @@ def to_dict(self) -> dict[str, Any]:
"tags": list(self.tags),
"namespace": self.namespace,
"args_schema": dict(self.args_schema),
"output_schema": dict(self.output_schema) if self.output_schema is not None else None,
"examples": list(self.examples),
"constraints": dict(self.constraints),
"side_effects": self.side_effects,
Expand All @@ -105,6 +107,9 @@ def from_dict(cls, data: dict[str, Any]) -> SelectableItem:
tags=list(data.get("tags", [])),
namespace=data.get("namespace", ""),
args_schema=dict(data.get("args_schema", {})),
output_schema=dict(data["output_schema"])
if data.get("output_schema") is not None
else None,
examples=list(data.get("examples", [])),
constraints=dict(data.get("constraints", {})),
side_effects=bool(data.get("side_effects", False)),
Expand Down
Loading