From c1942ba518860d24538719b2bd0c62d2a7b92219 Mon Sep 17 00:00:00 2001 From: Wojtek Majewski Date: Fri, 7 Nov 2025 11:15:28 +0100 Subject: [PATCH] feat: demo - copy and explanation panel tweaks --- apps/demo/src/lib/components/CodePanel.svelte | 89 +-- apps/demo/src/lib/components/DAGNode.svelte | 32 ++ .../lib/components/DAGVisualization.svelte | 25 +- .../lib/components/ExplanationPanel.svelte | 524 +++++++++++------- apps/demo/src/lib/components/MiniDAG.svelte | 77 ++- apps/demo/src/lib/components/PulseDot.svelte | 71 ++- .../src/lib/components/WelcomeModal.svelte | 184 +++--- apps/demo/src/lib/stores/pulse-dots.svelte.ts | 22 +- apps/demo/src/routes/+page.svelte | 495 ++++++++++++++--- apps/demo/supabase/config.toml | 2 +- .../article_flow_worker/article_flow.ts | 29 +- .../functions/article_flow_worker/index.ts | 2 +- .../watchdog_article_worker.sql} | 0 13 files changed, 1084 insertions(+), 468 deletions(-) create mode 100644 apps/demo/src/lib/components/DAGNode.svelte rename apps/demo/supabase/{migrations/20251103204242_watchdog_article_flow_worker.sql => seeds/watchdog_article_worker.sql} (100%) diff --git a/apps/demo/src/lib/components/CodePanel.svelte b/apps/demo/src/lib/components/CodePanel.svelte index 430e092c0..548778c0e 100644 --- a/apps/demo/src/lib/components/CodePanel.svelte +++ b/apps/demo/src/lib/components/CodePanel.svelte @@ -1,11 +1,11 @@ + +
+ + {data.label} + + +
+ + diff --git a/apps/demo/src/lib/components/DAGVisualization.svelte b/apps/demo/src/lib/components/DAGVisualization.svelte index 1bc110427..e1e0224d5 100644 --- a/apps/demo/src/lib/components/DAGVisualization.svelte +++ b/apps/demo/src/lib/components/DAGVisualization.svelte @@ -3,6 +3,7 @@ import { SvelteFlow } from '@xyflow/svelte'; import '@xyflow/svelte/dist/style.css'; import type { createFlowState } from '$lib/stores/pgflow-state-improved.svelte'; + import DAGNode from './DAGNode.svelte'; interface Props { flowState: ReturnType; @@ -15,6 +16,11 @@ let containerElement: HTMLDivElement | undefined = $state(undefined); let shouldFitView = $state(true); + // Custom node types with PulseDot + const nodeTypes = { + dagNode: DAGNode + }; + // Re-center when container or window resizes onMount(() => { const handleResize = () => { @@ -88,12 +94,12 @@ } // Define the 4-step DAG structure - reactive to step states and selection - // Vertical spacing between nodes (81px between levels) + // Vertical spacing between nodes (110px between levels) // Shifted up by 30px to center better in viewport let nodes = $derived([ { id: 'fetchArticle', - type: 'default', + type: 'dagNode', position: { x: 150, y: -30 }, data: { label: 'fetchArticle' }, class: getNodeClass('fetchArticle'), @@ -101,24 +107,24 @@ }, { id: 'summarize', - type: 'default', - position: { x: 50, y: 51 }, + type: 'dagNode', + position: { x: 50, y: 80 }, data: { label: 'summarize' }, class: getNodeClass('summarize'), draggable: false }, { id: 'extractKeywords', - type: 'default', - position: { x: 250, y: 51 }, + type: 'dagNode', + position: { x: 250, y: 80 }, data: { label: 'extractKeywords' }, class: getNodeClass('extractKeywords'), draggable: false }, { id: 'publish', - type: 'default', - position: { x: 150, y: 132 }, + type: 'dagNode', + position: { x: 150, y: 190 }, data: { label: 'publish' }, class: getNodeClass('publish'), draggable: false @@ -223,8 +229,9 @@ import { createEventDispatcher, onMount } from 'svelte'; - import { Card, CardContent, CardHeader, CardTitle } from '$lib/components/ui/card'; + import { fade } from 'svelte/transition'; import { Button } from '$lib/components/ui/button'; - import { Badge } from '$lib/components/ui/badge'; import { codeToHtml } from 'shiki'; import type { createFlowState } from '$lib/stores/pgflow-state-improved.svelte'; import StatusBadge from '$lib/components/StatusBadge.svelte'; + import MiniDAG from '$lib/components/MiniDAG.svelte'; + import { Clock, Workflow, Play, Hourglass } from '@lucide/svelte'; interface Props { selectedStep: string | null; - hoveredStep: string | null; visible: boolean; flowState: ReturnType; + showMobileHeader?: boolean; } - let { selectedStep, hoveredStep, visible, flowState }: Props = $props(); + let { selectedStep, visible, flowState, showMobileHeader = true }: Props = $props(); const dispatch = createEventDispatcher<{ close: void; @@ -57,28 +58,26 @@ // Step-level concept explanations (how pgflow works internally) const stepConcepts: Record = { fetchArticle: - "This is a root step with no dependencies. When start_flow() is called, SQL Core immediately " + - "pushes a message to the queue. The Worker polls, gets the message, executes the handler, " + - "and calls complete_task() with the return value. SQL Core acknowledges completion, saves the output, " + - "and checks which dependent steps now have all their dependencies satisfied.", + 'Root step with no dependencies. start_flow() creates a task and queues a message containing the task ID. ' + + 'Worker polls the queue, calls start_tasks() with the task ID to reserve the task and get its input, ' + + 'executes the handler, calls complete_task() to save the output. Tasks exist independently from queue messages, ' + + 'enabling both automated polling and manual reservation (future: human approval steps).', summarize: - "Depends on fetchArticle. After fetchArticle completes, SQL Core checks if this step's " + - "dependencies are met. Since fetchArticle is the only dependency, this step becomes ready " + - "immediately. SQL Core pushes a message with the input payload (fetchArticle's output). " + - "Worker polls, executes the handler, calls complete_task(). SQL Core acknowledges completion and saves output.", + 'Depends on fetchArticle. When fetchArticle completes, SQL Core checks dependencies, creates a task, and queues a message. ' + + "Worker polls, calls start_tasks() which assembles input from both fetchArticle's output and the original run input, " + + 'executes the handler, calls complete_task().', extractKeywords: - "Also depends only on fetchArticle, so it becomes ready at the same time as summarize. " + - "Both messages hit the queue simultaneously - whichever Worker polls first starts execution. " + - "This is how pgflow achieves parallel execution: SQL Core identifies ready steps and pushes messages, " + - "Workers execute independently.", + 'Also depends on fetchArticle, so becomes ready alongside summarize. ' + + 'Both messages hit the queue simultaneously—parallel execution happens naturally as workers ' + + 'independently poll and start whichever task they receive first.', publish: - "Depends on both summarize AND extractKeywords. This step remains blocked until both " + - "dependencies complete. After the second one finishes, complete_task() acknowledges completion, " + - "saves output, checks dependencies, and finds publish is now ready. SQL Core pushes the message " + - "with both outputs as input. After publish completes, no dependent steps remain - the run is marked completed." + 'Depends on both summarize AND extractKeywords—blocked until both complete. ' + + 'After the second finishes, SQL Core finds publish ready, creates a task and queues a message. ' + + 'start_tasks() assembles input from both dependency outputs. After completion, no dependents remain ' + + 'and the run is marked completed.' }; // Step metadata for explanation @@ -98,7 +97,7 @@ name: 'fetchArticle', displayName: 'Fetch Article', whatItDoes: - 'Fetches article content from the provided URL using r.jina.ai. Returns both the article text and title for downstream processing.', + 'Fetches article content from the provided URL using Jina Reader API. Returns structured content with title and text for downstream processing.', dependsOn: [], dependents: ['summarize', 'extractKeywords'], inputType: `{ @@ -115,7 +114,7 @@ name: 'summarize', displayName: 'Summarize', whatItDoes: - 'Uses an LLM (Groq) to generate a concise summary of the article content. Runs in parallel with keyword extraction for efficiency.', + 'Generates a concise summary of the article content using an LLM. Runs in parallel with keyword extraction.', dependsOn: ['fetchArticle'], dependents: ['publish'], inputType: `{ @@ -130,7 +129,7 @@ name: 'extractKeywords', displayName: 'Extract Keywords', whatItDoes: - 'Uses an LLM (Groq) to extract key terms and topics from the article. Runs in parallel with summarization for efficiency.', + 'Extracts key terms and topics from the article using an LLM. Runs in parallel with summarization.', dependsOn: ['fetchArticle'], dependents: ['publish'], inputType: `{ @@ -144,7 +143,7 @@ name: 'publish', displayName: 'Publish', whatItDoes: - 'Combines the summary and keywords and publishes the processed article. In this demo, generates a mock article ID—in production, this would save to a database.', + 'Combines the summary and keywords and publishes the processed article. In this demo, returns a mock article ID—in production, this would insert into a database.', dependsOn: ['summarize', 'extractKeywords'], dependents: [], inputType: `{ @@ -156,43 +155,44 @@ }; let panelElement: HTMLElement | undefined = $state(undefined); - let highlightedInputType = $state(''); - let highlightedReturnType = $state(''); + let highlightedInput = $state(''); let highlightedOutput = $state(''); - // Generate syntax-highlighted types whenever step changes - $effect(() => { - const info = currentStepInfo; - const isFlowLevel = !selectedStep; - - if (isFlowLevel) { - // Show flow-level input type - codeToHtml(flowInfo.inputType, { - lang: 'typescript', - theme: 'night-owl' - }).then((html) => { - highlightedInputType = html; - }); - highlightedReturnType = ''; - } else if (info) { - // Highlight step input type - codeToHtml(info.inputType, { - lang: 'typescript', - theme: 'night-owl' - }).then((html) => { - highlightedInputType = html; - }); + // Replace long strings with placeholders for mobile display + function truncateDeep(obj: unknown, maxLength = 80): unknown { + if (typeof obj === 'string') { + if (obj.length > maxLength) { + return ``; + } + return obj; + } + if (Array.isArray(obj)) { + return obj.map((item) => truncateDeep(item, maxLength)); + } + if (obj !== null && typeof obj === 'object') { + const truncated: Record = {}; + for (const [key, value] of Object.entries(obj)) { + truncated[key] = truncateDeep(value, maxLength); + } + return truncated; + } + return obj; + } - // Highlight return type - codeToHtml(info.returns, { - lang: 'typescript', + // Generate syntax-highlighted input whenever input changes + $effect(() => { + const input = stepInput; + if (input) { + const truncated = truncateDeep(input); + const jsonString = JSON.stringify(truncated, null, 2); + codeToHtml(jsonString, { + lang: 'json', theme: 'night-owl' }).then((html) => { - highlightedReturnType = html; + highlightedInput = html; }); } else { - highlightedInputType = ''; - highlightedReturnType = ''; + highlightedInput = ''; } }); @@ -200,7 +200,8 @@ $effect(() => { const output = stepOutput; if (output) { - const jsonString = JSON.stringify(output, null, 2); + const truncated = truncateDeep(output); + const jsonString = JSON.stringify(truncated, null, 2); codeToHtml(jsonString, { lang: 'json', theme: 'night-owl' @@ -231,13 +232,56 @@ selectedStep && stepInfo[selectedStep] ? stepInfo[selectedStep] : null ); - // Get step output from events - const stepOutput = $derived.by(() => { - if (!selectedStep) return null; - // Find the latest completed event for this step - const stepEvents = flowState.events.filter((e) => e.data?.step_slug === selectedStep).reverse(); - const completedEvent = stepEvents.find((e) => e.event_type === 'step:completed'); - return completedEvent?.data?.output || null; + // Build step outputs map from events + const stepOutputs = $derived.by(() => { + const outputs: Record = {}; + flowState.events.forEach((event) => { + if ( + event.event_type === 'step:completed' && + event.data?.step_slug && + event.data?.output !== undefined + ) { + outputs[event.data.step_slug as string] = event.data.output; + } + }); + return outputs; + }); + + // Get step output + const stepOutput = $derived(selectedStep ? stepOutputs[selectedStep] : null); + + // Get actual input for the selected step + const stepInput = $derived.by(() => { + if (!selectedStep || !currentStepInfo) return null; + + // Only show input if step is started or completed + const stepStatus = flowState.stepStatuses[selectedStep]; + if (stepStatus !== 'started' && stepStatus !== 'completed') return null; + + // For steps with dependencies, check if all dependencies are completed + if (currentStepInfo.dependsOn.length > 0) { + const allDepsCompleted = currentStepInfo.dependsOn.every((dep) => { + return flowState.stepStatuses[dep] === 'completed'; + }); + if (!allDepsCompleted) return null; + } + + const input: Record = {}; + + // Always add run input (URL) from flowState.run if available + if (flowState.run?.input) { + input.run = flowState.run.input; + } + + // Add outputs from dependencies + for (const dep of currentStepInfo.dependsOn) { + const depOutput = stepOutputs[dep]; + if (depOutput !== undefined) { + input[dep] = depOutput; + } + } + + return Object.keys(input).length > 0 ? input : null; }); // Get current step status @@ -292,22 +336,11 @@ - -
-
- {#if currentStepInfo} -

- Step {currentStepInfo.name} -

- {:else} -

- Flow {flowInfo.name} -

- {/if} -
-
+ + {#if showMobileHeader} +
{#if stepStatus} {/if} @@ -315,97 +348,194 @@ >✕
-
+ {/if}
{#if currentStepInfo} - -
-

- {currentStepInfo.whatItDoes} -

-
- - -
- - 📚 How this step works in pgflow - -
- {stepConcepts[currentStepInfo.name]} -
-
- - -
- -
- -
-
Depends On
- {#if currentStepInfo.dependsOn.length === 0} - None - {:else} -
- {#each currentStepInfo.dependsOn as dep (dep)} - - {/each} -
+ {#key selectedStep} +
+ +
+

+ {currentStepInfo.whatItDoes} +

+ {#if selectedStep !== 'flow_config'} + {/if}
- -
-
Dependents
- {#if currentStepInfo.dependents.length === 0} - None - {:else} -
- {#each currentStepInfo.dependents as dep (dep)} - - {/each} -
- {/if} -
-
- - -
- -
-
Input Type
-
- - {@html highlightedInputType} + + Waits for +
+
+ {#each currentStepInfo.dependsOn as dep (dep)} + + {/each} +
+
+ {/if} + + + {#if currentStepInfo.dependents.length > 0} +
+
+ + Required by +
+
+ {#each currentStepInfo.dependents as dep (dep)} + + {/each} +
+
+ {/if}
-
- -
-
Return Type
-
- - {@html highlightedReturnType} + +
+ +
+
Input
+ {#if highlightedInput} +
+ + {@html highlightedInput} +
+ {:else if flowState.status === 'idle'} +
+ + Run the workflow to see input +
+ {:else if currentStepInfo && currentStepInfo.dependsOn.length > 0} + {@const incompleteDeps = currentStepInfo.dependsOn.filter( + (dep) => getStepStatus(dep) !== 'completed' + )} + {#if incompleteDeps.length > 0} +
+ + Waiting for {incompleteDeps.join(', ')} to complete +
+ {:else} +
+ + Waiting for step to start +
+ {/if} + {:else} +
+ + Waiting for step to start +
+ {/if} +
+ + +
+
Output
+ {#if highlightedOutput} +
+ + {@html highlightedOutput} +
+ {:else if flowState.status === 'idle'} +
+ + Run the workflow to see output +
+ {:else if stepStatus === 'started'} +
+ + Step is running... +
+ {:else} +
+ + Waiting for step to complete +
+ {/if} +
-
+ {/key} {:else}
@@ -422,7 +552,9 @@ > ⚙️ How SQL Core orchestrates this flow -
+

When you call start_flow('article_flow', {'{url}'}) As Workers execute handlers and call complete_task(), SQL Core acknowledges completion, saves outputs, checks dependent steps, and starts - those with all dependencies satisfied. The run completes when + >, SQL Core acknowledges completion, saves outputs, checks dependent steps, and + starts those with all dependencies satisfied. The run completes when remaining_steps = 0.

- This demo uses Supabase Realtime to broadcast graph state changes from SQL Core back to - the browser in real-time. + This demo uses Supabase Realtime to broadcast graph state changes from SQL Core back + to the browser in real-time.

@@ -449,7 +581,7 @@ 🛡️ Reliability Configuration
- {#each flowInfo.reliabilityFeatures as feature} + {#each flowInfo.reliabilityFeatures as feature (feature.setting)}
{feature.setting}

{feature.explanation}

@@ -458,18 +590,6 @@
- -
-
Flow Input Type
-
- - {@html highlightedInputType} -
-

- Start this flow with a URL object. SQL Core passes this input to root steps. -

-
-
Steps
@@ -488,24 +608,12 @@
{/if} - - - {#if stepOutput && highlightedOutput} -
-
Output
-
- - {@html highlightedOutput} -
-
- {/if}
{/if} diff --git a/apps/demo/src/lib/components/MiniDAG.svelte b/apps/demo/src/lib/components/MiniDAG.svelte index ab83b87f3..5a0cd411c 100644 --- a/apps/demo/src/lib/components/MiniDAG.svelte +++ b/apps/demo/src/lib/components/MiniDAG.svelte @@ -1,4 +1,6 @@ - + - {#each edgePaths as edge} + {#each edgePaths as edge (edge?.id)} {#if edge} {/if} {/each} - {#each nodes as node} + {#each nodes as node (node.id)} {#if visible} -
+
{/if} diff --git a/apps/demo/src/lib/components/WelcomeModal.svelte b/apps/demo/src/lib/components/WelcomeModal.svelte index 73a110a61..dd0327647 100644 --- a/apps/demo/src/lib/components/WelcomeModal.svelte +++ b/apps/demo/src/lib/components/WelcomeModal.svelte @@ -1,7 +1,6 @@ {#if visible} - -