diff --git a/docs/public/reference/agent.md b/docs/public/reference/agent.md index f103c0d5..c78ef7c6 100644 --- a/docs/public/reference/agent.md +++ b/docs/public/reference/agent.md @@ -62,6 +62,7 @@ After each poll, the agent sends a heartbeat (`POST /api/agent/heartbeat`) that - Host system metrics (CPU, memory, disk, network) - Recent stdout/stderr log lines from each pipeline process - Agent and Vector version information +- Node labels (optional key-value metadata for selective deployment) --- @@ -75,6 +76,7 @@ After each poll, the agent sends a heartbeat (`POST /api/agent/heartbeat`) that | `VF_VECTOR_BIN` | No | `vector` | Path to the Vector binary. Use if Vector is not on the system `PATH`. | | `VF_POLL_INTERVAL` | No | `15s` | How often to poll the server for config changes. Accepts Go duration syntax (e.g., `10s`, `1m`). | | `VF_LOG_LEVEL` | No | `info` | Agent log level: `debug`, `info`, `warn`, `error` | +| `VF_LABELS` | No | -- | Comma-separated key=value pairs reported to the server on each heartbeat (e.g., `region=us-east-1,tier=production`). Labels set via the UI take precedence over agent-reported values. Used for selective pipeline deployment. | {% hint style="warning" %} `VF_URL` is the only strictly required variable. However, `VF_TOKEN` must be set on the first run for enrollment. After the agent writes its node token to disk, `VF_TOKEN` can be removed. @@ -167,6 +169,10 @@ Key fields: - **`certFiles`**: Certificate data written to `/certs/` before starting the pipeline. - **`pendingAction`**: Server-initiated action (currently only `self_update`). +{% hint style="info" %} +When a pipeline has a **node selector** configured (via the deploy dialog), the config response only includes pipelines whose selector labels match this node's labels. A pipeline with no node selector deploys to all nodes. +{% endhint %} + {% hint style="info" %} When a node is in **maintenance mode**, the config response returns an empty `pipelines` array. The agent stops all running pipelines but continues sending heartbeats. See [Fleet Management](../user-guide/fleet.md#maintenance-mode) for details. {% endhint %} @@ -177,6 +183,9 @@ Called after every poll. Sends status and metrics for all managed pipelines. **Headers:** `Authorization: Bearer `, `Content-Type: application/json` +Key fields: +- **`labels`** (optional): Key-value pairs describing this node. Labels set via the `VF_LABELS` environment variable are reported here. The server merges them with any labels set through the UI, with UI-set labels taking precedence. + **Request:** ```json { @@ -212,7 +221,11 @@ Called after every poll. Sends status and metrics for all managed pipelines. }, "agentVersion": "0.5.0", "vectorVersion": "vector 0.41.1", - "deploymentMode": "STANDALONE" + "deploymentMode": "STANDALONE", + "labels": { + "region": "us-east-1", + "tier": "production" + } } ``` diff --git a/docs/public/user-guide/fleet.md b/docs/public/user-guide/fleet.md index fd34daf8..0526e7fd 100644 --- a/docs/public/user-guide/fleet.md +++ b/docs/public/user-guide/fleet.md @@ -13,6 +13,7 @@ All enrolled agent nodes are displayed in a table with the following columns: | **Name** | The node name. Click it to open the node detail page. You can rename nodes from the detail view. | | **Host:Port** | The hostname or IP address and API port the agent is listening on. | | **Environment** | The environment the node is enrolled in. | +| **Labels** | Key-value labels assigned to the node, shown as `key=value` badges. See [Node Labels](#node-labels) below. | | **Version** | The Vector version running on the node. | | **Agent Version** | The VectorFlow agent version, plus deployment mode (Docker or Binary). An **Update available** badge appears when a newer version exists. | | **Status** | Current health status (see statuses below). | @@ -104,6 +105,53 @@ Docker-based agents are updated by pulling the latest image. The **Update** butt Below the node list, the **Pipeline Deployment Matrix** shows a grid of all deployed pipelines across all nodes in the environment. This lets you see at a glance which pipelines are running on which nodes and their current status. +## Node labels + +Labels are key-value pairs you can attach to nodes for organization and selective deployment. Common uses include tagging nodes by region, role, tier, or any custom dimension relevant to your infrastructure. + +### Viewing labels + +Labels appear as `key=value` badges in the **Labels** column of the fleet table. Nodes with no labels show an empty column. + +### Adding and editing labels + +{% stepper %} +{% step %} +### Open the node detail page +Click a node name in the fleet table to open its detail page. +{% endstep %} +{% step %} +### Edit labels +In the **Labels** card, click the **Edit** button. +{% endstep %} +{% step %} +### Add or modify labels +Use the key-value input pairs to add, modify, or remove labels. Click **Add Label** to add a new pair, or click the **X** button to remove a row. +{% endstep %} +{% step %} +### Save +Click **Save Labels** to persist the changes. +{% endstep %} +{% endstepper %} + +{% hint style="info" %} +Editing labels requires the **Editor** role or above on the team. +{% endhint %} + +### Agent-reported labels + +Agents can also report labels in their heartbeat payload. When a label is reported by the agent and also set via the UI, the **UI value takes precedence**. This lets you override agent-reported labels without them being overwritten on the next heartbeat. + +### Selective deployment with labels + +When deploying a pipeline, you can optionally restrict deployment to nodes matching specific labels. In the deploy dialog, the **Target Nodes** selector lets you pick from all labels in the environment. Selected labels are combined with AND logic -- a node must have all selected labels to receive the pipeline. + +The deploy dialog shows a live count of matching nodes (e.g., "3 of 5 nodes match") so you can verify your selection before deploying. When no labels are selected, the pipeline deploys to all nodes in the environment (backward compatible). + +{% hint style="warning" %} +Changing a pipeline's node selector on a subsequent deploy updates the targeting. Nodes that no longer match will stop the pipeline on their next poll. +{% endhint %} + ## Maintenance mode Maintenance mode lets you temporarily stop all pipelines on a node without removing it from the fleet. This is useful for host upgrades, kernel patches, disk maintenance, or any situation where you need the node idle but still connected. diff --git a/prisma/migrations/20260307100000_add_node_labels/migration.sql b/prisma/migrations/20260307100000_add_node_labels/migration.sql new file mode 100644 index 00000000..3151759d --- /dev/null +++ b/prisma/migrations/20260307100000_add_node_labels/migration.sql @@ -0,0 +1,5 @@ +-- AlterTable +ALTER TABLE "VectorNode" ADD COLUMN "labels" JSONB DEFAULT '{}'; + +-- AlterTable +ALTER TABLE "Pipeline" ADD COLUMN "nodeSelector" JSONB; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index f0db424c..027b9da0 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -107,6 +107,7 @@ model VectorNode { pendingAction Json? maintenanceMode Boolean @default(false) maintenanceModeAt DateTime? + labels Json? @default("{}") pipelineStatuses NodePipelineStatus[] nodeMetrics NodeMetric[] pipelineLogs PipelineLog[] @@ -200,6 +201,7 @@ model Pipeline { edges PipelineEdge[] versions PipelineVersion[] globalConfig Json? + nodeSelector Json? isDraft Boolean @default(true) isSystem Boolean @default(false) deployedAt DateTime? diff --git a/src/app/(dashboard)/fleet/[nodeId]/page.tsx b/src/app/(dashboard)/fleet/[nodeId]/page.tsx index fe7e0b47..88814521 100644 --- a/src/app/(dashboard)/fleet/[nodeId]/page.tsx +++ b/src/app/(dashboard)/fleet/[nodeId]/page.tsx @@ -3,11 +3,12 @@ import { useParams, useRouter } from "next/navigation"; import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; import { useTRPC } from "@/trpc/client"; -import { ArrowLeft, ShieldOff, Trash2, Activity, Terminal, Server, Pencil, Check, X, Wrench } from "lucide-react"; +import { ArrowLeft, ShieldOff, Trash2, Activity, Terminal, Server, Pencil, Check, X, Wrench, Plus, Tag } from "lucide-react"; import { NodeLogs } from "@/components/fleet/node-logs"; import { toast } from "sonner"; import { useState } from "react"; +import { Badge } from "@/components/ui/badge"; import { Button } from "@/components/ui/button"; import { StatusBadge } from "@/components/ui/status-badge"; import { Input } from "@/components/ui/input"; @@ -57,6 +58,8 @@ export default function NodeDetailPage() { const [isRenaming, setIsRenaming] = useState(false); const [editName, setEditName] = useState(""); + const [isEditingLabels, setIsEditingLabels] = useState(false); + const [editLabels, setEditLabels] = useState>([]); const nodeQuery = useQuery( trpc.fleet.get.queryOptions( @@ -141,6 +144,35 @@ export default function NodeDetailPage() { }), ); + const labelsMutation = useMutation( + trpc.fleet.updateLabels.mutationOptions({ + onSuccess: () => { + queryClient.invalidateQueries({ queryKey: trpc.fleet.get.queryKey({ id: params.nodeId }) }); + queryClient.invalidateQueries({ queryKey: trpc.fleet.list.queryKey() }); + toast.success("Labels updated"); + setIsEditingLabels(false); + }, + }), + ); + + function handleStartEditLabels() { + const labels = (node?.labels as Record) ?? {}; + const entries = Object.entries(labels).map(([key, value]) => ({ key, value })); + if (entries.length === 0) entries.push({ key: "", value: "" }); + setEditLabels(entries); + setIsEditingLabels(true); + } + + function handleSaveLabels() { + if (!node) return; + const labels: Record = {}; + for (const { key, value } of editLabels) { + const k = key.trim(); + if (k) labels[k] = value.trim(); + } + labelsMutation.mutate({ nodeId: node.id, labels }); + } + function handleMaintenanceToggle() { if (!node) return; if (!node.maintenanceMode) { @@ -357,6 +389,96 @@ export default function NodeDetailPage() { + {/* Node Labels */} + + + + + + Labels + + {!isEditingLabels && ( + + )} + + + + {isEditingLabels ? ( +
+ {editLabels.map((label, idx) => ( +
+ { + const next = [...editLabels]; + next[idx] = { ...next[idx], key: e.target.value }; + setEditLabels(next); + }} + className="flex-1" + /> + = + { + const next = [...editLabels]; + next[idx] = { ...next[idx], value: e.target.value }; + setEditLabels(next); + }} + className="flex-1" + /> + +
+ ))} + +
+ + +
+
+ ) : ( +
+ {Object.entries((node.labels as Record) ?? {}).length > 0 ? ( + Object.entries((node.labels as Record) ?? {}).map( + ([k, v]) => ( + + {k}={v} + + ), + ) + ) : ( +

No labels assigned

+ )} +
+ )} +
+
+ diff --git a/src/app/(dashboard)/fleet/page.tsx b/src/app/(dashboard)/fleet/page.tsx index b20644e7..e40fb7da 100644 --- a/src/app/(dashboard)/fleet/page.tsx +++ b/src/app/(dashboard)/fleet/page.tsx @@ -119,6 +119,7 @@ export default function FleetPage() { Name Host:Port Environment + Labels Version Agent Version Status @@ -143,6 +144,17 @@ export default function FleetPage() { {node.environment.name} + +
+ {Object.entries( + (node.labels as Record) ?? {}, + ).map(([k, v]) => ( + + {k}={v} + + ))} +
+
{node.vectorVersion?.split(" ")[1] ?? "—"} diff --git a/src/app/api/agent/config/route.ts b/src/app/api/agent/config/route.ts index 86aae744..739e05c1 100644 --- a/src/app/api/agent/config/route.ts +++ b/src/app/api/agent/config/route.ts @@ -16,7 +16,7 @@ export async function GET(request: Request) { // Fetch the node to check for pending actions (e.g., self-update) const node = await prisma.vectorNode.findUnique({ where: { id: agent.nodeId }, - select: { pendingAction: true, maintenanceMode: true }, + select: { pendingAction: true, maintenanceMode: true, labels: true }, }); if (node?.maintenanceMode) { @@ -53,7 +53,7 @@ export async function GET(request: Request) { // Agents receive the config snapshot from PipelineVersion — NOT live node/edge // data — so that saving in the editor doesn't affect agents until an explicit // deploy confirms the change. - const pipelines = await prisma.pipeline.findMany({ + const deployedPipelines = await prisma.pipeline.findMany({ where: { environmentId: agent.environmentId, isDraft: false, @@ -62,6 +62,7 @@ export async function GET(request: Request) { select: { id: true, name: true, + nodeSelector: true, versions: { orderBy: { version: "desc" }, take: 1, @@ -70,6 +71,16 @@ export async function GET(request: Request) { }, }); + // Filter pipelines by nodeSelector matching this node's labels. + // Pipelines without a nodeSelector (or empty selector) deploy to all nodes. + const nodeLabels = (node?.labels as Record) ?? {}; + const pipelines = deployedPipelines.filter((p) => { + const selector = (p.nodeSelector as Record) ?? {}; + return Object.entries(selector).every( + ([key, value]) => nodeLabels[key] === value, + ); + }); + const pipelineConfigs = []; const certBasePath = "/var/lib/vf-agent/certs"; diff --git a/src/app/api/agent/heartbeat/route.ts b/src/app/api/agent/heartbeat/route.ts index f0cc20b4..067e3932 100644 --- a/src/app/api/agent/heartbeat/route.ts +++ b/src/app/api/agent/heartbeat/route.ts @@ -67,6 +67,7 @@ const heartbeatSchema = z.object({ error: z.string().optional(), })).optional(), updateError: z.string().max(500).optional(), + labels: z.record(z.string(), z.string()).optional(), }); let lastCleanup = 0; @@ -151,7 +152,7 @@ export async function POST(request: Request) { } // Update node heartbeat and metadata - await prisma.vectorNode.update({ + const node = await prisma.vectorNode.update({ where: { id: agent.nodeId }, data: { lastHeartbeat: now, @@ -166,6 +167,18 @@ export async function POST(request: Request) { }, }); + // Merge agent-reported labels with existing UI-set labels. + // UI-set labels take precedence over agent-reported labels. + // Uses a single atomic operation to avoid TOCTOU race with fleet.updateLabels: + // agent labels are the base, existing DB labels override on top. + if (parsed.data.labels) { + await prisma.$executeRaw` + UPDATE "VectorNode" + SET labels = ${JSON.stringify(parsed.data.labels)}::jsonb || labels + WHERE id = ${node.id} + `; + } + // Read previous snapshots BEFORE upserting so we can compute deltas correctly const prevSnapshots = new Map([]); + const [labelPopoverOpen, setLabelPopoverOpen] = useState(false); const previewQuery = useQuery({ ...trpc.deploy.preview.queryOptions({ pipelineId }), @@ -40,6 +56,64 @@ export function DeployDialog({ pipelineId, open, onOpenChange }: DeployDialogPro enabled: open, }); + const environmentId = envQuery.data?.environmentId; + + const labelsQuery = useQuery({ + ...trpc.fleet.listLabels.queryOptions( + { environmentId: environmentId! }, + ), + enabled: open && !!environmentId, + }); + + // Seed selectedLabels from existing pipeline nodeSelector when dialog opens + useEffect(() => { + if (!open) return; + const existing = previewQuery.data?.nodeSelector ?? {}; + // eslint-disable-next-line react-hooks/set-state-in-effect + setSelectedLabels( + Object.entries(existing).map(([k, v]) => `${k}=${v}`), + ); + }, [open, previewQuery.data?.nodeSelector]); + + // Build flat list of "key=value" options from the label map + const availableLabelOptions = useMemo(() => { + const data = labelsQuery.data; + if (!data) return []; + const options: string[] = []; + for (const [key, values] of Object.entries(data)) { + for (const val of values as string[]) { + options.push(`${key}=${val}`); + } + } + return options.sort(); + }, [labelsQuery.data]); + + // Build nodeSelector from selected labels + const nodeSelector = useMemo(() => { + const sel: Record = {}; + for (const label of selectedLabels) { + const idx = label.indexOf("="); + if (idx > 0) { + sel[label.slice(0, idx)] = label.slice(idx + 1); + } + } + return sel; + }, [selectedLabels]); + + // Compute matching node count + const matchingNodeCount = useMemo(() => { + const nodes = envQuery.data?.nodes ?? []; + if (selectedLabels.length === 0) return nodes.length; + return nodes.filter((n) => { + const nodeLabels = (n.labels as Record) ?? {}; + return Object.entries(nodeSelector).every( + ([key, value]) => nodeLabels[key] === value, + ); + }).length; + }, [envQuery.data?.nodes, selectedLabels, nodeSelector]); + + const totalNodeCount = envQuery.data?.nodes.length ?? 0; + const agentMutation = useMutation( trpc.deploy.agent.mutationOptions({ onSuccess: (result) => { @@ -79,11 +153,15 @@ export function DeployDialog({ pipelineId, open, onOpenChange }: DeployDialogPro function handleDeploy() { setDeploying(true); - agentMutation.mutate({ pipelineId, changelog: changelog.trim() }); + agentMutation.mutate({ + pipelineId, + changelog: changelog.trim(), + ...(selectedLabels.length > 0 ? { nodeSelector } : { nodeSelector: {} }), + }); } return ( - { if (deploying) return; if (!val) setChangelog(""); onOpenChange(val); }}> + { if (deploying) return; if (!val) { setChangelog(""); setSelectedLabels([]); } onOpenChange(val); }}> @@ -112,6 +190,95 @@ export function DeployDialog({ pipelineId, open, onOpenChange }: DeployDialogPro )} + {/* Node selector */} + {availableLabelOptions.length > 0 && ( +
+ + + + + + + + + + No labels found. + + {(() => { + const selectedKeys = new Set(selectedLabels.map((l) => l.split("=")[0])); + return availableLabelOptions.map((option) => { + const optionKey = option.split("=")[0]; + const isSelected = selectedLabels.includes(option); + const keyAlreadyUsed = selectedKeys.has(optionKey) && !isSelected; + return ( + { + if (keyAlreadyUsed) return; + setSelectedLabels((prev) => + prev.includes(option) + ? prev.filter((l) => l !== option) + : [...prev, option], + ); + }} + > + + {option} + + ); + }); + })()} + + + + + + + {selectedLabels.length > 0 && ( +
+ {selectedLabels.map((label) => ( + + {label} + + + ))} +
+ )} + +

+ {matchingNodeCount} of {totalNodeCount} node{totalNodeCount !== 1 ? "s" : ""} match +

+
+ )} + {preview && ( diff --git a/src/server/routers/deploy.ts b/src/server/routers/deploy.ts index a62816b5..068faecb 100644 --- a/src/server/routers/deploy.ts +++ b/src/server/routers/deploy.ts @@ -1,5 +1,6 @@ import { z } from "zod"; import { TRPCError } from "@trpc/server"; +import { Prisma } from "@/generated/prisma"; import { router, protectedProcedure, withTeamAccess } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; import { deployAgent, undeployAgent } from "@/server/services/deploy-agent"; @@ -64,11 +65,18 @@ export const deployRouter = router({ currentVersion: latestVersion?.version ?? null, currentLogLevel: latestVersion?.logLevel ?? "info", newLogLevel: ((pipeline.globalConfig as Record)?.log_level as string) ?? "info", + nodeSelector: pipeline.nodeSelector as Record | null, }; }), agent: protectedProcedure - .input(z.object({ pipelineId: z.string(), changelog: z.string().min(1) })) + .input( + z.object({ + pipelineId: z.string(), + changelog: z.string().min(1), + nodeSelector: z.record(z.string(), z.string()).optional(), + }), + ) .use(withTeamAccess("EDITOR")) .use(withAudit("deploy.agent", "Pipeline")) .mutation(async ({ input, ctx }) => { @@ -77,7 +85,22 @@ export const deployRouter = router({ throw new TRPCError({ code: "UNAUTHORIZED" }); } - return deployAgent(input.pipelineId, userId, input.changelog); + const result = await deployAgent(input.pipelineId, userId, input.changelog); + + // Only persist nodeSelector if the deploy actually succeeded + if (result.success && input.nodeSelector !== undefined) { + await prisma.pipeline.update({ + where: { id: input.pipelineId }, + data: { + nodeSelector: + Object.keys(input.nodeSelector).length > 0 + ? input.nodeSelector + : Prisma.DbNull, + }, + }); + } + + return result; }), undeploy: protectedProcedure @@ -112,6 +135,7 @@ export const deployRouter = router({ host: true, apiPort: true, status: true, + labels: true, }, }, }, diff --git a/src/server/routers/fleet.ts b/src/server/routers/fleet.ts index 14e9160b..4990abcd 100644 --- a/src/server/routers/fleet.ts +++ b/src/server/routers/fleet.ts @@ -264,6 +264,43 @@ export const fleetRouter = router({ }); }), + updateLabels: protectedProcedure + .input( + z.object({ + nodeId: z.string(), + labels: z.record(z.string(), z.string()), + }), + ) + .use(withTeamAccess("EDITOR")) + .use(withAudit("vectorNode.updated", "VectorNode")) + .mutation(async ({ input }) => { + return prisma.vectorNode.update({ + where: { id: input.nodeId }, + data: { labels: input.labels }, + }); + }), + + listLabels: protectedProcedure + .input(z.object({ environmentId: z.string() })) + .use(withTeamAccess("VIEWER")) + .query(async ({ input }) => { + const nodes = await prisma.vectorNode.findMany({ + where: { environmentId: input.environmentId }, + select: { labels: true }, + }); + const labelMap: Record> = {}; + for (const node of nodes) { + const labels = (node.labels as Record) ?? {}; + for (const [key, value] of Object.entries(labels)) { + if (!labelMap[key]) labelMap[key] = new Set(); + labelMap[key].add(value); + } + } + return Object.fromEntries( + Object.entries(labelMap).map(([k, v]) => [k, [...v].sort()]), + ); + }), + setMaintenanceMode: protectedProcedure .input( z.object({