Skip to content

Commit e2eddb9

Browse files
authored
MCP Queue Example (#99)
* MCP Queue Example * mock AI mode, KV rename, cleanup, docs * chore(lockfile): update for queues-agent deps * fix test * fixing test errors for queue
1 parent bf0db7f commit e2eddb9

File tree

16 files changed

+471
-0
lines changed

16 files changed

+471
-0
lines changed
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
USE_MOCK_AI=true
2+
3+
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# http://editorconfig.org
2+
root = true
3+
4+
[*]
5+
indent_style = tab
6+
end_of_line = lf
7+
charset = utf-8
8+
trim_trailing_whitespace = true
9+
insert_final_newline = true
10+
11+
[*.yml]
12+
indent_style = space
13+
14+

examples/queues-agent/.gitignore

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
# Local development artifacts
2+
.wrangler/
3+
.dev.vars
4+
data/
5+
node_modules/
6+
7+

examples/queues-agent/.prettierrc

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"printWidth": 140,
3+
"singleQuote": true,
4+
"semi": true,
5+
"useTabs": true
6+
}
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
{
2+
"files.associations": {
3+
"wrangler.json": "jsonc"
4+
}
5+
}

examples/queues-agent/README.md

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
## Queues Agent Example
2+
3+
Showcase of using Cloudflare Queues with the NullShot Agent Toolkit:
4+
5+
- HTTP producer endpoint enqueues chat jobs
6+
- Queue consumer triggers inside the same Worker and forwards each job directly to a Durable Object Agent to process with AI
7+
8+
### Architecture
9+
10+
- Producer: `POST /enqueue` → pushes `{ sessionId, messages }` into `REQUEST_QUEUE`
11+
- Consumer: `queue()` handler → forwards each message to `AGENT` Durable Object at `/agent/chat/:sessionId`
12+
- Agent: `QueueAgent` extends the toolkit `AiSdkAgent` and streams an AI response (Workers AI by default)
13+
14+
### Files
15+
16+
- `src/index.ts` – Worker with producer route, queue consumer, and DO Agent
17+
- `wrangler.jsonc` – Bindings for Queue, Durable Object, Workers AI
18+
19+
### Prerequisites
20+
21+
- Node.js 18+
22+
- Wrangler CLI
23+
- Cloudflare account (optional for local mock; required for Workers AI and cloud)
24+
25+
### Setup
26+
27+
Run modes
28+
29+
- Local (Free): Uses Miniflare’s local queue simulation. Workers AI still requires login to produce real model output; without login you’ll see “Not logged in” in logs, but the queue flow runs end-to-end.
30+
- Tip: Set `USE_MOCK_AI=true` in `.dev.vars` to run locally without a Cloudflare account. The agent returns a deterministic mock response.
31+
- Cloud (Paid): Uses real Cloudflare Queues and Workers AI on your account. Requires a paid Workers plan for Queues.
32+
33+
1. Install deps
34+
35+
```bash
36+
pnpm install
37+
```
38+
39+
2. Create a Queue and (optionally) a DLQ (Cloud only, Paid)
40+
41+
Cloud deployment only. Not required for local dev (--local). Requires a paid Workers plan and `npx wrangler login` first.
42+
43+
```bash
44+
# Create main queue (cloud)
45+
npx wrangler queues create request-queue
46+
47+
# Optional: create a dead letter queue (cloud)
48+
npx wrangler queues create request-queue-dlq
49+
```
50+
51+
3. Configure `wrangler.jsonc`
52+
53+
Edit the queue names if you used different names in step 2. The default config expects:
54+
55+
- producer/consumer queue: `request-queue`
56+
- dead letter queue: `request-queue-dlq`
57+
58+
4. Authenticate and run with real services
59+
60+
```bash
61+
# Login interactively (recommended for dev)
62+
npx wrangler login
63+
64+
# Or set a token for non-interactive shells
65+
export CLOUDFLARE_API_TOKEN=... # least-privilege token
66+
67+
# Use real edge runtime (queues + Workers AI)
68+
npx wrangler dev --remote
69+
70+
# Deploy to Cloudflare
71+
npx wrangler deploy
72+
```
73+
74+
5. Privacy and OSS hygiene
75+
76+
- Never commit secrets or tokens. Use Wrangler secrets or environment variables.
77+
- This repo example does not include any secrets. Avoid adding `.dev.vars` to git.
78+
- For local only, you can create `.dev.vars` (excluded by `.gitignore`) to store non-sensitive vars.
79+
80+
### Usage
81+
82+
Enqueue a chat job (producer):
83+
84+
```bash
85+
curl -X POST "http://127.0.0.1:8787/enqueue" \
86+
-H "Content-Type: application/json" \
87+
-d '{
88+
"sessionId": "demo-session-1",
89+
"messages": [
90+
{ "role": "user", "content": "Say hello in one sentence." }
91+
]
92+
}'
93+
```
94+
95+
The consumer will receive queue messages and route them to the Agent Durable Object. You can tail logs to observe processing:
96+
97+
```bash
98+
npx wrangler tail
99+
```
100+
101+
### Configuration Notes
102+
103+
- `compatibility_date`: set to 2025-02-11 per repo rules
104+
- `compatibility_flags`: `["nodejs_compat"]`
105+
- Observability enabled with `head_sampling_rate = 1`
106+
- Uses Workers AI by default; set `USE_MOCK_AI=true` for zero‑auth local output.
107+
108+
### Retrieve results (optional KV persistence)
109+
110+
- This example stores the latest agent response per `sessionId` in KV (binding `RESULTS`).
111+
- Fetch the persisted output:
112+
113+
```bash
114+
curl "http://127.0.0.1:8787/result/demo-session-1"
115+
```
116+
117+
Returns:
118+
119+
```json
120+
{ "result": "... assistant text ..." }
121+
```
122+
123+
### Production checklist
124+
125+
- Auth: `npx wrangler login` (or set `CLOUDFLARE_API_TOKEN`).
126+
- Queues: create `request-queue` and optional `request-queue-dlq` (Paid plan required for cloud queues).
127+
- Retries/DLQ: keep `retry_delay` configured; monitor DLQ.
128+
- Persistence: KV (included), or D1/R2 for richer storage.
129+
- Security: no secrets in code; use Wrangler secrets/env vars.
130+
- Observability: `wrangler tail`, dashboard logs, metrics.
131+
- Limits: model usage, queue throughput, DO CPU limits.
132+
133+
Note: Workers AI requires Cloudflare auth even in local dev. Without login you’ll see “Not logged in” in logs.
134+
If you prefer zero-auth local output, set `USE_MOCK_AI=true` in `.dev.vars`.
135+
136+
### Example Payload
137+
138+
```json
139+
{
140+
"sessionId": "demo-session-1",
141+
"messages": [{ "role": "user", "content": "Summarize Cloudflare Queues in one line." }]
142+
}
143+
```

examples/queues-agent/package.json

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
{
2+
"name": "queues-agent",
3+
"version": "0.0.1",
4+
"private": true,
5+
"scripts": {
6+
"dev": "wrangler dev",
7+
"deploy": "wrangler deploy",
8+
"build": "wrangler build",
9+
"test": "vitest run"
10+
},
11+
"devDependencies": {
12+
"typescript": "catalog:",
13+
"wrangler": "catalog:",
14+
"vitest": "catalog:",
15+
"@cloudflare/vitest-pool-workers": "catalog:",
16+
"@nullshot/test-utils": "workspace:*"
17+
},
18+
"dependencies": {
19+
"@nullshot/agent": "workspace:*",
20+
"ai": "catalog:",
21+
"hono": "^4.7.7",
22+
"workers-ai-provider": "catalog:"
23+
}
24+
}

examples/queues-agent/src/index.ts

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
import { Hono } from 'hono';
2+
import { cors } from 'hono/cors';
3+
import { applyPermissionlessAgentSessionRouter } from '@nullshot/agent';
4+
import { AiSdkAgent, AIUISDKMessage } from '@nullshot/agent/aisdk';
5+
import { Service } from '@nullshot/agent';
6+
import { ToolboxService } from '@nullshot/agent/services';
7+
import { createWorkersAI } from 'workers-ai-provider';
8+
9+
// Minimal agent that echoes a short response via Workers AI
10+
export class QueueAgent extends AiSdkAgent<Env> {
11+
constructor(state: DurableObjectState, env: Env) {
12+
// If USE_MOCK_AI is enabled, we don't require the Workers AI binding
13+
let model: any;
14+
if (env.USE_MOCK_AI === 'true') {
15+
// Provide a dummy model; processMessage will short-circuit in mock mode
16+
model = {} as any;
17+
} else {
18+
if (!env.AI) throw new Error('AI binding missing. Configure Workers AI in wrangler.jsonc');
19+
const workersai = createWorkersAI({ binding: env.AI });
20+
model = workersai('@cf/meta/llama-3.1-8b-instruct' as any);
21+
}
22+
const services: Service[] = [new ToolboxService(env)];
23+
super(state, env, model, services);
24+
}
25+
26+
async processMessage(sessionId: string, messages: AIUISDKMessage): Promise<Response> {
27+
// Mock mode: return deterministic response without calling Workers AI
28+
if (this.env.USE_MOCK_AI === 'true') {
29+
const last = messages.messages[messages.messages.length - 1];
30+
const userText = typeof last?.content === 'string' ? last.content : 'Hello';
31+
const reply = `Mock response: ${userText}`;
32+
return new Response(reply, { headers: { 'Content-Type': 'text/plain' } });
33+
}
34+
35+
const result = await this.streamTextWithMessages(sessionId, messages.messages, {
36+
system: 'You are a helpful assistant. Keep responses concise.',
37+
maxSteps: 5,
38+
});
39+
return result.toTextStreamResponse();
40+
}
41+
}
42+
43+
// Hono app for producer and agent gateway
44+
const app = new Hono<{ Bindings: Env }>();
45+
app.use('*', cors());
46+
47+
// Simple enqueue endpoint: { sessionId, messages }
48+
app.post('/enqueue', async (c) => {
49+
const body = await c.req.json<any>();
50+
const sessionId: string = body.sessionId || crypto.randomUUID();
51+
const messages = body.messages || [{ role: 'user', content: 'Hello!' }];
52+
53+
await c.env.REQUEST_QUEUE.send({ sessionId, messages });
54+
55+
return c.json({ enqueued: true, sessionId });
56+
});
57+
58+
// Fetch latest result for session
59+
app.get('/result/:sessionId', async (c) => {
60+
const sessionId = c.req.param('sessionId');
61+
const value = await c.env.RESULTS_KV.get(`result:${sessionId}`);
62+
if (!value) return c.json({ result: null }, 200);
63+
return c.json({ result: value }, 200);
64+
});
65+
66+
// Route /agent/chat/:sessionId to the DO agent
67+
applyPermissionlessAgentSessionRouter(app);
68+
69+
export default {
70+
async fetch(request: Request, env: Env, ctx: ExecutionContext): Promise<Response> {
71+
return app.fetch(request, env, ctx);
72+
},
73+
74+
// Queue consumer: run messages through the Agent DO
75+
async queue(batch: MessageBatch<any>, env: Env, ctx: ExecutionContext) {
76+
for (const msg of batch.messages) {
77+
try {
78+
const { sessionId, messages } = msg.body || {};
79+
if (!sessionId || !messages) {
80+
console.warn('Invalid queue message, skipping');
81+
continue;
82+
}
83+
const id = env.AGENT.idFromName(sessionId);
84+
const req = new Request('https://internal/agent/chat/' + sessionId, {
85+
method: 'POST',
86+
headers: { 'Content-Type': 'application/json' },
87+
body: JSON.stringify({ id: crypto.randomUUID(), messages }),
88+
});
89+
// Synchronously fetch the agent and persist full text to KV for retrieval
90+
const resp = await env.AGENT.get(id).fetch(req);
91+
const text = await resp.text();
92+
ctx.waitUntil(
93+
env.RESULTS_KV.put(`result:${sessionId}`, text, {
94+
expirationTtl: 60 * 60,
95+
}),
96+
);
97+
} catch (e) {
98+
console.error('Queue processing error:', e);
99+
throw e;
100+
}
101+
}
102+
},
103+
};
Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
declare module 'cloudflare:test' {
2+
interface ProvidedEnv extends Env {}
3+
}
Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,40 @@
1+
import { expect, it } from 'vitest';
2+
import { SELF } from 'cloudflare:test';
3+
// Use the in-memory worker provided by the Vitest Workers pool
4+
const BASE = 'https://example.com';
5+
6+
it(
7+
'enqueues and returns a result (may be null without Workers AI auth)',
8+
async () => {
9+
const sessionId = `test-${crypto.randomUUID()}`;
10+
11+
const enqueue = await SELF.fetch(`${BASE}/enqueue`, {
12+
method: 'POST',
13+
headers: { 'Content-Type': 'application/json' },
14+
body: JSON.stringify({
15+
sessionId,
16+
messages: [{ role: 'user', content: 'One sentence about queues.' }],
17+
}),
18+
});
19+
expect(enqueue.ok).toBe(true);
20+
const ej = await enqueue.json();
21+
expect(ej.enqueued).toBe(true);
22+
23+
// Poll result up to ~8s
24+
let result: any = null;
25+
for (let i = 0; i < 8; i++) {
26+
const r = await SELF.fetch(`${BASE}/result/${sessionId}`);
27+
expect(r.ok).toBe(true);
28+
const j = await r.json();
29+
if (j.result) {
30+
result = j.result;
31+
break;
32+
}
33+
await new Promise((res) => setTimeout(res, 1000));
34+
}
35+
36+
// We allow null when not authenticated to Workers AI locally
37+
expect(result === null || typeof result === 'string').toBe(true);
38+
},
39+
{ timeout: 20000 },
40+
);

0 commit comments

Comments
 (0)