diff --git a/docs/public/user-guide/pipeline-editor.md b/docs/public/user-guide/pipeline-editor.md index f5235b67..a5190f5b 100644 --- a/docs/public/user-guide/pipeline-editor.md +++ b/docs/public/user-guide/pipeline-editor.md @@ -101,7 +101,12 @@ When a pipeline is deployed, each node displays live throughput metrics directly Click any node on the canvas to open its configuration in the detail panel on the right. -The panel shows: +The panel has two tabs: + +- **Config** -- The component configuration form (always visible). +- **Live Tail** -- Sample live events flowing through the component (visible only when the pipeline is deployed). See [Live Tail](#live-tail) below. + +The **Config** tab shows: - **Component name and kind** -- The display name, a badge indicating source/transform/sink, and a delete button. - **Component Key** -- A unique identifier for this component within the pipeline (e.g. `traefik_logs`). Must contain only letters, numbers, and underscores. @@ -160,6 +165,46 @@ Toggle the logs panel from the toolbar to view real-time logs from the running p The logs panel only shows data for deployed pipelines. Draft pipelines have no running processes to produce logs. {% endhint %} +## Live Tail + +Live Tail lets you sample real events flowing through any component in a deployed pipeline, directly from the detail panel. This is useful for verifying that data is being ingested, transformed, and routed as expected. + +### How to use Live Tail + +{% stepper %} + +{% step %} +### Deploy the pipeline +Live Tail requires a running pipeline. If the pipeline is still a draft, deploy it first using the **Deploy** button in the toolbar. +{% endstep %} + +{% step %} +### Select a component +Click any node on the canvas to open the detail panel. +{% endstep %} + +{% step %} +### Switch to the Live Tail tab +In the detail panel, click the **Live Tail** tab. This tab only appears for deployed pipelines. +{% endstep %} + +{% step %} +### Sample events +Click **Sample 10 Events** to request a batch of live events from the selected component. The panel polls for results and displays them as they arrive. +{% endstep %} + +{% endstepper %} + +Each sampled event appears as a collapsed row showing a JSON preview. Click any row to expand it and view the full event payload with formatted JSON. + +- Events are shown newest-first, and the panel retains up to 50 events across multiple sampling requests. +- Each sampling request has a two-minute expiry. If no events are captured within that window, the request expires silently. +- You can sample from any component type -- sources, transforms, or sinks. + +{% hint style="info" %} +Live Tail uses the agent's event sampling infrastructure. The agent captures a snapshot of events passing through the selected component and returns them on its next heartbeat. There is no persistent stream -- each click of "Sample 10 Events" triggers a new one-shot capture. +{% endhint %} + ## Pipeline rename Click the pipeline name in the top-left corner of the editor to rename it inline. Press Enter to confirm or Escape to cancel. diff --git a/src/app/(dashboard)/pipelines/[id]/page.tsx b/src/app/(dashboard)/pipelines/[id]/page.tsx index 49bb9bd7..4afb0da3 100644 --- a/src/app/(dashboard)/pipelines/[id]/page.tsx +++ b/src/app/(dashboard)/pipelines/[id]/page.tsx @@ -1,6 +1,6 @@ "use client"; -import { useCallback, useEffect, useMemo, useRef, useState } from "react"; +import { useCallback, useEffect, useRef, useState } from "react"; import { useParams, useRouter } from "next/navigation"; import { useQuery, useMutation, useQueryClient } from "@tanstack/react-query"; import type { NodeMetricsData } from "@/stores/flow-store"; @@ -160,10 +160,8 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) { ); // Lightweight check for recent errors (for toolbar badge) — 24h window - const errorCheckSince = useMemo( + const [errorCheckSince] = useState( () => new Date(Date.now() - 24 * 60 * 60 * 1000), - // eslint-disable-next-line react-hooks/exhaustive-deps - [], ); const recentErrorsQuery = useQuery( trpc.pipeline.logs.queryOptions( @@ -429,7 +427,10 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) {
- + {metricsOpen && (
diff --git a/src/components/flow/detail-panel.tsx b/src/components/flow/detail-panel.tsx index e93a1ad4..8cba6799 100644 --- a/src/components/flow/detail-panel.tsx +++ b/src/components/flow/detail-panel.tsx @@ -1,7 +1,6 @@ "use client"; import { useCallback, useEffect, useMemo, useState } from "react"; -import { useParams } from "next/navigation"; import { Copy, Trash2, Lock, Info } from "lucide-react"; import { useFlowStore } from "@/stores/flow-store"; import { SchemaForm } from "@/components/config-forms/schema-form"; @@ -13,6 +12,8 @@ import { Button } from "@/components/ui/button"; import { Separator } from "@/components/ui/separator"; import { Switch } from "@/components/ui/switch"; import { Badge } from "@/components/ui/badge"; +import { Tabs, TabsContent, TabsList, TabsTrigger } from "@/components/ui/tabs"; +import { LiveTailPanel } from "@/components/flow/live-tail-panel"; import type { VectorComponentDef } from "@/lib/vector/types"; import type { Node, Edge } from "@xyflow/react"; @@ -105,9 +106,12 @@ function getUpstreamSources( /* Component */ /* ------------------------------------------------------------------ */ -export function DetailPanel() { - const params = useParams<{ id: string }>(); - const pipelineId = params?.id; +interface DetailPanelProps { + pipelineId: string; + isDeployed: boolean; +} + +export function DetailPanel({ pipelineId, isDeployed }: DetailPanelProps) { const selectedNodeId = useFlowStore((s) => s.selectedNodeId); const selectedNodeIds = useFlowStore((s) => s.selectedNodeIds); const copySelectedNodes = useFlowStore((s) => s.copySelectedNodes); @@ -230,185 +234,200 @@ export function DetailPanel() { return (
-
-
- {/* ---- System locked banner ---- */} - {isSystemLocked && ( -
- - This source is managed by VectorFlow and cannot be edited. -
- )} - - {/* ---- Header ---- */} - - -
- - {componentDef.displayName} - -
- - {componentDef.kind} - - {isSystemLocked ? ( -
- -
- ) : ( - + {componentDef.kind} + + {isSystemLocked ? ( +
+ +
+ ) : ( + + )} +
+
+
+ + {/* Component Key */} +
+ + handleKeyChange(e.target.value)} + disabled={isSystemLocked} + /> +

+ Letters, numbers, and underscores only (e.g. traefik_logs) +

+
+ + {/* Enabled toggle */} +
+ + { + if (selectedNodeId) toggleNodeDisabled(selectedNodeId); + }} + disabled={isSystemLocked} + /> +
+ + {/* Component Type (read-only) */} +
+ +

{componentDef.type}

+
+
+
+ + + + {/* ---- Configuration ---- */} +
+

Configuration

+ + {isSystemLocked ? ( + /* Read-only config display for locked nodes */ +
+ {Object.entries(config).map(([key, value]) => ( +
+ +

+ {typeof value === "object" ? JSON.stringify(value) : String(value ?? "")} +

+
+ ))} + {Object.keys(config).length === 0 && ( +

No configuration

)}
-
- - - {/* Component Key */} -
- - handleKeyChange(e.target.value)} - disabled={isSystemLocked} - /> -

- Letters, numbers, and underscores only (e.g. traefik_logs) -

-
+ ) : ( + <> + {/* VRL Editor for remap source field */} + {componentDef.type === "remap" && ( +
+ + handleConfigChange({ ...config, source: v })} + sourceTypes={upstream.sourceTypes} + pipelineId={pipelineId} + upstreamSourceKeys={upstream.sourceKeys} + /> +
+ )} - {/* Enabled toggle */} -
- - { - if (selectedNodeId) toggleNodeDisabled(selectedNodeId); - }} - disabled={isSystemLocked} - /> -
+ {/* VRL Editor for filter condition field */} + {componentDef.type === "filter" && ( +
+ + handleConfigChange({ ...config, condition: v })} + sourceTypes={upstream.sourceTypes} + pipelineId={pipelineId} + upstreamSourceKeys={upstream.sourceKeys} + /> +
+ )} - {/* Component Type (read-only) */} -
- -

{componentDef.type}

-
-
- - - - - {/* ---- Configuration ---- */} -
-

Configuration

- - {isSystemLocked ? ( - /* Read-only config display for locked nodes */ -
- {Object.entries(config).map(([key, value]) => ( -
- -

- {typeof value === "object" ? JSON.stringify(value) : String(value ?? "")} -

-
- ))} - {Object.keys(config).length === 0 && ( -

No configuration

- )} -
- ) : ( - <> - {/* VRL Editor for remap source field */} - {componentDef.type === "remap" && ( -
- - handleConfigChange({ ...config, source: v })} - sourceTypes={upstream.sourceTypes} - pipelineId={pipelineId} - upstreamSourceKeys={upstream.sourceKeys} - /> -
- )} - - {/* VRL Editor for filter condition field */} - {componentDef.type === "filter" && ( -
- - handleConfigChange({ ...config, condition: v })} - sourceTypes={upstream.sourceTypes} - pipelineId={pipelineId} - upstreamSourceKeys={upstream.sourceKeys} - /> -
- )} - - {/* VRL Editors for route conditions */} - {componentDef.type === "route" && ( -
- - {Object.entries( - (config.route as Record) ?? {}, - ).map(([routeName, condition]) => ( -
- - - handleConfigChange({ - ...config, - route: { - ...((config.route as Record) ?? {}), - [routeName]: v, - }, - }) - } - height="120px" - sourceTypes={upstream.sourceTypes} - pipelineId={pipelineId} - upstreamSourceKeys={upstream.sourceKeys} - /> -
- ))} -
- )} - - {/* Standard schema form for remaining fields (exclude VRL-managed fields) */} - >; - required?: string[]; - }, - componentDef.type, + {/* VRL Editors for route conditions */} + {componentDef.type === "route" && ( +
+ + {Object.entries( + (config.route as Record) ?? {}, + ).map(([routeName, condition]) => ( +
+ + + handleConfigChange({ + ...config, + route: { + ...((config.route as Record) ?? {}), + [routeName]: v, + }, + }) + } + height="120px" + sourceTypes={upstream.sourceTypes} + pipelineId={pipelineId} + upstreamSourceKeys={upstream.sourceKeys} + /> +
+ ))} +
)} - values={config} - onChange={handleConfigChange} - /> - - )} + + {/* Standard schema form for remaining fields (exclude VRL-managed fields) */} + >; + required?: string[]; + }, + componentDef.type, + )} + values={config} + onChange={handleConfigChange} + /> + + )} +
-
-
+ + + + + +
); } diff --git a/src/components/flow/live-tail-panel.tsx b/src/components/flow/live-tail-panel.tsx new file mode 100644 index 00000000..de0bf893 --- /dev/null +++ b/src/components/flow/live-tail-panel.tsx @@ -0,0 +1,193 @@ +"use client"; + +import { useCallback, useRef, useState } from "react"; +import { useQuery, useMutation } from "@tanstack/react-query"; +import { useTRPC } from "@/trpc/client"; +import { Button } from "@/components/ui/button"; +import { Loader2, Play, ChevronDown, ChevronRight } from "lucide-react"; + +interface LiveTailPanelProps { + pipelineId: string; + componentKey: string; + isDeployed: boolean; +} + +export function LiveTailPanel({ pipelineId, componentKey, isDeployed }: LiveTailPanelProps) { + const trpc = useTRPC(); + const [requestId, setRequestId] = useState(null); + const [events, setEvents] = useState>([]); + const [hasExpired, setHasExpired] = useState(false); + const [completedEmpty, setCompletedEmpty] = useState(false); + + // Track which requestId we've already processed to avoid double-processing + const processedRequestRef = useRef(null); + + // Reset events and any in-flight request when the selected component changes + const [prevComponentKey, setPrevComponentKey] = useState(componentKey); + if (prevComponentKey !== componentKey) { + setPrevComponentKey(componentKey); + setEvents([]); + setRequestId(null); + setHasExpired(false); + setCompletedEmpty(false); + } + + const processResults = useCallback((data: { status: string; samples?: Array<{ componentKey: string; events: unknown }> }) => { + if (!requestId || processedRequestRef.current === requestId) return; + if (data.status !== "COMPLETED" || !data.samples) { + if (data.status === "EXPIRED") { + setHasExpired(true); + processedRequestRef.current = requestId; + setRequestId(null); // allow clean re-sample + } + return; + } + + const newEvents = data.samples + .filter((s) => s.componentKey === componentKey) + .flatMap((s) => { + const evts = (s.events as unknown[]) ?? []; + return evts.map((e) => ({ data: e, expanded: false })); + }); + + if (newEvents.length > 0) { + setEvents((prev) => [...newEvents, ...prev].slice(0, 50)); + setCompletedEmpty(false); + } else { + setCompletedEmpty(true); + } + + processedRequestRef.current = requestId; + setRequestId(null); // Stop polling + }, [requestId, componentKey]); + + // Request samples mutation + const requestMutation = useMutation( + trpc.pipeline.requestSamples.mutationOptions({ + onSuccess: (result) => { + setRequestId(result.requestId); + }, + }) + ); + + // Poll for results when we have a requestId + const resultQuery = useQuery({ + ...trpc.pipeline.sampleResult.queryOptions({ requestId: requestId! }), + enabled: !!requestId, + select: (data) => { + // Process results via select callback (not in an effect) + processResults(data); + return data; + }, + refetchInterval: (query) => { + const data = query.state.data; + if (data?.status === "COMPLETED" || data?.status === "ERROR" || data?.status === "EXPIRED") return false; + return 1000; // Poll every second while pending + }, + }); + + const handleSample = useCallback(() => { + processedRequestRef.current = null; + setRequestId(null); + setHasExpired(false); + setCompletedEmpty(false); + requestMutation.mutate({ + pipelineId, + componentKeys: [componentKey], + limit: 10, + }); + }, [pipelineId, componentKey, requestMutation]); + + const toggleExpand = useCallback((index: number) => { + setEvents((prev) => + prev.map((e, i) => (i === index ? { ...e, expanded: !e.expanded } : e)) + ); + }, []); + + if (!isDeployed) { + return ( +
+ Pipeline must be deployed to sample live events. +
+ ); + } + + const isPending = requestMutation.isPending || (!!requestId && (resultQuery.isFetching || resultQuery.data?.status === "PENDING")); + + return ( +
+
+ Live Events + +
+ + {resultQuery.data?.status === "ERROR" && ( +
+ Sampling failed. The component may not be producing events. +
+ )} + + {hasExpired && events.length === 0 && !isPending && ( +
+ Sampling timed out — no events were captured. Try again or check that the component is receiving data. +
+ )} + + {completedEmpty && events.length === 0 && !isPending && !hasExpired && ( +
+ No matching events found for this component. It may not have received data during the sampling window. +
+ )} + + {events.length === 0 && !isPending && !hasExpired && !completedEmpty && ( +
+ Click "Sample 10 Events" to see data flowing through this component. +
+ )} + +
+ {events.map((event, i) => ( +
+ + {event.expanded && ( +
+                {JSON.stringify(event.data, null, 2)}
+              
+ )} +
+ ))} +
+ + {events.length > 0 && ( +
+ Showing {events.length} event{events.length !== 1 ? "s" : ""} +
+ )} +
+ ); +}