Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
name: E2E Tests

on:
push:
paths:
- "charts/**"
- "cmd/snmcp-e2e/**"
- "scripts/e2e-test.sh"
- ".github/workflows/e2e.yaml"
pull_request:
paths:
- "charts/**"
- "cmd/snmcp-e2e/**"
- "scripts/e2e-test.sh"
- ".github/workflows/e2e.yaml"
workflow_dispatch:

permissions:
contents: read

jobs:
e2e:
runs-on: ubuntu-latest
steps:
- name: Check out code
uses: actions/checkout@v4

- name: Set up Go
uses: actions/setup-go@v5
with:
go-version-file: "go.mod"

- name: Set up Helm
uses: azure/setup-helm@v4

- name: Set up Kind
uses: helm/kind-action@v1
with:
cluster_name: kind

- name: Download dependencies
run: go mod download

- name: Run E2E tests
run: ./scripts/e2e-test.sh all

- name: Cleanup
if: always()
run: ./scripts/e2e-test.sh cleanup
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,4 @@ vendor
.cursor/
agents/
.serena/
.envrc
2 changes: 2 additions & 0 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -135,3 +135,5 @@ This file follows the AGENTS.md spec described in the Codex system message (scop
---

Happy hacking! 🚀

@CLAUDE.md as reference.
125 changes: 108 additions & 17 deletions CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ This file provides guidance to Claude Code (claude.ai/code) when working with co

```bash
make build # Build server binary to bin/snmcp
make docker-build # Build local Docker image (both streamnative/mcp-server and streamnative/snmcp tags)
make docker-build-push # Build and push multi-platform image (linux/amd64,linux/arm64)
make docker-build-multiplatform # Build multi-platform image locally
make docker-buildx-setup # Setup Docker buildx for multi-platform builds
make license-check # Check license headers
make license-fix # Fix license headers
go test -race ./... # Run all tests with race detection
go test -race ./pkg/mcp/builders/... # Run specific package tests
go test -v -run TestName ./pkg/... # Run a single test
make license-check # Check license headers
make license-fix # Fix license headers
make docker-build # Build local Docker image
make docker-build-push # Build and push multi-platform image
```

## Architecture Overview
Expand All @@ -24,9 +26,11 @@ The StreamNative MCP Server implements the Model Context Protocol using the `mar
```
Client Request → MCP Server (pkg/mcp/server.go)
SSE/stdio transport layer (pkg/cmd/mcp/)
Tool Handler (from builders)
Session Context (pkg/mcp/ctx.go)
Context Functions (pkg/mcp/ctx.go)
Service Client (Kafka/Pulsar/SNCloud)
```
Expand All @@ -38,29 +42,44 @@ Client Request → MCP Server (pkg/mcp/server.go)
- Sessions provide lazy-initialized clients for each service
- Context functions (`pkg/mcp/ctx.go`) inject/retrieve sessions from request context

2. **Tool Builders** (`pkg/mcp/builders/`)
2. **Tool Builders Framework** (`pkg/mcp/builders/`)
- `ToolBuilder` interface: `GetName()`, `GetRequiredFeatures()`, `BuildTools()`, `Validate()`
- `BaseToolBuilder` provides common feature validation logic
- Each builder creates `[]server.ServerTool` with tool definitions and handlers
- Builders in `builders/kafka/` and `builders/pulsar/` implement service-specific tools
- `ToolRegistry` manages all tool builders with concurrent-safe registration
- `ToolBuildConfig` specifies build parameters (ReadOnly, Features, Options)
- `ToolMetadata` describes tool information (Name, Version, Description, Category, Tags)

3. **Tool Builders Organization**
- `builders/kafka/` - Kafka-specific tool builders (connect, consume, groups, partitions, produce, schema_registry, topics)
- `builders/pulsar/` - Pulsar-specific tool builders (brokers, brokers_stats, cluster, functions, functions_worker, namespace, namespace_policy, nsisolationpolicy, packages, resourcequotas, schema, sinks, sources, subscription, tenant, topic, topic_policy)
- `builders/streamnative/` - StreamNative Cloud tool builders

3. **Tool Registration** (`pkg/mcp/*_tools.go`)
4. **Tool Registration** (`pkg/mcp/*_tools.go`)
- Each `*_tools.go` file creates a builder, builds tools, and adds them to the server
- Tools are conditionally registered based on `--features` flag
- Feature constants defined in `pkg/mcp/features.go`

4. **PFTools - Functions as Tools** (`pkg/mcp/pftools/`)
5. **PFTools - Functions as Tools** (`pkg/mcp/pftools/`)
- `PulsarFunctionManager` dynamically converts Pulsar Functions to MCP tools
- Polls for function changes and auto-registers/unregisters tools
- Circuit breaker pattern (`circuit_breaker.go`) for fault tolerance
- Schema conversion (`schema.go`) for input/output handling

6. **Session Management** (`pkg/mcp/session/`)
- `pulsar_session_manager.go` - LRU session cache with TTL cleanup for multi-session mode

7. **Transport Layer** (`pkg/cmd/mcp/`)
- `sse.go` - SSE transport with health endpoints (`/healthz`, `/readyz`) and auth middleware
- `server.go` - Stdio transport and common server initialization

### Key Design Patterns

- **Builder Pattern**: Tool builders create tools based on features and read-only mode
- **Registry Pattern**: ToolRegistry provides centralized management of all builders
- **Context Injection**: Sessions passed via `context.Context` using typed keys
- **Feature Flags**: Tools enabled/disabled via string feature identifiers
- **Circuit Breaker**: PFTools uses failure thresholds to prevent cascading failures
- **Multi-Session Pattern**: Per-user Pulsar sessions with LRU caching for SSE mode

## Adding New Tools

Expand Down Expand Up @@ -106,7 +125,7 @@ Client Request → MCP Server (pkg/mcp/server.go)

4. **Get Session in Handler**:
```go
session := mcpCtx.GetKafkaSession(ctx) // or GetPulsarSession
session := mcp.GetKafkaSession(ctx) // or GetPulsarSession
if session == nil {
return mcp.NewToolResultError("session not found"), nil
}
Expand All @@ -115,10 +134,13 @@ Client Request → MCP Server (pkg/mcp/server.go)

## Session Context Access

Handlers receive sessions via context (see `pkg/mcp/internal/context/ctx.go`):
- `mcpCtx.GetKafkaSession(ctx)` → `*kafka.Session`
- `mcpCtx.GetPulsarSession(ctx)` → `*pulsar.Session`
- `mcpCtx.GetSNCloudSession(ctx)` → `*config.Session`
Handlers receive sessions via context (see `pkg/mcp/ctx.go`):
- `mcp.GetKafkaSession(ctx)` → `*kafka.Session`
- `mcp.GetPulsarSession(ctx)` → `*pulsar.Session`
- `mcp.GetSNCloudSession(ctx)` → `*config.Session`
- `mcp.GetSNCloudOrganization(ctx)` → organization string
- `mcp.GetSNCloudInstance(ctx)` → instance string
- `mcp.GetSNCloudCluster(ctx)` → cluster string

From sessions:
- `session.GetAdminClient()` / `session.GetAdminV3Client()` for Pulsar admin
Expand All @@ -144,13 +166,82 @@ When `--multi-session-pulsar` is enabled (SSE server with external Pulsar only):
- **Session management**: See `pkg/mcp/session/pulsar_session_manager.go`

Key files:
- `pkg/cmd/mcp/sse.go` - Auth middleware wraps SSEHandler()/MessageHandler()
- `pkg/cmd/mcp/sse.go` - Auth middleware wraps SSEHandler()/MessageHandler(), health endpoints
- `pkg/mcp/session/pulsar_session_manager.go` - LRU session cache with TTL cleanup
- `pkg/cmd/mcp/server.go` - Skips global PulsarSession when multi-session enabled

### Health Endpoints

SSE server exposes health check endpoints:
- `GET /mcp/healthz` - Liveness probe (always returns "ok")
- `GET /mcp/readyz` - Readiness probe (always returns "ready")

## Feature Flags

Available feature flags (defined in `pkg/mcp/features.go`):

| Feature | Description |
|---------|-------------|
| `all` | Enable all features |
| `all-kafka` | All Kafka features |
| `all-pulsar` | All Pulsar features |
| `kafka-client` | Kafka produce/consume |
| `kafka-admin` | Kafka admin operations (all admin tools) |
| `kafka-admin-schema-registry` | Schema Registry |
| `kafka-admin-kafka-connect` | Kafka Connect |
| `kafka-admin-topics` | Manage Kafka topics |
| `kafka-admin-partitions` | Manage Kafka partitions |
| `kafka-admin-groups` | Manage Kafka consumer groups |
| `pulsar-admin` | Pulsar admin operations (all admin tools) |
| `pulsar-client` | Pulsar produce/consume |
| `pulsar-admin-brokers` | Manage Pulsar brokers |
| `pulsar-admin-brokers-status` | Pulsar broker status |
| `pulsar-admin-broker-stats` | Access Pulsar broker statistics |
| `pulsar-admin-clusters` | Manage Pulsar clusters |
| `pulsar-admin-functions` | Manage Pulsar Functions |
| `pulsar-admin-functions-worker` | Manage Pulsar Function workers |
| `pulsar-admin-namespaces` | Manage Pulsar namespaces |
| `pulsar-admin-namespace-policy` | Configure namespace policies |
| `pulsar-admin-ns-isolation-policy` | Manage namespace isolation policies |
| `pulsar-admin-packages` | Manage Pulsar packages |
| `pulsar-admin-resource-quotas` | Configure resource quotas |
| `pulsar-admin-schemas` | Manage Pulsar schemas |
| `pulsar-admin-subscriptions` | Manage Pulsar subscriptions |
| `pulsar-admin-tenants` | Manage Pulsar tenants |
| `pulsar-admin-topics` | Manage Pulsar topics |
| `pulsar-admin-sinks` | Manage Pulsar IO sinks |
| `pulsar-admin-sources` | Manage Pulsar Sources |
| `pulsar-admin-topic-policy` | Configure topic policies |
| `streamnative-cloud` | StreamNative Cloud context management |
| `functions-as-tools` | Dynamic Pulsar Functions as MCP tools |

## Helm Chart

The project includes a Helm chart for Kubernetes deployment at `charts/snmcp/`:

```bash
# Basic installation
helm install snmcp ./charts/snmcp \
--set pulsar.webServiceURL=http://pulsar.example.com:8080

# With TLS
helm install snmcp ./charts/snmcp \
--set pulsar.webServiceURL=https://pulsar:8443 \
--set pulsar.tls.enabled=true \
--set pulsar.tls.secretName=pulsar-tls
```

The chart runs MCP Server in Multi-Session Pulsar mode with authentication via `Authorization: Bearer <token>` header.

## SDK Packages

The project includes generated SDK packages:
- `sdk/sdk-apiserver/` - StreamNative Cloud API server client
- `sdk/sdk-kafkaconnect/` - Kafka Connect client

## Error Handling

- Wrap errors: `fmt.Errorf("failed to X: %w", err)`
- Return tool errors: `mcp.NewToolResultError("message")`
- Check session nil before operations
- For PFTools, use circuit breaker to handle repeated failures
- For PFTools, use circuit breaker to handle repeated failures
18 changes: 18 additions & 0 deletions charts/snmcp/Chart.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
apiVersion: v2
name: snmcp
description: A Helm chart for StreamNative MCP Server with Multi-Session Pulsar support
type: application
version: 0.1.0
appVersion: "0.0.1"
home: https://github.com/streamnative/streamnative-mcp-server
sources:
- https://github.com/streamnative/streamnative-mcp-server
maintainers:
- name: StreamNative
url: https://streamnative.io
keywords:
- mcp
- streamnative
- pulsar
- ai
- llm
Loading
Loading