Skip to content

Streaming firewall with progressive summarization #47

@dgenio

Description

@dgenio

Milestone: v0.3.0 | Tier: Creative Bet | Effort: Medium–Large

Problem

Firewall.apply() waits for the complete driver response before transforming. For large results (database queries, LLM-generated tool outputs, file listings), this:

  • Blocks until the entire response arrives — high latency for large payloads
  • Wastes memory — entire response must fit in memory before any processing
  • Prevents incremental delivery — caller can't start processing until everything is ready
  • Can't adapt — if budget is exceeded mid-stream, there's no way to switch to more aggressive summarization for remaining chunks

Proposed Change

1. Streaming firewall method

class Firewall:
    async def apply_stream(
        self,
        response_chunks: AsyncIterator[dict],
        *,
        response_mode: ResponseMode,
        budget: int,
        sensitivity: SensitivityTag,
        allowed_fields: list[str] | None = None,
    ) -> AsyncIterator[dict]:
        """Stream response chunks through the firewall with progressive summarization."""

2. Progressive summarization strategy

As chunks arrive:

  1. Phase 1 (budget > 60%): Pass chunks through with redaction only (minimal transformation).
  2. Phase 2 (budget 30%–60%): Switch to table/summary mode for remaining chunks.
  3. Phase 3 (budget 10%–30%): Aggressive summarization — emit only key-value summaries.
  4. Phase 4 (budget < 10%): Emit handle references only — store full data in HandleStore for later expand().

Budget tracking is incremental — each yielded chunk decrements the remaining budget.

3. Streaming driver protocol

Extend the Driver protocol to support streaming:

class StreamingDriver(Protocol):
    async def execute_stream(
        self,
        operation: str,
        params: dict[str, Any],
        constraints: dict[str, Any],
    ) -> AsyncIterator[dict[str, Any]]:
        """Execute an operation and yield response chunks."""

Drivers can implement execute_stream() optionally. The kernel detects support and uses streaming when available, falling back to execute() + single-chunk wrapping.

4. Kernel integration

async def invoke_stream(
    self, token: str, params: dict,
) -> AsyncIterator[Frame]:
    """Invoke a capability with streaming response."""

Each yielded Frame contains a chunk of the response. The final frame has is_final=True.

Acceptance Criteria

  • apply_stream() yields transformed chunks as they arrive (not buffered)
  • Peak memory for 10MB response stays under 1MB
  • Budget transitions happen mid-stream (mode escalation is visible in output)
  • Redaction is applied to every chunk (PII never leaks even in streaming mode)
  • StreamingDriver protocol is optional — non-streaming drivers still work
  • invoke_stream() works end-to-end with a streaming driver
  • Fallback: non-streaming drivers produce a single-chunk stream

Affected Files

  • src/agent_kernel/firewall/transform.py (add apply_stream())
  • src/agent_kernel/drivers/base.py (add StreamingDriver protocol)
  • src/agent_kernel/kernel.py (add invoke_stream())
  • src/agent_kernel/models.py (add is_final to Frame or new streaming model)
  • tests/test_firewall.py (streaming tests with mock async iterators)
  • tests/test_kernel.py (end-to-end streaming tests)

Dependencies

Risk

Medium — streaming + summarization interaction is complex. Edge cases around chunk boundaries and partial records need careful testing.

Metadata

Metadata

Assignees

No one assigned

    Labels

    complexity:averageModerate effort, some design neededphase:authRegistry, tokens, policypriority:lowNice to havesize:MMedium change, 50 to 200 linestype:featureNew functionality

    Projects

    No projects

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions