diff --git a/src/app/(dashboard)/fleet/overview/page.tsx b/src/app/(dashboard)/fleet/overview/page.tsx new file mode 100644 index 00000000..341302db --- /dev/null +++ b/src/app/(dashboard)/fleet/overview/page.tsx @@ -0,0 +1,177 @@ +"use client"; + +import { useState } from "react"; +import Link from "next/link"; +import { useQuery } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { useEnvironmentStore } from "@/stores/environment-store"; +import { usePollingInterval } from "@/hooks/use-polling-interval"; +import { FleetKpiCards } from "@/components/fleet/fleet-kpi-cards"; +import { FleetVolumeChart } from "@/components/fleet/fleet-volume-chart"; +import { FleetThroughputChart } from "@/components/fleet/fleet-throughput-chart"; +import { FleetCapacityChart } from "@/components/fleet/fleet-capacity-chart"; +import { DataLossTable } from "@/components/fleet/data-loss-table"; +import { DeploymentMatrix } from "@/components/fleet/deployment-matrix"; +import { EmptyState } from "@/components/empty-state"; +import { QueryError } from "@/components/query-error"; +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { cn } from "@/lib/utils"; +import { ArrowLeft } from "lucide-react"; + +type TimeRange = "1h" | "6h" | "1d" | "7d" | "30d"; + +export default function FleetOverviewPage() { + const trpc = useTRPC(); + const { selectedEnvironmentId } = useEnvironmentStore(); + const [range, setRange] = useState("1d"); + const [lossThreshold, setLossThreshold] = useState(0.05); + const polling = usePollingInterval(15_000); + + const overview = useQuery({ + ...trpc.fleet.overview.queryOptions({ + environmentId: selectedEnvironmentId ?? "", + range, + }), + enabled: !!selectedEnvironmentId, + refetchInterval: polling, + }); + + const volumeTrend = useQuery({ + ...trpc.fleet.volumeTrend.queryOptions({ + environmentId: selectedEnvironmentId ?? "", + range, + }), + enabled: !!selectedEnvironmentId, + refetchInterval: polling, + }); + + const nodeThroughput = useQuery({ + ...trpc.fleet.nodeThroughput.queryOptions({ + environmentId: selectedEnvironmentId ?? "", + range, + }), + enabled: !!selectedEnvironmentId, + refetchInterval: polling, + }); + + const nodeCapacity = useQuery({ + ...trpc.fleet.nodeCapacity.queryOptions({ + environmentId: selectedEnvironmentId ?? "", + range, + }), + enabled: !!selectedEnvironmentId, + refetchInterval: polling, + }); + + const dataLoss = useQuery({ + ...trpc.fleet.dataLoss.queryOptions({ + environmentId: selectedEnvironmentId ?? "", + range, + threshold: lossThreshold, + }), + enabled: !!selectedEnvironmentId, + refetchInterval: polling, + }); + + const matrixThroughput = useQuery({ + ...trpc.fleet.matrixThroughput.queryOptions({ + environmentId: selectedEnvironmentId ?? "", + range, + }), + enabled: !!selectedEnvironmentId, + refetchInterval: polling, + }); + + if (!selectedEnvironmentId) { + return ( +
+ +
+ ); + } + + if (overview.isError) { + return ( +
+ overview.refetch()} + /> +
+ ); + } + + return ( +
+
+
+ + + Fleet + +

Fleet Overview

+
+
+ {(["1h", "6h", "1d", "7d", "30d"] as const).map((v) => ( + + ))} +
+
+ + + + + + + + + + + + + + Deployment Matrix + + + + + +
+ ); +} diff --git a/src/app/(dashboard)/fleet/page.tsx b/src/app/(dashboard)/fleet/page.tsx index 45ecc1d0..32a1a019 100644 --- a/src/app/(dashboard)/fleet/page.tsx +++ b/src/app/(dashboard)/fleet/page.tsx @@ -126,6 +126,18 @@ export default function FleetPage() { return (
+
+ + Overview + + + Nodes + +
+ {isLoading ? (
{Array.from({ length: 3 }).map((_, i) => ( diff --git a/src/components/fleet/data-loss-table.tsx b/src/components/fleet/data-loss-table.tsx new file mode 100644 index 00000000..ced9ea08 --- /dev/null +++ b/src/components/fleet/data-loss-table.tsx @@ -0,0 +1,162 @@ +"use client"; + +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; +import { Badge } from "@/components/ui/badge"; +import { Input } from "@/components/ui/input"; +import { AlertTriangle, CheckCircle2 } from "lucide-react"; +import Link from "next/link"; +import { formatCount, formatPercent } from "@/lib/format"; + +interface PipelineDataLoss { + pipelineId: string; + pipelineName: string; + nodeId: string | null; + nodeName: string | null; + eventsIn: number; + eventsOut: number; + lossRate: number; +} + +interface DataLossTableProps { + data: PipelineDataLoss[] | undefined; + isLoading: boolean; + threshold: number; + onThresholdChange: (value: number) => void; +} + +export function DataLossTable({ + data, + isLoading, + threshold, + onThresholdChange, +}: DataLossTableProps) { + if (isLoading) { + return ( + + + + + + + + + + + ); + } + + const thresholdPct = Math.round(threshold * 100); + + return ( + + +
+ Data Loss Detection + {data && data.length > 0 ? ( + + {data.length} flagged + + ) : ( + + No loss detected + + )} +
+
+ + Threshold + + { + const v = Number(e.target.value); + if (v >= 1 && v <= 50) onThresholdChange(v / 100); + }} + className="h-7 w-14 text-xs text-center" + min={1} + max={50} + /> + % +
+
+ + {!data || data.length === 0 ? ( +
+ +

+ No pipelines exceed the {thresholdPct}% loss threshold +

+
+ ) : ( +
+ + + + + + + + + + + + + {data.map((row) => { + const severity = row.lossRate >= 0.2 ? "critical" : row.lossRate >= 0.1 ? "warning" : "minor"; + return ( + + + + + + + + + ); + })} + +
PipelineNodeEvents InEvents OutLoss RateSeverity
+ + {row.pipelineName} + + + {row.nodeName ?? "—"} + + {formatCount(row.eventsIn)} + + {formatCount(row.eventsOut)} + + {formatPercent(row.lossRate)} + +
+ +
+
+
+ )} +
+
+ ); +} diff --git a/src/components/fleet/deployment-matrix.tsx b/src/components/fleet/deployment-matrix.tsx index 4517d068..5faf9cb5 100644 --- a/src/components/fleet/deployment-matrix.tsx +++ b/src/components/fleet/deployment-matrix.tsx @@ -9,12 +9,29 @@ import Link from "next/link"; import { StatusDot } from "@/components/ui/status-dot"; import { pipelineStatusVariant, pipelineStatusLabel } from "@/lib/status"; import { usePollingInterval } from "@/hooks/use-polling-interval"; +import { formatEventsRate } from "@/lib/format"; +import type { TimeRange } from "@/server/services/fleet-data"; + +interface MatrixCellThroughput { + pipelineId: string; + nodeId: string; + eventsPerSec: number; + bytesPerSec: number; + lossRate: number; +} interface DeploymentMatrixProps { environmentId: string; + range?: TimeRange; + lossThreshold?: number; + throughputData?: MatrixCellThroughput[]; } -export function DeploymentMatrix({ environmentId }: DeploymentMatrixProps) { +export function DeploymentMatrix({ + environmentId, + lossThreshold = 0.05, + throughputData, +}: DeploymentMatrixProps) { const trpc = useTRPC(); const polling = usePollingInterval(15_000); @@ -49,6 +66,14 @@ export function DeploymentMatrix({ environmentId }: DeploymentMatrixProps) { return null; } + // Index throughput data by pipelineId:nodeId for O(1) lookup + const throughputMap = new Map(); + if (throughputData) { + for (const cell of throughputData) { + throughputMap.set(`${cell.pipelineId}:${cell.nodeId}`, cell); + } + } + return (
@@ -93,6 +118,12 @@ export function DeploymentMatrix({ environmentId }: DeploymentMatrixProps) { const ps = node.pipelineStatuses.find( (s) => s.pipelineId === pipeline.id ); + const cellThroughput = throughputMap.get( + `${pipeline.id}:${node.id}` + ); + const hasLoss = + cellThroughput != null && + cellThroughput.lossRate > lossThreshold; if (!ps) { return ( @@ -107,7 +138,16 @@ export function DeploymentMatrix({ environmentId }: DeploymentMatrixProps) { const isOutdated = ps.version < pipeline.latestVersion; return ( - ); diff --git a/src/components/fleet/fleet-capacity-chart.tsx b/src/components/fleet/fleet-capacity-chart.tsx new file mode 100644 index 00000000..cccab047 --- /dev/null +++ b/src/components/fleet/fleet-capacity-chart.tsx @@ -0,0 +1,282 @@ +"use client"; + +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; +import { + ChartContainer, + ChartTooltip, + ChartTooltipContent, + ChartLegend, + ChartLegendContent, + type ChartConfig, +} from "@/components/ui/chart"; +import { LineChart, Line, XAxis, YAxis, CartesianGrid } from "recharts"; +import { formatPercent, formatTimeAxis } from "@/lib/format"; +import { Inbox } from "lucide-react"; + +interface NodeCapacityBucket { + bucket: string; + memoryPct: number; + diskPct: number; + cpuLoad: number; +} + +interface NodeCapacity { + nodeId: string; + nodeName: string; + buckets: NodeCapacityBucket[]; +} + +interface FleetCapacityChartProps { + data: NodeCapacity[] | undefined; + isLoading: boolean; + range: string; +} + +// 8-color palette for distinguishing nodes +const NODE_COLORS = [ + "oklch(0.55 0.24 265)", + "oklch(0.65 0.17 163)", + "oklch(0.60 0.20 30)", + "oklch(0.55 0.22 310)", + "oklch(0.65 0.18 90)", + "oklch(0.58 0.20 200)", + "oklch(0.62 0.15 50)", + "oklch(0.52 0.22 350)", +]; + +type MetricKey = "memoryPct" | "diskPct" | "cpuLoad"; + +function buildChartData( + nodes: NodeCapacity[], + metric: MetricKey, +): { t: number; [key: string]: number }[] { + // Collect all unique timestamps across all nodes + const timeSet = new Set(); + for (const node of nodes) { + for (const b of node.buckets) { + timeSet.add(b.bucket); + } + } + const times = Array.from(timeSet).sort(); + + // Build lookup maps per node + const nodeLookups = nodes.map((node) => { + const map = new Map(); + for (const b of node.buckets) { + map.set(b.bucket, b[metric]); + } + return { nodeId: node.nodeId, map }; + }); + + return times.map((t) => { + const point: { t: number; [key: string]: number } = { t: new Date(t).getTime() }; + for (const { nodeId, map } of nodeLookups) { + point[nodeId] = map.get(t) ?? 0; + } + return point; + }); +} + +function buildConfig(nodes: NodeCapacity[]): ChartConfig { + const config: ChartConfig = {}; + for (let i = 0; i < nodes.length; i++) { + config[nodes[i].nodeId] = { + label: nodes[i].nodeName, + color: NODE_COLORS[i % NODE_COLORS.length], + }; + } + return config; +} + +function CapacityMetricChart({ + title, + nodes, + metric, + range, + yFormatter, + yDomain, +}: { + title: string; + nodes: NodeCapacity[]; + metric: MetricKey; + range: string; + yFormatter: (v: number) => string; + yDomain?: [number, number]; +}) { + const chartData = buildChartData(nodes, metric); + const chartConfig = buildConfig(nodes); + + if (chartData.length === 0) { + return ( + + + {title} + + +
+ +

No data

+
+
+
+ ); + } + + // Find peak node for this metric + let peakNodeId = ""; + let peakValue = -1; + for (const node of nodes) { + for (const b of node.buckets) { + if (b[metric] > peakValue) { + peakValue = b[metric]; + peakNodeId = node.nodeId; + } + } + } + const peakNodeName = nodes.find((n) => n.nodeId === peakNodeId)?.nodeName; + + return ( + + + + {title} + {nodes.length > 1 && peakNodeName && ( + + Peak: {peakNodeName} ({yFormatter(peakValue)}) + + )} + + + + + + + formatTimeAxis(v, range)} + interval="preserveStartEnd" + /> + + ) => { + const timestamp = payload?.[0]?.payload?.t; + if (!timestamp) return ""; + return new Date(Number(timestamp)).toLocaleString([], { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + }); + }} + formatter={(value, name) => ( +
+ + {chartConfig[name as string]?.label ?? name} + + + {yFormatter(value as number)} + +
+ )} + /> + } + /> + } /> + {nodes.map((node, i) => ( + + ))} +
+
+
+
+ ); +} + +export function FleetCapacityChart({ data, isLoading, range }: FleetCapacityChartProps) { + if (isLoading) { + return ( +
+ {Array.from({ length: 3 }).map((_, i) => ( + + + + + + + + + + + ))} +
+ ); + } + + const nodes = data ?? []; + + if (nodes.length === 0) { + return ( + + + Node Capacity Utilization + + +
+ +

No capacity data available

+
+
+
+ ); + } + + return ( +
+ formatPercent(v)} + yDomain={[0, 100]} + /> + formatPercent(v)} + yDomain={[0, 100]} + /> + v.toFixed(1)} + /> +
+ ); +} diff --git a/src/components/fleet/fleet-kpi-cards.tsx b/src/components/fleet/fleet-kpi-cards.tsx new file mode 100644 index 00000000..c33bb0d5 --- /dev/null +++ b/src/components/fleet/fleet-kpi-cards.tsx @@ -0,0 +1,92 @@ +"use client"; + +import { Card, CardContent } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; +import { formatBytes, formatCount, formatPercent } from "@/lib/format"; +import { ArrowDownToLine, ArrowUpFromLine, Activity, Gauge } from "lucide-react"; + +interface FleetKpiCardsProps { + data: + | { + bytesIn: number; + bytesOut: number; + eventsIn: number; + eventsOut: number; + errorRate: number; + nodeCount: number; + } + | undefined; + isLoading: boolean; +} + +export function FleetKpiCards({ data, isLoading }: FleetKpiCardsProps) { + if (isLoading) { + return ( +
+ {Array.from({ length: 4 }).map((_, i) => ( + + ))} +
+ ); + } + + return ( +
+ + +
+ + Total Bytes In +
+

+ {formatBytes(data?.bytesIn ?? 0)} +

+
+
+ + + +
+ + Total Bytes Out +
+

+ {formatBytes(data?.bytesOut ?? 0)} +

+
+
+ + + +
+ + Events In / Out +
+

+ {formatCount(data?.eventsIn ?? 0)}{" "} + /{" "} + {formatCount(data?.eventsOut ?? 0)} +

+
+
+ + + +
+ + Fleet Health +
+

+ {data?.nodeCount ?? 0}{" "} + + {data?.nodeCount === 1 ? "node" : "nodes"} + +

+

+ {formatPercent((data?.errorRate ?? 0) * 100)} error rate +

+
+
+
+ ); +} diff --git a/src/components/fleet/fleet-throughput-chart.tsx b/src/components/fleet/fleet-throughput-chart.tsx new file mode 100644 index 00000000..18f232bf --- /dev/null +++ b/src/components/fleet/fleet-throughput-chart.tsx @@ -0,0 +1,130 @@ +"use client"; + +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; +import { + ChartContainer, + ChartTooltip, + ChartTooltipContent, + type ChartConfig, +} from "@/components/ui/chart"; +import { BarChart, Bar, XAxis, YAxis, CartesianGrid } from "recharts"; +import { formatBytes } from "@/lib/format"; +import { Inbox } from "lucide-react"; + +interface NodeThroughput { + nodeId: string; + nodeName: string; + bytesIn: number; + bytesOut: number; + eventsIn: number; + eventsOut: number; +} + +interface FleetThroughputChartProps { + data: NodeThroughput[] | undefined; + isLoading: boolean; +} + +const chartConfig: ChartConfig = { + bytesIn: { label: "Bytes In", color: "oklch(0.55 0.24 265)" }, + bytesOut: { label: "Bytes Out", color: "oklch(0.65 0.17 163)" }, +}; + +export function FleetThroughputChart({ data, isLoading }: FleetThroughputChartProps) { + if (isLoading) { + return ( + + + Node Throughput Comparison + + + + + + ); + } + + const chartData = (data ?? []).map((d) => ({ + name: d.nodeName, + bytesIn: d.bytesIn, + bytesOut: d.bytesOut, + })); + + // Find the node with highest total throughput for highlighting + const maxIdx = chartData.reduce( + (max, d, i) => (d.bytesIn + d.bytesOut > (chartData[max]?.bytesIn ?? 0) + (chartData[max]?.bytesOut ?? 0) ? i : max), + 0, + ); + + return ( + + + + Node Throughput Comparison + {chartData.length > 1 && chartData[maxIdx] && ( + + Bottleneck: {chartData[maxIdx].name} + + )} + + + + {chartData.length === 0 ? ( +
+ +

No node throughput data

+
+ ) : ( + + + + 6 ? -45 : 0} + textAnchor={chartData.length > 6 ? "end" : "middle"} + height={chartData.length > 6 ? 60 : 30} + /> + + ( +
+ + {chartConfig[name as string]?.label ?? name} + + + {formatBytes(value as number)} + +
+ )} + /> + } + /> + + +
+
+ )} +
+
+ ); +} diff --git a/src/components/fleet/fleet-volume-chart.tsx b/src/components/fleet/fleet-volume-chart.tsx new file mode 100644 index 00000000..6fa22a10 --- /dev/null +++ b/src/components/fleet/fleet-volume-chart.tsx @@ -0,0 +1,134 @@ +"use client"; + +import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; +import { Skeleton } from "@/components/ui/skeleton"; +import { + ChartContainer, + ChartTooltip, + ChartTooltipContent, + type ChartConfig, +} from "@/components/ui/chart"; +import { AreaChart, Area, XAxis, YAxis, CartesianGrid } from "recharts"; +import { formatBytes, formatTimeAxis } from "@/lib/format"; +import { Inbox } from "lucide-react"; + +interface FleetVolumeChartProps { + data: + | { + bucket: string; + bytesIn: number; + bytesOut: number; + eventsIn: number; + eventsOut: number; + }[] + | undefined; + isLoading: boolean; + range: string; +} + +const chartConfig: ChartConfig = { + bytesIn: { label: "Bytes In", color: "oklch(0.55 0.24 265)" }, + bytesOut: { label: "Bytes Out", color: "oklch(0.65 0.17 163)" }, +}; + +export function FleetVolumeChart({ data, isLoading, range }: FleetVolumeChartProps) { + if (isLoading) { + return ( + + + Data Volume Trend + + + + + + ); + } + + const chartData = (data ?? []).map((d) => ({ + t: new Date(d.bucket).getTime(), + bytesIn: d.bytesIn, + bytesOut: d.bytesOut, + })); + + return ( + + + Data Volume Trend + + + {chartData.length === 0 ? ( +
+ +

No data for selected time range

+
+ ) : ( + + + + formatTimeAxis(v, range)} + interval="preserveStartEnd" + /> + + ) => { + const timestamp = payload?.[0]?.payload?.t; + if (!timestamp) return ""; + return new Date(Number(timestamp)).toLocaleString([], { + month: "short", + day: "numeric", + hour: "2-digit", + minute: "2-digit", + }); + }} + formatter={(value, name) => ( +
+ + {chartConfig[name as string]?.label ?? name} + + + {formatBytes(value as number)} + +
+ )} + /> + } + /> + + +
+
+ )} +
+
+ ); +} diff --git a/src/hooks/__tests__/use-realtime-invalidation.test.ts b/src/hooks/__tests__/use-realtime-invalidation.test.ts index 092d6bb2..aa2cb796 100644 --- a/src/hooks/__tests__/use-realtime-invalidation.test.ts +++ b/src/hooks/__tests__/use-realtime-invalidation.test.ts @@ -2,15 +2,21 @@ import { describe, it, expect } from "vitest"; import { getInvalidationKeys } from "../use-realtime-invalidation"; describe("getInvalidationKeys", () => { - it("maps metric_update to 6 query key prefixes", () => { + it("maps metric_update to 12 query key prefixes", () => { const keys = getInvalidationKeys("metric_update"); - expect(keys).toHaveLength(6); + expect(keys).toHaveLength(12); expect(keys).toContainEqual(["dashboard", "stats"]); expect(keys).toContainEqual(["dashboard", "pipelineCards"]); expect(keys).toContainEqual(["dashboard", "chartMetrics"]); expect(keys).toContainEqual(["dashboard", "volumeAnalytics"]); expect(keys).toContainEqual(["metrics", "getNodePipelineRates"]); expect(keys).toContainEqual(["fleet", "nodeMetrics"]); + expect(keys).toContainEqual(["fleet", "overview"]); + expect(keys).toContainEqual(["fleet", "volumeTrend"]); + expect(keys).toContainEqual(["fleet", "nodeThroughput"]); + expect(keys).toContainEqual(["fleet", "nodeCapacity"]); + expect(keys).toContainEqual(["fleet", "dataLoss"]); + expect(keys).toContainEqual(["fleet", "matrixThroughput"]); }); it("maps fleet_status to 7 query key prefixes", () => { diff --git a/src/hooks/use-realtime-invalidation.ts b/src/hooks/use-realtime-invalidation.ts index 0eec9c29..1b4da3d3 100644 --- a/src/hooks/use-realtime-invalidation.ts +++ b/src/hooks/use-realtime-invalidation.ts @@ -29,6 +29,12 @@ export function getInvalidationKeys( ["dashboard", "volumeAnalytics"], ["metrics", "getNodePipelineRates"], ["fleet", "nodeMetrics"], + ["fleet", "overview"], + ["fleet", "volumeTrend"], + ["fleet", "nodeThroughput"], + ["fleet", "nodeCapacity"], + ["fleet", "dataLoss"], + ["fleet", "matrixThroughput"], ]; case "fleet_status": diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index bc882c31..990ae6d2 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -6,6 +6,7 @@ import { LogLevel } from "@/generated/prisma"; import { withAudit } from "@/server/middleware/audit"; import { checkDevAgentVersion } from "@/server/services/version-check"; import { pushRegistry } from "@/server/services/push-registry"; +import { getFleetOverview, getVolumeTrend, getNodeThroughput, getNodeCapacity, getDataLoss, getMatrixThroughput } from "@/server/services/fleet-data"; export const fleetRouter = router({ list: protectedProcedure @@ -526,4 +527,77 @@ export const fleetRouter = router({ })), }; }), + + overview: protectedProcedure + .input( + z.object({ + environmentId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]).default("1d"), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return getFleetOverview(input.environmentId, input.range); + }), + + volumeTrend: protectedProcedure + .input( + z.object({ + environmentId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]).default("1d"), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return getVolumeTrend(input.environmentId, input.range); + }), + + nodeThroughput: protectedProcedure + .input( + z.object({ + environmentId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]).default("1d"), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return getNodeThroughput(input.environmentId, input.range); + }), + + nodeCapacity: protectedProcedure + .input( + z.object({ + environmentId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]).default("1d"), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return getNodeCapacity(input.environmentId, input.range); + }), + + dataLoss: protectedProcedure + .input( + z.object({ + environmentId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]).default("1d"), + threshold: z.number().min(0).max(1).default(0.05), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return getDataLoss(input.environmentId, input.range, input.threshold); + }), + + matrixThroughput: protectedProcedure + .input( + z.object({ + environmentId: z.string(), + range: z.enum(["1h", "6h", "1d", "7d", "30d"]).default("1d"), + }), + ) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return getMatrixThroughput(input.environmentId, input.range); + }), }); diff --git a/src/server/services/__tests__/fleet-data.test.ts b/src/server/services/__tests__/fleet-data.test.ts new file mode 100644 index 00000000..2e6d3112 --- /dev/null +++ b/src/server/services/__tests__/fleet-data.test.ts @@ -0,0 +1,457 @@ +import { describe, it, expect, vi, beforeEach } from "vitest"; + +vi.mock("@/lib/prisma", () => ({ + prisma: { $queryRaw: vi.fn() }, +})); + +import { prisma } from "@/lib/prisma"; +import { + getFleetOverview, + getVolumeTrend, + getNodeThroughput, + getNodeCapacity, + getDataLoss, + getMatrixThroughput, + type TimeRange, +} from "@/server/services/fleet-data"; + +const mockQueryRaw = prisma.$queryRaw as ReturnType; + +beforeEach(() => { + vi.clearAllMocks(); +}); + +describe("getFleetOverview", () => { + it("returns computed KPIs from aggregated metrics", async () => { + mockQueryRaw + .mockResolvedValueOnce([ + { + bytes_in: BigInt(1000), + bytes_out: BigInt(800), + events_in: BigInt(500), + events_out: BigInt(490), + errors_total: BigInt(10), + }, + ]) + .mockResolvedValueOnce([{ count: BigInt(3) }]); + + const result = await getFleetOverview("env-1", "7d"); + + expect(result).toEqual({ + bytesIn: 1000, + bytesOut: 800, + eventsIn: 500, + eventsOut: 490, + errorRate: 10 / 500, + nodeCount: 3, + }); + }); + + it("returns zeros when no data exists", async () => { + mockQueryRaw + .mockResolvedValueOnce([ + { + bytes_in: null, + bytes_out: null, + events_in: null, + events_out: null, + errors_total: null, + }, + ]) + .mockResolvedValueOnce([{ count: BigInt(0) }]); + + const result = await getFleetOverview("env-1", "1d"); + + expect(result).toEqual({ + bytesIn: 0, + bytesOut: 0, + eventsIn: 0, + eventsOut: 0, + errorRate: 0, + nodeCount: 0, + }); + }); + + it("computes error rate as errorsTotal / eventsIn", async () => { + mockQueryRaw + .mockResolvedValueOnce([ + { + bytes_in: BigInt(0), + bytes_out: BigInt(0), + events_in: BigInt(200), + events_out: BigInt(180), + errors_total: BigInt(20), + }, + ]) + .mockResolvedValueOnce([{ count: BigInt(1) }]); + + const result = await getFleetOverview("env-1", "1h"); + + expect(result.errorRate).toBe(0.1); + }); +}); + +describe("getVolumeTrend", () => { + it("returns daily-bucketed volume data with number conversion", async () => { + const buckets = [ + { + bucket: new Date("2026-03-24T00:00:00Z"), + bytes_in: BigInt(500), + bytes_out: BigInt(400), + events_in: BigInt(100), + events_out: BigInt(90), + }, + { + bucket: new Date("2026-03-25T00:00:00Z"), + bytes_in: BigInt(600), + bytes_out: BigInt(500), + events_in: BigInt(120), + events_out: BigInt(110), + }, + { + bucket: new Date("2026-03-26T00:00:00Z"), + bytes_in: BigInt(700), + bytes_out: BigInt(600), + events_in: BigInt(140), + events_out: BigInt(130), + }, + ]; + mockQueryRaw.mockResolvedValueOnce(buckets); + + const result = await getVolumeTrend("env-1", "7d"); + + expect(result).toHaveLength(3); + expect(result[0]).toEqual({ + bucket: "2026-03-24T00:00:00.000Z", + bytesIn: 500, + bytesOut: 400, + eventsIn: 100, + eventsOut: 90, + }); + expect(result[2]).toEqual({ + bucket: "2026-03-26T00:00:00.000Z", + bytesIn: 700, + bytesOut: 600, + eventsIn: 140, + eventsOut: 130, + }); + }); + + it("returns empty array when no data exists", async () => { + mockQueryRaw.mockResolvedValueOnce([]); + + const result = await getVolumeTrend("env-1", "30d"); + + expect(result).toEqual([]); + }); + + it("accepts all five range values", async () => { + const ranges: TimeRange[] = ["1h", "6h", "1d", "7d", "30d"]; + for (const range of ranges) { + mockQueryRaw.mockResolvedValueOnce([]); + const result = await getVolumeTrend("env-1", range); + expect(result).toEqual([]); + } + expect(mockQueryRaw).toHaveBeenCalledTimes(5); + }); +}); + +describe("getNodeThroughput", () => { + it("returns per-node throughput with BigInt conversion", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + node_id: "node-1", + node_name: "us-east-1", + bytes_in: BigInt(50000), + bytes_out: BigInt(45000), + events_in: BigInt(1000), + events_out: BigInt(980), + }, + { + node_id: "node-2", + node_name: "eu-west-1", + bytes_in: BigInt(30000), + bytes_out: BigInt(28000), + events_in: BigInt(600), + events_out: BigInt(590), + }, + ]); + + const result = await getNodeThroughput("env-1", "1d"); + + expect(result).toHaveLength(2); + expect(result[0]).toEqual({ + nodeId: "node-1", + nodeName: "us-east-1", + bytesIn: 50000, + bytesOut: 45000, + eventsIn: 1000, + eventsOut: 980, + }); + expect(result[1]).toEqual({ + nodeId: "node-2", + nodeName: "eu-west-1", + bytesIn: 30000, + bytesOut: 28000, + eventsIn: 600, + eventsOut: 590, + }); + }); + + it("returns empty array when no nodes have metrics", async () => { + mockQueryRaw.mockResolvedValueOnce([]); + + const result = await getNodeThroughput("env-1", "7d"); + + expect(result).toEqual([]); + }); + + it("handles null metric values", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + node_id: "node-1", + node_name: "node-a", + bytes_in: null, + bytes_out: null, + events_in: null, + events_out: null, + }, + ]); + + const result = await getNodeThroughput("env-1", "1h"); + + expect(result[0]).toEqual({ + nodeId: "node-1", + nodeName: "node-a", + bytesIn: 0, + bytesOut: 0, + eventsIn: 0, + eventsOut: 0, + }); + }); +}); + +describe("getNodeCapacity", () => { + it("returns per-node bucketed capacity utilization", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + node_id: "node-1", + node_name: "us-east-1", + bucket: new Date("2026-03-25T10:00:00Z"), + memory_pct: 72.5, + disk_pct: 45.3, + cpu_load: 1.25, + }, + { + node_id: "node-1", + node_name: "us-east-1", + bucket: new Date("2026-03-25T11:00:00Z"), + memory_pct: 75.0, + disk_pct: 45.5, + cpu_load: 1.80, + }, + { + node_id: "node-2", + node_name: "eu-west-1", + bucket: new Date("2026-03-25T10:00:00Z"), + memory_pct: 60.0, + disk_pct: 30.0, + cpu_load: 0.50, + }, + ]); + + const result = await getNodeCapacity("env-1", "1d"); + + expect(result).toHaveLength(2); + expect(result[0].nodeId).toBe("node-1"); + expect(result[0].nodeName).toBe("us-east-1"); + expect(result[0].buckets).toHaveLength(2); + expect(result[0].buckets[0]).toEqual({ + bucket: "2026-03-25T10:00:00.000Z", + memoryPct: 72.5, + diskPct: 45.3, + cpuLoad: 1.25, + }); + expect(result[1].nodeId).toBe("node-2"); + expect(result[1].buckets).toHaveLength(1); + }); + + it("returns empty array when no node metrics exist", async () => { + mockQueryRaw.mockResolvedValueOnce([]); + + const result = await getNodeCapacity("env-1", "7d"); + + expect(result).toEqual([]); + }); + + it("handles null utilization values", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + node_id: "node-1", + node_name: "node-a", + bucket: new Date("2026-03-25T00:00:00Z"), + memory_pct: null, + disk_pct: null, + cpu_load: null, + }, + ]); + + const result = await getNodeCapacity("env-1", "1d"); + + expect(result[0].buckets[0]).toEqual({ + bucket: "2026-03-25T00:00:00.000Z", + memoryPct: 0, + diskPct: 0, + cpuLoad: 0, + }); + }); +}); + +describe("getDataLoss", () => { + it("returns pipelines exceeding the loss threshold", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + pipeline_id: "p1", + pipeline_name: "ingest-logs", + node_id: "node-1", + node_name: "us-east-1", + events_in: BigInt(1000), + events_out: BigInt(800), // 20% loss + }, + { + pipeline_id: "p2", + pipeline_name: "metrics-agg", + node_id: "node-1", + node_name: "us-east-1", + events_in: BigInt(500), + events_out: BigInt(490), // 2% loss + }, + ]); + + const result = await getDataLoss("env-1", "1d", 0.05); + + expect(result).toHaveLength(1); + expect(result[0]).toEqual({ + pipelineId: "p1", + pipelineName: "ingest-logs", + nodeId: "node-1", + nodeName: "us-east-1", + eventsIn: 1000, + eventsOut: 800, + lossRate: 0.2, + }); + }); + + it("skips pipelines with zero throughput", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + pipeline_id: "p1", + pipeline_name: "idle-pipeline", + node_id: null, + node_name: null, + events_in: BigInt(0), + events_out: BigInt(0), + }, + ]); + + const result = await getDataLoss("env-1", "7d", 0.05); + + expect(result).toEqual([]); + }); + + it("returns empty array when no data loss detected", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + pipeline_id: "p1", + pipeline_name: "healthy", + node_id: "node-1", + node_name: "us-east-1", + events_in: BigInt(1000), + events_out: BigInt(999), // 0.1% loss + }, + ]); + + const result = await getDataLoss("env-1", "1d", 0.05); + + expect(result).toEqual([]); + }); + + it("sorts results by loss rate descending", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + pipeline_id: "p1", + pipeline_name: "moderate-loss", + node_id: "node-1", + node_name: "us-east-1", + events_in: BigInt(1000), + events_out: BigInt(850), // 15% loss + }, + { + pipeline_id: "p2", + pipeline_name: "severe-loss", + node_id: "node-1", + node_name: "us-east-1", + events_in: BigInt(1000), + events_out: BigInt(500), // 50% loss + }, + ]); + + const result = await getDataLoss("env-1", "1d", 0.05); + + expect(result).toHaveLength(2); + expect(result[0].pipelineName).toBe("severe-loss"); + expect(result[1].pipelineName).toBe("moderate-loss"); + }); +}); + +describe("getMatrixThroughput", () => { + it("returns per-cell throughput rates and loss", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + pipeline_id: "p1", + node_id: "node-1", + events_in: BigInt(86400), // 1 evt/sec over 1d + events_out: BigInt(82080), // 5% loss + bytes_in: BigInt(864000), + bytes_out: BigInt(820800), + }, + ]); + + const result = await getMatrixThroughput("env-1", "1d"); + + expect(result).toHaveLength(1); + expect(result[0].pipelineId).toBe("p1"); + expect(result[0].nodeId).toBe("node-1"); + expect(result[0].eventsPerSec).toBe(1); + expect(result[0].lossRate).toBe(0.05); + expect(result[0].bytesPerSec).toBe( + Math.round((864000 + 820800) / 86400) + ); + }); + + it("returns empty array when no metrics exist", async () => { + mockQueryRaw.mockResolvedValueOnce([]); + + const result = await getMatrixThroughput("env-1", "7d"); + + expect(result).toEqual([]); + }); + + it("handles zero events gracefully", async () => { + mockQueryRaw.mockResolvedValueOnce([ + { + pipeline_id: "p1", + node_id: "node-1", + events_in: BigInt(0), + events_out: BigInt(0), + bytes_in: BigInt(0), + bytes_out: BigInt(0), + }, + ]); + + const result = await getMatrixThroughput("env-1", "1d"); + + expect(result[0].eventsPerSec).toBe(0); + expect(result[0].lossRate).toBe(0); + }); +}); diff --git a/src/server/services/fleet-data.ts b/src/server/services/fleet-data.ts new file mode 100644 index 00000000..e556c636 --- /dev/null +++ b/src/server/services/fleet-data.ts @@ -0,0 +1,388 @@ +import { prisma } from "@/lib/prisma"; +import { Prisma } from "@/generated/prisma"; + +// ─── Types ────────────────────────────────────────────────────────────────── + +export type TimeRange = "1h" | "6h" | "1d" | "7d" | "30d"; + +export interface FleetOverview { + bytesIn: number; + bytesOut: number; + eventsIn: number; + eventsOut: number; + errorRate: number; + nodeCount: number; +} + +export interface VolumeBucket { + bucket: string; + bytesIn: number; + bytesOut: number; + eventsIn: number; + eventsOut: number; +} + +export interface NodeThroughput { + nodeId: string; + nodeName: string; + bytesIn: number; + bytesOut: number; + eventsIn: number; + eventsOut: number; +} + +export interface NodeCapacityBucket { + bucket: string; + memoryPct: number; + diskPct: number; + cpuLoad: number; +} + +export interface NodeCapacity { + nodeId: string; + nodeName: string; + buckets: NodeCapacityBucket[]; +} + +export interface PipelineDataLoss { + pipelineId: string; + pipelineName: string; + nodeId: string | null; + nodeName: string | null; + eventsIn: number; + eventsOut: number; + lossRate: number; +} + +export interface MatrixCellThroughput { + pipelineId: string; + nodeId: string; + eventsPerSec: number; + bytesPerSec: number; + lossRate: number; +} + +// ─── Helpers ──────────────────────────────────────────────────────────────── + +const RANGE_MS: Record = { + "1h": 60 * 60 * 1000, + "6h": 6 * 60 * 60 * 1000, + "1d": 24 * 60 * 60 * 1000, + "7d": 7 * 24 * 60 * 60 * 1000, + "30d": 30 * 24 * 60 * 60 * 1000, +}; + +const BUCKET_SIZE: Record = { + "1h": "minute", + "6h": "hour", + "1d": "hour", + "7d": "day", + "30d": "day", +}; + +function sinceDate(range: TimeRange): Date { + return new Date(Date.now() - RANGE_MS[range]); +} + +// ─── Fleet Overview ───────────────────────────────────────────────────────── + +export async function getFleetOverview( + environmentId: string, + range: TimeRange, +): Promise { + const since = sinceDate(range); + + const [metricRows, nodeRows] = await Promise.all([ + prisma.$queryRaw< + { + bytes_in: bigint | null; + bytes_out: bigint | null; + events_in: bigint | null; + events_out: bigint | null; + errors_total: bigint | null; + }[] + >(Prisma.sql` + SELECT + SUM("bytesIn") AS bytes_in, + SUM("bytesOut") AS bytes_out, + SUM("eventsIn") AS events_in, + SUM("eventsOut") AS events_out, + SUM("errorsTotal") AS errors_total + FROM "PipelineMetric" + WHERE "componentId" IS NULL + AND "timestamp" >= ${since} + AND "pipelineId" IN ( + SELECT "id" FROM "Pipeline" WHERE "environmentId" = ${environmentId} + ) + `), + prisma.$queryRaw<{ count: bigint }[]>(Prisma.sql` + SELECT COUNT(*) AS count + FROM "VectorNode" + WHERE "environmentId" = ${environmentId} + `), + ]); + + const m = metricRows[0]; + const bytesIn = Number(m?.bytes_in ?? 0); + const bytesOut = Number(m?.bytes_out ?? 0); + const eventsIn = Number(m?.events_in ?? 0); + const eventsOut = Number(m?.events_out ?? 0); + const errorsTotal = Number(m?.errors_total ?? 0); + const nodeCount = Number(nodeRows[0]?.count ?? 0); + const errorRate = eventsIn > 0 ? errorsTotal / eventsIn : 0; + + return { bytesIn, bytesOut, eventsIn, eventsOut, errorRate, nodeCount }; +} + +// ─── Volume Trend ─────────────────────────────────────────────────────────── + +export async function getVolumeTrend( + environmentId: string, + range: TimeRange, +): Promise { + const since = sinceDate(range); + const bucket = BUCKET_SIZE[range]; + + const rows = await prisma.$queryRaw< + { + bucket: Date; + bytes_in: bigint | null; + bytes_out: bigint | null; + events_in: bigint | null; + events_out: bigint | null; + }[] + >(Prisma.sql` + SELECT + date_trunc(${bucket}, "timestamp") AS bucket, + SUM("bytesIn") AS bytes_in, + SUM("bytesOut") AS bytes_out, + SUM("eventsIn") AS events_in, + SUM("eventsOut") AS events_out + FROM "PipelineMetric" + WHERE "componentId" IS NULL + AND "timestamp" >= ${since} + AND "pipelineId" IN ( + SELECT "id" FROM "Pipeline" WHERE "environmentId" = ${environmentId} + ) + GROUP BY 1 + ORDER BY 1 + `); + + return rows.map((r) => ({ + bucket: (r.bucket instanceof Date ? r.bucket : new Date(r.bucket)).toISOString(), + bytesIn: Number(r.bytes_in ?? 0), + bytesOut: Number(r.bytes_out ?? 0), + eventsIn: Number(r.events_in ?? 0), + eventsOut: Number(r.events_out ?? 0), + })); +} + +// ─── Node Throughput Comparison ────────────────────────────────────────────── + +export async function getNodeThroughput( + environmentId: string, + range: TimeRange, +): Promise { + const since = sinceDate(range); + + const rows = await prisma.$queryRaw< + { + node_id: string; + node_name: string; + bytes_in: bigint | null; + bytes_out: bigint | null; + events_in: bigint | null; + events_out: bigint | null; + }[] + >(Prisma.sql` + SELECT + n."id" AS node_id, + n."name" AS node_name, + SUM(pm."bytesIn") AS bytes_in, + SUM(pm."bytesOut") AS bytes_out, + SUM(pm."eventsIn") AS events_in, + SUM(pm."eventsOut") AS events_out + FROM "PipelineMetric" pm + JOIN "VectorNode" n ON n."id" = pm."nodeId" + WHERE pm."componentId" IS NULL + AND pm."nodeId" IS NOT NULL + AND pm."timestamp" >= ${since} + AND n."environmentId" = ${environmentId} + GROUP BY n."id", n."name" + ORDER BY SUM(pm."bytesIn") DESC + `); + + return rows.map((r) => ({ + nodeId: r.node_id, + nodeName: r.node_name, + bytesIn: Number(r.bytes_in ?? 0), + bytesOut: Number(r.bytes_out ?? 0), + eventsIn: Number(r.events_in ?? 0), + eventsOut: Number(r.events_out ?? 0), + })); +} + +// ─── Node Capacity Utilization ─────────────────────────────────────────────── + +export async function getNodeCapacity( + environmentId: string, + range: TimeRange, +): Promise { + const since = sinceDate(range); + const bucket = BUCKET_SIZE[range]; + + const rows = await prisma.$queryRaw< + { + node_id: string; + node_name: string; + bucket: Date; + memory_pct: number | null; + disk_pct: number | null; + cpu_load: number | null; + }[] + >(Prisma.sql` + SELECT + nm."nodeId" AS node_id, + n."name" AS node_name, + date_trunc(${bucket}, nm."timestamp") AS bucket, + AVG(CASE WHEN nm."memoryTotalBytes" > 0 + THEN nm."memoryUsedBytes"::float / nm."memoryTotalBytes" * 100 + ELSE 0 END) AS memory_pct, + AVG(CASE WHEN nm."fsTotalBytes" > 0 + THEN nm."fsUsedBytes"::float / nm."fsTotalBytes" * 100 + ELSE 0 END) AS disk_pct, + AVG(nm."loadAvg1") AS cpu_load + FROM "NodeMetric" nm + JOIN "VectorNode" n ON n."id" = nm."nodeId" + WHERE n."environmentId" = ${environmentId} + AND nm."timestamp" >= ${since} + GROUP BY nm."nodeId", n."name", date_trunc(${bucket}, nm."timestamp") + ORDER BY nm."nodeId", bucket + `); + + // Group flat rows into per-node capacity objects + const nodeMap = new Map(); + for (const r of rows) { + let node = nodeMap.get(r.node_id); + if (!node) { + node = { nodeId: r.node_id, nodeName: r.node_name, buckets: [] }; + nodeMap.set(r.node_id, node); + } + node.buckets.push({ + bucket: (r.bucket instanceof Date ? r.bucket : new Date(r.bucket)).toISOString(), + memoryPct: Math.round((r.memory_pct ?? 0) * 10) / 10, + diskPct: Math.round((r.disk_pct ?? 0) * 10) / 10, + cpuLoad: Math.round((r.cpu_load ?? 0) * 100) / 100, + }); + } + + return Array.from(nodeMap.values()); +} + +// ─── Data Loss Detection ──────────────────────────────────────────────────── + +export async function getDataLoss( + environmentId: string, + range: TimeRange, + threshold: number = 0.05, +): Promise { + const since = sinceDate(range); + + const rows = await prisma.$queryRaw< + { + pipeline_id: string; + pipeline_name: string; + node_id: string | null; + node_name: string | null; + events_in: bigint | null; + events_out: bigint | null; + }[] + >(Prisma.sql` + SELECT + p."id" AS pipeline_id, + p."name" AS pipeline_name, + n."id" AS node_id, + n."name" AS node_name, + SUM(pm."eventsIn") AS events_in, + SUM(pm."eventsOut") AS events_out + FROM "PipelineMetric" pm + JOIN "Pipeline" p ON p."id" = pm."pipelineId" + LEFT JOIN "VectorNode" n ON n."id" = pm."nodeId" + WHERE pm."componentId" IS NULL + AND pm."timestamp" >= ${since} + AND p."environmentId" = ${environmentId} + GROUP BY p."id", p."name", n."id", n."name" + ORDER BY p."name", n."name" + `); + + const results: PipelineDataLoss[] = []; + for (const r of rows) { + const eventsIn = Number(r.events_in ?? 0); + const eventsOut = Number(r.events_out ?? 0); + if (eventsIn === 0) continue; + const lossRate = (eventsIn - eventsOut) / eventsIn; + if (lossRate <= threshold) continue; + results.push({ + pipelineId: r.pipeline_id, + pipelineName: r.pipeline_name, + nodeId: r.node_id, + nodeName: r.node_name, + eventsIn, + eventsOut, + lossRate: Math.round(lossRate * 10000) / 10000, + }); + } + + return results.sort((a, b) => b.lossRate - a.lossRate); +} + +// ─── Matrix Throughput ────────────────────────────────────────────────────── + +export async function getMatrixThroughput( + environmentId: string, + range: TimeRange, +): Promise { + const since = sinceDate(range); + const windowSeconds = RANGE_MS[range] / 1000; + + const rows = await prisma.$queryRaw< + { + pipeline_id: string; + node_id: string; + events_in: bigint | null; + events_out: bigint | null; + bytes_in: bigint | null; + bytes_out: bigint | null; + }[] + >(Prisma.sql` + SELECT + pm."pipelineId" AS pipeline_id, + pm."nodeId" AS node_id, + SUM(pm."eventsIn") AS events_in, + SUM(pm."eventsOut") AS events_out, + SUM(pm."bytesIn") AS bytes_in, + SUM(pm."bytesOut") AS bytes_out + FROM "PipelineMetric" pm + JOIN "Pipeline" p ON p."id" = pm."pipelineId" + WHERE pm."componentId" IS NULL + AND pm."nodeId" IS NOT NULL + AND pm."timestamp" >= ${since} + AND p."environmentId" = ${environmentId} + GROUP BY pm."pipelineId", pm."nodeId" + `); + + return rows.map((r) => { + const eventsIn = Number(r.events_in ?? 0); + const eventsOut = Number(r.events_out ?? 0); + const bytesIn = Number(r.bytes_in ?? 0); + const bytesOut = Number(r.bytes_out ?? 0); + const lossRate = eventsIn > 0 ? (eventsIn - eventsOut) / eventsIn : 0; + return { + pipelineId: r.pipeline_id, + nodeId: r.node_id, + eventsPerSec: Math.round((eventsIn / windowSeconds) * 100) / 100, + bytesPerSec: Math.round((bytesIn + bytesOut) / windowSeconds), + lossRate: Math.round(Math.max(0, lossRate) * 10000) / 10000, + }; + }); +}
+
{isOutdated ? (
)} + {cellThroughput != null && ( +
+ {formatEventsRate(cellThroughput.eventsPerSec)} + {hasLoss && ( + + ⚠ + + )} +
+ )}