Skip to content

Commit 71403b2

Browse files
committed
solely rely on tanstack query to vend data and invalidate query key's, remove custom caching
1 parent b079e98 commit 71403b2

File tree

9 files changed

+242
-220
lines changed

9 files changed

+242
-220
lines changed

apps/docs/content/docs/en/tools/a2a.mdx

Lines changed: 0 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -57,31 +57,6 @@ Send a message to an external A2A-compatible agent.
5757
| `artifacts` | array | Structured output artifacts |
5858
| `history` | array | Full message history |
5959

60-
### `a2a_send_message_stream`
61-
62-
Send a message to an external A2A-compatible agent with real-time streaming.
63-
64-
#### Input
65-
66-
| Parameter | Type | Required | Description |
67-
| --------- | ---- | -------- | ----------- |
68-
| `agentUrl` | string | Yes | The A2A agent endpoint URL |
69-
| `message` | string | Yes | Message to send to the agent |
70-
| `taskId` | string | No | Task ID for continuing an existing task |
71-
| `contextId` | string | No | Context ID for conversation continuity |
72-
| `apiKey` | string | No | API key for authentication |
73-
74-
#### Output
75-
76-
| Parameter | Type | Description |
77-
| --------- | ---- | ----------- |
78-
| `content` | string | The text response from the agent |
79-
| `taskId` | string | Task ID for follow-up interactions |
80-
| `contextId` | string | Context ID for conversation continuity |
81-
| `state` | string | Task state |
82-
| `artifacts` | array | Structured output artifacts |
83-
| `history` | array | Full message history |
84-
8560
### `a2a_get_task`
8661

8762
Query the status of an existing A2A task.

apps/sim/app/api/a2a/agents/[agentId]/route.ts

Lines changed: 19 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,3 @@
1-
/**
2-
* A2A Agent Card Endpoint
3-
*
4-
* Returns the Agent Card (discovery document) for an A2A agent.
5-
* Also supports CRUD operations for managing agents.
6-
*/
7-
81
import { db } from '@sim/db'
92
import { a2aAgent, workflow } from '@sim/db/schema'
103
import { createLogger } from '@sim/logger'
@@ -13,6 +6,7 @@ import { type NextRequest, NextResponse } from 'next/server'
136
import { generateAgentCard, generateSkillsFromWorkflow } from '@/lib/a2a/agent-card'
147
import type { AgentCapabilities, AgentSkill } from '@/lib/a2a/types'
158
import { checkHybridAuth } from '@/lib/auth/hybrid'
9+
import { getRedisClient } from '@/lib/core/config/redis'
1610
import { loadWorkflowFromNormalizedTables } from '@/lib/workflows/persistence/utils'
1711

1812
const logger = createLogger('A2AAgentCardAPI')
@@ -226,6 +220,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
226220
})
227221
.where(eq(a2aAgent.id, agentId))
228222

223+
const redis = getRedisClient()
224+
if (redis) {
225+
try {
226+
await redis.del(`a2a:agent:${agentId}:card`)
227+
} catch (err) {
228+
logger.warn('Failed to invalidate agent card cache', { agentId, error: err })
229+
}
230+
}
231+
229232
logger.info(`Published A2A agent: ${agentId}`)
230233
return NextResponse.json({ success: true, isPublished: true })
231234
}
@@ -239,6 +242,15 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
239242
})
240243
.where(eq(a2aAgent.id, agentId))
241244

245+
const redis = getRedisClient()
246+
if (redis) {
247+
try {
248+
await redis.del(`a2a:agent:${agentId}:card`)
249+
} catch (err) {
250+
logger.warn('Failed to invalidate agent card cache', { agentId, error: err })
251+
}
252+
}
253+
242254
logger.info(`Unpublished A2A agent: ${agentId}`)
243255
return NextResponse.json({ success: true, isPublished: false })
244256
}

apps/sim/app/api/a2a/serve/[agentId]/route.ts

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<R
6060
return NextResponse.json(JSON.parse(cached), {
6161
headers: {
6262
'Content-Type': 'application/json',
63-
'Cache-Control': 'public, max-age=3600',
63+
'Cache-Control': 'private, max-age=60',
6464
'X-Cache': 'HIT',
6565
},
6666
})
@@ -134,7 +134,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<R
134134

135135
if (redis) {
136136
try {
137-
await redis.set(cacheKey, JSON.stringify(agentCard), 'EX', 3600)
137+
await redis.set(cacheKey, JSON.stringify(agentCard), 'EX', 60)
138138
} catch (err) {
139139
logger.warn('Redis cache write failed', { agentId, error: err })
140140
}
@@ -143,7 +143,7 @@ export async function GET(_request: NextRequest, { params }: { params: Promise<R
143143
return NextResponse.json(agentCard, {
144144
headers: {
145145
'Content-Type': 'application/json',
146-
'Cache-Control': 'public, max-age=3600',
146+
'Cache-Control': 'private, max-age=60',
147147
'X-Cache': 'MISS',
148148
},
149149
})
@@ -224,7 +224,6 @@ export async function POST(request: NextRequest, { params }: { params: Promise<R
224224
}
225225

226226
const { id, method, params: rpcParams } = body
227-
// Only accept API keys from X-API-Key header to maintain proper auth boundaries
228227
const apiKey = request.headers.get('X-API-Key')
229228

230229
logger.info(`A2A request: ${method} for agent ${agentId}`)
@@ -294,8 +293,6 @@ async function handleMessageSend(
294293
const contextId = message.contextId || uuidv4()
295294

296295
// Distributed lock to prevent concurrent task processing
297-
// Note: When Redis is unavailable, acquireLock returns true (degraded mode).
298-
// In production, ensure Redis is available for proper distributed locking.
299296
const lockKey = `a2a:task:${taskId}:lock`
300297
const lockValue = uuidv4()
301298
const acquired = await acquireLock(lockKey, lockValue, 60)
@@ -332,7 +329,6 @@ async function handleMessageSend(
332329

333330
history.push(message)
334331

335-
// Truncate history to prevent unbounded JSONB growth
336332
if (history.length > A2A_MAX_HISTORY_LENGTH) {
337333
history.splice(0, history.length - A2A_MAX_HISTORY_LENGTH)
338334
}
@@ -492,10 +488,6 @@ async function handleMessageStream(
492488
const taskId = message.taskId || generateTaskId()
493489

494490
// Distributed lock to prevent concurrent task processing
495-
// Note: When Redis is unavailable, acquireLock returns true (degraded mode).
496-
// Lock timeout: 5 minutes for streaming to handle long-running workflows.
497-
// If a streaming request fails without releasing the lock, subsequent requests
498-
// will be blocked until timeout. The lock is released in finally block below.
499491
const lockKey = `a2a:task:${taskId}:lock`
500492
const lockValue = uuidv4()
501493
const acquired = await acquireLock(lockKey, lockValue, 300)
@@ -542,7 +534,6 @@ async function handleMessageStream(
542534

543535
history.push(message)
544536

545-
// Truncate history to prevent unbounded JSONB growth
546537
if (history.length > A2A_MAX_HISTORY_LENGTH) {
547538
history.splice(0, history.length - A2A_MAX_HISTORY_LENGTH)
548539
}
@@ -575,7 +566,14 @@ async function handleMessageStream(
575566
async start(controller) {
576567
const sendEvent = (event: string, data: unknown) => {
577568
try {
578-
controller.enqueue(encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`))
569+
const jsonRpcResponse = {
570+
jsonrpc: '2.0' as const,
571+
id,
572+
result: data,
573+
}
574+
controller.enqueue(
575+
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`)
576+
)
579577
} catch (error) {
580578
logger.error('Error sending SSE event:', error)
581579
}
@@ -667,7 +665,6 @@ async function handleMessageStream(
667665
}
668666
}
669667

670-
// Use finalContent if available and non-empty, otherwise fall back to accumulated content
671668
const messageContent =
672669
(finalContent !== undefined && finalContent.length > 0
673670
? finalContent
@@ -691,12 +688,13 @@ async function handleMessageStream(
691688
logger.error('Failed to trigger push notification', { taskId, error: err })
692689
})
693690

694-
sendEvent('status', {
695-
kind: 'status',
696-
taskId,
691+
sendEvent('task', {
692+
kind: 'task',
693+
id: taskId,
697694
contextId,
698695
status: { state: 'completed', timestamp: new Date().toISOString() },
699-
final: true,
696+
history,
697+
artifacts: [],
700698
})
701699
} else {
702700
const result = await response.json()
@@ -735,12 +733,13 @@ async function handleMessageStream(
735733
logger.error('Failed to trigger push notification', { taskId, error: err })
736734
})
737735

738-
sendEvent('status', {
739-
kind: 'status',
740-
taskId,
736+
sendEvent('task', {
737+
kind: 'task',
738+
id: taskId,
741739
contextId,
742740
status: { state: 'completed', timestamp: new Date().toISOString() },
743-
final: true,
741+
history,
742+
artifacts,
744743
})
745744
}
746745
} catch (error) {
@@ -914,6 +913,8 @@ async function handleTaskResubscribe(
914913
})
915914
}
916915

916+
const encoder = new TextEncoder()
917+
917918
if (isTerminalState(task.status as TaskState)) {
918919
const completedTask = buildTaskResponse({
919920
taskId: task.id,
@@ -922,14 +923,19 @@ async function handleTaskResubscribe(
922923
history: task.messages as Message[],
923924
artifacts: (task.artifacts as Artifact[]) || [],
924925
})
925-
return NextResponse.json(createResponse(id, completedTask))
926+
const jsonRpcResponse = { jsonrpc: '2.0' as const, id, result: completedTask }
927+
const sseData = `event: task\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`
928+
const stream = new ReadableStream({
929+
start(controller) {
930+
controller.enqueue(encoder.encode(sseData))
931+
controller.close()
932+
},
933+
})
934+
return new NextResponse(stream, { headers: SSE_HEADERS })
926935
}
927-
928-
const encoder = new TextEncoder()
929936
let isCancelled = false
930937
let pollTimeoutId: ReturnType<typeof setTimeout> | null = null
931938

932-
// Listen for client disconnection via request signal
933939
const abortSignal = request.signal
934940
abortSignal.addEventListener('abort', () => {
935941
isCancelled = true
@@ -944,7 +950,10 @@ async function handleTaskResubscribe(
944950
const sendEvent = (event: string, data: unknown): boolean => {
945951
if (isCancelled || abortSignal.aborted) return false
946952
try {
947-
controller.enqueue(encoder.encode(`event: ${event}\ndata: ${JSON.stringify(data)}\n\n`))
953+
const jsonRpcResponse = { jsonrpc: '2.0' as const, id, result: data }
954+
controller.enqueue(
955+
encoder.encode(`event: ${event}\ndata: ${JSON.stringify(jsonRpcResponse)}\n\n`)
956+
)
948957
return true
949958
} catch (error) {
950959
logger.error('Error sending SSE event:', error)
@@ -973,8 +982,8 @@ async function handleTaskResubscribe(
973982
return
974983
}
975984

976-
const pollInterval = 3000 // 3 seconds (reduced from 1s to lower DB load)
977-
const maxPolls = 100 // 5 minutes max (100 * 3s = 300s)
985+
const pollInterval = 3000 // 3 seconds
986+
const maxPolls = 100 // 5 minutes max
978987

979988
let polls = 0
980989
const poll = async () => {

apps/sim/app/workspace/[workspaceId]/w/[workflowId]/components/chat/components/output-select/output-select.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -331,7 +331,7 @@ export function OutputSelect({
331331
disabled={disabled || workflowOutputs.length === 0}
332332
align={align}
333333
maxHeight={maxHeight}
334-
dropdownWidth={220}
334+
dropdownWidth={180}
335335
/>
336336
)
337337
}

0 commit comments

Comments
 (0)