From ddd50609fb5f5be7d1b6543b20e21a82f395890e Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:10:28 +0000 Subject: [PATCH 1/9] feat: add PipelineSli model and migration Add PipelineSli table to track per-pipeline health SLI definitions (error_rate, throughput_floor, discard_rate) with configurable thresholds, conditions, and evaluation windows. --- .../migration.sql | 19 +++++++++++++++++++ prisma/schema.prisma | 15 +++++++++++++++ 2 files changed, 34 insertions(+) create mode 100644 prisma/migrations/20260308000000_add_pipeline_slis/migration.sql diff --git a/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql b/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql new file mode 100644 index 00000000..bcac7961 --- /dev/null +++ b/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql @@ -0,0 +1,19 @@ +-- CreateTable +CREATE TABLE "PipelineSli" ( + "id" TEXT NOT NULL, + "pipelineId" TEXT NOT NULL, + "metric" TEXT NOT NULL, + "condition" TEXT NOT NULL, + "threshold" DOUBLE PRECISION NOT NULL, + "windowMinutes" INTEGER NOT NULL DEFAULT 5, + "enabled" BOOLEAN NOT NULL DEFAULT true, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "PipelineSli_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE INDEX "PipelineSli_pipelineId_idx" ON "PipelineSli"("pipelineId"); + +-- AddForeignKey +ALTER TABLE "PipelineSli" ADD CONSTRAINT "PipelineSli_pipelineId_fkey" FOREIGN KEY ("pipelineId") REFERENCES "Pipeline"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f0db424c..c50b6b0b 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -213,6 +213,7 @@ model Pipeline { alertRules AlertRule[] sampleRequests EventSampleRequest[] eventSamples EventSample[] + slis PipelineSli[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } @@ -258,6 +259,20 @@ model PipelineMetric { @@index([timestamp]) } +model PipelineSli { + id String @id @default(cuid()) + pipelineId String + pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) + metric String // "error_rate" | "throughput_floor" | "discard_rate" + condition String // "lt" | "gt" + threshold Float + windowMinutes Int @default(5) + enabled Boolean @default(true) + createdAt DateTime @default(now()) + + @@index([pipelineId]) +} + model EventSampleRequest { id String @id @default(cuid()) pipelineId String From b0dfb34fad3f123e3651452cf306e1162aabaa05 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:10:58 +0000 Subject: [PATCH 2/9] feat: add SLI evaluation service Evaluates pipeline health by checking configured SLI definitions (error_rate, discard_rate, throughput_floor) against recent pipeline metrics within the configured time window. --- src/server/services/sli-evaluator.ts | 89 ++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 src/server/services/sli-evaluator.ts diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts new file mode 100644 index 00000000..42c15519 --- /dev/null +++ b/src/server/services/sli-evaluator.ts @@ -0,0 +1,89 @@ +import { prisma } from "@/lib/prisma"; + +export type SliStatus = "healthy" | "degraded" | "no_data"; + +export interface SliResult { + metric: string; + status: "met" | "breached"; + value: number; + threshold: number; + condition: string; +} + +export async function evaluatePipelineHealth(pipelineId: string): Promise<{ + status: SliStatus; + slis: SliResult[]; +}> { + const sliDefs = await prisma.pipelineSli.findMany({ + where: { pipelineId, enabled: true }, + }); + + if (sliDefs.length === 0) return { status: "no_data", slis: [] }; + + const results: SliResult[] = []; + + for (const sli of sliDefs) { + const since = new Date(Date.now() - sli.windowMinutes * 60_000); + const metrics = await prisma.pipelineMetric.findMany({ + where: { pipelineId, timestamp: { gte: since } }, + }); + + if (metrics.length === 0) { + results.push({ + metric: sli.metric, + status: "breached", + value: 0, + threshold: sli.threshold, + condition: sli.condition, + }); + continue; + } + + let value: number; + const totalEventsIn = metrics.reduce( + (s, m) => s + Number(m.eventsIn ?? 0), + 0, + ); + + switch (sli.metric) { + case "error_rate": { + const totalErrors = metrics.reduce( + (s, m) => s + Number(m.errorsTotal ?? 0), + 0, + ); + value = totalEventsIn > 0 ? totalErrors / totalEventsIn : 0; + break; + } + case "discard_rate": { + const totalDiscarded = metrics.reduce( + (s, m) => s + Number(m.eventsDiscarded ?? 0), + 0, + ); + value = totalEventsIn > 0 ? totalDiscarded / totalEventsIn : 0; + break; + } + case "throughput_floor": { + const windowSeconds = sli.windowMinutes * 60; + value = totalEventsIn / windowSeconds; + break; + } + default: + value = 0; + } + + const met = + sli.condition === "lt" ? value < sli.threshold : value > sli.threshold; + results.push({ + metric: sli.metric, + status: met ? "met" : "breached", + value, + threshold: sli.threshold, + condition: sli.condition, + }); + } + + const overallStatus: SliStatus = results.every((r) => r.status === "met") + ? "healthy" + : "degraded"; + return { status: overallStatus, slis: results }; +} From 2d50ff6ad1eaf5e6f80f38a3a661b4b8d0394dfe Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:12:39 +0000 Subject: [PATCH 3/9] feat: add SLI CRUD and health query to pipeline router Add listSlis, upsertSli, deleteSli, and health procedures to the pipeline router. Update the list query to include healthStatus (healthy/degraded/no_data) for each deployed pipeline. --- src/server/routers/pipeline.ts | 94 +++++++++++++++++++++++++++++++++- 1 file changed, 92 insertions(+), 2 deletions(-) diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 967be2fd..e777da0e 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -17,6 +17,7 @@ import { getOrCreateSystemEnvironment } from "@/server/services/system-environme import { copyPipelineGraph } from "@/server/services/copy-pipeline-graph"; import { stripEnvRefs, type StrippedRef } from "@/server/services/strip-env-refs"; import { gitSyncDeletePipeline } from "@/server/services/git-sync"; +import { evaluatePipelineHealth } from "@/server/services/sli-evaluator"; /** Pipeline names must be safe identifiers */ const pipelineNameSchema = z @@ -114,7 +115,7 @@ export const pipelineRouter = router({ orderBy: { updatedAt: "desc" }, }); - return pipelines.map((p) => { + const mapped = await Promise.all(pipelines.map(async (p) => { let hasUndeployedChanges = false; if (!p.isDraft && p.deployedAt) { const latestVersion = p.versions[0]; @@ -165,6 +166,17 @@ export const pipelineRouter = router({ } } + // Evaluate pipeline health for deployed pipelines + let healthStatus: "healthy" | "degraded" | "no_data" = "no_data"; + if (!p.isDraft && p.deployedAt) { + try { + const health = await evaluatePipelineHealth(p.id); + healthStatus = health.status; + } catch { + healthStatus = "no_data"; + } + } + return { id: p.id, name: p.name, @@ -177,8 +189,11 @@ export const pipelineRouter = router({ updatedBy: p.updatedBy, nodeStatuses: p.nodeStatuses, hasUndeployedChanges, + healthStatus, }; - }); + })); + + return mapped; }), get: protectedProcedure @@ -986,4 +1001,79 @@ export const pipelineRouter = router({ return deduplicated; }), + + listSlis: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return prisma.pipelineSli.findMany({ + where: { pipelineId: input.pipelineId }, + orderBy: { createdAt: "asc" }, + }); + }), + + upsertSli: protectedProcedure + .input( + z.object({ + pipelineId: z.string(), + metric: z.enum(["error_rate", "throughput_floor", "discard_rate"]), + condition: z.enum(["lt", "gt"]), + threshold: z.number().min(0), + windowMinutes: z.number().int().min(1).max(1440).default(5), + }), + ) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.sli_upserted", "Pipeline")) + .mutation(async ({ input }) => { + const existing = await prisma.pipelineSli.findFirst({ + where: { pipelineId: input.pipelineId, metric: input.metric }, + }); + + if (existing) { + return prisma.pipelineSli.update({ + where: { id: existing.id }, + data: { + condition: input.condition, + threshold: input.threshold, + windowMinutes: input.windowMinutes, + }, + }); + } + + return prisma.pipelineSli.create({ + data: { + pipelineId: input.pipelineId, + metric: input.metric, + condition: input.condition, + threshold: input.threshold, + windowMinutes: input.windowMinutes, + }, + }); + }), + + deleteSli: protectedProcedure + .input(z.object({ id: z.string() })) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.sli_deleted", "Pipeline")) + .mutation(async ({ input }) => { + const sli = await prisma.pipelineSli.findUnique({ + where: { id: input.id }, + }); + if (!sli) { + throw new TRPCError({ + code: "NOT_FOUND", + message: "SLI not found", + }); + } + return prisma.pipelineSli.delete({ + where: { id: input.id }, + }); + }), + + health: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return evaluatePipelineHealth(input.pipelineId); + }), }); From dc10450cf5254187495b248aa7379fc62b69734b Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:17:19 +0000 Subject: [PATCH 4/9] feat: add health badges and SLI configuration UI - Add Health column to pipeline list with green/yellow/gray badges - Add SLI configuration form to pipeline settings (metric, condition, threshold, window with add/remove) - Add health indicator dot next to process status in flow toolbar --- src/app/(dashboard)/pipelines/page.tsx | 19 ++ src/components/flow/flow-toolbar.tsx | 30 ++- src/components/flow/pipeline-settings.tsx | 218 +++++++++++++++++++++- 3 files changed, 263 insertions(+), 4 deletions(-) diff --git a/src/app/(dashboard)/pipelines/page.tsx b/src/app/(dashboard)/pipelines/page.tsx index 8de11704..e1909698 100644 --- a/src/app/(dashboard)/pipelines/page.tsx +++ b/src/app/(dashboard)/pipelines/page.tsx @@ -177,6 +177,7 @@ export default function PipelinesPage() { Name Status + Health Events/sec In Bytes/sec In Reduction @@ -218,6 +219,24 @@ export default function PipelinesPage() { )} + {/* Health */} + + {pipeline.isDraft ? ( + -- + ) : pipeline.healthStatus === "healthy" ? ( + + Healthy + + ) : pipeline.healthStatus === "degraded" ? ( + + Degraded + + ) : ( + + No SLIs + + )} + {/* Events/sec In */} {liveRates[pipeline.id] diff --git a/src/components/flow/flow-toolbar.tsx b/src/components/flow/flow-toolbar.tsx index c6423029..506d19cf 100644 --- a/src/components/flow/flow-toolbar.tsx +++ b/src/components/flow/flow-toolbar.tsx @@ -44,7 +44,7 @@ import { cn } from "@/lib/utils"; import { useFlowStore } from "@/stores/flow-store"; import { generateVectorYaml, generateVectorToml, importVectorConfig } from "@/lib/config-generator"; import { useTRPC } from "@/trpc/client"; -import { useMutation } from "@tanstack/react-query"; +import { useMutation, useQuery } from "@tanstack/react-query"; import { VersionHistoryDialog } from "@/components/pipeline/version-history-dialog"; type ProcessStatusValue = "RUNNING" | "STARTING" | "STOPPED" | "CRASHED" | "PENDING"; @@ -113,6 +113,15 @@ export function FlowToolbar({ const [versionsOpen, setVersionsOpen] = useState(false); const trpc = useTRPC(); + + const healthQuery = useQuery( + trpc.pipeline.health.queryOptions( + { pipelineId: pipelineId! }, + { enabled: !!pipelineId && !isDraft && !!deployedAt, refetchInterval: 30_000 }, + ), + ); + const healthStatus = healthQuery.data?.status ?? null; + const validateMutation = useMutation(trpc.validator.validate.mutationOptions({ onSuccess: (result) => { if (result.valid) { @@ -359,7 +368,7 @@ export function FlowToolbar({ Pipeline settings - + @@ -386,6 +395,23 @@ export function FlowToolbar({ {processStatus === "CRASHED" && "Crashed"} {processStatus === "PENDING" && "Pending..."} + {/* Health SLI indicator dot */} + {healthStatus === "healthy" && ( + + + + + All SLIs met + + )} + {healthStatus === "degraded" && ( + + + + + One or more SLIs breached + + )} )} diff --git a/src/components/flow/pipeline-settings.tsx b/src/components/flow/pipeline-settings.tsx index 2bf0aa3d..69fc11ba 100644 --- a/src/components/flow/pipeline-settings.tsx +++ b/src/components/flow/pipeline-settings.tsx @@ -1,12 +1,13 @@ "use client"; import { useState, useEffect } from "react"; -import { ChevronRight } from "lucide-react"; +import { ChevronRight, Plus, Trash2 } from "lucide-react"; import { useFlowStore } from "@/stores/flow-store"; import { Label } from "@/components/ui/label"; import { Button } from "@/components/ui/button"; import { Separator } from "@/components/ui/separator"; import { Badge } from "@/components/ui/badge"; +import { Input } from "@/components/ui/input"; import { Select, SelectContent, @@ -19,8 +20,15 @@ import { CollapsibleContent, CollapsibleTrigger, } from "@/components/ui/collapsible"; +import { useTRPC } from "@/trpc/client"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { toast } from "sonner"; -export function PipelineSettings() { +interface PipelineSettingsProps { + pipelineId?: string; +} + +export function PipelineSettings({ pipelineId }: PipelineSettingsProps) { const globalConfig = useFlowStore((s) => s.globalConfig); const updateGlobalConfig = useFlowStore((s) => s.updateGlobalConfig); const setGlobalConfig = useFlowStore((s) => s.setGlobalConfig); @@ -129,10 +137,216 @@ export function PipelineSettings() { + + {pipelineId && ( + <> + + + + )} ); } +// --------------------------------------------------------------------------- +// SLI settings sub-component +// --------------------------------------------------------------------------- + +const METRIC_OPTIONS = [ + { value: "error_rate", label: "Error Rate" }, + { value: "throughput_floor", label: "Throughput Floor" }, + { value: "discard_rate", label: "Discard Rate" }, +] as const; + +const CONDITION_OPTIONS = [ + { value: "lt", label: "< (less than)" }, + { value: "gt", label: "> (greater than)" }, +] as const; + +function SliSettings({ pipelineId }: { pipelineId: string }) { + const trpc = useTRPC(); + const queryClient = useQueryClient(); + + const slisQuery = useQuery( + trpc.pipeline.listSlis.queryOptions({ pipelineId }), + ); + const slis = slisQuery.data ?? []; + + const [sliOpen, setSliOpen] = useState(false); + const [newMetric, setNewMetric] = useState("error_rate"); + const [newCondition, setNewCondition] = useState("lt"); + const [newThreshold, setNewThreshold] = useState("0.01"); + const [newWindow, setNewWindow] = useState("5"); + + const upsertMutation = useMutation( + trpc.pipeline.upsertSli.mutationOptions({ + onSuccess: () => { + toast.success("SLI saved"); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.listSlis.queryKey(), + }); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.health.queryKey(), + }); + }, + onError: (err) => toast.error(err.message || "Failed to save SLI"), + }), + ); + + const deleteMutation = useMutation( + trpc.pipeline.deleteSli.mutationOptions({ + onSuccess: () => { + toast.success("SLI removed"); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.listSlis.queryKey(), + }); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.health.queryKey(), + }); + }, + onError: (err) => toast.error(err.message || "Failed to delete SLI"), + }), + ); + + const handleAdd = () => { + const threshold = parseFloat(newThreshold); + const windowMinutes = parseInt(newWindow, 10); + if (isNaN(threshold) || threshold < 0) { + toast.error("Threshold must be a non-negative number"); + return; + } + if (isNaN(windowMinutes) || windowMinutes < 1) { + toast.error("Window must be at least 1 minute"); + return; + } + upsertMutation.mutate({ + pipelineId, + metric: newMetric as "error_rate" | "throughput_floor" | "discard_rate", + condition: newCondition as "lt" | "gt", + threshold, + windowMinutes, + }); + }; + + const metricLabel = (m: string) => + METRIC_OPTIONS.find((o) => o.value === m)?.label ?? m; + + return ( + + + + Health SLIs + {slis.length > 0 && ( + + {slis.length} + + )} + + + {/* Existing SLIs */} + {slis.length > 0 && ( +
+ {slis.map((sli) => ( +
+
+ {metricLabel(sli.metric)}{" "} + + {sli.condition === "lt" ? "<" : ">"} {sli.threshold} + {" "} + + ({sli.windowMinutes}m) + +
+ +
+ ))} +
+ )} + + {/* Add new SLI form */} +
+
+ + +
+
+
+ + +
+
+ + setNewThreshold(e.target.value)} + className="h-8 text-xs" + /> +
+
+
+ + setNewWindow(e.target.value)} + className="h-8 text-xs" + /> +
+ +
+
+
+ ); +} + /** * Returns true when globalConfig has content beyond just log_level. * Used by the toolbar to show a dot indicator on the gear icon. From 7aa43548465cd01230e8e375b28238450af11cd2 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:17:57 +0000 Subject: [PATCH 5/9] docs: add pipeline health SLIs section to pipelines page Document available metrics (error_rate, discard_rate, throughput_floor), health badges, and step-by-step SLI configuration instructions. --- docs/public/user-guide/pipelines.md | 52 +++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) diff --git a/docs/public/user-guide/pipelines.md b/docs/public/user-guide/pipelines.md index b649c688..62b20edc 100644 --- a/docs/public/user-guide/pipelines.md +++ b/docs/public/user-guide/pipelines.md @@ -12,6 +12,7 @@ Pipelines are displayed in a table with the following columns: |--------|------------| | **Name** | The pipeline name. Click it to open the pipeline in the editor. | | **Status** | Current lifecycle state (see statuses below). | +| **Health** | SLI health badge -- green **Healthy**, yellow **Degraded**, or gray **No SLIs** (see [Pipeline Health SLIs](#pipeline-health-slis) below). | | **Events/sec In** | Live event ingestion rate polled from the agent fleet. | | **Bytes/sec In** | Live byte ingestion rate. | | **Reduction** | Percentage of events reduced by transforms, color-coded green (>50%), amber (>10%), or neutral. | @@ -69,6 +70,57 @@ Every time you deploy a pipeline, a new **version** is created that captures the The pipeline list shows a **Pending deploy** badge when the saved configuration differs from the most recently deployed version, so you always know if there are undeployed changes. +## Pipeline Health SLIs + +Service Level Indicators (SLIs) let you define health thresholds for your deployed pipelines. When SLIs are configured, VectorFlow continuously evaluates pipeline metrics against your thresholds and displays the result as a health badge in the pipeline list and pipeline editor toolbar. + +### Health badges + +| Badge | Meaning | +|-------|---------| +| **Healthy** (green) | All configured SLIs are within their thresholds. | +| **Degraded** (yellow) | One or more SLIs have breached their threshold. | +| **No SLIs** (gray) | No SLI definitions have been configured for this pipeline. | + +Draft pipelines do not show a health badge since they are not deployed and have no metrics. + +### Available metrics + +| Metric | Description | Typical condition | +|--------|-------------|-------------------| +| **Error Rate** | Ratio of errors to total events ingested (`errorsTotal / eventsIn`). | `< 0.01` (less than 1% errors) | +| **Discard Rate** | Ratio of discarded events to total events ingested (`eventsDiscarded / eventsIn`). | `< 0.05` (less than 5% discards) | +| **Throughput Floor** | Events per second averaged over the evaluation window (`eventsIn / windowSeconds`). | `> 100` (at least 100 events/sec) | + +### Configuring SLIs + +{% stepper %} +{% step %} +### Open pipeline settings +In the pipeline editor, click the **Settings** gear icon in the toolbar to open the settings popover. +{% endstep %} +{% step %} +### Expand Health SLIs +Click the **Health SLIs** collapsible section at the bottom of the settings panel. +{% endstep %} +{% step %} +### Add an SLI +Select a **Metric** (Error Rate, Throughput Floor, or Discard Rate), choose a **Condition** (less than or greater than), set a **Threshold** value, and configure the evaluation **Window** in minutes (1--1440). Click **Add SLI** to save. +{% endstep %} +{% step %} +### Review and remove +Existing SLIs are listed above the form. Click the trash icon to remove an SLI. Changes take effect immediately -- the pipeline list and toolbar health indicators update on the next evaluation cycle. +{% endstep %} +{% endstepper %} + +{% hint style="info" %} +Each metric can only have one SLI per pipeline. Adding an SLI for a metric that already has one will update the existing definition. +{% endhint %} + +{% hint style="warning" %} +If no metric data is available for the evaluation window (for example, the pipeline was recently deployed or has no traffic), the SLI is treated as **breached** and the pipeline health will show as **Degraded**. +{% endhint %} + ## Filtering by environment Pipelines are scoped to the currently selected **environment** (shown in the sidebar). Switch environments to view pipelines in a different environment. Each environment maintains its own independent set of pipelines, agent nodes, and secrets. From 83af85a39d116eed7d7fe6bfaeabf1e9bc84568e Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:27:03 +0000 Subject: [PATCH 6/9] fix: address Greptile review findings - Add @@unique([pipelineId, metric]) constraint to prevent duplicate SLIs and use atomic Prisma upsert instead of read-then-write - Add pipelineId to deleteSli input so withTeamAccess can resolve team context, and verify pipelineId matches the SLI being deleted - Replace unbounded findMany with aggregate in SLI evaluator to avoid transferring all metric rows to the application layer --- .../migration.sql | 3 ++ prisma/schema.prisma | 1 + src/components/flow/pipeline-settings.tsx | 2 +- src/server/routers/pipeline.ts | 32 ++++++++----------- src/server/services/sli-evaluator.ts | 23 ++++++------- 5 files changed, 28 insertions(+), 33 deletions(-) diff --git a/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql b/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql index bcac7961..c37279fa 100644 --- a/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql +++ b/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql @@ -12,6 +12,9 @@ CREATE TABLE "PipelineSli" ( CONSTRAINT "PipelineSli_pkey" PRIMARY KEY ("id") ); +-- CreateIndex +CREATE UNIQUE INDEX "PipelineSli_pipelineId_metric_key" ON "PipelineSli"("pipelineId", "metric"); + -- CreateIndex CREATE INDEX "PipelineSli_pipelineId_idx" ON "PipelineSli"("pipelineId"); diff --git a/prisma/schema.prisma b/prisma/schema.prisma index c50b6b0b..d6cd598f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -270,6 +270,7 @@ model PipelineSli { enabled Boolean @default(true) createdAt DateTime @default(now()) + @@unique([pipelineId, metric]) @@index([pipelineId]) } diff --git a/src/components/flow/pipeline-settings.tsx b/src/components/flow/pipeline-settings.tsx index 69fc11ba..3948d0a0 100644 --- a/src/components/flow/pipeline-settings.tsx +++ b/src/components/flow/pipeline-settings.tsx @@ -266,7 +266,7 @@ function SliSettings({ pipelineId }: { pipelineId: string }) { variant="ghost" size="icon" className="h-6 w-6 text-muted-foreground hover:text-destructive" - onClick={() => deleteMutation.mutate({ id: sli.id })} + onClick={() => deleteMutation.mutate({ id: sli.id, pipelineId })} disabled={deleteMutation.isPending} > diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index e777da0e..631a7c7d 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -1025,23 +1025,19 @@ export const pipelineRouter = router({ .use(withTeamAccess("EDITOR")) .use(withAudit("pipeline.sli_upserted", "Pipeline")) .mutation(async ({ input }) => { - const existing = await prisma.pipelineSli.findFirst({ - where: { pipelineId: input.pipelineId, metric: input.metric }, - }); - - if (existing) { - return prisma.pipelineSli.update({ - where: { id: existing.id }, - data: { - condition: input.condition, - threshold: input.threshold, - windowMinutes: input.windowMinutes, + return prisma.pipelineSli.upsert({ + where: { + pipelineId_metric: { + pipelineId: input.pipelineId, + metric: input.metric, }, - }); - } - - return prisma.pipelineSli.create({ - data: { + }, + update: { + condition: input.condition, + threshold: input.threshold, + windowMinutes: input.windowMinutes, + }, + create: { pipelineId: input.pipelineId, metric: input.metric, condition: input.condition, @@ -1052,14 +1048,14 @@ export const pipelineRouter = router({ }), deleteSli: protectedProcedure - .input(z.object({ id: z.string() })) + .input(z.object({ id: z.string(), pipelineId: z.string() })) .use(withTeamAccess("EDITOR")) .use(withAudit("pipeline.sli_deleted", "Pipeline")) .mutation(async ({ input }) => { const sli = await prisma.pipelineSli.findUnique({ where: { id: input.id }, }); - if (!sli) { + if (!sli || sli.pipelineId !== input.pipelineId) { throw new TRPCError({ code: "NOT_FOUND", message: "SLI not found", diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts index 42c15519..9dc767d2 100644 --- a/src/server/services/sli-evaluator.ts +++ b/src/server/services/sli-evaluator.ts @@ -24,11 +24,15 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ for (const sli of sliDefs) { const since = new Date(Date.now() - sli.windowMinutes * 60_000); - const metrics = await prisma.pipelineMetric.findMany({ + + // Use aggregate to avoid transferring all metric rows to the application + const agg = await prisma.pipelineMetric.aggregate({ where: { pipelineId, timestamp: { gte: since } }, + _sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true }, + _count: true, }); - if (metrics.length === 0) { + if (agg._count === 0) { results.push({ metric: sli.metric, status: "breached", @@ -40,25 +44,16 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ } let value: number; - const totalEventsIn = metrics.reduce( - (s, m) => s + Number(m.eventsIn ?? 0), - 0, - ); + const totalEventsIn = Number(agg._sum.eventsIn ?? 0); switch (sli.metric) { case "error_rate": { - const totalErrors = metrics.reduce( - (s, m) => s + Number(m.errorsTotal ?? 0), - 0, - ); + const totalErrors = Number(agg._sum.errorsTotal ?? 0); value = totalEventsIn > 0 ? totalErrors / totalEventsIn : 0; break; } case "discard_rate": { - const totalDiscarded = metrics.reduce( - (s, m) => s + Number(m.eventsDiscarded ?? 0), - 0, - ); + const totalDiscarded = Number(agg._sum.eventsDiscarded ?? 0); value = totalEventsIn > 0 ? totalDiscarded / totalEventsIn : 0; break; } From dbfc897abfc739b438804db2d6f8548825af5243 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 13:53:33 +0000 Subject: [PATCH 7/9] fix: address pipeline SLIs review findings --- src/server/routers/pipeline.ts | 13 +------------ 1 file changed, 1 insertion(+), 12 deletions(-) diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 631a7c7d..54645b67 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -166,17 +166,6 @@ export const pipelineRouter = router({ } } - // Evaluate pipeline health for deployed pipelines - let healthStatus: "healthy" | "degraded" | "no_data" = "no_data"; - if (!p.isDraft && p.deployedAt) { - try { - const health = await evaluatePipelineHealth(p.id); - healthStatus = health.status; - } catch { - healthStatus = "no_data"; - } - } - return { id: p.id, name: p.name, @@ -189,7 +178,7 @@ export const pipelineRouter = router({ updatedBy: p.updatedBy, nodeStatuses: p.nodeStatuses, hasUndeployedChanges, - healthStatus, + healthStatus: null as "healthy" | "degraded" | "no_data" | null, }; })); From a4e554585b580d2d67e57c6bfd4814e705be0c9e Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 14:16:43 +0000 Subject: [PATCH 8/9] fix: repair health column in pipeline list, handle zero-throughput SLIs --- src/app/(dashboard)/pipelines/page.tsx | 49 ++++++++++++++++++++------ src/server/routers/pipeline.ts | 1 - src/server/services/sli-evaluator.ts | 30 ++++++++++++---- 3 files changed, 61 insertions(+), 19 deletions(-) diff --git a/src/app/(dashboard)/pipelines/page.tsx b/src/app/(dashboard)/pipelines/page.tsx index e1909698..86380585 100644 --- a/src/app/(dashboard)/pipelines/page.tsx +++ b/src/app/(dashboard)/pipelines/page.tsx @@ -80,6 +80,43 @@ function reductionColor(pct: number): string { return "bg-muted text-muted-foreground"; } +/** Lazily fetches SLI health for a single deployed pipeline. */ +function PipelineHealthBadge({ pipelineId }: { pipelineId: string }) { + const trpc = useTRPC(); + const healthQuery = useQuery( + trpc.pipeline.health.queryOptions( + { pipelineId }, + { refetchInterval: 30_000 }, + ), + ); + + const status = healthQuery.data?.status ?? null; + + if (healthQuery.isLoading) { + return ; + } + + if (status === "healthy") { + return ( + + Healthy + + ); + } + if (status === "degraded") { + return ( + + Degraded + + ); + } + return ( + + No SLIs + + ); +} + export default function PipelinesPage() { const trpc = useTRPC(); const selectedEnvironmentId = useEnvironmentStore((s) => s.selectedEnvironmentId); @@ -223,18 +260,8 @@ export default function PipelinesPage() { {pipeline.isDraft ? ( -- - ) : pipeline.healthStatus === "healthy" ? ( - - Healthy - - ) : pipeline.healthStatus === "degraded" ? ( - - Degraded - ) : ( - - No SLIs - + )} {/* Events/sec In */} diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 54645b67..e2483833 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -178,7 +178,6 @@ export const pipelineRouter = router({ updatedBy: p.updatedBy, nodeStatuses: p.nodeStatuses, hasUndeployedChanges, - healthStatus: null as "healthy" | "degraded" | "no_data" | null, }; })); diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts index 9dc767d2..5328dcfb 100644 --- a/src/server/services/sli-evaluator.ts +++ b/src/server/services/sli-evaluator.ts @@ -4,8 +4,8 @@ export type SliStatus = "healthy" | "degraded" | "no_data"; export interface SliResult { metric: string; - status: "met" | "breached"; - value: number; + status: "met" | "breached" | "no_data"; + value: number | null; threshold: number; condition: string; } @@ -46,15 +46,27 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ let value: number; const totalEventsIn = Number(agg._sum.eventsIn ?? 0); + // For rate-based metrics, zero throughput means no meaningful signal + if (totalEventsIn === 0 && (sli.metric === "error_rate" || sli.metric === "discard_rate")) { + results.push({ + metric: sli.metric, + status: "no_data", + value: null, + threshold: sli.threshold, + condition: sli.condition, + }); + continue; + } + switch (sli.metric) { case "error_rate": { const totalErrors = Number(agg._sum.errorsTotal ?? 0); - value = totalEventsIn > 0 ? totalErrors / totalEventsIn : 0; + value = totalErrors / totalEventsIn; break; } case "discard_rate": { const totalDiscarded = Number(agg._sum.eventsDiscarded ?? 0); - value = totalEventsIn > 0 ? totalDiscarded / totalEventsIn : 0; + value = totalDiscarded / totalEventsIn; break; } case "throughput_floor": { @@ -77,8 +89,12 @@ export async function evaluatePipelineHealth(pipelineId: string): Promise<{ }); } - const overallStatus: SliStatus = results.every((r) => r.status === "met") - ? "healthy" - : "degraded"; + const evaluated = results.filter((r) => r.status !== "no_data"); + const overallStatus: SliStatus = + evaluated.length === 0 + ? "no_data" + : evaluated.every((r) => r.status === "met") + ? "healthy" + : "degraded"; return { status: overallStatus, slis: results }; } From 5cf87e0200aedecc4470412171aa804f1aeceee9 Mon Sep 17 00:00:00 2001 From: TerrifiedBug Date: Sat, 7 Mar 2026 14:27:33 +0000 Subject: [PATCH 9/9] fix: add "No Data" badge for SLIs with no traffic --- src/app/(dashboard)/pipelines/page.tsx | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/src/app/(dashboard)/pipelines/page.tsx b/src/app/(dashboard)/pipelines/page.tsx index 86380585..1cb230e6 100644 --- a/src/app/(dashboard)/pipelines/page.tsx +++ b/src/app/(dashboard)/pipelines/page.tsx @@ -91,6 +91,7 @@ function PipelineHealthBadge({ pipelineId }: { pipelineId: string }) { ); const status = healthQuery.data?.status ?? null; + const hasSlis = (healthQuery.data?.slis.length ?? 0) > 0; if (healthQuery.isLoading) { return ; @@ -110,6 +111,13 @@ function PipelineHealthBadge({ pipelineId }: { pipelineId: string }) { ); } + if (status === "no_data" && hasSlis) { + return ( + + No Data + + ); + } return ( No SLIs