Skip to content
19 changes: 18 additions & 1 deletion pkg/vmcp/aggregator/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,17 +29,34 @@ type BackendDiscoverer interface {
// 1. Query: Fetch capabilities from each backend
// 2. Conflict Resolution: Handle duplicate tool/resource/prompt names
// 3. Merging: Create final unified capability view and routing table
//
//go:generate mockgen -destination=mocks/mock_interfaces.go -package=mocks -source=aggregator.go BackendDiscoverer Aggregator ConflictResolver ToolFilter ToolOverride
type Aggregator interface {
// QueryCapabilities queries a backend for its MCP capabilities.
// Returns the raw capabilities (tools, resources, prompts) from the backend.
QueryCapabilities(ctx context.Context, backend vmcp.Backend) (*BackendCapabilities, error)

// QueryAllCapabilities queries all backends for their capabilities in parallel.
// Handles backend failures gracefully (logs and continues with remaining backends).
QueryAllCapabilities(ctx context.Context, backends []vmcp.Backend) (map[string]*BackendCapabilities, error)

// ResolveConflicts applies conflict resolution strategy to handle
// duplicate capability names across backends.
ResolveConflicts(ctx context.Context, capabilities map[string]*BackendCapabilities) (*ResolvedCapabilities, error)

// MergeCapabilities creates the final unified capability view and routing table.
MergeCapabilities(ctx context.Context, resolved *ResolvedCapabilities) (*AggregatedCapabilities, error)
// Uses the backend registry to populate full BackendTarget information for routing.
MergeCapabilities(
ctx context.Context,
resolved *ResolvedCapabilities,
registry vmcp.BackendRegistry,
) (*AggregatedCapabilities, error)

// AggregateCapabilities is a convenience method that performs the full aggregation pipeline:
// 1. Query all backends
// 2. Resolve conflicts
// 3. Merge into final view
AggregateCapabilities(ctx context.Context, backends []vmcp.Backend) (*AggregatedCapabilities, error)
}

// BackendCapabilities contains the raw capabilities from a single backend.
Expand Down
124 changes: 124 additions & 0 deletions pkg/vmcp/aggregator/cli_discoverer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package aggregator

import (
"context"
"fmt"

rt "github.com/stacklok/toolhive/pkg/container/runtime"
"github.com/stacklok/toolhive/pkg/groups"
"github.com/stacklok/toolhive/pkg/logger"
"github.com/stacklok/toolhive/pkg/vmcp"
"github.com/stacklok/toolhive/pkg/workloads"
)

// cliBackendDiscoverer discovers backend MCP servers from Docker/Podman workloads in a group.
// This is the CLI version of BackendDiscoverer that uses the workloads.Manager.
type cliBackendDiscoverer struct {
workloadsManager workloads.Manager
groupsManager groups.Manager
}

// NewCLIBackendDiscoverer creates a new CLI-based backend discoverer.
// It discovers workloads from Docker/Podman containers managed by ToolHive.
func NewCLIBackendDiscoverer(workloadsManager workloads.Manager, groupsManager groups.Manager) BackendDiscoverer {
return &cliBackendDiscoverer{
workloadsManager: workloadsManager,
groupsManager: groupsManager,
}
}

// Discover finds all backend workloads in the specified group.
// Returns all accessible backends with their health status marked based on workload status.
// The groupRef is the group name (e.g., "engineering-team").
func (d *cliBackendDiscoverer) Discover(ctx context.Context, groupRef string) ([]vmcp.Backend, error) {
logger.Infof("Discovering backends in group %s", groupRef)

// Verify that the group exists
exists, err := d.groupsManager.Exists(ctx, groupRef)
if err != nil {
return nil, fmt.Errorf("failed to check if group exists: %w", err)
}
if !exists {
return nil, fmt.Errorf("group %s not found", groupRef)
}

// Get all workload names in the group
workloadNames, err := d.workloadsManager.ListWorkloadsInGroup(ctx, groupRef)
if err != nil {
return nil, fmt.Errorf("failed to list workloads in group: %w", err)
}

if len(workloadNames) == 0 {
logger.Infof("No workloads found in group %s", groupRef)
return []vmcp.Backend{}, nil
}

logger.Debugf("Found %d workloads in group %s, discovering backends", len(workloadNames), groupRef)

// Query each workload and convert to backend
var backends []vmcp.Backend
for _, name := range workloadNames {
workload, err := d.workloadsManager.GetWorkload(ctx, name)
if err != nil {
logger.Warnf("Failed to get workload %s: %v, skipping", name, err)
continue
}

// Skip workloads without a URL (not accessible)
if workload.URL == "" {
logger.Debugf("Skipping workload %s without URL", name)
continue
}

// Map workload status to backend health status
healthStatus := mapWorkloadStatusToHealth(workload.Status)

// Convert core.Workload to vmcp.Backend
backend := vmcp.Backend{
ID: name,
Name: name,
BaseURL: workload.URL,
TransportType: workload.TransportType.String(),
HealthStatus: healthStatus,
Metadata: make(map[string]string),
}

// Copy user labels to metadata first
for k, v := range workload.Labels {
backend.Metadata[k] = v
}

// Set system metadata (these override user labels to prevent conflicts)
backend.Metadata["group"] = groupRef
backend.Metadata["tool_type"] = workload.ToolType
backend.Metadata["workload_status"] = string(workload.Status)

backends = append(backends, backend)
logger.Debugf("Discovered backend %s: %s (%s) with health status %s",
backend.ID, backend.BaseURL, backend.TransportType, backend.HealthStatus)
}

if len(backends) == 0 {
logger.Infof("No accessible backends found in group %s (all workloads lack URLs)", groupRef)
return []vmcp.Backend{}, nil
}

logger.Infof("Discovered %d backends in group %s", len(backends), groupRef)
return backends, nil
}

// mapWorkloadStatusToHealth converts a workload status to a backend health status.
func mapWorkloadStatusToHealth(status rt.WorkloadStatus) vmcp.BackendHealthStatus {
switch status {
case rt.WorkloadStatusRunning:
return vmcp.BackendHealthy
case rt.WorkloadStatusUnhealthy:
return vmcp.BackendUnhealthy
case rt.WorkloadStatusStopped, rt.WorkloadStatusError, rt.WorkloadStatusStopping, rt.WorkloadStatusRemoving:
return vmcp.BackendUnhealthy
case rt.WorkloadStatusStarting, rt.WorkloadStatusUnknown:
return vmcp.BackendUnknown
default:
return vmcp.BackendUnknown
}
}
Loading
Loading