Skip to content

M002: Real-Time Monitoring — SSE push, batch writes, live dashboards#109

Merged
TerrifiedBug merged 19 commits intomainfrom
milestone/M002
Mar 24, 2026
Merged

M002: Real-Time Monitoring — SSE push, batch writes, live dashboards#109
TerrifiedBug merged 19 commits intomainfrom
milestone/M002

Conversation

@TerrifiedBug
Copy link
Copy Markdown
Owner

Summary

Transforms VectorFlow monitoring from poll-based (15–30s stale) to push-based (5s live). Agent heartbeats speed up to 5s, all hot-path DB writes are batched, and a single authenticated SSE endpoint streams metrics, fleet status, logs, and status changes to every dashboard page, the pipeline editor, and log viewers. Toast notifications surface critical state transitions. Graceful fallback to 30s polling on SSE disconnect.

73 files changed, 4702 insertions, 256 tests across 20 files.

What Changed

S01: Agent Heartbeat Speedup & Batch Writes

  • Go agent default poll interval: 15s → 5s
  • batchUpsertPipelineStatuses() — single INSERT...ON CONFLICT raw SQL for any pipeline count
  • ingestMetrics() rewritten with $transaction(deleteMany + createMany)
  • Per-heartbeat queries: ~300-400 → ~15-20 at 100 pipelines
  • MetricStore MAX_SAMPLES: 240 → 720 (1hr at 5s)

S02: SSE Infrastructure & MetricStore Pub/Sub

  • src/lib/sse/types.ts — 4 typed SSE events + discriminated union
  • src/server/services/metric-store.ts — subscribe/unsubscribe/flush pub/sub
  • src/server/services/sse-registry.ts — connection registry with environment-scoped permission filtering
  • src/app/api/sse/route.ts — authenticated SSE endpoint with ReadableStream + 30s keepalive
  • src/hooks/use-sse.ts — EventSource lifecycle with exponential backoff reconnect (1s→30s)
  • src/stores/sse-store.ts — Zustand connection status store
  • Heartbeat handler wired with 4 SSE emission points

S03: Dashboard & Fleet Pages Wired to SSE

  • usePollingInterval — suppresses polling when SSE connected, 30s floor on disconnect
  • useRealtimeInvalidation — maps SSE events → React Query key invalidation (500ms debounce)
  • 9 dashboard/fleet/analytics files converted from hardcoded refetchInterval to SSE-aware polling

S04: Pipeline Editor Live Overlays

  • Fixed cross-cutting bug: useSSE subscribers promoted from per-instance useRef to module-level Map
  • useFlowMetrics(pipelineId) — bridges SSE metric_update events to flow store
  • deriveMetrics — maps source/transform/sink node kinds to correct rate fields

S05: Streaming Logs & Toast Notifications

  • useStreamingLogs — SSE log_entry subscription with parse/dedup/200-entry buffer
  • parseLogLine — handles JSON (Vector msg/message convention) and plain-text formats
  • useSSEToasts — CRASHED→error, DEPLOYED→success, OFFLINE→warning with 30s cooldown dedup
  • Pipeline status transition detection in heartbeat handler
  • Deploy-complete SSE emission in deploy router

S06: Integration Verification

  • 8 integration tests: MetricStore→SSERegistry permission-scoped delivery, polling state machine, multi-type fan-out

New Files (25)

  • src/app/api/sse/route.ts
  • src/lib/sse/types.ts, src/lib/log-utils.ts
  • src/server/services/heartbeat-batch.ts, src/server/services/sse-registry.ts
  • src/hooks/use-sse.ts, use-polling-interval.ts, use-realtime-invalidation.ts, use-flow-metrics.ts, use-streaming-logs.ts, use-sse-toasts.ts
  • src/stores/sse-store.ts
  • 13 test files

Requirements Validated

R013 (5s heartbeats), R014 (MetricStore pub/sub), R015 (SSE endpoint), R016 (live dashboard), R017 (editor overlays), R018 (streaming logs), R019 (toast notifications), R020 (polling fallback), R021 (batch DB writes), R022 (SSE permission scoping)

Verification

  • tsc --noEmit
  • eslint src/
  • pnpm test — 256/256 pass ✅

@greptile-apps
Copy link
Copy Markdown
Contributor

greptile-apps bot commented Mar 24, 2026

Greptile Summary

This PR transforms VectorFlow's monitoring layer from poll-based (15–30 s stale) to push-based (5 s live) by adding a full SSE pipeline: a single authenticated /api/sse endpoint, an SSERegistry for environment-scoped fan-out, a pub/sub layer on MetricStore, and five new React hooks that wire SSE events to React Query invalidation, flow-store metrics overlays, streaming log buffers, and toast notifications. Batch DB writes (batchUpsertPipelineStatuses, the new ingestMetrics transaction) reduce per-heartbeat queries from ~300–400 down to ~15–20 at 100 pipelines.

What's in good shape:

  • The previously flagged ingestMetrics accumulate-within-minute bug is properly fixed with accumulateRow() — deltas are read, accumulated onto existing per-node rows, and then replaced inside a single transaction.
  • SSERegistry.broadcast catches dead-controller exceptions and self-cleans; the keepalive timer calls .unref() so it never blocks process exit.
  • useSSE's module-level activeConnectionCount guard correctly prevents duplicate tab connections and is safe under React Strict Mode double-invoke.
  • usePollingInterval cleanly implements the 30 s polling floor fallback (R020) by returning false when SSE is connected.
  • useSSEToasts' fleet_status OFFLINE path is acknowledged dead code with a forward-looking comment, not an oversight.
  • Auth scoping is correct: environment IDs are resolved server-side at connection time and all broadcasts are filtered against the connection's environmentIds.

Minor residual from previous threads:

  • deploy.ts still emits fromStatus: "" and nodeId: "" on deploy-complete events (partially addressed — "PENDING""", but proper status lookup was not added).

Confidence Score: 5/5

  • Safe to merge — no new correctness, security, or data-loss issues introduced; all previously raised blocking concerns have been resolved.
  • The only open item from prior threads (fromStatus: "" in deploy.ts) is a presentational data quality issue on an SSE event that clients currently only use for toasts — it cannot cause a crash, data loss, or security problem, and was already flagged and acknowledged. The core concerns (ingestMetrics accumulation, SSE auth scoping, MetricStore pub/sub isolation) are all well-implemented. 256/256 tests pass and the architecture is clean.
  • No files require special attention.

Important Files Changed

Filename Overview
src/app/api/sse/route.ts New SSE endpoint with proper session authentication, environment-scoped permission resolution, and dual cleanup (ReadableStream cancel + abort signal). No issues found.
src/server/services/sse-registry.ts Connection registry with environment-scoped broadcast, stale-connection cleanup on enqueue failure, keepalive timer with unref(), and safe double-unregister via stale-check guard. Clean implementation.
src/server/services/metrics-ingest.ts Refactored to correctly accumulate within-minute deltas using accumulateRow() — addresses the previous thread's delete-replace concern. Per-node rows are read, delta-accumulated, then replaced inside a single transaction; aggregation rows are recomputed from all nodes' data.
src/server/services/heartbeat-batch.ts Single INSERT…ON CONFLICT raw SQL batch upsert for NodePipelineStatus. Parameterized via Prisma.sql tagged templates (safe from injection). Uses globalThis.crypto.randomUUID() without import — consistent with Node.js 20+ globals, fine given Next.js 16 minimum.
src/app/api/agent/heartbeat/route.ts SSE emission points wired correctly (fleet_status, status_change on node/pipeline transitions, metric_update via MetricStore.flush, log_entry). Component latency batch uses deleteMany+createMany which drops rows for any component absent from the current heartbeat within the same minute — minor edge case for normally-active pipelines. The fleet_status event is always HEALTHY (no change from previous behaviour).
src/hooks/use-sse.ts Single-connection-per-tab guard via module-level activeConnectionCount, exponential backoff reconnect (1s→30s), typed event dispatch through module-level subscriber Map. Lifecycle cleanup correctly handles React Strict Mode double-effect and pending reconnect timers.
src/server/services/metric-store.ts Added pub/sub subscribe/unsubscribe/flush API alongside existing ring-buffer logic. MAX_SAMPLES bumped from 240→720 for 1hr at 5s cadence. flush() batches all component events per pipeline per call (100 pipelines = 100 subscriber calls, not 500).
src/hooks/use-sse-toasts.ts CRASHED→error and DEPLOYED→success toasts with 30s cooldown dedup. OFFLINE fleet_status path is intentional dead code with an explanatory comment (pending future watchdog service). 30s dedup correctly prevents toast storms during rapid pipeline cycling.
src/hooks/use-streaming-logs.ts SSE log_entry subscription with 200-entry ring buffer, fingerprint-based dedup (level + 80-char message prefix, 30s window), and pipelineId/nodeId filter. Exported pure helpers are well-tested. filterRef pattern correctly avoids stale closure on options changes.
src/hooks/use-realtime-invalidation.ts 500ms debounced batch invalidation mapping SSE event types to tRPC query key prefixes. Set-based dedup prevents duplicate invalidations within the window. Cleanup correctly cancels pending timer and clears keys.
src/server/routers/deploy.ts Added SSE broadcast on deploy completion. The fromStatus: "" and nodeId: "" pattern was flagged in a previous review thread; it remains a partial fix (changed from "PENDING" to ""). No new issues introduced by this diff.
src/hooks/use-polling-interval.ts Returns false (suppress polling) when SSE is connected, or max(baseInterval, 30_000) as a floor when disconnected/reconnecting. Pure getPollingInterval logic is separately tested.
src/hooks/use-flow-metrics.ts Bridges SSE metric_update events into flow store's updateNodeMetrics. Per-component sample buffer (capped at 60), kind-specific rate field mapping via deriveMetrics. Node kind resolved from flow store state rather than event data — avoids stale events updating wrong field.
src/lib/log-utils.ts Parses raw log lines from agent heartbeats. Handles JSON (Vector msg/message convention) and plain-text level-prefix formats. Falls back gracefully on malformed JSON. Level aliases normalize common variants (warn/warning, err/error).

Sequence Diagram

sequenceDiagram
    participant Agent as Go Agent
    participant HB as /api/agent/heartbeat
    participant MetricStore as MetricStore
    participant SSEReg as SSERegistry
    participant SSERoute as /api/sse
    participant Browser as Browser (useSSE)

    Browser->>SSERoute: GET /api/sse (authenticated)
    SSERoute->>SSERoute: Resolve environmentIds
    SSERoute->>SSEReg: register(connId, controller, userId, envIds)
    SSERoute-->>Browser: text/event-stream (: connected)

    loop Every 5 seconds
        Agent->>HB: POST /api/agent/heartbeat {metrics, pipelineStatuses}
        HB->>HB: batchUpsertPipelineStatuses()
        HB->>HB: ingestMetrics() → accumulateRow() delta
        HB->>MetricStore: recordTotals() + flush(nodeId, pipelineId)
        MetricStore-->>HB: MetricUpdateEvent[]
        HB->>SSEReg: broadcast(metric_update, envId)
        HB->>SSEReg: broadcast(fleet_status HEALTHY, envId)
        HB->>SSEReg: broadcast(status_change if pipeline transitioned, envId)
        HB->>SSEReg: broadcast(log_entry if recentLogs, envId)
        SSEReg->>Browser: SSE event stream (filtered by envIds)
    end

    Browser->>Browser: useSSE dispatches to subscribers
    Browser->>Browser: useRealtimeInvalidation → debounced React Query invalidation
    Browser->>Browser: useSSEToasts → sonner toast on CRASHED/DEPLOYED
    Browser->>Browser: useFlowMetrics → updateNodeMetrics in flow store
    Browser->>Browser: useStreamingLogs → parsed log buffer

    note over Browser: On SSE disconnect → usePollingInterval returns 30s floor
Loading

Reviews (2): Last reviewed commit: "fix: accumulate per-minute metric deltas..." | Re-trigger Greptile

@TerrifiedBug
Copy link
Copy Markdown
Owner Author

@greptile review

- agent/internal/config/config.go
- src/server/services/metric-store.ts
- src/server/services/heartbeat-batch.ts
- src/app/api/agent/heartbeat/route.ts
- src/server/services/__tests__/heartbeat-batch.test.ts
- src/server/services/metrics-ingest.ts
- src/app/api/agent/heartbeat/route.ts
- src/server/services/__tests__/metrics-ingest.test.ts
- src/server/services/__tests__/heartbeat-batch.test.ts
- src/server/services/__tests__/metrics-ingest.test.ts
- src/server/services/metric-store.ts
- src/lib/sse/types.ts
- src/server/services/metric-store.ts
- src/server/services/sse-registry.ts
- src/app/api/sse/route.ts
- src/app/api/agent/heartbeat/route.ts
- src/stores/sse-store.ts
- src/hooks/use-sse.ts
- src/server/services/__tests__/metric-store.test.ts
- src/server/services/__tests__/sse-registry.test.ts
- src/hooks/use-polling-interval.ts
- src/hooks/use-realtime-invalidation.ts
- src/app/(dashboard)/layout.tsx
- src/hooks/__tests__/use-polling-interval.test.ts
- src/hooks/__tests__/use-realtime-invalidation.test.ts
- src/app/(dashboard)/page.tsx
- src/app/(dashboard)/analytics/page.tsx
- src/app/(dashboard)/fleet/[nodeId]/page.tsx
- src/components/fleet/deployment-matrix.tsx
- src/components/fleet/uptime-cards.tsx
- src/components/fleet/node-metrics-charts.tsx
- src/components/fleet/event-log.tsx
- src/components/fleet/status-timeline.tsx
- src/hooks/use-sse.ts
- src/hooks/__tests__/use-sse.test.ts
- src/hooks/use-flow-metrics.ts
- src/hooks/__tests__/use-flow-metrics.test.ts
- src/lib/sse/types.ts
- src/app/api/agent/heartbeat/route.ts
- src/server/routers/deploy.ts
- src/hooks/use-realtime-invalidation.ts
- src/lib/log-utils.ts
- src/lib/__tests__/log-utils.test.ts
- src/hooks/__tests__/use-realtime-invalidation.test.ts
- src/hooks/use-streaming-logs.ts
- src/hooks/__tests__/use-streaming-logs.test.ts
- src/components/pipeline/pipeline-logs.tsx
- src/components/fleet/node-logs.tsx
- src/hooks/use-sse-toasts.ts
- src/hooks/__tests__/use-sse-toasts.test.ts
- src/app/(dashboard)/layout.tsx
- src/server/services/__tests__/sse-integration.test.ts
- src/hooks/__tests__/sse-lifecycle.test.ts
…ranch, fix deploy fromStatus

Fixes three issues flagged by Greptile code review:

1. metrics-ingest.ts: The batch rewrite (S01) replaced findFirst+increment
   with deleteMany+createMany, which discarded accumulated deltas within
   the same minute. At 5s heartbeats, only the last delta survived instead
   of all ~12 per minute — causing ~12x undercount in historical analytics.
   Fix: read existing rows first, add deltas in-memory (accumulateRow),
   then delete+create with accumulated totals. 5 new tests.

2. use-sse-toasts.ts: The status_change OFFLINE branch was dead code —
   the heartbeat handler only emits status_change for recovery (HEALTHY)
   and pipeline transitions (always has pipelineId). A node going offline
   means no heartbeats, so no status_change is emitted. Removed the dead
   branch. The fleet_status OFFLINE branch is retained with a comment
   noting it needs a future server-side watchdog to emit OFFLINE events.

3. deploy.ts: Replaced hardcoded fromStatus 'PENDING' with empty string
   since the deploy action doesn't know the previous pipeline status.
   Added descriptive reason strings distinguishing direct deploy from
   deploy-request approval.
@TerrifiedBug TerrifiedBug merged commit 33d3215 into main Mar 24, 2026
3 checks passed
@TerrifiedBug TerrifiedBug deleted the milestone/M002 branch March 24, 2026 13:10
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant