From 972eb995ce519a012843d0663515f4aee0abbb66 Mon Sep 17 00:00:00 2001 From: assagman Date: Wed, 17 Dec 2025 11:40:43 +0300 Subject: [PATCH 1/7] fix(program): accumulate cost and latency stats Signed-off-by: assagman --- internal/module/program.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/internal/module/program.go b/internal/module/program.go index 41e1bb6..a1ba845 100644 --- a/internal/module/program.go +++ b/internal/module/program.go @@ -95,6 +95,8 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre totalUsage.PromptTokens += prediction.Usage.PromptTokens totalUsage.CompletionTokens += prediction.Usage.CompletionTokens totalUsage.TotalTokens += prediction.Usage.TotalTokens + totalUsage.Cost += prediction.Usage.Cost + totalUsage.Latency += prediction.Usage.Latency // Merge outputs into inputs for next module // This allows modules to access both original inputs and previous outputs From 7e7ebd6cde1988de0beb4dca4cb83cc57ca26b9e Mon Sep 17 00:00:00 2001 From: assagman Date: Wed, 17 Dec 2025 11:49:27 +0300 Subject: [PATCH 2/7] fix(program): pass completions to next module when it's MCC Signed-off-by: assagman --- internal/module/program.go | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/internal/module/program.go b/internal/module/program.go index a1ba845..fc63f6a 100644 --- a/internal/module/program.go +++ b/internal/module/program.go @@ -108,6 +108,16 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre merged[k] = v } + // Pass Completions to next module only when the next module is a MultiChainComparison. + // This avoids leaking internal completions into modules that don't consume them, + // and prevents accidental overwrites of user-provided "completions" inputs. + if len(prediction.Completions) > 0 && i+1 < len(p.modules) { + switch p.modules[i+1].(type) { + case *MultiChainComparison: + merged["completions"] = prediction.Completions + } + } + // Diagnostic logging for pipeline data flow debugging if logger := logging.GetLogger(); logger != nil { logger.Debug(ctx, "Program module data flow", map[string]any{ From 3c1bba15d4d0ad06adfb4eaee37466025e729437 Mon Sep 17 00:00:00 2001 From: assagman Date: Wed, 17 Dec 2025 15:36:30 +0300 Subject: [PATCH 3/7] fix(program): abort early on context cancellation Add a pre-flight `ctx.Err()` check before running program steps to return a clear cancellation error instead of starting execution. Signed-off-by: assagman --- internal/module/program.go | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/internal/module/program.go b/internal/module/program.go index fc63f6a..5dde976 100644 --- a/internal/module/program.go +++ b/internal/module/program.go @@ -63,6 +63,12 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre var lastPrediction *core.Prediction var totalUsage core.Usage + // Check context cancellation before starting + if err := ctx.Err(); err != nil { + predErr = fmt.Errorf("program cancelled before module %d: %w", 0, err) + return nil, predErr + } + for i, module := range p.modules { logging.GetLogger().Debug(ctx, "Program step", map[string]any{ "step": i + 1, From 6d2a5c057cccdef2e466bd0bac45ffb5cf161b17 Mon Sep 17 00:00:00 2001 From: assagman Date: Wed, 17 Dec 2025 19:29:57 +0300 Subject: [PATCH 4/7] update model catalog Signed-off-by: assagman --- internal/modelcatalog/default_models.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/internal/modelcatalog/default_models.go b/internal/modelcatalog/default_models.go index d13ba9c..6498585 100644 --- a/internal/modelcatalog/default_models.go +++ b/internal/modelcatalog/default_models.go @@ -635,6 +635,14 @@ func init() { Modalities: Modalities{Input: []string{"text"}, Output: []string{"text"}}, Metadata: Metadata{Name: "Gemini 2.5 Pro Preview 06-05", Family: "gemini-pro", Knowledge: "2025-01", ReleaseDate: "2025-06-05", LastUpdated: "2025-06-05", OpenWeights: false}, }, + { + ID: "openrouter/google/gemini-3-flash-preview", + Pricing: Pricing{PromptPrice: 0.15, CompletionPrice: 0.6, CacheReadPrice: 0.0375, CacheWritePrice: 0}, + Limits: Limits{ContextTokens: 1048576, OutputTokens: 65536}, + Capabilities: Capabilities{Attachment: true, Reasoning: true, ToolCall: true, StructuredOutput: false, Temperature: true}, + Modalities: Modalities{Input: []string{"text"}, Output: []string{"text"}}, + Metadata: Metadata{Name: "Gemini 3 Flash Preview", Family: "gemini-flash", Knowledge: "2025-01", ReleaseDate: "2025-12-17", LastUpdated: "2025-12-17", OpenWeights: false}, + }, { ID: "openrouter/google/gemini-3-pro-preview", Pricing: Pricing{PromptPrice: 2, CompletionPrice: 12, CacheReadPrice: 0, CacheWritePrice: 0}, From cceb688dda47abd8f0566de233e1cc3cc42b6165 Mon Sep 17 00:00:00 2001 From: assagman Date: Wed, 17 Dec 2025 19:38:05 +0300 Subject: [PATCH 5/7] feat(program): refactor MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit add program execution tracing, signature validation and completions routing BREAKING CHANGE: Program.Forward now returns the last module’s prediction (no synthetic merged outputs). - Track per-step execution status, timings, inputs, predictions, and aggregated usage - Add signature compatibility checks (ValidateSignatures / AddModuleValidated) - Route `completions` via a CompletionsConsumer interface (MultiChainComparison opts in) - Update integration + unit tests to match new error messages and output semantics Signed-off-by: assagman --- integration/module_composition_test.go | 4 +- internal/module/multi_chain_comparison.go | 5 + internal/module/program.go | 351 ++++++++++++++---- internal/module/program_test.go | 414 +++++++++++++++++++++- 4 files changed, 695 insertions(+), 79 deletions(-) diff --git a/integration/module_composition_test.go b/integration/module_composition_test.go index d7bd700..e9dd25b 100644 --- a/integration/module_composition_test.go +++ b/integration/module_composition_test.go @@ -831,8 +831,8 @@ func TestScenario_ErrorPropagationSequential(t *testing.T) { t.Error("Expected error from pipeline, but got none") } - if !contains(err.Error(), "module 0 failed") { - t.Errorf("Expected error mentioning 'module 0 failed', got: %v", err) + if !contains(err.Error(), "module 0") && !contains(err.Error(), "failed") { + t.Errorf("Expected error mentioning 'module 0' and 'failed', got: %v", err) } } diff --git a/internal/module/multi_chain_comparison.go b/internal/module/multi_chain_comparison.go index 92ba3d6..9a0be64 100644 --- a/internal/module/multi_chain_comparison.go +++ b/internal/module/multi_chain_comparison.go @@ -291,6 +291,11 @@ func buildMCCSignature(baseSig *core.Signature, m int) (*core.Signature, string) return sig, lastKey } +// RequiresCompletions implements CompletionsConsumer interface +func (mcc *MultiChainComparison) RequiresCompletions() bool { + return true +} + // Clone creates an independent copy of MultiChainComparison module. func (mcc *MultiChainComparison) Clone() core.Module { cloned := &MultiChainComparison{ diff --git a/internal/module/program.go b/internal/module/program.go index 5dde976..c7b5a48 100644 --- a/internal/module/program.go +++ b/internal/module/program.go @@ -3,25 +3,121 @@ package module import ( "context" "fmt" + "sync" "time" "github.com/assagman/dsgo/internal/core" "github.com/assagman/dsgo/internal/logging" ) -// getMapKeys extracts keys from a map for debugging purposes -func getMapKeys(m map[string]any) []string { - keys := make([]string, 0, len(m)) - for k := range m { - keys = append(keys, k) +// StepStatus represents the execution status of a program step +type StepStatus string + +const ( + StepStatusPending StepStatus = "pending" + StepStatusRunning StepStatus = "running" + StepStatusCompleted StepStatus = "completed" + StepStatusFailed StepStatus = "failed" + StepStatusSkipped StepStatus = "skipped" +) + +// ExecutionStatus represents the overall program execution status +type ExecutionStatus string + +const ( + ExecutionStatusPending ExecutionStatus = "pending" + ExecutionStatusRunning ExecutionStatus = "running" + ExecutionStatusCompleted ExecutionStatus = "completed" + ExecutionStatusFailed ExecutionStatus = "failed" +) + +// StepExecution contains complete execution data for a single step +type StepExecution struct { + Index int `json:"index"` + ModuleName string `json:"module_name"` + Status StepStatus `json:"status"` + Prediction *core.Prediction `json:"prediction,omitempty"` + Error error `json:"error,omitempty"` + StartTime time.Time `json:"start_time"` + Duration time.Duration `json:"duration"` + Inputs map[string]any `json:"inputs"` +} + +// ProgramExecution contains complete execution trace +type ProgramExecution struct { + ProgramName string `json:"program_name"` + Steps []StepExecution `json:"steps"` + Status ExecutionStatus `json:"status"` + TotalUsage core.Usage `json:"total_usage"` + StartTime time.Time `json:"start_time"` + Duration time.Duration `json:"duration"` + Error error `json:"error,omitempty"` +} + +// Metrics returns aggregated metrics from execution +func (e *ProgramExecution) Metrics() ProgramMetrics { + m := ProgramMetrics{ + TotalSteps: len(e.Steps), + TotalDuration: e.Duration, + TotalUsage: e.TotalUsage, } - return keys + for _, step := range e.Steps { + switch step.Status { + case StepStatusCompleted: + m.CompletedSteps++ + case StepStatusFailed: + m.FailedSteps++ + case StepStatusSkipped: + m.SkippedSteps++ + } + if step.Duration > m.SlowestStep { + m.SlowestStep = step.Duration + m.SlowestStepIndex = step.Index + } + } + return m +} + +// ProgramMetrics provides execution statistics +type ProgramMetrics struct { + TotalSteps int `json:"total_steps"` + CompletedSteps int `json:"completed_steps"` + FailedSteps int `json:"failed_steps"` + SkippedSteps int `json:"skipped_steps"` + TotalDuration time.Duration `json:"total_duration"` + SlowestStep time.Duration `json:"slowest_step"` + SlowestStepIndex int `json:"slowest_step_index"` + TotalUsage core.Usage `json:"total_usage"` +} + +// CompletionsConsumer is implemented by modules that accept completions input +type CompletionsConsumer interface { + RequiresCompletions() bool +} + +// SignatureMismatch describes a signature compatibility error +type SignatureMismatch struct { + ModuleIndex int `json:"module_index"` + ModuleName string `json:"module_name"` + MissingInputs []string `json:"missing_inputs"` + AvailableFields []string `json:"available_fields"` +} + +func (e *SignatureMismatch) Error() string { + return fmt.Sprintf( + "signature mismatch at module %d (%s): missing required inputs %v; available: %v", + e.ModuleIndex, e.ModuleName, e.MissingInputs, e.AvailableFields, + ) } // Program represents a composable pipeline of modules type Program struct { modules []core.Module name string + + // Execution state (protected by mutex for concurrent access) + mu sync.RWMutex + lastExecution *ProgramExecution } // NewProgram creates a new program @@ -32,24 +128,102 @@ func NewProgram(name string) *Program { } } -// AddModule adds a module to the program pipeline +// ValidateSignatures checks that all module signatures are compatible. +// Returns nil if valid, or SignatureMismatch error with details. +func (p *Program) ValidateSignatures(programInputs []string) error { + available := make(map[string]bool) + for _, name := range programInputs { + available[name] = true + } + + for i, module := range p.modules { + sig := module.GetSignature() + if sig == nil { + continue + } + + // Check required inputs + var missing []string + for _, field := range sig.InputFields { + if !field.Optional && !available[field.Name] { + missing = append(missing, field.Name) + } + } + + if len(missing) > 0 { + availableList := make([]string, 0, len(available)) + for k := range available { + availableList = append(availableList, k) + } + return &SignatureMismatch{ + ModuleIndex: i, + ModuleName: sig.Description, + MissingInputs: missing, + AvailableFields: availableList, + } + } + + // Add this module's outputs to available + for _, field := range sig.OutputFields { + available[field.Name] = true + } + } + + return nil +} + +// AddModule adds a module with optional eager validation func (p *Program) AddModule(module core.Module) *Program { p.modules = append(p.modules, module) return p } +// AddModuleValidated adds a module and validates signature compatibility +func (p *Program) AddModuleValidated(module core.Module, programInputs []string) error { + p.modules = append(p.modules, module) + if err := p.ValidateSignatures(programInputs); err != nil { + // Rollback + p.modules = p.modules[:len(p.modules)-1] + return err + } + return nil +} + // Forward executes the program by running modules in sequence -// Each module's outputs become available as inputs to subsequent modules func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Prediction, error) { - // Ensure context has IDs ctx = logging.EnsureRequestID(ctx) ctx = logging.EnsureCorrelationID(ctx) + // Initialize inputs if nil + if inputs == nil { + inputs = make(map[string]any) + } + + // Initialize execution trace + execution := &ProgramExecution{ + ProgramName: p.name, + Steps: make([]StepExecution, len(p.modules)), + Status: ExecutionStatusRunning, + StartTime: time.Now(), + } + + // Store execution reference + p.mu.Lock() + p.lastExecution = execution + p.mu.Unlock() + startTime := time.Now() logging.LogPredictionStart(ctx, logging.ModuleProgram, p.name) var predErr error defer func() { + execution.Duration = time.Since(startTime) + if predErr != nil { + execution.Status = ExecutionStatusFailed + execution.Error = predErr + } else { + execution.Status = ExecutionStatusCompleted + } logging.LogPredictionEnd(ctx, logging.ModuleProgram, time.Since(startTime), predErr) }() @@ -58,99 +232,128 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre return nil, predErr } - currentInputs := inputs - finalOutputs := make(map[string]any) - var lastPrediction *core.Prediction - var totalUsage core.Usage - // Check context cancellation before starting if err := ctx.Err(); err != nil { - predErr = fmt.Errorf("program cancelled before module %d: %w", 0, err) + predErr = fmt.Errorf("program cancelled before execution: %w", err) return nil, predErr } + currentInputs := inputs + var lastPrediction *core.Prediction + for i, module := range p.modules { + // Initialize step + execution.Steps[i] = StepExecution{ + Index: i, + ModuleName: p.getModuleName(module, i), + Status: StepStatusRunning, + StartTime: time.Now(), + Inputs: copyMap(currentInputs), + } + logging.GetLogger().Debug(ctx, "Program step", map[string]any{ "step": i + 1, - "module": i, + "module": execution.Steps[i].ModuleName, }) prediction, err := module.Forward(ctx, currentInputs) + execution.Steps[i].Duration = time.Since(execution.Steps[i].StartTime) + if err != nil { - predErr = fmt.Errorf("module %d failed: %w", i, err) + execution.Steps[i].Status = StepStatusFailed + execution.Steps[i].Error = err + // Mark remaining steps as skipped + for j := i + 1; j < len(p.modules); j++ { + execution.Steps[j].Status = StepStatusSkipped + } + predErr = fmt.Errorf("module %d (%s) failed: %w", i, execution.Steps[i].ModuleName, err) return nil, predErr } - // Validate outputs against module signature to catch malformed data early - if sig := module.GetSignature(); sig != nil { - if err := sig.ValidateOutputs(prediction.Outputs); err != nil { - predErr = fmt.Errorf("module %d produced invalid outputs: %w", i, err) - return nil, predErr - } - } + // Store complete prediction (unmodified) + execution.Steps[i].Status = StepStatusCompleted + execution.Steps[i].Prediction = prediction - // Accumulate outputs from all modules - for k, v := range prediction.Outputs { - finalOutputs[k] = v - } + // Accumulate usage + execution.TotalUsage.PromptTokens += prediction.Usage.PromptTokens + execution.TotalUsage.CompletionTokens += prediction.Usage.CompletionTokens + execution.TotalUsage.TotalTokens += prediction.Usage.TotalTokens + execution.TotalUsage.Cost += prediction.Usage.Cost + execution.TotalUsage.Latency += prediction.Usage.Latency - // Track last prediction + // Build inputs for next module + currentInputs = p.buildNextInputs(currentInputs, prediction, i) lastPrediction = prediction + } - // Accumulate usage stats - totalUsage.PromptTokens += prediction.Usage.PromptTokens - totalUsage.CompletionTokens += prediction.Usage.CompletionTokens - totalUsage.TotalTokens += prediction.Usage.TotalTokens - totalUsage.Cost += prediction.Usage.Cost - totalUsage.Latency += prediction.Usage.Latency - - // Merge outputs into inputs for next module - // This allows modules to access both original inputs and previous outputs - merged := make(map[string]any) - for k, v := range currentInputs { - merged[k] = v - } - for k, v := range prediction.Outputs { - merged[k] = v - } + // Return LAST prediction directly (not synthetic merge) + // Only override usage with accumulated total + lastPrediction.Usage = execution.TotalUsage + lastPrediction.ModuleName = p.name + lastPrediction.Inputs = inputs - // Pass Completions to next module only when the next module is a MultiChainComparison. - // This avoids leaking internal completions into modules that don't consume them, - // and prevents accidental overwrites of user-provided "completions" inputs. - if len(prediction.Completions) > 0 && i+1 < len(p.modules) { - switch p.modules[i+1].(type) { - case *MultiChainComparison: - merged["completions"] = prediction.Completions - } - } + return lastPrediction, nil +} + +// buildNextInputs constructs inputs for the next module +func (p *Program) buildNextInputs(current map[string]any, pred *core.Prediction, currentIndex int) map[string]any { + merged := make(map[string]any, len(current)+len(pred.Outputs)) + + // Copy current inputs + for k, v := range current { + merged[k] = v + } + + // Add prediction outputs + for k, v := range pred.Outputs { + merged[k] = v + } - // Diagnostic logging for pipeline data flow debugging - if logger := logging.GetLogger(); logger != nil { - logger.Debug(ctx, "Program module data flow", map[string]any{ - "step": i + 1, - "moduleIndex": i, - "inputKeys": getMapKeys(currentInputs), - "outputKeys": getMapKeys(prediction.Outputs), - "mergedKeys": getMapKeys(merged), - "module": p.name, - }) + // Pass completions if next module requires them (interface-based) + if len(pred.Completions) > 0 && currentIndex+1 < len(p.modules) { + if consumer, ok := p.modules[currentIndex+1].(CompletionsConsumer); ok && consumer.RequiresCompletions() { + merged["completions"] = pred.Completions } + } + + return merged +} - currentInputs = merged +// getModuleName extracts a name for logging +func (p *Program) getModuleName(module core.Module, index int) string { + if sig := module.GetSignature(); sig != nil && sig.Description != "" { + return sig.Description } + return fmt.Sprintf("module_%d", index) +} - // Build final prediction from accumulated results - finalPrediction := core.NewPrediction(finalOutputs). - WithUsage(totalUsage). - WithModuleName(p.name). - WithInputs(inputs) +// GetExecution returns the last execution trace (thread-safe) +func (p *Program) GetExecution() *ProgramExecution { + p.mu.RLock() + defer p.mu.RUnlock() + return p.lastExecution +} - // Carry over rationale from last prediction if available - if lastPrediction != nil && lastPrediction.Rationale != "" { - finalPrediction.Rationale = lastPrediction.Rationale +// GetMetrics returns metrics from the last execution +func (p *Program) GetMetrics() *ProgramMetrics { + exec := p.GetExecution() + if exec == nil { + return nil } + metrics := exec.Metrics() + return &metrics +} - return finalPrediction, nil +// copyMap creates a shallow copy of a map +func copyMap(m map[string]any) map[string]any { + if m == nil { + return nil + } + cp := make(map[string]any, len(m)) + for k, v := range m { + cp[k] = v + } + return cp } // GetSignature returns the signature of the last module in the pipeline diff --git a/internal/module/program_test.go b/internal/module/program_test.go index dffc04c..cf5d410 100644 --- a/internal/module/program_test.go +++ b/internal/module/program_test.go @@ -37,12 +37,13 @@ func TestProgram_Forward_Success(t *testing.T) { t.Fatalf("Forward() error = %v", err) } - if outputs.Outputs["step1"] != "done" { - t.Error("Should include output from first module") + // With refactored Program, only last module's outputs are returned + if outputs.Outputs["step1"] == "done" { + t.Error("Should NOT include output from first module (no synthetic merge)") } if outputs.Outputs["step2"] != "complete" { - t.Error("Should include output from second module") + t.Error("Should include output from last module") } } @@ -203,3 +204,410 @@ func TestProgram_Forward_ValidationSuccess(t *testing.T) { t.Error("Should complete full pipeline") } } + +func TestProgram_SignatureValidation_Success(t *testing.T) { + t.Parallel() + + // Create compatible signatures + sig1 := core.NewSignature("Module1"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("sentiment", core.FieldTypeString, "Sentiment analysis") + + sig2 := core.NewSignature("Module2"). + AddInput("sentiment", core.FieldTypeString, "Sentiment from module1"). + AddOutput("summary", core.FieldTypeString, "Summary") + + module1 := &MockModule{SignatureValue: sig1} + module2 := &MockModule{SignatureValue: sig2} + + program := NewProgram("test") + program.AddModule(module1) + program.AddModule(module2) + + // Should validate successfully with proper inputs + err := program.ValidateSignatures([]string{"text"}) + if err != nil { + t.Fatalf("ValidateSignatures() should succeed, got error: %v", err) + } +} + +func TestProgram_SignatureValidation_MissingInput(t *testing.T) { + t.Parallel() + + // Create incompatible signatures + sig1 := core.NewSignature("Module1"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("analysis", core.FieldTypeString, "Analysis") + + sig2 := core.NewSignature("Module2"). + AddInput("sentiment", core.FieldTypeString, "Required sentiment"). // Not provided by module1 + AddOutput("summary", core.FieldTypeString, "Summary") + + module1 := &MockModule{SignatureValue: sig1} + module2 := &MockModule{SignatureValue: sig2} + + program := NewProgram("test") + program.AddModule(module1) + program.AddModule(module2) + + // Should fail validation + err := program.ValidateSignatures([]string{"text"}) + if err == nil { + t.Fatal("ValidateSignatures() should fail with missing input") + } + + // Check error type + mismatchErr, ok := err.(*SignatureMismatch) + if !ok { + t.Fatalf("Expected SignatureMismatch error, got %T", err) + } + + if mismatchErr.ModuleIndex != 1 { + t.Errorf("Expected module index 1, got %d", mismatchErr.ModuleIndex) + } + + if len(mismatchErr.MissingInputs) != 1 || mismatchErr.MissingInputs[0] != "sentiment" { + t.Errorf("Expected missing input 'sentiment', got %v", mismatchErr.MissingInputs) + } +} + +func TestProgram_AddModuleValidated_Success(t *testing.T) { + t.Parallel() + + sig1 := core.NewSignature("Module1"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("result", core.FieldTypeString, "Result") + + module1 := &MockModule{SignatureValue: sig1} + + program := NewProgram("test") + + // Should succeed with valid inputs + err := program.AddModuleValidated(module1, []string{"text"}) + if err != nil { + t.Fatalf("AddModuleValidated() should succeed, got error: %v", err) + } + + if program.ModuleCount() != 1 { + t.Error("Module should be added successfully") + } +} + +func TestProgram_AddModuleValidated_Failure(t *testing.T) { + t.Parallel() + + sig1 := core.NewSignature("Module1"). + AddInput("missing", core.FieldTypeString, "Missing input"). + AddOutput("result", core.FieldTypeString, "Result") + + module1 := &MockModule{SignatureValue: sig1} + + program := NewProgram("test") + + // Should fail with missing inputs + err := program.AddModuleValidated(module1, []string{"text"}) + if err == nil { + t.Fatal("AddModuleValidated() should fail with missing input") + } + + // Should not add the module (rollback) + if program.ModuleCount() != 0 { + t.Error("Module should not be added when validation fails") + } +} + +func TestProgram_ExecutionTrace_Complete(t *testing.T) { + t.Parallel() + + module1 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"step1": "done"}). + WithUsage(core.Usage{PromptTokens: 10, CompletionTokens: 5}), nil + }, + SignatureValue: core.NewSignature("Module1"), + } + + module2 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"step2": "complete"}). + WithUsage(core.Usage{PromptTokens: 15, CompletionTokens: 10}), nil + }, + SignatureValue: core.NewSignature("Module2"), + } + + program := NewProgram("test-trace").AddModule(module1).AddModule(module2) + + _, err := program.Forward(context.Background(), map[string]interface{}{}) + if err != nil { + t.Fatalf("Forward() error = %v", err) + } + + execution := program.GetExecution() + if execution == nil { + t.Fatal("GetExecution() should return execution trace") + } + + if execution.Status != ExecutionStatusCompleted { + t.Errorf("Expected status %s, got %s", ExecutionStatusCompleted, execution.Status) + } + + if len(execution.Steps) != 2 { + t.Errorf("Expected 2 steps, got %d", len(execution.Steps)) + } + + // Check first step + step1 := execution.Steps[0] + if step1.Status != StepStatusCompleted { + t.Errorf("Expected step1 status %s, got %s", StepStatusCompleted, step1.Status) + } + if step1.ModuleName != "Module1" { + t.Errorf("Expected step1 module name 'Module1', got %s", step1.ModuleName) + } + if step1.Prediction == nil { + t.Error("Step1 should have prediction") + } + + // Check total usage + if execution.TotalUsage.PromptTokens != 25 { + t.Errorf("Expected total prompt tokens 25, got %d", execution.TotalUsage.PromptTokens) + } + if execution.TotalUsage.CompletionTokens != 15 { + t.Errorf("Expected total completion tokens 15, got %d", execution.TotalUsage.CompletionTokens) + } +} + +func TestProgram_ExecutionTrace_PartialFailure(t *testing.T) { + t.Parallel() + + module1 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"step1": "done"}), nil + }, + SignatureValue: core.NewSignature("Module1"), + } + + module2 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return nil, errors.New("module2 failed") + }, + SignatureValue: core.NewSignature("Module2"), + } + + module3 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"step3": "never reached"}), nil + }, + SignatureValue: core.NewSignature("Module3"), + } + + program := NewProgram("test-failure").AddModule(module1).AddModule(module2).AddModule(module3) + + _, err := program.Forward(context.Background(), map[string]interface{}{}) + if err == nil { + t.Fatal("Forward() should fail") + } + + execution := program.GetExecution() + if execution == nil { + t.Fatal("GetExecution() should return execution trace") + } + + if execution.Status != ExecutionStatusFailed { + t.Errorf("Expected status %s, got %s", ExecutionStatusFailed, execution.Status) + } + + // Check step statuses + if execution.Steps[0].Status != StepStatusCompleted { + t.Error("Step1 should be completed") + } + if execution.Steps[1].Status != StepStatusFailed { + t.Error("Step2 should be failed") + } + if execution.Steps[2].Status != StepStatusSkipped { + t.Error("Step3 should be skipped") + } +} + +func TestProgram_GetMetrics(t *testing.T) { + t.Parallel() + + module1 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"step1": "done"}), nil + }, + SignatureValue: core.NewSignature("Module1"), + } + + program := NewProgram("test-metrics").AddModule(module1) + + // Should return nil before execution + metrics := program.GetMetrics() + if metrics != nil { + t.Error("GetMetrics() should return nil before execution") + } + + _, err := program.Forward(context.Background(), map[string]interface{}{}) + if err != nil { + t.Fatalf("Forward() error = %v", err) + } + + metrics = program.GetMetrics() + if metrics == nil { + t.Fatal("GetMetrics() should return metrics after execution") + } + + if metrics.TotalSteps != 1 { + t.Errorf("Expected total steps 1, got %d", metrics.TotalSteps) + } + if metrics.CompletedSteps != 1 { + t.Errorf("Expected completed steps 1, got %d", metrics.CompletedSteps) + } +} + +func TestProgram_LastPredictionPassthrough(t *testing.T) { + t.Parallel() + + // Create modules with different outputs + module1 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"intermediate": "value1"}), nil + }, + SignatureValue: core.NewSignature("Module1"), + } + + module2 := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return core.NewPrediction(map[string]interface{}{"final": "value2"}). + WithRationale("final rationale"), nil + }, + SignatureValue: core.NewSignature("Module2"), + } + + program := NewProgram("test-passthrough").AddModule(module1).AddModule(module2) + + result, err := program.Forward(context.Background(), map[string]interface{}{}) + if err != nil { + t.Fatalf("Forward() error = %v", err) + } + + // Should return LAST module's outputs, not merged + if len(result.Outputs) != 1 { + t.Errorf("Expected 1 output from last module, got %d", len(result.Outputs)) + } + + if result.Outputs["final"] != "value2" { + t.Error("Should contain final module's output") + } + + if result.Outputs["intermediate"] == "value1" { + t.Error("Should NOT contain intermediate module's output (no synthetic merge)") + } + + if result.Rationale != "final rationale" { + t.Error("Should preserve rationale from last module") + } + + if result.ModuleName != "test-passthrough" { + t.Error("Should set module name to program name") + } +} + +func TestProgram_CompletionsPassthrough_Interface(t *testing.T) { + t.Parallel() + + // Mock module that produces completions + producer := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + completions := []map[string]interface{}{ + {"answer": "answer1", "rationale": "rationale1"}, + {"answer": "answer2", "rationale": "rationale2"}, + } + return core.NewPrediction(map[string]interface{}{"output": "from producer"}). + WithCompletions(completions), nil + }, + SignatureValue: core.NewSignature("Producer"), + } + + // Create MultiChainComparison to consume completions + sig := core.NewSignature("Consumer"). + AddInput("text", core.FieldTypeString, "Text input"). + AddOutput("summary", core.FieldTypeString, "Summary") + + consumer := NewMultiChainComparison(sig, nil, 2) // LM can be nil for test + + program := NewProgram("test-completions").AddModule(producer).AddModule(consumer) + + // Verify consumer requires completions + if !consumer.RequiresCompletions() { + t.Error("MultiChainComparison should require completions") + } + + // Test buildNextInputs logic + producerResult := core.NewPrediction(map[string]interface{}{"output": "test"}). + WithCompletions([]map[string]interface{}{{"answer": "test"}}) + + nextInputs := program.buildNextInputs(map[string]interface{}{"text": "input"}, producerResult, 0) + + if nextInputs["completions"] == nil { + t.Error("Should pass completions to next module that requires them") + } +} + +func TestProgram_NilInputs(t *testing.T) { + t.Parallel() + + module := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + // Should receive non-nil map even if input was nil + if inputs == nil { + t.Error("Module should receive non-nil inputs map") + } + return core.NewPrediction(map[string]interface{}{"output": "ok"}), nil + }, + SignatureValue: core.NewSignature("Module"), + } + + program := NewProgram("test-nil").AddModule(module) + + _, err := program.Forward(context.Background(), nil) + if err != nil { + t.Fatalf("Forward() should handle nil inputs, got error: %v", err) + } +} + +func TestProgram_ConcurrentGetExecution(t *testing.T) { + t.Parallel() + + module := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + // Simulate some work + return core.NewPrediction(map[string]interface{}{"output": "done"}), nil + }, + SignatureValue: core.NewSignature("Module"), + } + + program := NewProgram("test-concurrent").AddModule(module) + + // Start execution in goroutine + done := make(chan error, 1) + go func() { + _, err := program.Forward(context.Background(), map[string]interface{}{}) + done <- err + }() + + // Concurrently access execution trace + execution := program.GetExecution() + // Should not race - execution may be nil or in-progress, but shouldn't crash + _ = execution + + err := <-done + if err != nil { + t.Fatalf("Forward() error = %v", err) + } + + // After completion, should be able to access execution trace + execution = program.GetExecution() + if execution == nil { + t.Error("GetExecution() should return trace after completion") + } +} From a4562c65bd0d056e1ca9b4bf21f3f69cc63e88f6 Mon Sep 17 00:00:00 2001 From: assagman Date: Thu, 18 Dec 2025 01:48:38 +0300 Subject: [PATCH 6/7] fix(modelcatalog): update Gemini 3 Flash pricing Signed-off-by: assagman --- internal/modelcatalog/default_models.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/modelcatalog/default_models.go b/internal/modelcatalog/default_models.go index 6498585..39a2b58 100644 --- a/internal/modelcatalog/default_models.go +++ b/internal/modelcatalog/default_models.go @@ -637,7 +637,7 @@ func init() { }, { ID: "openrouter/google/gemini-3-flash-preview", - Pricing: Pricing{PromptPrice: 0.15, CompletionPrice: 0.6, CacheReadPrice: 0.0375, CacheWritePrice: 0}, + Pricing: Pricing{PromptPrice: 0.5, CompletionPrice: 3, CacheReadPrice: 0.05, CacheWritePrice: 0}, Limits: Limits{ContextTokens: 1048576, OutputTokens: 65536}, Capabilities: Capabilities{Attachment: true, Reasoning: true, ToolCall: true, StructuredOutput: false, Temperature: true}, Modalities: Modalities{Input: []string{"text"}, Output: []string{"text"}}, From a198ee46b8eaca73b07a365091d40fd061534fda Mon Sep 17 00:00:00 2001 From: assagman Date: Thu, 18 Dec 2025 01:52:05 +0300 Subject: [PATCH 7/7] feat(program): add execution tracing, retention and step-level helpers - Add `ForwardWithTrace` + `ProgramResult` to return both final prediction and full execution trace - Track executions by unique `ExecutionID`, retain last N runs, and expose lookup/getters for step outputs - Validate module outputs against signatures and avoid mutating stored traces via deep-copy helpers - Re-export tracing types/constants in `dsgo` and add `examples/program_tracing` + coverage tests Signed-off-by: assagman --- dsgo.go | 25 ++ examples/program_tracing/README.md | 108 ++++++ examples/program_tracing/go.mod | 20 + examples/program_tracing/go.sum | 12 + examples/program_tracing/main.go | 240 ++++++++++++ internal/core/cache.go | 20 +- internal/core/cache_test.go | 10 +- internal/core/lm.go | 2 +- internal/core/signature.go | 19 + internal/module/program.go | 486 ++++++++++++++++++++++-- internal/module/program_tracing_test.go | 415 ++++++++++++++++++++ 11 files changed, 1304 insertions(+), 53 deletions(-) create mode 100644 examples/program_tracing/README.md create mode 100644 examples/program_tracing/go.mod create mode 100644 examples/program_tracing/go.sum create mode 100644 examples/program_tracing/main.go create mode 100644 internal/module/program_tracing_test.go diff --git a/dsgo.go b/dsgo.go index cfa8985..e0871fa 100644 --- a/dsgo.go +++ b/dsgo.go @@ -97,6 +97,17 @@ type ( ScoringFunction = module.ScoringFunction StreamResult = module.StreamResult MultiChainComparison = module.MultiChainComparison + + // Program tracing and validation types + ExecutionID = module.ExecutionID + StepStatus = module.StepStatus + ExecutionStatus = module.ExecutionStatus + StepExecution = module.StepExecution + ProgramExecution = module.ProgramExecution + ProgramMetrics = module.ProgramMetrics + ProgramResult = module.ProgramResult + SignatureMismatch = module.SignatureMismatch + CompletionsConsumer = module.CompletionsConsumer ) // Re-export logging types @@ -191,6 +202,8 @@ var ( GenerateCacheKey = core.GenerateCacheKey GenerateCacheKeyWithIgnored = core.GenerateCacheKeyWithIgnored MarkCacheHit = core.MarkCacheHit + DeepCopyMap = core.DeepCopyMap + DeepCopySlice = core.DeepCopySlice NewFallbackAdapter = core.NewFallbackAdapter NewFallbackAdapterWithChain = core.NewFallbackAdapterWithChain NewJSONAdapter = core.NewJSONAdapter @@ -400,4 +413,16 @@ const ( MCPErrCodeMethodNotFound = mcp.ErrCodeMethodNotFound MCPErrCodeInvalidParams = mcp.ErrCodeInvalidParams MCPErrCodeInternalError = mcp.ErrCodeInternalError + + // Program tracing and validation constants + StepStatusPending = module.StepStatusPending + StepStatusRunning = module.StepStatusRunning + StepStatusCompleted = module.StepStatusCompleted + StepStatusFailed = module.StepStatusFailed + StepStatusSkipped = module.StepStatusSkipped + + ExecutionStatusPending = module.ExecutionStatusPending + ExecutionStatusRunning = module.ExecutionStatusRunning + ExecutionStatusCompleted = module.ExecutionStatusCompleted + ExecutionStatusFailed = module.ExecutionStatusFailed ) diff --git a/examples/program_tracing/README.md b/examples/program_tracing/README.md new file mode 100644 index 0000000..3381ee9 --- /dev/null +++ b/examples/program_tracing/README.md @@ -0,0 +1,108 @@ +# Program Tracing Examples + +This document demonstrates how to use DSGo's program tracing capabilities for debugging, observability, and progress reporting. + +## Key Use Cases + +### 1. Debugging Pipeline Issues +- Access intermediate results to identify where problems occur +- Check input/output compatibility between steps +- View detailed error messages and timing information + +### 2. Observability and Monitoring +- Track execution progress in real-time +- Monitor resource usage (tokens, cost, latency) +- Collect metrics for performance analysis + +### 3. UI/CLI Progress Reporting +- Show step-by-step progress to users +- Display estimated completion times +- Provide detailed status updates + +### 4. Pipeline Optimization +- Identify bottlenecks with per-step timing +- Analyze cost distribution across steps +- Compare performance between different configurations + +### 5. Testing and Validation +- Verify intermediate outputs meet expectations +- Test error handling and recovery +- Validate data flow through the pipeline + +## Running the Examples + +```bash +cd examples/program_tracing +go run tracing_examples.go +``` + +This will run through several examples showing: +1. **Basic Execution Tracing**: How to access execution traces and metrics +2. **Intermediate Results**: How to retrieve step-by-step outputs even though `Forward()` only returns the final result +3. **Error Debugging**: How to use execution traces to diagnose pipeline failures +4. **Metrics Collection**: How to collect and analyze performance metrics + +## Key APIs Demonstrated + +### Accessing Execution Traces +```go +execution := program.GetExecution() +metrics := execution.Metrics() +``` + +### Intermediate Results Access +```go +for i, step := range execution.Steps { + if step.Status == dsgo.StepStatusCompleted { + outputs := step.Prediction.Outputs + duration := step.Duration + // Process intermediate results + } +} +``` + +### Error Diagnosis +```go +if execution.Status == dsgo.ExecutionStatusFailed { + for i, step := range execution.Steps { + if step.Status == dsgo.StepStatusFailed { + fmt.Printf("Step %d failed: %v\n", i, step.Error) + fmt.Printf("Inputs: %v\n", step.Inputs) + } + } +} +``` + +### Performance Metrics +```go +metrics := execution.Metrics() +fmt.Printf("Total duration: %v\n", metrics.TotalDuration) +fmt.Printf("Slowest step: %d (%v)\n", metrics.SlowestStepIndex, metrics.SlowestStep) +fmt.Printf("Total cost: $%.6f\n", metrics.TotalUsage.Cost) +``` + +## Understanding the Execution Model + +DSGo's `Program.Forward()` method always returns only the **last module's prediction** for backward compatibility. However, the complete execution trace containing all intermediate results is available via `program.GetExecution()`. + +This design allows: +- **Simple API**: Most use cases only need the final result +- **Full observability**: All intermediate data is accessible when needed +- **Performance**: No overhead for users who don't need tracing +- **Debugging**: Complete visibility into pipeline execution + +The execution trace includes: +- Per-step timing information +- Input/output data for each step +- Error information and stack traces +- Aggregated usage metrics (tokens, cost, latency) +- Overall execution status and duration + +## Integration with Observability Systems + +The tracing data can be easily integrated with: +- **Logging systems**: Structured logging of execution metrics +- **Monitoring platforms**: Export metrics to Prometheus, etc. +- **Progress bars**: Real-time UI updates during long executions +- **Debugging tools**: Step-by-step pipeline analysis +- **Cost tracking**: Detailed cost breakdown per step diff --git a/examples/program_tracing/go.mod b/examples/program_tracing/go.mod new file mode 100644 index 0000000..7306bde --- /dev/null +++ b/examples/program_tracing/go.mod @@ -0,0 +1,20 @@ +module github.com/assagman/dsgo/examples/program_tracing + +go 1.25 + +require ( + github.com/assagman/dsgo v0.0.0 + github.com/assagman/dsgo/examples/shared v0.0.0 +) + +require ( + github.com/openai/openai-go/v3 v3.13.0 // indirect + github.com/tidwall/gjson v1.18.0 // indirect + github.com/tidwall/match v1.1.1 // indirect + github.com/tidwall/pretty v1.2.1 // indirect + github.com/tidwall/sjson v1.2.5 // indirect +) + +replace github.com/assagman/dsgo => ../.. + +replace github.com/assagman/dsgo/examples/shared => ../shared diff --git a/examples/program_tracing/go.sum b/examples/program_tracing/go.sum new file mode 100644 index 0000000..cb0b239 --- /dev/null +++ b/examples/program_tracing/go.sum @@ -0,0 +1,12 @@ +github.com/openai/openai-go/v3 v3.13.0 h1:arSFmVHcBHNVYG5iqspPJrLoin0Qqn2JcCLWWcTcM1Q= +github.com/openai/openai-go/v3 v3.13.0/go.mod h1:cdufnVK14cWcT9qA1rRtrXx4FTRsgbDPW7Ia7SS5cZo= +github.com/tidwall/gjson v1.14.2/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/gjson v1.18.0 h1:FIDeeyB800efLX89e5a8Y0BNH+LOngJyGrIWxG2FKQY= +github.com/tidwall/gjson v1.18.0/go.mod h1:/wbyibRr2FHMks5tjHJ5F8dMZh3AcwJEMf5vlfC0lxk= +github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA= +github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM= +github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= +github.com/tidwall/sjson v1.2.5/go.mod h1:Fvgq9kS/6ociJEDnK0Fk1cpYF4FIW6ZF7LAe+6jwd28= diff --git a/examples/program_tracing/main.go b/examples/program_tracing/main.go new file mode 100644 index 0000000..6833c91 --- /dev/null +++ b/examples/program_tracing/main.go @@ -0,0 +1,240 @@ +package main + +import ( + "context" + "fmt" + "os" + "strings" + "time" + + "github.com/assagman/dsgo" + "github.com/assagman/dsgo/examples/shared/tools" +) + +func main() { + fmt.Println("=== DSGo Program Tracing Demo ===") + fmt.Println("Pipeline: Task Understanding → Codebase Analysis → Implementation Plan") + fmt.Println() + + // Check for API key + if os.Getenv("OPENROUTER_API_KEY") == "" && os.Getenv("OPENAI_API_KEY") == "" { + fmt.Println("⚠️ No API key found. Running in demo mode (showing configuration only).") + demonstrateConfiguration() + return + } + + // Run the actual pipeline + if err := runPipeline(); err != nil { + fmt.Printf("❌ Pipeline failed: %v\n", err) + os.Exit(1) + } +} + +func runPipeline() error { + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Minute) + defer cancel() + + // Initialize LM + modelName := getModelName() + lm, err := dsgo.NewLM(ctx, modelName) + if err != nil { + return fmt.Errorf("failed to create LM: %w", err) + } + fmt.Printf("✓ Using model: %s\n\n", modelName) + + // Get filesystem tools for codebase exploration + fsTools := tools.GetAllFilesystemTools() + fmt.Printf("✓ Loaded %d filesystem tools for codebase analysis\n", len(fsTools)) + + // Create signatures for each step + + // Step 1: Task Understanding with filesystem access to explore the codebase + taskSig := dsgo.NewSignature(`Understand a task and determine its scope by exploring the codebase. +Use the filesystem tools to: +1. List files at the project root to understand the structure +2. Read key files like README.md, go.mod to understand the project +3. Identify relevant directories and files for the task`). + AddInput("Task", dsgo.FieldTypeString, "The task to analyze and plan for"). + AddOutput("Understanding", dsgo.FieldTypeString, "Clear understanding of what needs to be done based on codebase exploration"). + AddOutput("Scope", dsgo.FieldTypeString, "Scope and boundaries of the task with specific file/directory references") + + // Step 2: Codebase Analysis with filesystem access for deep exploration + analysisSig := dsgo.NewSignature(`Analyze the codebase based on task understanding. +Use the filesystem tools to: +1. Navigate to relevant directories identified in the previous step +2. Read source files to understand implementation patterns +3. Search for related code using patterns +Provide concrete findings with file paths and code references.`). + AddInput("Understanding", dsgo.FieldTypeString, "Understanding of the task from previous step"). + AddInput("Scope", dsgo.FieldTypeString, "Scope from previous step"). + AddOutput("KeyFindings", dsgo.FieldTypeString, "Key findings from analyzing the codebase with specific file references"). + AddOutput("Considerations", dsgo.FieldTypeString, "Important considerations for implementation based on existing code patterns") + + // Step 3: Implementation Plan (simple Predict - synthesizes findings without needing filesystem) + planSig := dsgo.NewSignature("Create implementation plan based on analysis. No setup and git actions"). + AddInput("Understanding", dsgo.FieldTypeString, "Understanding from task step"). + AddInput("KeyFindings", dsgo.FieldTypeString, "Key findings from analysis step"). + AddInput("Considerations", dsgo.FieldTypeString, "Considerations from analysis step"). + AddOutput("Plan", dsgo.FieldTypeString, "Step-by-step implementation plan with specific file paths"). + AddOutput("Risks", dsgo.FieldTypeString, "Potential risks and mitigations"). + AddOutput("EstimatedTime", dsgo.FieldTypeString, "Estimated time to complete") + + // Create modules: ReAct for task & analysis (with tools), Predict for plan (synthesis only) + taskModule := dsgo.NewReAct(taskSig, lm, fsTools). + WithMaxIterations(8). + WithVerbose(true) + analysisModule := dsgo.NewReAct(analysisSig, lm, fsTools). + WithMaxIterations(10). + WithVerbose(true) + planModule := dsgo.NewPredict(planSig, lm) + + // Build the program with tracing configuration + program := dsgo.NewProgram("Task-Analysis-Plan Pipeline"). + WithVerbose(true). + WithInputs([]string{"Task"}). + WithExecutionRetention(5). + AddModule(taskModule). + AddModule(analysisModule). + AddModule(planModule) + + fmt.Printf("✓ Created program: %s (modules: %d)\n", program.Name(), program.ModuleCount()) + + // Execute with trace + inputs := map[string]any{ + "Task": "improve parallel.go", + } + + fmt.Println("\n--- Executing Pipeline ---") + result, err := program.ForwardWithTrace(ctx, inputs) + if err != nil { + return fmt.Errorf("pipeline execution failed: %w", err) + } + + plan, exists := result.Prediction.Get("Plan") + if !exists { + return fmt.Errorf("no plan output") + } + fmt.Println("result.Prediction - Plan: \n", plan) + + // Display execution trace + displayExecutionTrace(result) + + // Demonstrate helper APIs + demonstrateHelperAPIs(program, result.ExecutionID) + + return nil +} + +func displayExecutionTrace(result *dsgo.ProgramResult) { + fmt.Println("\n" + strings.Repeat("=", 60)) + fmt.Println("📊 EXECUTION TRACE") + fmt.Println(strings.Repeat("=", 60)) + + exec := result.Execution + fmt.Printf("Execution ID: %s\n", exec.ID) + fmt.Printf("Status: %s\n", exec.Status) + fmt.Printf("Duration: %v\n", exec.Duration) + fmt.Printf("Total Tokens: %d\n", exec.TotalUsage.TotalTokens) + fmt.Printf("Total Cost: $%.6f\n", exec.TotalUsage.Cost) + + fmt.Println("\n--- Steps ---") + for i, step := range exec.Steps { + fmt.Printf("\nStep %d: %s\n", i+1, step.ModuleName) + fmt.Printf(" Status: %s\n", step.Status) + fmt.Printf(" Duration: %v\n", step.Duration) + if step.Prediction != nil { + fmt.Printf(" Output keys: %v\n", getKeys(step.Prediction.Outputs)) + } + if step.Error != nil { + fmt.Printf(" Error: %v\n", step.Error) + } + } + + // Display metrics + metrics := exec.Metrics() + fmt.Println("\n--- Metrics ---") + fmt.Printf("Total Steps: %d\n", metrics.TotalSteps) + fmt.Printf("Completed: %d\n", metrics.CompletedSteps) + fmt.Printf("Slowest Step: %d (%.2fs)\n", metrics.SlowestStepIndex, metrics.SlowestStep.Seconds()) +} + +func demonstrateHelperAPIs(program *dsgo.Program, execID dsgo.ExecutionID) { + fmt.Println("\n" + strings.Repeat("=", 60)) + fmt.Println("🔧 HELPER API DEMONSTRATION") + fmt.Println(strings.Repeat("=", 60)) + + // GetLastExecutionID + lastID := program.GetLastExecutionID() + fmt.Printf("\nGetLastExecutionID(): %s\n", lastID) + + // GetAllExecutionIDs + allIDs := program.GetAllExecutionIDs() + fmt.Printf("GetAllExecutionIDs(): %d execution(s)\n", len(allIDs)) + + // GetStepPrediction - get task understanding from step 0 + pred, err := program.GetStepPrediction(execID, 0) + if err != nil { + fmt.Printf("GetStepPrediction error: %v\n", err) + } else { + fmt.Printf("\nGetStepPrediction(execID, 0) - Task Understanding:\n") + if understanding, ok := pred.Outputs["Understanding"]; ok { + fmt.Printf(" Understanding: %s...\n", understanding) + } + } + + // GetStepOutput - get specific output + plan, err := program.GetStepOutput(execID, 2, "Plan") + if err != nil { + fmt.Printf("GetStepOutput error: %v\n", err) + } else { + fmt.Printf("\nGetStepOutput(execID, 2, \"Plan\") - Implementation Plan:\n") + fmt.Printf(" Plan: %s...\n", plan) + } + + // GetLastStepOutput - convenience method + risks, err := program.GetLastStepOutput(2, "Risks") + if err != nil { + fmt.Printf("GetLastStepOutput error: %v\n", err) + } else { + fmt.Printf("\nGetLastStepOutput(2, \"Risks\"):\n") + fmt.Printf(" Risks: %s...\n", risks) + } +} + +func demonstrateConfiguration() { + fmt.Println("\n--- Configuration Demo (no LM) ---") + + // Show how to configure the program + program := dsgo.NewProgram("Demo Pipeline"). + WithVerbose(true). + WithInputs([]string{"Task"}). + WithExecutionRetention(5) + + fmt.Printf("✓ Program: %s\n", program.Name()) + fmt.Printf("✓ Module count: %d\n", program.ModuleCount()) + + fmt.Println("\nAvailable APIs:") + fmt.Println(" • ForwardWithTrace(ctx, inputs) - Execute and get trace") + fmt.Println(" • GetExecution() - Get last execution trace") + fmt.Println(" • GetExecutionByID(id) - Get execution by ID") + fmt.Println(" • GetAllExecutionIDs() - List retained executions") + fmt.Println(" • GetStepPrediction(id, index) - Get step prediction") + fmt.Println(" • GetStepOutput(id, index, key) - Get specific output") + fmt.Println(" • GetLastStepPrediction(index) - Convenience method") + fmt.Println(" • GetLastStepOutput(index, key) - Convenience method") +} + +func getModelName() string { + if model := os.Getenv("EXAMPLES_DEFAULT_MODEL"); model != "" { + return model + } + return "openrouter/google/gemini-2.0-flash-001" +} + +func getKeys(m map[string]any) []string { + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} diff --git a/internal/core/cache.go b/internal/core/cache.go index d9b4a2d..9ea9a55 100644 --- a/internal/core/cache.go +++ b/internal/core/cache.go @@ -543,21 +543,21 @@ func deepCopyResult(r *GenerateResult) *GenerateResult { } // Deep copy Arguments map if tc.Arguments != nil { - result.ToolCalls[i].Arguments = deepCopyMap(tc.Arguments) + result.ToolCalls[i].Arguments = DeepCopyMap(tc.Arguments) } } } // Deep copy Metadata map if r.Metadata != nil { - result.Metadata = deepCopyMap(r.Metadata) + result.Metadata = DeepCopyMap(r.Metadata) } return result } -// deepCopyMap creates a deep copy of a map[string]any -func deepCopyMap(m map[string]any) map[string]any { +// DeepCopyMap creates a deep copy of a map[string]any +func DeepCopyMap(m map[string]any) map[string]any { if m == nil { return nil } @@ -566,9 +566,9 @@ func deepCopyMap(m map[string]any) map[string]any { for k, v := range m { switch val := v.(type) { case map[string]any: - result[k] = deepCopyMap(val) + result[k] = DeepCopyMap(val) case []any: - result[k] = deepCopySlice(val) + result[k] = DeepCopySlice(val) default: result[k] = val } @@ -576,8 +576,8 @@ func deepCopyMap(m map[string]any) map[string]any { return result } -// deepCopySlice creates a deep copy of a []any slice -func deepCopySlice(s []any) []any { +// DeepCopySlice creates a deep copy of a []any slice +func DeepCopySlice(s []any) []any { if s == nil { return nil } @@ -586,9 +586,9 @@ func deepCopySlice(s []any) []any { for i, v := range s { switch val := v.(type) { case map[string]any: - result[i] = deepCopyMap(val) + result[i] = DeepCopyMap(val) case []any: - result[i] = deepCopySlice(val) + result[i] = DeepCopySlice(val) default: result[i] = val } diff --git a/internal/core/cache_test.go b/internal/core/cache_test.go index 012a10a..e914488 100644 --- a/internal/core/cache_test.go +++ b/internal/core/cache_test.go @@ -856,17 +856,17 @@ func TestCanonicalizeMap_NestedMaps(t *testing.T) { } } -// TestDeepCopyMap_NilInput tests nil handling in deepCopyMap +// TestDeepCopyMap_NilInput tests nil handling in DeepCopyMap func TestDeepCopyMap_NilInput(t *testing.T) { - result := deepCopyMap(nil) + result := DeepCopyMap(nil) if result != nil { t.Error("Expected nil result for nil input") } } -// TestDeepCopySlice_NilInput tests nil handling in deepCopySlice +// TestDeepCopySlice_NilInput tests nil handling in DeepCopySlice func TestDeepCopySlice_NilInput(t *testing.T) { - result := deepCopySlice(nil) + result := DeepCopySlice(nil) if result != nil { t.Error("Expected nil result for nil input") } @@ -882,7 +882,7 @@ func TestDeepCopySlice_ComplexTypes(t *testing.T) { []any{"nested", "slice"}, } - copied := deepCopySlice(slice) + copied := DeepCopySlice(slice) if copied == nil { t.Fatal("Expected non-nil result") } diff --git a/internal/core/lm.go b/internal/core/lm.go index 7dbace4..76add97 100644 --- a/internal/core/lm.go +++ b/internal/core/lm.go @@ -169,7 +169,7 @@ func (o *GenerateOptions) Copy() *GenerateOptions { StreamCallback: o.StreamCallback, // Copy reference (function pointer) FrequencyPenalty: o.FrequencyPenalty, PresencePenalty: o.PresencePenalty, - ProviderParams: deepCopyMap(o.ProviderParams), // Deep copy provider params + ProviderParams: DeepCopyMap(o.ProviderParams), // Deep copy provider params } // Copy slices diff --git a/internal/core/signature.go b/internal/core/signature.go index 0efaac3..9319ef6 100644 --- a/internal/core/signature.go +++ b/internal/core/signature.go @@ -143,6 +143,25 @@ func (d *ValidationDiagnostics) HasErrors() bool { return len(d.MissingFields) > 0 || len(d.TypeErrors) > 0 || len(d.ClassErrors) > 0 } +// Clone returns a deep copy of ValidationDiagnostics +func (d *ValidationDiagnostics) Clone() *ValidationDiagnostics { + if d == nil { + return nil + } + copy := &ValidationDiagnostics{ + MissingFields: append([]string(nil), d.MissingFields...), + TypeErrors: make(map[string]error, len(d.TypeErrors)), + ClassErrors: make(map[string]error, len(d.ClassErrors)), + } + for k, v := range d.TypeErrors { + copy.TypeErrors[k] = v + } + for k, v := range d.ClassErrors { + copy.ClassErrors[k] = v + } + return copy +} + // ValidateOutputs validates that all required outputs are present and of correct type func (s *Signature) ValidateOutputs(outputs map[string]any) error { for _, field := range s.OutputFields { diff --git a/internal/module/program.go b/internal/module/program.go index c7b5a48..3224228 100644 --- a/internal/module/program.go +++ b/internal/module/program.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "sync" + "sync/atomic" "time" "github.com/assagman/dsgo/internal/core" @@ -43,8 +44,12 @@ type StepExecution struct { Inputs map[string]any `json:"inputs"` } +// ExecutionID uniquely identifies a program execution +type ExecutionID string + // ProgramExecution contains complete execution trace type ProgramExecution struct { + ID ExecutionID `json:"id"` ProgramName string `json:"program_name"` Steps []StepExecution `json:"steps"` Status ExecutionStatus `json:"status"` @@ -95,6 +100,13 @@ type CompletionsConsumer interface { RequiresCompletions() bool } +// ProgramResult contains both prediction and execution trace +type ProgramResult struct { + Prediction *core.Prediction `json:"prediction"` + Execution *ProgramExecution `json:"execution"` + ExecutionID ExecutionID `json:"execution_id"` +} + // SignatureMismatch describes a signature compatibility error type SignatureMismatch struct { ModuleIndex int `json:"module_index"` @@ -114,20 +126,58 @@ func (e *SignatureMismatch) Error() string { type Program struct { modules []core.Module name string + verbose bool // Enable debug logging + + // Baseline inputs for validation + baselineInputs []string // Execution state (protected by mutex for concurrent access) - mu sync.RWMutex - lastExecution *ProgramExecution + mu sync.RWMutex + lastExecution *ProgramExecution + executionStore map[ExecutionID]*ProgramExecution // Store by ID + executionOrder []ExecutionID // Track insertion order for O(1) trim + retentionSize int // Maximum number of executions to retain + nextExecutionID uint64 // For generating unique IDs } // NewProgram creates a new program func NewProgram(name string) *Program { return &Program{ - name: name, - modules: []core.Module{}, + name: name, + modules: []core.Module{}, + executionStore: make(map[ExecutionID]*ProgramExecution), + executionOrder: []ExecutionID{}, + retentionSize: 10, // Keep last 10 executions by default } } +// WithExecutionRetention sets the maximum number of executions to retain +func (p *Program) WithExecutionRetention(size int) *Program { + if size < 0 { + size = 0 + } + p.retentionSize = size + return p +} + +// generateExecutionID creates a unique execution ID +func (p *Program) generateExecutionID() ExecutionID { + id := atomic.AddUint64(&p.nextExecutionID, 1) + return ExecutionID(fmt.Sprintf("%s-%d-%d", p.name, time.Now().UnixNano(), id)) +} + +// trimExecutionStore removes oldest executions to maintain retention size +func (p *Program) trimExecutionStore() { + if len(p.executionStore) <= p.retentionSize { + return + } + + // Optimize to O(1) using executionOrder + oldestID := p.executionOrder[0] + delete(p.executionStore, oldestID) + p.executionOrder = p.executionOrder[1:] +} + // ValidateSignatures checks that all module signatures are compatible. // Returns nil if valid, or SignatureMismatch error with details. func (p *Program) ValidateSignatures(programInputs []string) error { @@ -178,10 +228,32 @@ func (p *Program) AddModule(module core.Module) *Program { return p } -// AddModuleValidated adds a module and validates signature compatibility +// WithVerbose enables debug logging for program execution +func (p *Program) WithVerbose(verbose bool) *Program { + p.verbose = verbose + return p +} + +// WithInputs sets baseline inputs for signature validation +func (p *Program) WithInputs(inputs []string) *Program { + p.baselineInputs = inputs + return p +} + +// AddModuleValidated adds a module and validates signature compatibility. +// Uses baseline inputs if programInputs is nil. +// Returns an error if both programInputs and baseline inputs are nil. func (p *Program) AddModuleValidated(module core.Module, programInputs []string) error { + inputs := programInputs + if inputs == nil { + inputs = p.baselineInputs + } + if inputs == nil { + return fmt.Errorf("programInputs required: pass explicitly or set via WithInputs()") + } + p.modules = append(p.modules, module) - if err := p.ValidateSignatures(programInputs); err != nil { + if err := p.ValidateSignatures(inputs); err != nil { // Rollback p.modules = p.modules[:len(p.modules)-1] return err @@ -199,24 +271,50 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre inputs = make(map[string]any) } - // Initialize execution trace + // Generate execution ID and initialize trace + executionID := p.generateExecutionID() execution := &ProgramExecution{ + ID: executionID, ProgramName: p.name, Steps: make([]StepExecution, len(p.modules)), Status: ExecutionStatusRunning, StartTime: time.Now(), } - // Store execution reference + // Initialize all steps as pending + for i := range execution.Steps { + execution.Steps[i] = StepExecution{ + Index: i, + Status: StepStatusPending, + } + } + + // Store execution reference with retention policy p.mu.Lock() p.lastExecution = execution + p.executionStore[executionID] = execution + p.executionOrder = append(p.executionOrder, executionID) + + // Apply retention policy + if len(p.executionStore) > p.retentionSize { + p.trimExecutionStore() + } p.mu.Unlock() + if p.verbose { + logging.GetLogger().Info(ctx, "Program execution started", map[string]any{ + "program_name": p.name, + "module_count": len(p.modules), + "input_keys": getMapKeys(inputs), + }) + } + startTime := time.Now() logging.LogPredictionStart(ctx, logging.ModuleProgram, p.name) var predErr error defer func() { + p.mu.Lock() execution.Duration = time.Since(startTime) if predErr != nil { execution.Status = ExecutionStatusFailed @@ -224,16 +322,59 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre } else { execution.Status = ExecutionStatusCompleted } + p.mu.Unlock() + + if p.verbose { + if predErr != nil { + logging.GetLogger().Error(ctx, "Program execution failed", map[string]any{ + "program_name": p.name, + "duration": execution.Duration, + "error": predErr.Error(), + "completed_steps": func() int { + count := 0 + p.mu.RLock() + for _, step := range execution.Steps { + if step.Status == StepStatusCompleted { + count++ + } + } + p.mu.RUnlock() + return count + }(), + "total_steps": len(p.modules), + }) + } else { + p.mu.RLock() + totalTokens := execution.TotalUsage.TotalTokens + totalCost := execution.TotalUsage.Cost + p.mu.RUnlock() + logging.GetLogger().Info(ctx, "Program execution completed", map[string]any{ + "program_name": p.name, + "duration": execution.Duration, + "total_steps": len(p.modules), + "total_tokens": totalTokens, + "total_cost": totalCost, + }) + } + } logging.LogPredictionEnd(ctx, logging.ModuleProgram, time.Since(startTime), predErr) }() if len(p.modules) == 0 { + if p.verbose { + logging.GetLogger().Error(ctx, "Program has no modules", nil) + } predErr = fmt.Errorf("program has no modules") return nil, predErr } // Check context cancellation before starting if err := ctx.Err(); err != nil { + if p.verbose { + logging.GetLogger().Warn(ctx, "Program cancelled before execution", map[string]any{ + "error": err.Error(), + }) + } predErr = fmt.Errorf("program cancelled before execution: %w", err) return nil, predErr } @@ -242,35 +383,95 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre var lastPrediction *core.Prediction for i, module := range p.modules { - // Initialize step + // Initialize step (protected) + moduleName := p.getModuleName(module, i) + stepStartTime := time.Now() + inputsCopy := copyMap(currentInputs) + + p.mu.Lock() execution.Steps[i] = StepExecution{ Index: i, - ModuleName: p.getModuleName(module, i), + ModuleName: moduleName, Status: StepStatusRunning, - StartTime: time.Now(), - Inputs: copyMap(currentInputs), + StartTime: stepStartTime, + Inputs: inputsCopy, + } + p.mu.Unlock() + + if p.verbose { + logging.GetLogger().Info(ctx, "Program step started", map[string]any{ + "step": i + 1, + "module": moduleName, + "input_keys": getMapKeys(currentInputs), + }) + } else { + logging.GetLogger().Debug(ctx, "Program step", map[string]any{ + "step": i + 1, + "module": moduleName, + }) } - - logging.GetLogger().Debug(ctx, "Program step", map[string]any{ - "step": i + 1, - "module": execution.Steps[i].ModuleName, - }) prediction, err := module.Forward(ctx, currentInputs) - execution.Steps[i].Duration = time.Since(execution.Steps[i].StartTime) + stepDuration := time.Since(stepStartTime) if err != nil { + p.mu.Lock() + execution.Steps[i].Duration = stepDuration execution.Steps[i].Status = StepStatusFailed execution.Steps[i].Error = err // Mark remaining steps as skipped for j := i + 1; j < len(p.modules); j++ { execution.Steps[j].Status = StepStatusSkipped } - predErr = fmt.Errorf("module %d (%s) failed: %w", i, execution.Steps[i].ModuleName, err) + p.mu.Unlock() + + if p.verbose { + logging.GetLogger().Error(ctx, "Program step failed", map[string]any{ + "step": i + 1, + "module": moduleName, + "duration": stepDuration, + "error": err.Error(), + "input_keys": getMapKeys(inputsCopy), + }) + } + + predErr = fmt.Errorf("module %d (%s) failed: %w", i, moduleName, err) return nil, predErr } - // Store complete prediction (unmodified) + // Validate outputs against signature + if sig := module.GetSignature(); sig != nil { + if err := sig.ValidateOutputs(prediction.Outputs); err != nil { + stepErr := fmt.Errorf("module %d (%s) output validation failed: %w", i, moduleName, err) + + p.mu.Lock() + execution.Steps[i].Duration = stepDuration + execution.Steps[i].Status = StepStatusFailed + execution.Steps[i].Error = stepErr + // Mark remaining steps as skipped + for j := i + 1; j < len(p.modules); j++ { + execution.Steps[j].Status = StepStatusSkipped + } + p.mu.Unlock() + + if p.verbose { + logging.GetLogger().Error(ctx, "Program step output validation failed", map[string]any{ + "step": i + 1, + "module": moduleName, + "duration": stepDuration, + "error": stepErr.Error(), + "output_keys": getMapKeys(prediction.Outputs), + }) + } + + predErr = stepErr + return nil, predErr + } + } + + // Store complete prediction (protected) + p.mu.Lock() + execution.Steps[i].Duration = stepDuration execution.Steps[i].Status = StepStatusCompleted execution.Steps[i].Prediction = prediction @@ -280,19 +481,57 @@ func (p *Program) Forward(ctx context.Context, inputs map[string]any) (*core.Pre execution.TotalUsage.TotalTokens += prediction.Usage.TotalTokens execution.TotalUsage.Cost += prediction.Usage.Cost execution.TotalUsage.Latency += prediction.Usage.Latency + p.mu.Unlock() + + if p.verbose { + logging.GetLogger().Info(ctx, "Program step completed", map[string]any{ + "step": i + 1, + "module": moduleName, + "duration": stepDuration, + "input_keys": getMapKeys(inputsCopy), + "output_keys": getMapKeys(prediction.Outputs), + "prompt_tokens": prediction.Usage.PromptTokens, + "completion_tokens": prediction.Usage.CompletionTokens, + "cost": prediction.Usage.Cost, + }) + } // Build inputs for next module currentInputs = p.buildNextInputs(currentInputs, prediction, i) lastPrediction = prediction } - // Return LAST prediction directly (not synthetic merge) - // Only override usage with accumulated total - lastPrediction.Usage = execution.TotalUsage - lastPrediction.ModuleName = p.name - lastPrediction.Inputs = inputs + // Return a COPY of the last prediction to avoid mutation + // Override usage with accumulated total and set program metadata + resultPrediction := copyPrediction(lastPrediction) + p.mu.RLock() + resultPrediction.Usage = execution.TotalUsage + p.mu.RUnlock() + resultPrediction.ModuleName = p.name + resultPrediction.Inputs = copyMap(inputs) + + return resultPrediction, nil +} + +// ForwardWithTrace executes the program and returns both prediction and execution trace +func (p *Program) ForwardWithTrace(ctx context.Context, inputs map[string]any) (*ProgramResult, error) { + prediction, err := p.Forward(ctx, inputs) + if err != nil { + return nil, err + } + + execution := p.GetExecution() + if execution == nil { + return nil, fmt.Errorf("execution trace not available") + } + + result := &ProgramResult{ + Prediction: prediction, + Execution: execution, + ExecutionID: execution.ID, + } - return lastPrediction, nil + return result, nil } // buildNextInputs constructs inputs for the next module @@ -319,6 +558,18 @@ func (p *Program) buildNextInputs(current map[string]any, pred *core.Prediction, return merged } +// getMapKeys returns keys from a map for logging +func getMapKeys(m map[string]any) []string { + if m == nil { + return nil + } + keys := make([]string, 0, len(m)) + for k := range m { + keys = append(keys, k) + } + return keys +} + // getModuleName extracts a name for logging func (p *Program) getModuleName(module core.Module, index int) string { if sig := module.GetSignature(); sig != nil && sig.Description != "" { @@ -327,11 +578,102 @@ func (p *Program) getModuleName(module core.Module, index int) string { return fmt.Sprintf("module_%d", index) } -// GetExecution returns the last execution trace (thread-safe) +// GetExecution returns a copy of the last execution trace (thread-safe) func (p *Program) GetExecution() *ProgramExecution { p.mu.RLock() defer p.mu.RUnlock() - return p.lastExecution + return copyExecution(p.lastExecution) +} + +// GetExecutionByID returns execution trace by ID (thread-safe) +func (p *Program) GetExecutionByID(id ExecutionID) *ProgramExecution { + p.mu.RLock() + defer p.mu.RUnlock() + return copyExecution(p.executionStore[id]) +} + +// GetLastExecutionID returns the ID of the last execution +func (p *Program) GetLastExecutionID() ExecutionID { + p.mu.RLock() + defer p.mu.RUnlock() + if p.lastExecution == nil { + return "" + } + return p.lastExecution.ID +} + +// GetAllExecutionIDs returns all stored execution IDs (thread-safe) +func (p *Program) GetAllExecutionIDs() []ExecutionID { + p.mu.RLock() + defer p.mu.RUnlock() + + ids := make([]ExecutionID, 0, len(p.executionStore)) + for id := range p.executionStore { + ids = append(ids, id) + } + return ids +} + +// GetStepPrediction returns the prediction for a specific step (thread-safe) +func (p *Program) GetStepPrediction(executionID ExecutionID, stepIndex int) (*core.Prediction, error) { + p.mu.RLock() + defer p.mu.RUnlock() + + execution := p.executionStore[executionID] + if execution == nil { + return nil, fmt.Errorf("execution %s not found", executionID) + } + + if stepIndex < 0 || stepIndex >= len(execution.Steps) { + return nil, fmt.Errorf("step index %d out of bounds (0-%d)", stepIndex, len(execution.Steps)-1) + } + + step := execution.Steps[stepIndex] + if step.Status != StepStatusCompleted { + return nil, fmt.Errorf("step %d not completed (status: %s)", stepIndex, step.Status) + } + + return copyPrediction(step.Prediction), nil +} + +// GetStepOutput returns a specific output value from a step (thread-safe) +func (p *Program) GetStepOutput(executionID ExecutionID, stepIndex int, key string) (any, error) { + prediction, err := p.GetStepPrediction(executionID, stepIndex) + if err != nil { + return nil, err + } + + value, exists := prediction.Outputs[key] + if !exists { + return nil, fmt.Errorf("output key '%s' not found in step %d", key, stepIndex) + } + + return value, nil +} + +// GetLastStepPrediction returns prediction from last execution (convenience method) +func (p *Program) GetLastStepPrediction(stepIndex int) (*core.Prediction, error) { + executionID := p.GetLastExecutionID() + if executionID == "" { + return nil, fmt.Errorf("no executions available") + } + + return p.GetStepPrediction(executionID, stepIndex) +} + +// GetLastStepOutput returns output from last execution (convenience method) +func (p *Program) GetLastStepOutput(stepIndex int, key string) (any, error) { + prediction, err := p.GetLastStepPrediction(stepIndex) + if err != nil { + return nil, err + } + + value, exists := prediction.Outputs[key] + if !exists { + return nil, fmt.Errorf("output key '%s' not found in step %d", key, stepIndex) + } + + return value, nil } // GetMetrics returns metrics from the last execution @@ -344,16 +686,74 @@ func (p *Program) GetMetrics() *ProgramMetrics { return &metrics } -// copyMap creates a shallow copy of a map -func copyMap(m map[string]any) map[string]any { - if m == nil { +// copyExecution creates a deep copy of a program execution +func copyExecution(exec *ProgramExecution) *ProgramExecution { + if exec == nil { + return nil + } + + // Copy steps slice + stepsCopy := make([]StepExecution, len(exec.Steps)) + for i, step := range exec.Steps { + stepsCopy[i] = StepExecution{ + Index: step.Index, + ModuleName: step.ModuleName, + Status: step.Status, + Prediction: copyPrediction(step.Prediction), + Error: step.Error, + StartTime: step.StartTime, + Duration: step.Duration, + Inputs: copyMap(step.Inputs), + } + } + + // Create copy + return &ProgramExecution{ + ID: exec.ID, + ProgramName: exec.ProgramName, + Steps: stepsCopy, + Status: exec.Status, + TotalUsage: exec.TotalUsage, // Struct copies by value + StartTime: exec.StartTime, + Duration: exec.Duration, + Error: exec.Error, + } +} + +// copyPrediction creates a deep copy of a prediction +func copyPrediction(p *core.Prediction) *core.Prediction { + if p == nil { return nil } - cp := make(map[string]any, len(m)) - for k, v := range m { - cp[k] = v + + // Copy completions slice + completionsCopy := make([]map[string]any, len(p.Completions)) + for i, completion := range p.Completions { + completionsCopy[i] = core.DeepCopyMap(completion) + } + + // Create new prediction + resultCopy := &core.Prediction{ + Outputs: core.DeepCopyMap(p.Outputs), + Usage: p.Usage, // Usage is a struct, copies by value + ModuleName: p.ModuleName, + Inputs: core.DeepCopyMap(p.Inputs), + Rationale: p.Rationale, + Score: p.Score, + Completions: completionsCopy, + AdapterUsed: p.AdapterUsed, + ParseSuccess: p.ParseSuccess, + ParseAttempts: p.ParseAttempts, + FallbackUsed: p.FallbackUsed, + ParseDiagnostics: p.ParseDiagnostics.Clone(), } - return cp + + return resultCopy +} + +// copyMap creates a deep copy of a map using core.DeepCopyMap +func copyMap(m map[string]any) map[string]any { + return core.DeepCopyMap(m) } // GetSignature returns the signature of the last module in the pipeline @@ -371,9 +771,21 @@ func (p *Program) Name() string { // Clone creates an independent copy of Program module func (p *Program) Clone() core.Module { + p.mu.RLock() + defer p.mu.RUnlock() + cloned := &Program{ - name: p.name, - modules: make([]core.Module, len(p.modules)), + name: p.name, + modules: make([]core.Module, len(p.modules)), + verbose: p.verbose, + retentionSize: p.retentionSize, + executionStore: make(map[ExecutionID]*ProgramExecution), + executionOrder: []ExecutionID{}, + nextExecutionID: 0, + } + + if p.baselineInputs != nil { + cloned.baselineInputs = append([]string(nil), p.baselineInputs...) } // Clone all modules diff --git a/internal/module/program_tracing_test.go b/internal/module/program_tracing_test.go new file mode 100644 index 0000000..46cb0f2 --- /dev/null +++ b/internal/module/program_tracing_test.go @@ -0,0 +1,415 @@ +package module + +import ( + "context" + "fmt" + "testing" + + "github.com/assagman/dsgo/internal/core" +) + +func TestProgram_VerboseLogging(t *testing.T) { + sig := core.NewSignature("Test module"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("result", core.FieldTypeString, "Result") + + mockModule := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return &core.Prediction{ + Outputs: map[string]interface{}{"result": "success"}, + Usage: core.Usage{TotalTokens: 10}, + }, nil + }, + SignatureValue: sig, + } + + // Test verbose=true (should not crash) + verboseProgram := NewProgram("Verbose Test Program"). + WithVerbose(true). + AddModule(mockModule) + + ctx := context.Background() + _, err := verboseProgram.Forward(ctx, map[string]any{"text": "test"}) + if err != nil { + t.Errorf("Unexpected error with verbose logging: %v", err) + } + + // Test verbose=false (default) + quietProgram := NewProgram("Quiet Test Program"). + WithVerbose(false). + AddModule(mockModule) + + _, err = quietProgram.Forward(ctx, map[string]any{"text": "test"}) + if err != nil { + t.Errorf("Unexpected error with quiet logging: %v", err) + } +} + +func TestProgram_ExecutionIDAndRetention(t *testing.T) { + sig := core.NewSignature("Test module"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("result", core.FieldTypeString, "Result") + + mockModule := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + text := fmt.Sprintf("%v", inputs["text"]) + return &core.Prediction{ + Outputs: map[string]interface{}{"result": text}, + Usage: core.Usage{TotalTokens: 10}, + }, nil + }, + SignatureValue: sig, + } + + program := NewProgram("Retention Test Program"). + WithExecutionRetention(2). // Keep only 2 executions + AddModule(mockModule) + + ctx := context.Background() + + // Run multiple executions + var executionIDs []ExecutionID + for i := 0; i < 3; i++ { + _, err := program.Forward(ctx, map[string]any{"text": i}) + if err != nil { + t.Errorf("Execution %d failed: %v", i, err) + continue + } + + id := program.GetLastExecutionID() + if id == "" { + t.Errorf("Expected execution ID for run %d", i) + } + executionIDs = append(executionIDs, id) + } + + // Check that only last 2 executions are retained + allIDs := program.GetAllExecutionIDs() + if len(allIDs) != 2 { + t.Errorf("Expected 2 retained executions, got %d: %v", len(allIDs), allIDs) + } + + // Check that last two are ones retained (order may vary due to map iteration) + if len(allIDs) >= 2 { + // Just check that we have exactly 2 executions and they're from the last 2 runs + retainedSet := make(map[string]bool) + for _, id := range allIDs { + retainedSet[string(id)] = true + } + + expectedIDs := []string{string(executionIDs[1]), string(executionIDs[2])} + for _, expectedID := range expectedIDs { + if !retainedSet[expectedID] { + t.Errorf("Expected to retain execution %s, got %v", expectedID, allIDs) + } + } + } + + // Test accessing by ID + for _, id := range allIDs { + execution := program.GetExecutionByID(id) + if execution == nil { + t.Errorf("Expected to find execution %s", id) + continue + } + if execution.ID != id { + t.Errorf("Execution ID mismatch, expected %s, got %s", id, execution.ID) + } + } + + // Test accessing non-existent execution + nonExistent := program.GetExecutionByID(ExecutionID("non-existent")) + if nonExistent != nil { + t.Errorf("Expected nil for non-existent execution") + } +} + +func TestProgram_ForwardWithTrace(t *testing.T) { + sig := core.NewSignature("Test module"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("result", core.FieldTypeString, "Result") + + mockModule := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return &core.Prediction{ + Outputs: map[string]interface{}{"result": "success"}, + Usage: core.Usage{TotalTokens: 10}, + }, nil + }, + SignatureValue: sig, + } + + program := NewProgram("Trace Test Program").AddModule(mockModule) + ctx := context.Background() + result, err := program.ForwardWithTrace(ctx, map[string]any{"text": "test"}) + if err != nil { + t.Fatalf("ForwardWithTrace failed: %v", err) + } + + if result.Prediction == nil { + t.Error("Expected prediction in result") + } + if result.Execution == nil { + t.Error("Expected execution in result") + } + if result.ExecutionID == "" { + t.Error("Expected execution ID in result") + } + if result.ExecutionID != result.Execution.ID { + t.Errorf("Execution ID mismatch: %v != %v", result.ExecutionID, result.Execution.ID) + } +} + +func TestProgram_GetStepData(t *testing.T) { + sig := core.NewSignature("Test module"). + AddInput("text", core.FieldTypeString, "Input text"). + AddOutput("result", core.FieldTypeString, "Result") + + mockModule := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return &core.Prediction{ + Outputs: map[string]interface{}{"result": "step-success"}, + Usage: core.Usage{TotalTokens: 10}, + }, nil + }, + SignatureValue: sig, + } + + program := NewProgram("Step Data Test Program").AddModule(mockModule) + ctx := context.Background() + result, _ := program.ForwardWithTrace(ctx, map[string]any{"text": "test"}) + execID := result.ExecutionID + + // Test GetStepPrediction + pred, err := program.GetStepPrediction(execID, 0) + if err != nil { + t.Errorf("GetStepPrediction failed: %v", err) + } + if pred.Outputs["result"] != "step-success" { + t.Errorf("Expected step-success, got %v", pred.Outputs["result"]) + } + + // Test GetStepOutput + val, err := program.GetStepOutput(execID, 0, "result") + if err != nil { + t.Errorf("GetStepOutput failed: %v", err) + } + if val != "step-success" { + t.Errorf("Expected step-success, got %v", val) + } + + // Test GetLastStepPrediction + pred, err = program.GetLastStepPrediction(0) + if err != nil { + t.Errorf("GetLastStepPrediction failed: %v", err) + } + if pred.Outputs["result"] != "step-success" { + t.Errorf("Expected step-success, got %v", pred.Outputs["result"]) + } + + // Test GetLastStepOutput + val, err = program.GetLastStepOutput(0, "result") + if err != nil { + t.Errorf("GetLastStepOutput failed: %v", err) + } + if val != "step-success" { + t.Errorf("Expected step-success, got %v", val) + } + + // Edge case: out-of-bounds + _, err = program.GetStepPrediction(execID, 1) + if err == nil { + t.Error("Expected error for out-of-bounds step index") + } + + // Edge case: non-existent key + _, err = program.GetStepOutput(execID, 0, "non-existent") + if err == nil { + t.Error("Expected error for non-existent output key") + } + + // Edge case: non-existent execution + _, err = program.GetStepPrediction(ExecutionID("invalid"), 0) + if err == nil { + t.Error("Expected error for invalid execution ID") + } +} + +func TestProgram_CloneFull(t *testing.T) { + p := NewProgram("Clone Test"). + WithVerbose(true). + WithInputs([]string{"input1"}). + WithExecutionRetention(5) + + // Add a module + mock := &MockModule{SignatureValue: core.NewSignature("Mock")} + p.AddModule(mock) + + // Run once to populate state + ctx := context.Background() + _, _ = p.Forward(ctx, map[string]any{"input1": "val"}) + + if len(p.executionOrder) == 0 { + t.Fatal("Expected executions in original program") + } + + clonedMod := p.Clone() + cloned, ok := clonedMod.(*Program) + if !ok { + t.Fatal("Clone did not return *Program") + } + + if cloned.name != p.name { + t.Errorf("Name not cloned: %s != %s", cloned.name, p.name) + } + if cloned.verbose != p.verbose { + t.Errorf("Verbose not cloned: %v != %v", cloned.verbose, p.verbose) + } + if cloned.retentionSize != p.retentionSize { + t.Errorf("RetentionSize not cloned: %d != %d", cloned.retentionSize, p.retentionSize) + } + if len(cloned.baselineInputs) != len(p.baselineInputs) || cloned.baselineInputs[0] != p.baselineInputs[0] { + t.Errorf("BaselineInputs not cloned correctly") + } + + // Verify state reset + if len(cloned.executionStore) != 0 { + t.Error("executionStore should be empty in clone") + } + if len(cloned.executionOrder) != 0 { + t.Error("executionOrder should be empty in clone") + } + if cloned.nextExecutionID != 0 { + t.Errorf("nextExecutionID should be 0 in clone, got %d", cloned.nextExecutionID) + } +} + +func TestProgram_DeepCopyCorrectness(t *testing.T) { + mockModule := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return &core.Prediction{ + Outputs: map[string]interface{}{"result": "original"}, + }, nil + }, + } + + program := NewProgram("Deep Copy Test").AddModule(mockModule) + ctx := context.Background() + + // 1. Check Forward return value + pred, _ := program.Forward(ctx, nil) + pred.Outputs["result"] = "mutated" + + exec := program.GetExecution() + if exec.Steps[0].Prediction.Outputs["result"] != "original" { + t.Error("Mutation of Forward() result affected stored execution trace") + } + + // 2. Check GetStepPrediction return value + execID := program.GetLastExecutionID() + pred2, _ := program.GetStepPrediction(execID, 0) + pred2.Outputs["result"] = "mutated-again" + + exec2 := program.GetExecution() + if exec2.Steps[0].Prediction.Outputs["result"] != "original" { + t.Error("Mutation of GetStepPrediction() result affected stored execution trace") + } +} + +func TestProgram_RetentionValidation(t *testing.T) { + p := NewProgram("Retention Validation") + + p.WithExecutionRetention(-5) + if p.retentionSize != 0 { + t.Errorf("Expected retentionSize 0 for negative input, got %d", p.retentionSize) + } + + p.WithExecutionRetention(10) + if p.retentionSize != 10 { + t.Errorf("Expected retentionSize 10, got %d", p.retentionSize) + } +} + +func TestProgram_ConcurrentAccess(t *testing.T) { + sig := core.NewSignature("Concurrent test"). + AddInput("id", core.FieldTypeInt, "Request ID"). + AddOutput("result", core.FieldTypeString, "Result") + + mockModule := &MockModule{ + ForwardFunc: func(ctx context.Context, inputs map[string]interface{}) (*core.Prediction, error) { + return &core.Prediction{ + Outputs: map[string]interface{}{"result": "done"}, + Usage: core.Usage{TotalTokens: 5}, + }, nil + }, + SignatureValue: sig, + } + + program := NewProgram("Concurrent Test"). + WithExecutionRetention(100). + AddModule(mockModule) + + ctx := context.Background() + const numGoroutines = 50 + done := make(chan bool, numGoroutines) + + // Spawn goroutines that execute and read concurrently + for i := 0; i < numGoroutines; i++ { + go func(id int) { + defer func() { done <- true }() + + // Execute with trace to get the execution ID for THIS execution + result, err := program.ForwardWithTrace(ctx, map[string]any{"id": id}) + if err != nil { + t.Errorf("Goroutine %d: ForwardWithTrace failed: %v", id, err) + return + } + + execID := result.ExecutionID + + // Read execution trace and mutate returned copy + exec := program.GetExecution() + if exec != nil && len(exec.Steps) > 0 && exec.Steps[0].Prediction != nil { + // Mutate returned trace - should NOT affect internal state + exec.Steps[0].Prediction.Outputs["result"] = "mutated" + } + + // Read by ID + _ = program.GetExecutionByID(execID) + + // List all IDs + _ = program.GetAllExecutionIDs() + + // Get step data + _, _ = program.GetStepPrediction(execID, 0) + _, _ = program.GetStepOutput(execID, 0, "result") + }(i) + } + + // Wait for all goroutines + for i := 0; i < numGoroutines; i++ { + <-done + } + + // Verify all stored execution IDs are unique + allIDs := program.GetAllExecutionIDs() + idSet := make(map[ExecutionID]bool) + for _, id := range allIDs { + if idSet[id] { + t.Errorf("Duplicate execution ID in store: %s", id) + } + idSet[id] = true + } + + // Verify no internal state corruption from mutations + for _, id := range allIDs { + exec := program.GetExecutionByID(id) + if exec != nil && len(exec.Steps) > 0 && exec.Steps[0].Prediction != nil { + if exec.Steps[0].Prediction.Outputs["result"] != "done" { + t.Errorf("Internal state corrupted for execution %s: got %v", id, exec.Steps[0].Prediction.Outputs["result"]) + } + } + } + + t.Logf("Successfully ran %d concurrent executions with %d unique IDs retained", numGoroutines, len(allIDs)) +}