Skip to content

Commit eda9e94

Browse files
authored
feat: add component latency metrics end-to-end (#91)
* feat(agent): scrape component_latency_mean_seconds from Vector 0.54.0 * feat(db): add latencyMeanMs column to PipelineMetric * feat: accept and store component latency in heartbeat pipeline * feat(api): expose latency data in metrics and dashboard endpoints * feat(ui): add formatLatency helper and Component Latency chart to dashboard * feat(ui): add Component Latency chart to pipeline metrics page * feat(ui): add latency to flow editor overlay and show-metrics panel * feat(ui): add Avg Latency column to fleet node pipeline table * feat: add latency_mean SLI metric type for pipeline health evaluation * fix: use proper mean for latency aggregation and handle zero formatting - Fix running-average bug in getNodePipelineRates that gave incorrect latency means when aggregating across >2 components - Add early return for formatLatency(0) to display "0ms" instead of "0.000ms"
1 parent f5710ee commit eda9e94

File tree

24 files changed

+240
-35
lines changed

24 files changed

+240
-35
lines changed

agent/internal/agent/heartbeat.go

Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -42,14 +42,15 @@ func buildHeartbeat(sup *supervisor.Supervisor, vectorVersion string, deployment
4242
// Map per-component metrics for editor node overlays
4343
for _, cm := range sr.Components {
4444
ps.ComponentMetrics = append(ps.ComponentMetrics, client.ComponentMetric{
45-
ComponentID: cm.ComponentID,
46-
ComponentKind: cm.ComponentKind,
47-
ReceivedEvents: cm.ReceivedEvents,
48-
SentEvents: cm.SentEvents,
49-
ReceivedBytes: cm.ReceivedBytes,
50-
SentBytes: cm.SentBytes,
51-
ErrorsTotal: cm.ErrorsTotal,
52-
DiscardedEvents: cm.DiscardedEvents,
45+
ComponentID: cm.ComponentID,
46+
ComponentKind: cm.ComponentKind,
47+
ReceivedEvents: cm.ReceivedEvents,
48+
SentEvents: cm.SentEvents,
49+
ReceivedBytes: cm.ReceivedBytes,
50+
SentBytes: cm.SentBytes,
51+
ErrorsTotal: cm.ErrorsTotal,
52+
DiscardedEvents: cm.DiscardedEvents,
53+
LatencyMeanSeconds: cm.LatencyMeanSeconds,
5354
})
5455
}
5556

agent/internal/client/client.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,8 @@ type ComponentMetric struct {
178178
ReceivedBytes int64 `json:"receivedBytes,omitempty"`
179179
SentBytes int64 `json:"sentBytes,omitempty"`
180180
ErrorsTotal int64 `json:"errorsTotal,omitempty"`
181-
DiscardedEvents int64 `json:"discardedEvents,omitempty"`
181+
DiscardedEvents int64 `json:"discardedEvents,omitempty"`
182+
LatencyMeanSeconds float64 `json:"latencyMeanSeconds,omitempty"`
182183
}
183184

184185
// HostMetrics holds system-level metrics from the Vector host

agent/internal/metrics/scraper.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,8 @@ type ComponentMetrics struct {
2828
ReceivedBytes int64
2929
SentBytes int64
3030
ErrorsTotal int64
31-
DiscardedEvents int64
31+
DiscardedEvents int64
32+
LatencyMeanSeconds float64 // mean event time in component (seconds)
3233
}
3334

3435
// HostMetrics holds system-level metrics from Vector's host_metrics source.
@@ -137,6 +138,11 @@ func ScrapePrometheus(metricsPort int) ScrapeResult {
137138
}
138139
getOrCreate(componentMap, componentID, componentKind).DiscardedEvents += v
139140

141+
case "vector_component_latency_mean_seconds", "component_latency_mean_seconds":
142+
if !isInternal {
143+
getOrCreate(componentMap, componentID, componentKind).LatencyMeanSeconds = value
144+
}
145+
140146
// Host metrics – use += to aggregate across CPU cores, devices, interfaces, etc.
141147
case "host_memory_total_bytes":
142148
sr.Host.MemoryTotalBytes += int64(value)
Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
-- AlterTable: Add latencyMeanMs column to PipelineMetric
2+
ALTER TABLE "PipelineMetric" ADD COLUMN "latencyMeanMs" DOUBLE PRECISION;

prisma/schema.prisma

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -321,6 +321,7 @@ model PipelineMetric {
321321
bytesIn BigInt @default(0)
322322
bytesOut BigInt @default(0)
323323
utilization Float @default(0)
324+
latencyMeanMs Float? // pipeline-level weighted mean latency (ms)
324325
325326
@@index([pipelineId, timestamp])
326327
@@index([timestamp])
@@ -330,7 +331,7 @@ model PipelineSli {
330331
id String @id @default(cuid())
331332
pipelineId String
332333
pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade)
333-
metric String // "error_rate" | "throughput_floor" | "discard_rate"
334+
metric String // "error_rate" | "throughput_floor" | "discard_rate" | "latency_mean"
334335
condition String // "lt" | "gt"
335336
threshold Float
336337
windowMinutes Int @default(5)

src/app/(dashboard)/fleet/[nodeId]/page.tsx

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import {
3030
formatCount,
3131
formatBytes,
3232
formatBytesRate,
33+
formatLatency,
3334
} from "@/lib/format";
3435
import { nodeStatusVariant, nodeStatusLabel, pipelineStatusVariant, pipelineStatusLabel } from "@/lib/status";
3536

@@ -521,6 +522,7 @@ export default function NodeDetailPage() {
521522
<TableHead className="text-right">Errors</TableHead>
522523
<TableHead className="text-right">Bytes In</TableHead>
523524
<TableHead className="text-right">Bytes Out</TableHead>
525+
<TableHead className="text-right">Avg Latency</TableHead>
524526
<TableHead className="text-right">Uptime</TableHead>
525527
</TableRow>
526528
</TableHeader>
@@ -557,6 +559,11 @@ export default function NodeDetailPage() {
557559
<div>{formatBytes(ps.bytesOut)}</div>
558560
{rates && <div className="text-xs text-muted-foreground">{formatBytesRate(rates.bytesOutRate)}</div>}
559561
</TableCell>
562+
<TableCell className="text-right font-mono text-sm">
563+
{rates?.latencyMeanMs != null
564+
? formatLatency(rates.latencyMeanMs)
565+
: "—"}
566+
</TableCell>
560567
<TableCell className="text-right font-mono text-sm">
561568
{formatUptime(ps.uptimeSeconds)}
562569
</TableCell>

src/app/(dashboard)/page.tsx

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
Plus,
1616
Pencil,
1717
Trash2,
18+
Timer,
1819
} from "lucide-react";
1920
import { Card, CardContent } from "@/components/ui/card";
2021
import { Skeleton } from "@/components/ui/skeleton";
@@ -29,7 +30,7 @@ import { MetricsSection } from "@/components/dashboard/metrics-section";
2930
import { MetricChart } from "@/components/dashboard/metric-chart";
3031
import { ViewBuilderDialog } from "@/components/dashboard/view-builder-dialog";
3132
import { CustomView } from "@/components/dashboard/custom-view";
32-
import { formatSI, formatBytesRate, formatEventsRate } from "@/lib/format";
33+
import { formatSI, formatBytesRate, formatEventsRate, formatLatency } from "@/lib/format";
3334
import { PageHeader } from "@/components/page-header";
3435
import { cn } from "@/lib/utils";
3536

@@ -380,6 +381,15 @@ export default function DashboardPage() {
380381
timeRange={timeRange}
381382
height={200}
382383
/>
384+
<MetricChart
385+
title="Component Latency"
386+
icon={<Timer className="h-4 w-4" />}
387+
data={chartData.data?.pipeline.latency ?? {}}
388+
variant="area"
389+
yFormatter={formatLatency}
390+
timeRange={timeRange}
391+
height={200}
392+
/>
383393
</MetricsSection>
384394

385395
{/* System Metrics */}

src/app/(dashboard)/pipelines/[id]/metrics/page.tsx

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -115,6 +115,15 @@ export default function PipelineMetricsPage() {
115115
<MetricsChart rows={rows} dataKey="errors" height={220} />
116116
</CardContent>
117117
</Card>
118+
119+
<Card>
120+
<CardHeader>
121+
<CardTitle>Component Latency</CardTitle>
122+
</CardHeader>
123+
<CardContent>
124+
<MetricsChart rows={rows} dataKey="latency" height={220} />
125+
</CardContent>
126+
</Card>
118127
</>
119128
)}
120129

src/app/(dashboard)/pipelines/[id]/page.tsx

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -216,6 +216,7 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
216216
...(entry.kind === "TRANSFORM" ? { eventsInPerSec: latest.receivedEventsRate } : {}),
217217
status: eventsPerSec > 0 ? "healthy" : "degraded",
218218
samples: entry.samples,
219+
latencyMs: latest.latencyMeanMs,
219220
});
220221
}
221222

src/app/api/agent/heartbeat/route.ts

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,23 @@ import { deliverToChannels } from "@/server/services/channels";
1414
import { DeploymentMode } from "@/generated/prisma";
1515
import { isVersionOlder } from "@/lib/version";
1616

17+
/** Compute pipeline-level weighted mean latency (ms) from per-component metrics. */
18+
function computeWeightedLatency(
19+
components?: Array<{ receivedEvents: number; sentEvents: number; latencyMeanSeconds?: number }>,
20+
): number | null {
21+
if (!components || components.length === 0) return null;
22+
let weightedSum = 0;
23+
let totalEvents = 0;
24+
for (const cm of components) {
25+
if (cm.latencyMeanSeconds == null || cm.latencyMeanSeconds === 0) continue;
26+
const events = cm.receivedEvents + cm.sentEvents;
27+
weightedSum += cm.latencyMeanSeconds * 1000 * events; // convert seconds → ms
28+
totalEvents += events;
29+
}
30+
if (totalEvents === 0) return null;
31+
return weightedSum / totalEvents;
32+
}
33+
1734
const heartbeatSchema = z.object({
1835
agentVersion: z.string().max(100).optional(),
1936
vectorVersion: z.string().max(100).optional(),
@@ -39,6 +56,7 @@ const heartbeatSchema = z.object({
3956
sentBytes: z.number().optional(),
4057
errorsTotal: z.number().optional(),
4158
discardedEvents: z.number().optional(),
59+
latencyMeanSeconds: z.number().optional(), // NEW
4260
})).optional(),
4361
utilization: z.number().optional(),
4462
recentLogs: z.array(z.string()).optional(),
@@ -94,6 +112,7 @@ interface PipelineStatus {
94112
sentBytes?: number;
95113
errorsTotal?: number;
96114
discardedEvents?: number;
115+
latencyMeanSeconds?: number; // NEW
97116
}>;
98117
utilization?: number;
99118
recentLogs?: string[];
@@ -319,6 +338,7 @@ export async function POST(request: Request) {
319338
bytesIn: BigInt(p.bytesIn ?? 0),
320339
bytesOut: BigInt(p.bytesOut ?? 0),
321340
utilization: p.utilization ?? 0,
341+
latencyMeanMs: computeWeightedLatency(p.componentMetrics),
322342
}));
323343

324344
if (metricsData.length > 0) {
@@ -336,6 +356,9 @@ export async function POST(request: Request) {
336356
sentEventsTotal: cm.sentEvents,
337357
receivedBytesTotal: cm.receivedBytes ?? 0,
338358
sentBytesTotal: cm.sentBytes ?? 0,
359+
errorsTotal: cm.errorsTotal ?? 0,
360+
discardedTotal: cm.discardedEvents ?? 0,
361+
latencyMeanSeconds: cm.latencyMeanSeconds,
339362
});
340363
}
341364
}

0 commit comments

Comments
 (0)