diff --git a/prisma/migrations/20260308000000_add_pipeline_version_snapshots/migration.sql b/prisma/migrations/20260308000000_add_pipeline_version_snapshots/migration.sql new file mode 100644 index 00000000..96a87ec8 --- /dev/null +++ b/prisma/migrations/20260308000000_add_pipeline_version_snapshots/migration.sql @@ -0,0 +1,3 @@ +-- AlterTable +ALTER TABLE "PipelineVersion" ADD COLUMN "edgesSnapshot" JSONB, +ADD COLUMN "nodesSnapshot" JSONB; diff --git a/prisma/schema.prisma b/prisma/schema.prisma index c530080b..89121e42 100644 --- a/prisma/schema.prisma +++ b/prisma/schema.prisma @@ -379,6 +379,8 @@ model PipelineVersion { configToml String? logLevel String? globalConfig Json? + nodesSnapshot Json? + edgesSnapshot Json? createdById String changelog String? createdAt DateTime @default(now()) diff --git a/src/app/(dashboard)/analytics/page.tsx b/src/app/(dashboard)/analytics/page.tsx index b1fc39c4..35b23489 100644 --- a/src/app/(dashboard)/analytics/page.tsx +++ b/src/app/(dashboard)/analytics/page.tsx @@ -3,7 +3,7 @@ import { useState } from "react"; import { useQuery } from "@tanstack/react-query"; import { useTRPC } from "@/trpc/client"; -import { ArrowUp, ArrowDown, Minus, BarChart3 } from "lucide-react"; +import { ArrowUp, ArrowDown, Minus, BarChart3, Info } from "lucide-react"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Table, @@ -23,6 +23,7 @@ import { AreaChart, Area, XAxis, YAxis, CartesianGrid } from "recharts"; import { useEnvironmentStore } from "@/stores/environment-store"; import { formatBytes, formatTimeAxis } from "@/lib/format"; import { cn } from "@/lib/utils"; +import { Tooltip, TooltipTrigger, TooltipContent } from "@/components/ui/tooltip"; type VolumeRange = "1h" | "6h" | "1d" | "7d" | "30d"; @@ -41,9 +42,10 @@ interface PipelineRow { eventsIn: number; eventsOut: number; reduction: number; + eventsReduced: number; } -type SortKey = "pipelineName" | "bytesIn" | "bytesOut" | "reduction"; +type SortKey = "pipelineName" | "bytesIn" | "bytesOut" | "reduction" | "eventsReduced"; type SortDir = "asc" | "desc"; export default function AnalyticsPage() { @@ -76,6 +78,23 @@ export default function AnalyticsPage() { ? reductionPercent - prevReductionPercent : null; + // Event-based reduction (matches pipelines table formula, clamped at 0%) + const totalEventsIn = Number(data?.current._sum.eventsIn ?? 0); + const totalEventsOut = Number(data?.current._sum.eventsOut ?? 0); + const eventsReducedPercent = totalEventsIn > 0 ? Math.max(0, (1 - totalEventsOut / totalEventsIn) * 100) : null; + + const prevEventsIn = Number(data?.previous._sum.eventsIn ?? 0); + const prevEventsOut = Number(data?.previous._sum.eventsOut ?? 0); + const prevEventsReducedPercent = prevEventsIn > 0 ? Math.max(0, (1 - prevEventsOut / prevEventsIn) * 100) : null; + const eventsReducedDelta = + eventsReducedPercent != null && prevEventsReducedPercent != null + ? eventsReducedPercent - prevEventsReducedPercent + : null; + + // Rename bytes vars for clarity + const bytesSavedPercent = reductionPercent; + const bytesSavedDelta = reductionDelta; + const bytesInTrend = trendPercent(totalBytesIn, prevBytesIn); const bytesOutTrend = trendPercent(totalBytesOut, prevBytesOut); @@ -96,9 +115,10 @@ export default function AnalyticsPage() { // Per-pipeline table with sorting const sortedPipelines = (() => { if (!data?.perPipeline) return []; - const rows: PipelineRow[] = data.perPipeline.map((p: Omit) => ({ + const rows: PipelineRow[] = data.perPipeline.map((p: Omit) => ({ ...p, reduction: p.bytesIn > 0 ? (1 - p.bytesOut / p.bytesIn) * 100 : 0, + eventsReduced: p.eventsIn > 0 ? Math.max(0, (1 - p.eventsOut / p.eventsIn) * 100) : 0, })); return rows.sort((a: PipelineRow, b: PipelineRow) => { const aVal = a[sortKey]; @@ -166,7 +186,7 @@ export default function AnalyticsPage() { {/* KPI Cards */} -
+
{/* Total In */} @@ -205,31 +225,58 @@ export default function AnalyticsPage() { - {/* Reduction % */} + {/* Events Reduced */}
-

Reduction

- +

Events Reduced

+

= 50 + eventsReducedPercent != null && eventsReducedPercent > 50 ? "text-green-600 dark:text-green-400" - : reductionPercent != null && reductionPercent >= 20 + : eventsReducedPercent != null && eventsReducedPercent > 10 ? "text-amber-600 dark:text-amber-400" - : reductionPercent != null - ? "text-red-600 dark:text-red-400" - : "text-muted-foreground", + : "text-muted-foreground", )} > - {reductionPercent != null ? `${reductionPercent.toFixed(1)}%` : "--"} + {eventsReducedPercent != null ? `${eventsReducedPercent.toFixed(1)}%` : "--"}

- {reductionDelta != null && ( + {eventsReducedDelta != null && (

- {reductionDelta >= 0 ? "+" : ""} - {reductionDelta.toFixed(1)} pp vs previous period + {eventsReducedDelta >= 0 ? "+" : ""} + {eventsReducedDelta.toFixed(1)} pp vs last period +

+ )} +
+
+ + {/* Bytes Saved */} + + +
+
+

Bytes Saved

+ + + + + + Total bytes saved including sink compression and encoding + + +
+ +
+

+ {bytesSavedPercent != null ? `${bytesSavedPercent.toFixed(1)}%` : "--"} +

+ {bytesSavedDelta != null && ( +

+ {bytesSavedDelta >= 0 ? "+" : ""} + {bytesSavedDelta.toFixed(1)} pp vs last period

)}
@@ -349,11 +396,17 @@ export default function AnalyticsPage() { > Bytes Out{sortIndicator("bytesOut")} + toggleSort("eventsReduced")} + > + Events Reduced{sortIndicator("eventsReduced")} + toggleSort("reduction")} > - Reduction %{sortIndicator("reduction")} + Bytes Saved{sortIndicator("reduction")} @@ -367,6 +420,26 @@ export default function AnalyticsPage() { {formatBytes(p.bytesOut)} + +
+
+
50 + ? "bg-green-500" + : p.eventsReduced > 10 + ? "bg-amber-500" + : "bg-muted-foreground/30", + )} + style={{ width: `${Math.max(0, Math.min(100, p.eventsReduced))}%` }} + /> +
+ + {p.eventsReduced.toFixed(1)}% + +
+
diff --git a/src/app/(dashboard)/layout.tsx b/src/app/(dashboard)/layout.tsx index 3aab0dff..ef0eea75 100644 --- a/src/app/(dashboard)/layout.tsx +++ b/src/app/(dashboard)/layout.tsx @@ -113,7 +113,7 @@ export default function DashboardLayout({ -
+
{children} diff --git a/src/app/(dashboard)/page.tsx b/src/app/(dashboard)/page.tsx index 600854e2..dd0dc0dc 100644 --- a/src/app/(dashboard)/page.tsx +++ b/src/app/(dashboard)/page.tsx @@ -201,7 +201,7 @@ export default function DashboardPage() {
@@ -459,6 +476,28 @@ function PipelineBuilderInner({ pipelineId }: { pipelineId: string }) { setUndeployOpen(false); }} /> + + + + Discard unsaved changes? + + This will revert the pipeline to its last deployed state. Any saved changes that haven't been deployed will be lost. + + + + + + + +
); } diff --git a/src/app/(dashboard)/settings/page.tsx b/src/app/(dashboard)/settings/page.tsx index 57bf8cb1..f0b76672 100644 --- a/src/app/(dashboard)/settings/page.tsx +++ b/src/app/(dashboard)/settings/page.tsx @@ -31,6 +31,7 @@ import { AlertTriangle, Clock, Link2, + Info, } from "lucide-react"; import { Button } from "@/components/ui/button"; @@ -343,42 +344,22 @@ function AuditLogShippingSection() { Configure sinks - {isDeployed ? ( - - ) : ( - - )} + /> + + {isToggling ? (isDeployed ? "Disabling..." : "Enabling...") : (isDeployed ? "Active" : "Disabled")} + +
) : (
@@ -1664,7 +1645,13 @@ function TeamSettings() { Add an existing user to the team by their email address. - + + {(settingsQuery.data?.scimEnabled || settingsQuery.data?.oidcGroupSyncEnabled) && ( +
+ + SSO users are managed by your identity provider. Only local users can be added manually. +
+ )}
diff --git a/src/app/api/agent/enroll/route.ts b/src/app/api/agent/enroll/route.ts index d2e2ab92..c0bd9ac0 100644 --- a/src/app/api/agent/enroll/route.ts +++ b/src/app/api/agent/enroll/route.ts @@ -2,6 +2,7 @@ import { NextResponse } from "next/server"; import { z } from "zod"; import { prisma } from "@/lib/prisma"; import { verifyEnrollmentToken, generateNodeToken } from "@/server/services/agent-token"; +import { debugLog } from "@/lib/logger"; const enrollSchema = z.object({ token: z.string().min(1), @@ -26,7 +27,7 @@ export async function POST(request: Request) { const { token, hostname, os, agentVersion, vectorVersion } = parsed.data; const safeHostname = hostname.replace(/[\r\n\t"]/g, " "); const safeVersion = (agentVersion ?? "unknown").replace(/[\r\n\t"]/g, " "); - console.log(`[enroll] attempt from hostname="${safeHostname}" agentVersion="${safeVersion}"`); + debugLog("enroll", `attempt from hostname="${safeHostname}" agentVersion="${safeVersion}"`); // Find all environments that have an enrollment token const environments = await prisma.environment.findMany({ @@ -41,7 +42,7 @@ export async function POST(request: Request) { team: { select: { id: true } }, }, }); - console.log(`[enroll] found ${environments.length} candidate environment(s)`); + debugLog("enroll", `found ${environments.length} candidate environment(s)`); // Try each environment's enrollment token let matchedEnv: (typeof environments)[0] | null = null; @@ -79,7 +80,7 @@ export async function POST(request: Request) { metadata: { enrolledVia: "agent" }, }, }); - console.log(`[enroll] SUCCESS -- node ${node.id} enrolled in "${matchedEnv.name}"`); + debugLog("enroll", `SUCCESS -- node ${node.id} enrolled in "${matchedEnv.name}"`); return NextResponse.json({ nodeId: node.id, diff --git a/src/app/api/scim/v2/Groups/[id]/route.ts b/src/app/api/scim/v2/Groups/[id]/route.ts index 37b5bd47..51714f18 100644 --- a/src/app/api/scim/v2/Groups/[id]/route.ts +++ b/src/app/api/scim/v2/Groups/[id]/route.ts @@ -1,6 +1,7 @@ import { NextRequest, NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; import { writeAuditLog } from "@/server/services/audit"; +import { debugLog } from "@/lib/logger"; import { authenticateScim } from "../../auth"; import { reconcileUserTeamMemberships, @@ -75,6 +76,7 @@ export async function PATCH( try { const body = await req.json(); const operations = body.Operations ?? body.operations ?? []; + debugLog("scim", `PATCH /Groups/${id}`, { displayName: group.displayName, operations: operations.map((o: { op: string; path?: string; value?: unknown }) => ({ op: o.op, path: o.path, valueType: typeof o.value, valueLength: Array.isArray(o.value) ? o.value.length : undefined })) }); await prisma.$transaction(async (tx) => { for (const op of operations) { @@ -211,6 +213,12 @@ export async function PUT( try { const body = await req.json(); + const membersProvided = Array.isArray(body.members); + debugLog("scim", `PUT /Groups/${id}`, { + displayName: body.displayName, + membersProvided, + memberCount: membersProvided ? body.members.length : undefined, + }); await prisma.$transaction(async (tx) => { // Update displayName if provided @@ -223,7 +231,7 @@ export async function PUT( // Full member sync: compute desired set, diff against current const desiredUserIds = new Set(); - if (body.members && Array.isArray(body.members)) { + if (membersProvided) { for (const m of body.members) { const userId = (m as { value?: unknown }).value; if (typeof userId !== "string") continue; @@ -247,10 +255,22 @@ export async function PUT( } } - // Remove absent members - for (const member of currentMembers) { - if (!desiredUserIds.has(member.userId)) { - await tx.scimGroupMember.delete({ where: { id: member.id } }); + // Skip member removal when the incoming member list is empty but + // current members exist. Many SCIM providers (e.g. pocket-id) send + // intermediate PUTs with members:[] during background sync before + // re-adding members individually — treating this as "remove all" causes + // destructive churn (team memberships deleted then re-created). + const skipRemoval = desiredUserIds.size === 0 && currentMembers.length > 0; + if (skipRemoval) { + debugLog("scim", `PUT /Groups/${id}: skipping member removal (empty incoming list with ${currentMembers.length} current members)`); + } + + if (!skipRemoval) { + // Remove absent members + for (const member of currentMembers) { + if (!desiredUserIds.has(member.userId)) { + await tx.scimGroupMember.delete({ where: { id: member.id } }); + } } } diff --git a/src/app/api/scim/v2/Groups/route.ts b/src/app/api/scim/v2/Groups/route.ts index 1d646503..2302416d 100644 --- a/src/app/api/scim/v2/Groups/route.ts +++ b/src/app/api/scim/v2/Groups/route.ts @@ -1,6 +1,7 @@ import { NextRequest, NextResponse } from "next/server"; import { prisma } from "@/lib/prisma"; import { writeAuditLog } from "@/server/services/audit"; +import { debugLog } from "@/lib/logger"; import { authenticateScim } from "../auth"; import { reconcileUserTeamMemberships, @@ -93,6 +94,7 @@ export async function POST(req: NextRequest) { try { const body = await req.json(); + debugLog("scim", `POST /Groups`, { displayName: body.displayName, memberCount: Array.isArray(body.members) ? body.members.length : 0 }); const displayName = body.displayName; if (!displayName || typeof displayName !== "string") { return scimError("displayName is required", 400); diff --git a/src/auth.ts b/src/auth.ts index 06f35a76..bf88d8a6 100644 --- a/src/auth.ts +++ b/src/auth.ts @@ -8,6 +8,7 @@ import { encrypt, decrypt } from "@/server/services/crypto"; import { verifyTotpCode, verifyBackupCode } from "@/server/services/totp"; import { authConfig } from "@/auth.config"; import { writeAuditLog } from "@/server/services/audit"; +import { debugLog } from "@/lib/logger"; import { headers } from "next/headers"; async function getClientIp(): Promise { @@ -253,7 +254,7 @@ async function getAuthInstance() { if (settings?.oidcGroupSyncEnabled) { const groupsClaim = settings.oidcGroupsClaim ?? "groups"; const tokenGroups = (profileData?.[groupsClaim] as string[] | undefined) ?? []; - console.log(`[oidc] User ${user.email} groups (claim "${groupsClaim}"):`, tokenGroups); + debugLog("oidc", `User ${user.email} groups (claim "${groupsClaim}"):`, tokenGroups); let userGroupNames: string[]; @@ -271,7 +272,7 @@ async function getAuthInstance() { userGroupNames = tokenGroups; } - console.log(`[oidc] User ${user.email} scimEnabled=${settings.scimEnabled}, final groups:`, userGroupNames); + debugLog("oidc", `User ${user.email} scimEnabled=${settings.scimEnabled}, final groups:`, userGroupNames); const { reconcileUserTeamMemberships } = await import("@/server/services/group-mappings"); await prisma.$transaction(async (tx) => { await reconcileUserTeamMemberships(tx, dbUser.id, userGroupNames); diff --git a/src/components/flow/detail-panel.tsx b/src/components/flow/detail-panel.tsx index 2159a7df..2ec10d90 100644 --- a/src/components/flow/detail-panel.tsx +++ b/src/components/flow/detail-panel.tsx @@ -224,7 +224,7 @@ export function DetailPanel({ pipelineId, isDeployed }: DetailPanelProps) { ); } - const { componentDef, componentKey, config, disabled, isSystemLocked } = selectedNode.data as { + const { componentDef, config, disabled, isSystemLocked } = selectedNode.data as { componentDef: VectorComponentDef; componentKey: string; // used via displayKey/storeKey above config: Record; diff --git a/src/components/flow/flow-toolbar.tsx b/src/components/flow/flow-toolbar.tsx index 085048a5..4297e2af 100644 --- a/src/components/flow/flow-toolbar.tsx +++ b/src/components/flow/flow-toolbar.tsx @@ -71,6 +71,7 @@ interface FlowToolbarProps { hasRecentErrors?: boolean; processStatus?: ProcessStatusValue | null; gitOpsMode?: string; + onDiscardChanges?: () => void; } function downloadFile(content: string, filename: string) { @@ -101,6 +102,7 @@ export function FlowToolbar({ hasRecentErrors = false, processStatus, gitOpsMode, + onDiscardChanges, }: FlowToolbarProps) { const globalConfig = useFlowStore((s) => s.globalConfig); const canUndo = useFlowStore((s) => s.canUndo); @@ -128,6 +130,8 @@ export function FlowToolbar({ ), ); const healthStatus = healthQuery.data?.status ?? null; + const sliTotal = healthQuery.data?.slis?.length ?? 0; + const slisBreached = healthQuery.data?.slis?.filter((s: { status: string }) => s.status === "breached").length ?? 0; // Query pending deploy requests for this pipeline const pendingRequestsQuery = useQuery({ @@ -466,22 +470,18 @@ export function FlowToolbar({ {processStatus === "CRASHED" && "Crashed"} {processStatus === "PENDING" && "Pending..."} - {/* Health SLI indicator dot */} + {/* Health SLI badge */} {healthStatus === "healthy" && ( - - - - - All SLIs met - + + + SLIs: OK + )} {healthStatus === "degraded" && ( - - - - - One or more SLIs breached - + + + SLIs: {slisBreached}/{sliTotal} breached + )}
)} @@ -532,6 +532,22 @@ export function FlowToolbar({ Changes detected — deploy to update + {onDiscardChanges && ( + + + + + Revert to last deployed state + + )}
diff --git a/src/components/pipeline/version-history-dialog.tsx b/src/components/pipeline/version-history-dialog.tsx index 17b780af..890acaa4 100644 --- a/src/components/pipeline/version-history-dialog.tsx +++ b/src/components/pipeline/version-history-dialog.tsx @@ -144,7 +144,7 @@ export function VersionHistoryDialog({ Version - Changelog + Changelog Created Actions @@ -171,8 +171,8 @@ export function VersionHistoryDialog({ )}
- - + + {version.changelog || "No changelog"} diff --git a/src/lib/logger.ts b/src/lib/logger.ts new file mode 100644 index 00000000..d539426f --- /dev/null +++ b/src/lib/logger.ts @@ -0,0 +1,19 @@ +const level = (process.env.VF_LOG_LEVEL ?? process.env.LOG_LEVEL ?? "info").toLowerCase(); +const isDebug = level === "debug" || level === "trace"; + +function sanitizeLogString(value: string): string { + // Remove newline and carriage return characters to prevent log injection + return value.replace(/[\r\n]/g, ""); +} + +export function debugLog(tag: string, message: string, data?: unknown): void { + if (!isDebug) return; + const ts = new Date().toISOString(); + const safeTag = sanitizeLogString(tag); + const safeMessage = sanitizeLogString(message); + if (data !== undefined) { + console.log("%s [%s] %s", ts, safeTag, safeMessage, data); + } else { + console.log("%s [%s] %s", ts, safeTag, safeMessage); + } +} diff --git a/src/server/routers/admin.ts b/src/server/routers/admin.ts index 5056bbcb..c7a8c8c8 100644 --- a/src/server/routers/admin.ts +++ b/src/server/routers/admin.ts @@ -6,6 +6,7 @@ import { router, protectedProcedure, requireSuperAdmin } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; import { withAudit } from "@/server/middleware/audit"; import { writeAuditLog } from "@/server/services/audit"; +import { assertManualAssignmentAllowed } from "@/server/routers/team"; export const adminRouter = router({ /** List all platform users with their team memberships */ @@ -44,6 +45,8 @@ export const adminRouter = router({ role: z.enum(["VIEWER", "EDITOR", "ADMIN"]), })) .mutation(async ({ input }) => { + await assertManualAssignmentAllowed(input.userId); + const existing = await prisma.teamMember.findUnique({ where: { userId_teamId: { userId: input.userId, teamId: input.teamId } }, }); diff --git a/src/server/routers/pipeline.ts b/src/server/routers/pipeline.ts index fce67850..a0a1d874 100644 --- a/src/server/routers/pipeline.ts +++ b/src/server/routers/pipeline.ts @@ -3,7 +3,7 @@ import { z } from "zod"; import { TRPCError } from "@trpc/server"; import { router, protectedProcedure, withTeamAccess, requireSuperAdmin } from "@/trpc/init"; import { prisma } from "@/lib/prisma"; -import { ComponentKind, LogLevel } from "@/generated/prisma"; +import { ComponentKind, LogLevel, Prisma } from "@/generated/prisma"; import { withAudit } from "@/server/middleware/audit"; import { createVersion, @@ -759,6 +759,89 @@ export const pipelineRouter = router({ }); }), + discardChanges: protectedProcedure + .input(z.object({ pipelineId: z.string() })) + .use(withTeamAccess("EDITOR")) + .use(withAudit("pipeline.changes_discarded", "Pipeline")) + .mutation(async ({ input }) => { + const pipeline = await prisma.pipeline.findUnique({ + where: { id: input.pipelineId }, + select: { isDraft: true, deployedAt: true }, + }); + if (!pipeline) { + throw new TRPCError({ code: "NOT_FOUND", message: "Pipeline not found" }); + } + if (pipeline.isDraft || !pipeline.deployedAt) { + throw new TRPCError({ + code: "PRECONDITION_FAILED", + message: "Cannot discard changes on a pipeline that has never been deployed", + }); + } + + const latestVersion = await prisma.pipelineVersion.findFirst({ + where: { pipelineId: input.pipelineId }, + orderBy: { version: "desc" }, + }); + if (!latestVersion) { + throw new TRPCError({ code: "PRECONDITION_FAILED", message: "No deployed version found" }); + } + if (!latestVersion.nodesSnapshot || !latestVersion.edgesSnapshot) { + throw new TRPCError({ + code: "PRECONDITION_FAILED", + message: "Deployed version has no snapshot — deploy once more to enable discard", + }); + } + + const nodes = latestVersion.nodesSnapshot as Array>; + const edges = latestVersion.edgesSnapshot as Array>; + + await prisma.$transaction(async (tx) => { + await tx.pipeline.update({ + where: { id: input.pipelineId }, + data: { + globalConfig: latestVersion.globalConfig as Prisma.InputJsonValue ?? undefined, + }, + }); + + await tx.pipelineEdge.deleteMany({ where: { pipelineId: input.pipelineId } }); + await tx.pipelineNode.deleteMany({ where: { pipelineId: input.pipelineId } }); + + await Promise.all( + nodes.map((node) => + tx.pipelineNode.create({ + data: { + id: node.id as string, + pipelineId: input.pipelineId, + componentKey: node.componentKey as string, + componentType: node.componentType as string, + kind: node.kind as ComponentKind, + config: node.config as Prisma.InputJsonValue, + positionX: node.positionX as number, + positionY: node.positionY as number, + disabled: (node.disabled as boolean) ?? false, + }, + }) + ) + ); + + await Promise.all( + edges.map((edge) => + tx.pipelineEdge.create({ + data: { + id: edge.id as string, + pipelineId: input.pipelineId, + sourceNodeId: edge.sourceNodeId as string, + targetNodeId: edge.targetNodeId as string, + sourcePort: (edge.sourcePort as string) ?? null, + }, + }) + ) + ); + }); + + return { discarded: true }; + }), + versions: protectedProcedure .input(z.object({ pipelineId: z.string() })) .use(withTeamAccess("VIEWER")) @@ -782,15 +865,39 @@ export const pipelineRouter = router({ } const pipeline = await prisma.pipeline.findUnique({ where: { id: input.pipelineId }, - select: { globalConfig: true }, + select: { globalConfig: true, nodes: true, edges: true }, }); - const logLevel = (pipeline?.globalConfig as Record)?.log_level as string ?? null; + if (!pipeline) { + throw new TRPCError({ code: "NOT_FOUND", message: "Pipeline not found" }); + } + const logLevel = (pipeline.globalConfig as Record)?.log_level as string ?? null; + + const nodesSnapshot = pipeline.nodes.map((n) => ({ + id: n.id, + componentKey: n.componentKey, + componentType: n.componentType, + kind: n.kind, + config: n.config, + positionX: n.positionX, + positionY: n.positionY, + disabled: n.disabled, + })); + const edgesSnapshot = pipeline.edges.map((e) => ({ + id: e.id, + sourceNodeId: e.sourceNodeId, + targetNodeId: e.targetNodeId, + sourcePort: e.sourcePort, + })); + return createVersion( input.pipelineId, input.configYaml, userId, input.changelog, logLevel, + pipeline.globalConfig as Record | null, + nodesSnapshot, + edgesSnapshot, ); }), diff --git a/src/server/routers/team.ts b/src/server/routers/team.ts index e98294f9..f70c441d 100644 --- a/src/server/routers/team.ts +++ b/src/server/routers/team.ts @@ -6,6 +6,35 @@ import bcrypt from "bcryptjs"; import crypto from "crypto"; import { withAudit } from "@/server/middleware/audit"; +/** + * Block manual team assignment/role changes for OIDC users when their + * memberships are managed by an identity provider (SCIM or OIDC group sync). + * Flat SSO deployments (OIDC without group sync) allow manual assignment. + */ +export async function assertManualAssignmentAllowed(userId: string): Promise { + const user = await prisma.user.findUnique({ + where: { id: userId }, + select: { authMethod: true }, + }); + if (!user) { + throw new TRPCError({ code: "NOT_FOUND", message: "User not found" }); + } + if (user.authMethod !== "OIDC") return; + + const settings = await prisma.systemSettings.findUnique({ + where: { id: "singleton" }, + select: { scimEnabled: true, oidcGroupSyncEnabled: true }, + }); + if (settings?.scimEnabled || settings?.oidcGroupSyncEnabled) { + throw new TRPCError({ + code: "BAD_REQUEST", + message: + "This user's team membership is managed by your identity provider. " + + "Update their group assignments in your IdP instead.", + }); + } +} + export const teamRouter = router({ /** Get the current user's highest role across all teams */ myRole: protectedProcedure.query(async ({ ctx }) => { @@ -169,6 +198,8 @@ export const teamRouter = router({ }); } + await assertManualAssignmentAllowed(user.id); + const existing = await prisma.teamMember.findUnique({ where: { userId_teamId: { userId: user.id, teamId: input.teamId } }, }); @@ -228,7 +259,6 @@ export const teamRouter = router({ .mutation(async ({ input }) => { const member = await prisma.teamMember.findUnique({ where: { userId_teamId: { userId: input.userId, teamId: input.teamId } }, - include: { user: { select: { authMethod: true, scimExternalId: true } } }, }); if (!member) { throw new TRPCError({ @@ -236,12 +266,7 @@ export const teamRouter = router({ message: "Team member not found", }); } - if (member.user.authMethod === "OIDC" || member.user.scimExternalId) { - throw new TRPCError({ - code: "FORBIDDEN", - message: "Role is managed by identity provider and cannot be changed manually", - }); - } + await assertManualAssignmentAllowed(input.userId); return prisma.teamMember.update({ where: { id: member.id }, data: { role: input.role }, diff --git a/src/server/services/deploy-agent.ts b/src/server/services/deploy-agent.ts index f431394e..d5882cdc 100644 --- a/src/server/services/deploy-agent.ts +++ b/src/server/services/deploy-agent.ts @@ -103,6 +103,24 @@ export async function deployAgent( // 3. Create pipeline version (also marks pipeline as deployed) const gc = pipeline.globalConfig as Record | null; const logLevel = (gc?.log_level as string) ?? null; + + const nodesSnapshot = pipeline.nodes.map((n) => ({ + id: n.id, + componentKey: n.componentKey, + componentType: n.componentType, + kind: n.kind, + config: n.config, + positionX: n.positionX, + positionY: n.positionY, + disabled: n.disabled, + })); + const edgesSnapshot = pipeline.edges.map((e) => ({ + id: e.id, + sourceNodeId: e.sourceNodeId, + targetNodeId: e.targetNodeId, + sourcePort: e.sourcePort, + })); + const version = await createVersion( pipelineId, configYamlBuilder ?? configYaml, @@ -110,6 +128,8 @@ export async function deployAgent( changelog ?? (pipeline.isSystem ? "Deployed via system vector" : "Deployed via agent mode"), logLevel, gc, + nodesSnapshot, + edgesSnapshot, ); // 3b. Git sync (non-blocking side effect) diff --git a/src/server/services/group-mappings.ts b/src/server/services/group-mappings.ts index d5ba8284..ac7e7df0 100644 --- a/src/server/services/group-mappings.ts +++ b/src/server/services/group-mappings.ts @@ -1,4 +1,5 @@ import { prisma } from "@/lib/prisma"; +import { debugLog } from "@/lib/logger"; export interface GroupMapping { group: string; @@ -56,7 +57,7 @@ export async function reconcileUserTeamMemberships( userGroupNames: string[], ): Promise { const allMappings = await loadGroupMappings(); - console.log(`[reconcile] userId=${userId}, userGroups=${JSON.stringify(userGroupNames)}, mappings=${JSON.stringify(allMappings)}`); + debugLog("reconcile", `userId=${userId}, userGroups=${JSON.stringify(userGroupNames)}, mappings=${JSON.stringify(allMappings)}`); // Compute desired state: for each team, the highest role from any matching group const desiredTeamRoles = new Map(); @@ -70,13 +71,13 @@ export async function reconcileUserTeamMemberships( } } - console.log(`[reconcile] desiredTeamRoles=${JSON.stringify([...desiredTeamRoles.entries()])}`); + debugLog("reconcile", `desiredTeamRoles=${JSON.stringify([...desiredTeamRoles.entries()])}`); // Fetch current group_mapping TeamMembers for this user const existing = await tx.teamMember.findMany({ where: { userId, source: "group_mapping" }, }); - console.log(`[reconcile] existing group_mapping members=${JSON.stringify(existing.map(m => ({ teamId: m.teamId, role: m.role })))}`); + debugLog("reconcile", `existing group_mapping members=${JSON.stringify(existing.map(m => ({ teamId: m.teamId, role: m.role })))}`); const existingByTeam = new Map(existing.map((m) => [m.teamId, m])); diff --git a/src/server/services/pipeline-version.ts b/src/server/services/pipeline-version.ts index 197e0a10..1f2c7231 100644 --- a/src/server/services/pipeline-version.ts +++ b/src/server/services/pipeline-version.ts @@ -1,5 +1,5 @@ import { prisma } from "@/lib/prisma"; -import type { Prisma } from "@/generated/prisma"; +import { type Prisma, type ComponentKind } from "@/generated/prisma"; import { TRPCError } from "@trpc/server"; /** @@ -14,6 +14,8 @@ export async function createVersion( changelog?: string, logLevel?: string | null, globalConfig?: Record | null, + nodesSnapshot?: unknown, + edgesSnapshot?: unknown, ) { // Find the highest existing version number for this pipeline const latest = await prisma.pipelineVersion.findFirst({ @@ -32,6 +34,8 @@ export async function createVersion( configYaml: finalYaml, logLevel: logLevel ?? null, globalConfig: (globalConfig as Prisma.InputJsonValue) ?? undefined, + nodesSnapshot: nodesSnapshot ? (nodesSnapshot as Prisma.InputJsonValue) : undefined, + edgesSnapshot: edgesSnapshot ? (edgesSnapshot as Prisma.InputJsonValue) : undefined, createdById: userId, changelog, }, @@ -100,15 +104,57 @@ export async function rollback( }); } - // Restore globalConfig on the Pipeline model so the editor loads the correct state - if (targetVersion.globalConfig !== undefined) { - await prisma.pipeline.update({ - where: { id: pipelineId }, - data: { - globalConfig: targetVersion.globalConfig as Prisma.InputJsonValue ?? undefined, - }, - }); - } + // Restore pipeline state atomically: globalConfig + nodes/edges from snapshots + await prisma.$transaction(async (tx) => { + if (targetVersion.globalConfig !== undefined) { + await tx.pipeline.update({ + where: { id: pipelineId }, + data: { + globalConfig: targetVersion.globalConfig as Prisma.InputJsonValue ?? undefined, + }, + }); + } + + if (targetVersion.nodesSnapshot && targetVersion.edgesSnapshot) { + const snapshotNodes = targetVersion.nodesSnapshot as Array>; + const snapshotEdges = targetVersion.edgesSnapshot as Array>; + + await tx.pipelineEdge.deleteMany({ where: { pipelineId } }); + await tx.pipelineNode.deleteMany({ where: { pipelineId } }); + + await Promise.all( + snapshotNodes.map((node) => + tx.pipelineNode.create({ + data: { + id: node.id as string, + pipelineId, + componentKey: node.componentKey as string, + componentType: node.componentType as string, + kind: node.kind as ComponentKind, + config: node.config as Prisma.InputJsonValue, + positionX: node.positionX as number, + positionY: node.positionY as number, + disabled: (node.disabled as boolean) ?? false, + }, + }) + ) + ); + + await Promise.all( + snapshotEdges.map((edge) => + tx.pipelineEdge.create({ + data: { + id: edge.id as string, + pipelineId, + sourceNodeId: edge.sourceNodeId as string, + targetNodeId: edge.targetNodeId as string, + sourcePort: (edge.sourcePort as string) ?? null, + }, + }) + ) + ); + } + }); return createVersion( pipelineId, @@ -117,5 +163,7 @@ export async function rollback( `Rollback to version ${targetVersion.version}`, targetVersion.logLevel, targetVersion.globalConfig as Record | null, + targetVersion.nodesSnapshot, + targetVersion.edgesSnapshot, ); } diff --git a/src/server/services/scim.ts b/src/server/services/scim.ts index cbe72360..616f27bd 100644 --- a/src/server/services/scim.ts +++ b/src/server/services/scim.ts @@ -1,5 +1,6 @@ import { prisma } from "@/lib/prisma"; import { writeAuditLog } from "@/server/services/audit"; +import { debugLog } from "@/lib/logger"; import bcrypt from "bcryptjs"; import crypto from "crypto"; @@ -165,6 +166,7 @@ export async function scimCreateUser(scimUser: ScimUser): Promise<{ user: ScimUs } export async function scimUpdateUser(id: string, scimUser: Partial) { + debugLog("scim", `PUT /Users/${id}`, { active: scimUser.active, userName: scimUser.userName, externalId: scimUser.externalId }); const data: Record = {}; if (scimUser.name?.formatted) data.name = scimUser.name.formatted; @@ -219,6 +221,7 @@ export async function scimPatchUser( id: string, operations: ScimPatchOp[], ) { + debugLog("scim", `PATCH /Users/${id}`, { operations: operations.map(o => ({ op: o.op, path: o.path, value: o.value })) }); const data: Record = {}; for (const op of operations) { @@ -305,6 +308,7 @@ export async function scimPatchUser( } export async function scimDeleteUser(id: string) { + debugLog("scim", `DELETE /Users/${id}`); // Don't actually delete -- lock the account // Only claim SCIM ownership if not already locked by another source const existing = await prisma.user.findUnique({ where: { id }, select: { lockedBy: true } }); diff --git a/src/server/services/system-vector.ts b/src/server/services/system-vector.ts index dcbcac45..2afb83f0 100644 --- a/src/server/services/system-vector.ts +++ b/src/server/services/system-vector.ts @@ -3,6 +3,7 @@ import { writeFile, mkdir } from "fs/promises"; import { dirname, join } from "path"; import yaml from "js-yaml"; import { AUDIT_LOG_PATH } from "@/server/services/audit"; +import { debugLog } from "@/lib/logger"; const VECTOR_BIN = process.env.VF_VECTOR_BIN ?? "vector"; const VECTORFLOW_DATA_DIR = join(process.cwd(), ".vectorflow"); @@ -49,7 +50,7 @@ export async function startSystemVector(configYaml: string): Promise { vectorProcess = proc; proc.stdout?.on("data", (data: Buffer) => { - console.log(`[system-vector stdout] ${data.toString().trimEnd()}`); + debugLog("system-vector", data.toString().trimEnd()); }); proc.stderr?.on("data", (data: Buffer) => { @@ -57,9 +58,7 @@ export async function startSystemVector(configYaml: string): Promise { }); proc.on("exit", (code, signal) => { - console.log( - `System Vector process exited with code ${code}, signal ${signal}`, - ); + debugLog("system-vector", `process exited with code ${code}, signal ${signal}`); // Only nullify if this is still the current process (not replaced by a restart) if (vectorProcess === proc) { vectorProcess = null;