diff --git a/docs/public/user-guide/pipelines.md b/docs/public/user-guide/pipelines.md index b649c688..62b20edc 100644 --- a/docs/public/user-guide/pipelines.md +++ b/docs/public/user-guide/pipelines.md @@ -12,6 +12,7 @@ Pipelines are displayed in a table with the following columns: |--------|------------| | **Name** | The pipeline name. Click it to open the pipeline in the editor. | | **Status** | Current lifecycle state (see statuses below). | +| **Health** | SLI health badge -- green **Healthy**, yellow **Degraded**, or gray **No SLIs** (see [Pipeline Health SLIs](#pipeline-health-slis) below). | | **Events/sec In** | Live event ingestion rate polled from the agent fleet. | | **Bytes/sec In** | Live byte ingestion rate. | | **Reduction** | Percentage of events reduced by transforms, color-coded green (>50%), amber (>10%), or neutral. | @@ -69,6 +70,57 @@ Every time you deploy a pipeline, a new **version** is created that captures the The pipeline list shows a **Pending deploy** badge when the saved configuration differs from the most recently deployed version, so you always know if there are undeployed changes. +## Pipeline Health SLIs + +Service Level Indicators (SLIs) let you define health thresholds for your deployed pipelines. When SLIs are configured, VectorFlow continuously evaluates pipeline metrics against your thresholds and displays the result as a health badge in the pipeline list and pipeline editor toolbar. + +### Health badges + +| Badge | Meaning | +|-------|---------| +| **Healthy** (green) | All configured SLIs are within their thresholds. | +| **Degraded** (yellow) | One or more SLIs have breached their threshold. | +| **No SLIs** (gray) | No SLI definitions have been configured for this pipeline. | + +Draft pipelines do not show a health badge since they are not deployed and have no metrics. + +### Available metrics + +| Metric | Description | Typical condition | +|--------|-------------|-------------------| +| **Error Rate** | Ratio of errors to total events ingested (`errorsTotal / eventsIn`). | `< 0.01` (less than 1% errors) | +| **Discard Rate** | Ratio of discarded events to total events ingested (`eventsDiscarded / eventsIn`). | `< 0.05` (less than 5% discards) | +| **Throughput Floor** | Events per second averaged over the evaluation window (`eventsIn / windowSeconds`). | `> 100` (at least 100 events/sec) | + +### Configuring SLIs + +{% stepper %} +{% step %} +### Open pipeline settings +In the pipeline editor, click the **Settings** gear icon in the toolbar to open the settings popover. +{% endstep %} +{% step %} +### Expand Health SLIs +Click the **Health SLIs** collapsible section at the bottom of the settings panel. +{% endstep %} +{% step %} +### Add an SLI +Select a **Metric** (Error Rate, Throughput Floor, or Discard Rate), choose a **Condition** (less than or greater than), set a **Threshold** value, and configure the evaluation **Window** in minutes (1--1440). Click **Add SLI** to save. +{% endstep %} +{% step %} +### Review and remove +Existing SLIs are listed above the form. Click the trash icon to remove an SLI. Changes take effect immediately -- the pipeline list and toolbar health indicators update on the next evaluation cycle. +{% endstep %} +{% endstepper %} + +{% hint style="info" %} +Each metric can only have one SLI per pipeline. Adding an SLI for a metric that already has one will update the existing definition. +{% endhint %} + +{% hint style="warning" %} +If no metric data is available for the evaluation window (for example, the pipeline was recently deployed or has no traffic), the SLI is treated as **breached** and the pipeline health will show as **Degraded**. +{% endhint %} + ## Filtering by environment Pipelines are scoped to the currently selected **environment** (shown in the sidebar). Switch environments to view pipelines in a different environment. Each environment maintains its own independent set of pipelines, agent nodes, and secrets. diff --git a/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql b/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql new file mode 100644 index 00000000..c37279fa --- /dev/null +++ b/prisma/migrations/20260308000000_add_pipeline_slis/migration.sql @@ -0,0 +1,22 @@ +-- CreateTable +CREATE TABLE "PipelineSli" ( + "id" TEXT NOT NULL, + "pipelineId" TEXT NOT NULL, + "metric" TEXT NOT NULL, + "condition" TEXT NOT NULL, + "threshold" DOUBLE PRECISION NOT NULL, + "windowMinutes" INTEGER NOT NULL DEFAULT 5, + "enabled" BOOLEAN NOT NULL DEFAULT true, + "createdAt" TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP, + + CONSTRAINT "PipelineSli_pkey" PRIMARY KEY ("id") +); + +-- CreateIndex +CREATE UNIQUE INDEX "PipelineSli_pipelineId_metric_key" ON "PipelineSli"("pipelineId", "metric"); + +-- CreateIndex +CREATE INDEX "PipelineSli_pipelineId_idx" ON "PipelineSli"("pipelineId"); + +-- AddForeignKey +ALTER TABLE "PipelineSli" ADD CONSTRAINT "PipelineSli_pipelineId_fkey" FOREIGN KEY ("pipelineId") REFERENCES "Pipeline"("id") ON DELETE CASCADE ON UPDATE CASCADE; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f0db424c..d6cd598f 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -213,6 +213,7 @@ model Pipeline { alertRules AlertRule[] sampleRequests EventSampleRequest[] eventSamples EventSample[] + slis PipelineSli[] createdAt DateTime @default(now()) updatedAt DateTime @updatedAt } @@ -258,6 +259,21 @@ model PipelineMetric { @@index([timestamp]) } +model PipelineSli { + id String @id @default(cuid()) + pipelineId String + pipeline Pipeline @relation(fields: [pipelineId], references: [id], onDelete: Cascade) + metric String // "error_rate" | "throughput_floor" | "discard_rate" + condition String // "lt" | "gt" + threshold Float + windowMinutes Int @default(5) + enabled Boolean @default(true) + createdAt DateTime @default(now()) + + @@unique([pipelineId, metric]) + @@index([pipelineId]) +} + model EventSampleRequest { id String @id @default(cuid()) pipelineId String diff --git a/src/app/(dashboard)/pipelines/page.tsx b/src/app/(dashboard)/pipelines/page.tsx index 8de11704..1cb230e6 100644 --- a/src/app/(dashboard)/pipelines/page.tsx +++ b/src/app/(dashboard)/pipelines/page.tsx @@ -80,6 +80,51 @@ function reductionColor(pct: number): string { return "bg-muted text-muted-foreground"; } +/** Lazily fetches SLI health for a single deployed pipeline. */ +function PipelineHealthBadge({ pipelineId }: { pipelineId: string }) { + const trpc = useTRPC(); + const healthQuery = useQuery( + trpc.pipeline.health.queryOptions( + { pipelineId }, + { refetchInterval: 30_000 }, + ), + ); + + const status = healthQuery.data?.status ?? null; + const hasSlis = (healthQuery.data?.slis.length ?? 0) > 0; + + if (healthQuery.isLoading) { + return ; + } + + if (status === "healthy") { + return ( + + Healthy + + ); + } + if (status === "degraded") { + return ( + + Degraded + + ); + } + if (status === "no_data" && hasSlis) { + return ( + + No Data + + ); + } + return ( + + No SLIs + + ); +} + export default function PipelinesPage() { const trpc = useTRPC(); const selectedEnvironmentId = useEnvironmentStore((s) => s.selectedEnvironmentId); @@ -177,6 +222,7 @@ export default function PipelinesPage() { Name Status + Health Events/sec In Bytes/sec In Reduction @@ -218,6 +264,14 @@ export default function PipelinesPage() { )} + {/* Health */} + + {pipeline.isDraft ? ( + -- + ) : ( + + )} + {/* Events/sec In */} {liveRates[pipeline.id] diff --git a/src/components/flow/flow-toolbar.tsx b/src/components/flow/flow-toolbar.tsx index c6423029..506d19cf 100644 --- a/src/components/flow/flow-toolbar.tsx +++ b/src/components/flow/flow-toolbar.tsx @@ -44,7 +44,7 @@ import { cn } from "@/lib/utils"; import { useFlowStore } from "@/stores/flow-store"; import { generateVectorYaml, generateVectorToml, importVectorConfig } from "@/lib/config-generator"; import { useTRPC } from "@/trpc/client"; -import { useMutation } from "@tanstack/react-query"; +import { useMutation, useQuery } from "@tanstack/react-query"; import { VersionHistoryDialog } from "@/components/pipeline/version-history-dialog"; type ProcessStatusValue = "RUNNING" | "STARTING" | "STOPPED" | "CRASHED" | "PENDING"; @@ -113,6 +113,15 @@ export function FlowToolbar({ const [versionsOpen, setVersionsOpen] = useState(false); const trpc = useTRPC(); + + const healthQuery = useQuery( + trpc.pipeline.health.queryOptions( + { pipelineId: pipelineId! }, + { enabled: !!pipelineId && !isDraft && !!deployedAt, refetchInterval: 30_000 }, + ), + ); + const healthStatus = healthQuery.data?.status ?? null; + const validateMutation = useMutation(trpc.validator.validate.mutationOptions({ onSuccess: (result) => { if (result.valid) { @@ -359,7 +368,7 @@ export function FlowToolbar({ Pipeline settings - + @@ -386,6 +395,23 @@ export function FlowToolbar({ {processStatus === "CRASHED" && "Crashed"} {processStatus === "PENDING" && "Pending..."} + {/* Health SLI indicator dot */} + {healthStatus === "healthy" && ( + + + + + All SLIs met + + )} + {healthStatus === "degraded" && ( + + + + + One or more SLIs breached + + )} )} diff --git a/src/components/flow/pipeline-settings.tsx b/src/components/flow/pipeline-settings.tsx index 2bf0aa3d..3948d0a0 100644 --- a/src/components/flow/pipeline-settings.tsx +++ b/src/components/flow/pipeline-settings.tsx @@ -1,12 +1,13 @@ "use client"; import { useState, useEffect } from "react"; -import { ChevronRight } from "lucide-react"; +import { ChevronRight, Plus, Trash2 } from "lucide-react"; import { useFlowStore } from "@/stores/flow-store"; import { Label } from "@/components/ui/label"; import { Button } from "@/components/ui/button"; import { Separator } from "@/components/ui/separator"; import { Badge } from "@/components/ui/badge"; +import { Input } from "@/components/ui/input"; import { Select, SelectContent, @@ -19,8 +20,15 @@ import { CollapsibleContent, CollapsibleTrigger, } from "@/components/ui/collapsible"; +import { useTRPC } from "@/trpc/client"; +import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; +import { toast } from "sonner"; -export function PipelineSettings() { +interface PipelineSettingsProps { + pipelineId?: string; +} + +export function PipelineSettings({ pipelineId }: PipelineSettingsProps) { const globalConfig = useFlowStore((s) => s.globalConfig); const updateGlobalConfig = useFlowStore((s) => s.updateGlobalConfig); const setGlobalConfig = useFlowStore((s) => s.setGlobalConfig); @@ -129,10 +137,216 @@ export function PipelineSettings() { + + {pipelineId && ( + <> + + + > + )} ); } +// --------------------------------------------------------------------------- +// SLI settings sub-component +// --------------------------------------------------------------------------- + +const METRIC_OPTIONS = [ + { value: "error_rate", label: "Error Rate" }, + { value: "throughput_floor", label: "Throughput Floor" }, + { value: "discard_rate", label: "Discard Rate" }, +] as const; + +const CONDITION_OPTIONS = [ + { value: "lt", label: "< (less than)" }, + { value: "gt", label: "> (greater than)" }, +] as const; + +function SliSettings({ pipelineId }: { pipelineId: string }) { + const trpc = useTRPC(); + const queryClient = useQueryClient(); + + const slisQuery = useQuery( + trpc.pipeline.listSlis.queryOptions({ pipelineId }), + ); + const slis = slisQuery.data ?? []; + + const [sliOpen, setSliOpen] = useState(false); + const [newMetric, setNewMetric] = useState("error_rate"); + const [newCondition, setNewCondition] = useState("lt"); + const [newThreshold, setNewThreshold] = useState("0.01"); + const [newWindow, setNewWindow] = useState("5"); + + const upsertMutation = useMutation( + trpc.pipeline.upsertSli.mutationOptions({ + onSuccess: () => { + toast.success("SLI saved"); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.listSlis.queryKey(), + }); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.health.queryKey(), + }); + }, + onError: (err) => toast.error(err.message || "Failed to save SLI"), + }), + ); + + const deleteMutation = useMutation( + trpc.pipeline.deleteSli.mutationOptions({ + onSuccess: () => { + toast.success("SLI removed"); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.listSlis.queryKey(), + }); + queryClient.invalidateQueries({ + queryKey: trpc.pipeline.health.queryKey(), + }); + }, + onError: (err) => toast.error(err.message || "Failed to delete SLI"), + }), + ); + + const handleAdd = () => { + const threshold = parseFloat(newThreshold); + const windowMinutes = parseInt(newWindow, 10); + if (isNaN(threshold) || threshold < 0) { + toast.error("Threshold must be a non-negative number"); + return; + } + if (isNaN(windowMinutes) || windowMinutes < 1) { + toast.error("Window must be at least 1 minute"); + return; + } + upsertMutation.mutate({ + pipelineId, + metric: newMetric as "error_rate" | "throughput_floor" | "discard_rate", + condition: newCondition as "lt" | "gt", + threshold, + windowMinutes, + }); + }; + + const metricLabel = (m: string) => + METRIC_OPTIONS.find((o) => o.value === m)?.label ?? m; + + return ( + + + + Health SLIs + {slis.length > 0 && ( + + {slis.length} + + )} + + + {/* Existing SLIs */} + {slis.length > 0 && ( + + {slis.map((sli) => ( + + + {metricLabel(sli.metric)}{" "} + + {sli.condition === "lt" ? "<" : ">"} {sli.threshold} + {" "} + + ({sli.windowMinutes}m) + + + deleteMutation.mutate({ id: sli.id, pipelineId })} + disabled={deleteMutation.isPending} + > + + + + ))} + + )} + + {/* Add new SLI form */} + + + Metric + + + + + + {METRIC_OPTIONS.map((o) => ( + + {o.label} + + ))} + + + + + + Condition + + + + + + {CONDITION_OPTIONS.map((o) => ( + + {o.label} + + ))} + + + + + Threshold + setNewThreshold(e.target.value)} + className="h-8 text-xs" + /> + + + + Window (minutes) + setNewWindow(e.target.value)} + className="h-8 text-xs" + /> + + + + {upsertMutation.isPending ? "Saving..." : "Add SLI"} + + + + + ); +} + /** * Returns true when globalConfig has content beyond just log_level. * Used by the toolbar to show a dot indicator on the gear icon. diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index 967be2fd..e2483833 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -17,6 +17,7 @@ import { getOrCreateSystemEnvironment } from "@/server/services/system-environme import { copyPipelineGraph } from "@/server/services/copy-pipeline-graph"; import { stripEnvRefs, type StrippedRef } from "@/server/services/strip-env-refs"; import { gitSyncDeletePipeline } from "@/server/services/git-sync"; +import { evaluatePipelineHealth } from "@/server/services/sli-evaluator"; /** Pipeline names must be safe identifiers */ const pipelineNameSchema = z @@ -114,7 +115,7 @@ export const pipelineRouter = router({ orderBy: { updatedAt: "desc" }, }); - return pipelines.map((p) => { + const mapped = await Promise.all(pipelines.map(async (p) => { let hasUndeployedChanges = false; if (!p.isDraft && p.deployedAt) { const latestVersion = p.versions[0]; @@ -178,7 +179,9 @@ export const pipelineRouter = router({ nodeStatuses: p.nodeStatuses, hasUndeployedChanges, }; - }); + })); + + return mapped; }), get: protectedProcedure @@ -986,4 +989,75 @@ export const pipelineRouter = router({ return deduplicated; }), + + listSlis: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return prisma.pipelineSli.findMany({ + where: { pipelineId: input.pipelineId }, + orderBy: { createdAt: "asc" }, + }); + }), + + upsertSli: protectedProcedure + .input( + z.object({ + pipelineId: z.string(), + metric: z.enum(["error_rate", "throughput_floor", "discard_rate"]), + condition: z.enum(["lt", "gt"]), + threshold: z.number().min(0), + windowMinutes: z.number().int().min(1).max(1440).default(5), + }), + ) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.sli_upserted", "Pipeline")) + .mutation(async ({ input }) => { + return prisma.pipelineSli.upsert({ + where: { + pipelineId_metric: { + pipelineId: input.pipelineId, + metric: input.metric, + }, + }, + update: { + condition: input.condition, + threshold: input.threshold, + windowMinutes: input.windowMinutes, + }, + create: { + pipelineId: input.pipelineId, + metric: input.metric, + condition: input.condition, + threshold: input.threshold, + windowMinutes: input.windowMinutes, + }, + }); + }), + + deleteSli: protectedProcedure + .input(z.object({ id: z.string(), pipelineId: z.string() })) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.sli_deleted", "Pipeline")) + .mutation(async ({ input }) => { + const sli = await prisma.pipelineSli.findUnique({ + where: { id: input.id }, + }); + if (!sli || sli.pipelineId !== input.pipelineId) { + throw new TRPCError({ + code: "NOT_FOUND", + message: "SLI not found", + }); + } + return prisma.pipelineSli.delete({ + where: { id: input.id }, + }); + }), + + health: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + return evaluatePipelineHealth(input.pipelineId); + }), }); diff --git a/src/server/services/sli-evaluator.ts b/src/server/services/sli-evaluator.ts new file mode 100644 index 00000000..5328dcfb --- /dev/null +++ b/src/server/services/sli-evaluator.ts @@ -0,0 +1,100 @@ +import { prisma } from "@/lib/prisma"; + +export type SliStatus = "healthy" | "degraded" | "no_data"; + +export interface SliResult { + metric: string; + status: "met" | "breached" | "no_data"; + value: number | null; + threshold: number; + condition: string; +} + +export async function evaluatePipelineHealth(pipelineId: string): Promise<{ + status: SliStatus; + slis: SliResult[]; +}> { + const sliDefs = await prisma.pipelineSli.findMany({ + where: { pipelineId, enabled: true }, + }); + + if (sliDefs.length === 0) return { status: "no_data", slis: [] }; + + const results: SliResult[] = []; + + for (const sli of sliDefs) { + const since = new Date(Date.now() - sli.windowMinutes * 60_000); + + // Use aggregate to avoid transferring all metric rows to the application + const agg = await prisma.pipelineMetric.aggregate({ + where: { pipelineId, timestamp: { gte: since } }, + _sum: { eventsIn: true, errorsTotal: true, eventsDiscarded: true }, + _count: true, + }); + + if (agg._count === 0) { + results.push({ + metric: sli.metric, + status: "breached", + value: 0, + threshold: sli.threshold, + condition: sli.condition, + }); + continue; + } + + let value: number; + const totalEventsIn = Number(agg._sum.eventsIn ?? 0); + + // For rate-based metrics, zero throughput means no meaningful signal + if (totalEventsIn === 0 && (sli.metric === "error_rate" || sli.metric === "discard_rate")) { + results.push({ + metric: sli.metric, + status: "no_data", + value: null, + threshold: sli.threshold, + condition: sli.condition, + }); + continue; + } + + switch (sli.metric) { + case "error_rate": { + const totalErrors = Number(agg._sum.errorsTotal ?? 0); + value = totalErrors / totalEventsIn; + break; + } + case "discard_rate": { + const totalDiscarded = Number(agg._sum.eventsDiscarded ?? 0); + value = totalDiscarded / totalEventsIn; + break; + } + case "throughput_floor": { + const windowSeconds = sli.windowMinutes * 60; + value = totalEventsIn / windowSeconds; + break; + } + default: + value = 0; + } + + const met = + sli.condition === "lt" ? value < sli.threshold : value > sli.threshold; + results.push({ + metric: sli.metric, + status: met ? "met" : "breached", + value, + threshold: sli.threshold, + condition: sli.condition, + }); + } + + const evaluated = results.filter((r) => r.status !== "no_data"); + const overallStatus: SliStatus = + evaluated.length === 0 + ? "no_data" + : evaluated.every((r) => r.status === "met") + ? "healthy" + : "degraded"; + return { status: overallStatus, slis: results }; +}