diff --git a/.github/workflows/e2e.yaml b/.github/workflows/e2e.yaml index 34bf4de..2b4749f 100644 --- a/.github/workflows/e2e.yaml +++ b/.github/workflows/e2e.yaml @@ -42,6 +42,8 @@ jobs: run: go mod download - name: Run E2E tests + env: + E2E_VERBOSE: "1" run: ./scripts/e2e-test.sh all - name: Cleanup diff --git a/Dockerfile b/Dockerfile index 4754728..94233da 100644 --- a/Dockerfile +++ b/Dockerfile @@ -61,6 +61,7 @@ WORKDIR /server # Copy binary from builder COPY --from=builder /build/snmcp /server/snmcp +COPY --from=builder /build/cmd/snmcp-e2e/testdata/functions/echo.py /server/e2e/functions/echo.py # Change ownership RUN chown -R snmcp:snmcp /server diff --git a/cmd/snmcp-e2e/main.go b/cmd/snmcp-e2e/main.go index 43d2f08..7d0138f 100644 --- a/cmd/snmcp-e2e/main.go +++ b/cmd/snmcp-e2e/main.go @@ -134,9 +134,13 @@ func run(ctx context.Context, cfg config) error { suffix := time.Now().UnixNano() tenant := fmt.Sprintf("e2e-%d", suffix) - namespace := fmt.Sprintf("%s/ns-%d", tenant, suffix) + namespaceName := fmt.Sprintf("ns-%d", suffix) + namespace := fmt.Sprintf("%s/%s", tenant, namespaceName) topic := fmt.Sprintf("persistent://%s/topic-%d", namespace, suffix) concurrentTopic := fmt.Sprintf("persistent://%s/topic-concurrent-%d", namespace, suffix) + functionInputTopic := fmt.Sprintf("persistent://%s/function-input-%d", namespace, suffix) + functionOutputTopic := fmt.Sprintf("persistent://%s/function-output-%d", namespace, suffix) + functionName := fmt.Sprintf("echo-%d", suffix) result, err := callTool(ctx, adminClient, "pulsar_admin_tenant", map[string]any{ "resource": "tenant", @@ -209,6 +213,118 @@ func run(ctx context.Context, cfg config) error { return err } + result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{ + "resource": "topic", + "operation": "create", + "topic": functionInputTopic, + "partitions": float64(0), + }) + if err := requireToolOK(result, err, "pulsar_admin_topic create function input"); err != nil { + return err + } + + result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{ + "resource": "topic", + "operation": "create", + "topic": functionOutputTopic, + "partitions": float64(0), + }) + if err := requireToolOK(result, err, "pulsar_admin_topic create function output"); err != nil { + return err + } + + logf(cfg.verbose, "creating function: tenant=%s namespace=%s name=%s inputs=%v output=%s py=%s classname=%s", + tenant, + namespaceName, + functionName, + []string{functionInputTopic}, + functionOutputTopic, + "/server/e2e/functions/echo.py", + "echo.EchoFunction", + ) + result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{ + "operation": "create", + "tenant": tenant, + "namespace": namespaceName, + "name": functionName, + "classname": "echo.EchoFunction", + "inputs": []string{functionInputTopic}, + "output": functionOutputTopic, + "py": "/server/e2e/functions/echo.py", + }) + if err := requireToolOK(result, err, "pulsar_admin_functions create"); err != nil { + return err + } + + if err := waitForFunctionRunning(ctx, adminClient, tenant, namespaceName, functionName, 60*time.Second); err != nil { + return err + } + + result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{ + "operation": "stats", + "tenant": tenant, + "namespace": namespaceName, + "name": functionName, + "instanceId": float64(0), + }) + if err := requireToolOK(result, err, "pulsar_admin_functions stats"); err != nil { + return err + } + + triggerValue := "e2e-trigger" + result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{ + "operation": "trigger", + "tenant": tenant, + "namespace": namespaceName, + "name": functionName, + "topic": functionInputTopic, + "triggerValue": triggerValue, + }) + if err := requireToolOK(result, err, "pulsar_admin_functions trigger"); err != nil { + return err + } + triggerResult := firstText(result) + logf(cfg.verbose, "trigger result: %s", triggerResult) + if !strings.Contains(triggerResult, triggerValue) { + if err := waitForTriggerOutput(ctx, adminClient, functionOutputTopic, fmt.Sprintf("trigger-sub-%d", suffix), triggerValue, 30*time.Second, cfg.verbose); err != nil { + return fmt.Errorf("unexpected trigger result: %s; output check failed: %w", triggerResult, err) + } + } + + result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{ + "operation": "update", + "tenant": tenant, + "namespace": namespaceName, + "name": functionName, + "userConfig": map[string]any{"updated": true}, + }) + if err := requireToolOK(result, err, "pulsar_admin_functions update"); err != nil { + return err + } + + result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{ + "operation": "get", + "tenant": tenant, + "namespace": namespaceName, + "name": functionName, + }) + if err := requireToolOK(result, err, "pulsar_admin_functions get"); err != nil { + return err + } + if err := assertFunctionUserConfig(firstText(result), "updated"); err != nil { + return err + } + + result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{ + "operation": "delete", + "tenant": tenant, + "namespace": namespaceName, + "name": functionName, + }) + if err := requireToolOK(result, err, "pulsar_admin_functions delete"); err != nil { + return err + } + result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{ "resource": "topic", "operation": "create", @@ -462,6 +578,119 @@ func listClusters(ctx context.Context, c *client.Client) ([]string, error) { return clusters, nil } +type functionStatus struct { + NumRunning int `json:"numRunning"` +} + +func waitForFunctionRunning(ctx context.Context, c *client.Client, tenant, namespace, name string, timeout time.Duration) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + status, err := getFunctionStatus(ctx, c, tenant, namespace, name) + if err == nil && status.NumRunning > 0 { + return nil + } + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + } + } + return fmt.Errorf("function %s did not reach running state within %s", name, timeout.String()) +} + +func getFunctionStatus(ctx context.Context, c *client.Client, tenant, namespace, name string) (functionStatus, error) { + result, err := callTool(ctx, c, "pulsar_admin_functions", map[string]any{ + "operation": "status", + "tenant": tenant, + "namespace": namespace, + "name": name, + }) + if err := requireToolOK(result, err, "pulsar_admin_functions status"); err != nil { + return functionStatus{}, err + } + raw := firstText(result) + if raw == "" { + return functionStatus{}, errors.New("empty function status result") + } + var status functionStatus + if err := json.Unmarshal([]byte(raw), &status); err != nil { + return functionStatus{}, fmt.Errorf("failed to parse function status: %w", err) + } + return status, nil +} + +func assertFunctionUserConfig(raw string, key string) error { + if raw == "" { + return errors.New("empty function config result") + } + var payload map[string]interface{} + if err := json.Unmarshal([]byte(raw), &payload); err != nil { + return fmt.Errorf("failed to parse function config: %w", err) + } + userConfig, ok := payload["userConfig"].(map[string]interface{}) + if !ok { + return fmt.Errorf("missing userConfig in function config") + } + if _, ok := userConfig[key]; !ok { + return fmt.Errorf("userConfig missing key: %s", key) + } + return nil +} + +type consumeResponse struct { + MessagesConsumed int `json:"messages_consumed"` + Messages []consumeMessage `json:"messages"` +} + +type consumeMessage struct { + Data string `json:"data"` +} + +func waitForTriggerOutput(ctx context.Context, c *client.Client, topic, subscription, expected string, timeout time.Duration, verbose bool) error { + deadline := time.Now().Add(timeout) + for time.Now().Before(deadline) { + result, err := callTool(ctx, c, "pulsar_client_consume", map[string]any{ + "topic": topic, + "subscription-name": subscription, + "initial-position": "earliest", + "num-messages": float64(1), + "timeout": float64(5), + "subscription-type": "exclusive", + "subscription-mode": "durable", + "show-properties": false, + "hide-payload": false, + }) + if err := requireToolOK(result, err, "pulsar_client_consume trigger output"); err != nil { + return err + } + + raw := firstText(result) + if raw != "" { + var resp consumeResponse + if err := json.Unmarshal([]byte(raw), &resp); err != nil { + return fmt.Errorf("failed to parse trigger output: %w", err) + } + if resp.MessagesConsumed > 0 { + for _, msg := range resp.Messages { + if strings.Contains(msg.Data, expected) { + return nil + } + } + return fmt.Errorf("trigger output does not contain expected value: %s", expected) + } + } + + logf(verbose, "trigger output not ready yet, retrying") + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(2 * time.Second): + } + } + + return fmt.Errorf("trigger output not available after %s", timeout.String()) +} + func runConcurrent(ctx context.Context, adminClient, testClient *client.Client, topic, subscription string) error { var wg sync.WaitGroup errCh := make(chan error, 2) diff --git a/cmd/snmcp-e2e/testdata/functions/echo.py b/cmd/snmcp-e2e/testdata/functions/echo.py new file mode 100644 index 0000000..cfe7b9b --- /dev/null +++ b/cmd/snmcp-e2e/testdata/functions/echo.py @@ -0,0 +1,3 @@ +class EchoFunction(object): + def process(self, input, context): + return input diff --git a/docs/tools/pulsar_admin_functions.md b/docs/tools/pulsar_admin_functions.md index e6d0963..bab60e7 100644 --- a/docs/tools/pulsar_admin_functions.md +++ b/docs/tools/pulsar_admin_functions.md @@ -5,79 +5,130 @@ Manage Apache Pulsar Functions for stream processing. Pulsar Functions are light This tool provides a comprehensive set of operations to manage the entire function lifecycle: - **list**: List all functions in a namespace - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) - **get**: Get function configuration - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - **status**: Get runtime status of a function (instances, metrics) - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) + - `instanceId` (string, optional): Instance ID for per-instance status - **stats**: Get detailed statistics of a function (throughput, processing latency) - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) + - `instanceId` (string, optional): Instance ID for per-instance stats - **create**: Deploy a new function with specified parameters - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name - - `classname` (string, required): The fully qualified class name implementing the function - - `inputs` (array, required): The input topics for the function + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, optional): The function name (can be inferred from `classname`) + - `classname` (string, optional): The fully qualified class name implementing the function + - `functionType` (string, optional): Built-in function type (translated to `builtin://`) + - `inputs` (array, optional): The input topics for the function + - `topicsPattern` (string, optional): Topic pattern to consume from + - `inputSpecs` (object, optional): Map of input topics to consumer config - `output` (string, optional): The output topic for function results - `jar` (string, optional): Path to the JAR file for Java functions - `py` (string, optional): Path to the Python file for Python functions - `go` (string, optional): Path to the Go binary for Go functions - `parallelism` (number, optional): The parallelism factor of the function (default: 1) + - `cpu` (number, optional): CPU cores per instance + - `ram` (number, optional): RAM bytes per instance + - `disk` (number, optional): Disk bytes per instance - `userConfig` (object, optional): User-defined config key/values + - `producerConfig` (object, optional): Custom producer configuration + - `logTopic` (string, optional): Topic for function logs + - `schemaType` (string, optional): Output schema type or class + - `outputSerdeClassName` (string, optional): Output SerDe class + - `customSerdeInputs` (object, optional): Map of input topics to SerDe class + - `customSchemaInputs` (object, optional): Map of input topics to Schema class + - `customSchemaOutputs` (object, optional): Map of output topics to schema properties + - `inputTypeClassName` (string, optional): Input type class name + - `outputTypeClassName` (string, optional): Output type class name + - `processingGuarantees` (string, optional): Delivery semantics + - `retainOrdering` (boolean, optional): Process messages in order + - `retainKeyOrdering` (boolean, optional): Process messages in key order + - `batchBuilder` (string, optional): Batch builder type + - `forwardSourceMessageProperty` (boolean, optional): Forward properties to output + - `autoAck` (boolean, optional): Automatically acknowledge messages + - `subsName` (string, optional): Subscription name for inputs + - `subsPosition` (string, optional): Subscription position + - `skipToLatest` (boolean, optional): Skip to latest on restart + - `timeoutMs` (number, optional): Message timeout in ms + - `maxMessageRetries` (number, optional): Max retries + - `deadLetterTopic` (string, optional): Dead letter topic + - `customRuntimeOptions` (string, optional): Custom runtime options + - `secrets` (object, optional): Secrets map + - `cleanupSubscription` (boolean, optional): Clean up subscription on delete + - `windowLengthCount` (number, optional): Window length count + - `windowLengthDurationMs` (number, optional): Window length duration in ms + - `slidingIntervalCount` (number, optional): Sliding interval count + - `slidingIntervalDurationMs` (number, optional): Sliding interval duration in ms + - `functionConfigFile` (string, optional): YAML config file path - **update**: Update an existing function - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - Parameters similar to `create` operation + - `updateAuthData` (boolean, optional): Whether to update auth data - **delete**: Delete a function - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - **start**: Start a stopped function - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - **stop**: Stop a running function - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - **restart**: Restart a function - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - **querystate**: Query state stored by a stateful function for a specific key - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - `key` (string, required): The state key to query - **putstate**: Store state in a function's state store - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - `key` (string, required): The state key - `value` (string, required): The state value - **trigger**: Manually trigger a function with a specific value - - `tenant` (string, required): The tenant name - - `namespace` (string, required): The namespace name - - `name` (string, required): The function name + - `fqfn` (string, optional): Fully qualified function name `tenant/namespace/name` + - `tenant` (string, optional): The tenant name (default: `public`) + - `namespace` (string, optional): The namespace name (default: `default`) + - `name` (string, required): The function name (unless `fqfn` is provided) - `topic` (string, optional): The specific topic to trigger on - - `triggerValue` (string, optional): The value to trigger the function with \ No newline at end of file + - `triggerValue` (string, optional): The value to trigger the function with + - `triggerFile` (string, optional): File path containing the trigger value diff --git a/pkg/mcp/builders/pulsar/functions.go b/pkg/mcp/builders/pulsar/functions.go index 6bf8789..a14e457 100644 --- a/pkg/mcp/builders/pulsar/functions.go +++ b/pkg/mcp/builders/pulsar/functions.go @@ -18,6 +18,9 @@ import ( "context" "encoding/json" "fmt" + "os" + "strconv" + "strings" "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" "github.com/mark3labs/mcp-go/mcp" @@ -25,6 +28,7 @@ import ( "github.com/streamnative/pulsarctl/pkg/cmdutils" "github.com/streamnative/streamnative-mcp-server/pkg/mcp/builders" mcpCtx "github.com/streamnative/streamnative-mcp-server/pkg/mcp/internal/context" + "gopkg.in/yaml.v2" ) // PulsarAdminFunctionsToolBuilder implements the ToolBuilder interface for Pulsar admin functions operations @@ -113,18 +117,26 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsTool() mcp.To mcp.WithDescription(toolDesc), mcp.WithString("operation", mcp.Required(), mcp.Description(operationDesc)), - mcp.WithString("tenant", mcp.Required(), + mcp.WithString("fqfn", + mcp.Description("The Fully Qualified Function Name in the form tenant/namespace/name. "+ + "Mutually exclusive with tenant, namespace, and name parameters.")), + mcp.WithString("tenant", mcp.Description("The tenant name. Tenants are the primary organizational unit in Pulsar, "+ "providing multi-tenancy and resource isolation. Functions deployed within a tenant "+ - "inherit its permissions and resource quotas.")), - mcp.WithString("namespace", mcp.Required(), + "inherit its permissions and resource quotas. Defaults to 'public' if not provided.")), + mcp.WithString("namespace", mcp.Description("The namespace name. Namespaces are logical groupings of topics and functions "+ "within a tenant. They encapsulate configuration policies and access control. "+ - "Functions in a namespace typically process topics within the same namespace.")), + "Functions in a namespace typically process topics within the same namespace. "+ + "Defaults to 'default' if not provided.")), mcp.WithString("name", - mcp.Description("The function name. Required for all operations except 'list'. "+ + mcp.Description("The function name. Required for all operations except 'list', "+ + "unless it can be inferred from classname during create/update. "+ "Names should be descriptive of the function's purpose and must be unique within a namespace. "+ "Function names are used in metrics, logs, and when addressing the function via APIs.")), + mcp.WithString("instanceId", + mcp.Description("Function instance ID for status/stats operations. "+ + "If not set, the aggregated status/stats of all instances will be returned.")), // Additional parameters for specific operations mcp.WithString("classname", mcp.Description("The fully qualified class name implementing the function. Required for 'create' operation, optional for 'update'. "+ @@ -134,6 +146,9 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsTool() mcp.To "if file is '/path/to/double_number.py' with class 'DoubleNumber', classname must be 'double_number.DoubleNumber'. "+ "Common error: using just the class name 'DoubleNumber' (without filename prefix) will cause function creation to fail. "+ "Go functions should specify the 'main' function of the binary.")), + mcp.WithString("functionType", + mcp.Description("The built-in Pulsar Function type. When set, it is translated to a builtin:// package URL. "+ + "Mutually exclusive with jar/py/go.")), mcp.WithArray("inputs", mcp.Description("The input topics for the function (array of strings). Optional for 'create' and 'update' operations. "+ "Topics must be specified in the format 'persistent://tenant/namespace/topic'. "+ @@ -146,6 +161,11 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsTool() mcp.To }, ), ), + mcp.WithString("topicsPattern", + mcp.Description("Topic pattern to consume from. Mutually exclusive with inputs for typical use cases. "+ + "Example: persistent://tenant/namespace/topicPattern*")), + mcp.WithObject("inputSpecs", + mcp.Description("Map of input topics to consumer configuration (JSON object).")), mcp.WithString("output", mcp.Description("The output topic for the function results. Optional for 'create' and 'update' operations. "+ "Specified in the format 'persistent://tenant/namespace/topic'. "+ @@ -175,12 +195,78 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsTool() mcp.To "Higher values improve throughput but require more resources. "+ "For stateful functions, consider how parallelism affects state consistency. "+ "Default is 1 (single instance).")), + mcp.WithNumber("cpu", + mcp.Description("CPU cores allocated per function instance (docker runtime only).")), + mcp.WithNumber("ram", + mcp.Description("RAM bytes allocated per function instance (process/docker runtime only).")), + mcp.WithNumber("disk", + mcp.Description("Disk bytes allocated per function instance (docker runtime only).")), mcp.WithObject("userConfig", mcp.Description("User-defined config key/values. Optional for 'create' and 'update' operations. "+ "Provides configuration parameters accessible to the function at runtime. "+ "Specify as a JSON object with string, number, or boolean values. "+ "Common configs include connection parameters, batch sizes, or feature toggles. "+ "Example: {\"maxBatchSize\": 100, \"connectionString\": \"host:port\", \"debugMode\": true}")), + mcp.WithObject("producerConfig", + mcp.Description("Custom producer configuration as JSON object.")), + mcp.WithString("logTopic", + mcp.Description("Topic where function logs are written.")), + mcp.WithString("schemaType", + mcp.Description("Output schema type or schema class name.")), + mcp.WithString("outputSerdeClassName", + mcp.Description("SerDe class for output messages.")), + mcp.WithObject("customSerdeInputs", + mcp.Description("Map of input topics to SerDe class names (JSON object).")), + mcp.WithObject("customSchemaInputs", + mcp.Description("Map of input topics to Schema class names (JSON object).")), + mcp.WithObject("customSchemaOutputs", + mcp.Description("Map of output topics to Schema properties (JSON object).")), + mcp.WithString("inputTypeClassName", + mcp.Description("Input type class name.")), + mcp.WithString("outputTypeClassName", + mcp.Description("Output type class name.")), + mcp.WithString("processingGuarantees", + mcp.Description("Processing guarantees (delivery semantics).")), + mcp.WithBoolean("retainOrdering", + mcp.Description("Process messages in order.")), + mcp.WithBoolean("retainKeyOrdering", + mcp.Description("Process messages in key order.")), + mcp.WithString("batchBuilder", + mcp.Description("Batch builder type (DEFAULT or KEY_BASED).")), + mcp.WithBoolean("forwardSourceMessageProperty", + mcp.Description("Forward input message properties to output topic.")), + mcp.WithString("subsName", + mcp.Description("Subscription name for input-topic consumer.")), + mcp.WithString("subsPosition", + mcp.Description("Subscription position for input-topic consumer.")), + mcp.WithBoolean("skipToLatest", + mcp.Description("Skip to latest message on function instance restart.")), + mcp.WithBoolean("autoAck", + mcp.Description("Whether the framework acknowledges messages automatically.")), + mcp.WithNumber("timeoutMs", + mcp.Description("Message timeout in milliseconds.")), + mcp.WithNumber("maxMessageRetries", + mcp.Description("Number of message retries before giving up.")), + mcp.WithString("deadLetterTopic", + mcp.Description("Topic for messages that are not processed successfully.")), + mcp.WithString("customRuntimeOptions", + mcp.Description("Custom runtime options for the function.")), + mcp.WithObject("secrets", + mcp.Description("Secrets map for the function (JSON object).")), + mcp.WithBoolean("cleanupSubscription", + mcp.Description("Whether to delete the subscription when the function is deleted.")), + mcp.WithNumber("windowLengthCount", + mcp.Description("Number of messages per window.")), + mcp.WithNumber("windowLengthDurationMs", + mcp.Description("Window length in milliseconds.")), + mcp.WithNumber("slidingIntervalCount", + mcp.Description("Number of messages after which the window slides.")), + mcp.WithNumber("slidingIntervalDurationMs", + mcp.Description("Window sliding interval in milliseconds.")), + mcp.WithString("functionConfigFile", + mcp.Description("Path to a YAML config file that specifies the function configuration.")), + mcp.WithBoolean("updateAuthData", + mcp.Description("Whether to update auth data on update operations.")), mcp.WithString("key", mcp.Description("The state key. Required for 'querystate' and 'putstate' operations. "+ "Keys are used to identify values in the function's state store. "+ @@ -197,10 +283,12 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsTool() mcp.To "Used when triggering a function that consumes from multiple topics. "+ "If not provided, the first input topic will be used.")), mcp.WithString("triggerValue", - mcp.Description("The value with which to trigger the function. Required for 'trigger' operation. "+ + mcp.Description("The value with which to trigger the function. Required for 'trigger' operation unless triggerFile is set. "+ "This value will be passed to the function as if it were a message from the input topic. "+ "String values are sent as is; for typed values, ensure proper formatting based on function expectations. "+ "The function processes this value just like a normal message.")), + mcp.WithString("triggerFile", + mcp.Description("Path to a file containing the trigger value. Required for 'trigger' operation unless triggerValue is set.")), ) } @@ -246,54 +334,53 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsHandler(readO return b.handleError("check permissions", fmt.Errorf("operation '%s' not allowed in read-only mode. Read-only mode restricts modifications to Pulsar Functions", operation)), nil } - // Extract common parameters - tenant, err := request.RequireString("tenant") + identity, err := b.parseFunctionIdentity(request, operation) if err != nil { - return b.handleError("get tenant", fmt.Errorf("missing required parameter 'tenant': %v. A tenant is required for all Pulsar Functions operations", err)), nil - } - - namespace, err := request.RequireString("namespace") - if err != nil { - return b.handleError("get namespace", fmt.Errorf("missing required parameter 'namespace': %v. A namespace is required for all Pulsar Functions operations", err)), nil - } - - // For all operations except 'list', name is required - var name string - if operation != "list" { - name, err = request.RequireString("name") - if err != nil { - return b.handleError("get name", fmt.Errorf("missing required parameter 'name' for operation '%s': %v. The function name must be specified for this operation", operation, err)), nil - } + return b.handleError("get function identity", err), nil } // Handle operation using delegated handlers switch operation { case "list": - return b.handleFunctionList(ctx, client, tenant, namespace) + return b.handleFunctionList(ctx, client, identity.Tenant, identity.Namespace) case "get": - return b.handleFunctionGet(ctx, client, tenant, namespace, name) + return b.handleFunctionGet(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "status": - return b.handleFunctionStatus(ctx, client, tenant, namespace, name) + instanceID, ok, err := parseOptionalIntArg(request.GetArguments(), "instanceId") + if err != nil { + return b.handleError("get instanceId", err), nil + } + if ok { + return b.handleFunctionStatusWithInstance(ctx, client, identity.Tenant, identity.Namespace, identity.Name, instanceID) + } + return b.handleFunctionStatus(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "stats": - return b.handleFunctionStats(ctx, client, tenant, namespace, name) + instanceID, ok, err := parseOptionalIntArg(request.GetArguments(), "instanceId") + if err != nil { + return b.handleError("get instanceId", err), nil + } + if ok { + return b.handleFunctionStatsWithInstance(ctx, client, identity.Tenant, identity.Namespace, identity.Name, instanceID) + } + return b.handleFunctionStats(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "querystate": key, err := request.RequireString("key") if err != nil { return b.handleError("get key", fmt.Errorf("missing required parameter 'key' for operation 'querystate': %v. A key is required to look up state in the function's state store", err)), nil } - return b.handleFunctionQuerystate(ctx, client, tenant, namespace, name, key) + return b.handleFunctionQuerystate(ctx, client, identity.Tenant, identity.Namespace, identity.Name, key) case "create": - return b.handleFunctionCreate(ctx, client, tenant, namespace, name, request) + return b.handleFunctionCreate(ctx, client, identity.Tenant, identity.Namespace, identity.Name, request) case "update": - return b.handleFunctionUpdate(ctx, client, tenant, namespace, name, request) + return b.handleFunctionUpdate(ctx, client, identity.Tenant, identity.Namespace, identity.Name, request) case "delete": - return b.handleFunctionDelete(ctx, client, tenant, namespace, name) + return b.handleFunctionDelete(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "start": - return b.handleFunctionStart(ctx, client, tenant, namespace, name) + return b.handleFunctionStart(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "stop": - return b.handleFunctionStop(ctx, client, tenant, namespace, name) + return b.handleFunctionStop(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "restart": - return b.handleFunctionRestart(ctx, client, tenant, namespace, name) + return b.handleFunctionRestart(ctx, client, identity.Tenant, identity.Namespace, identity.Name) case "putstate": key, err := request.RequireString("key") if err != nil { @@ -303,14 +390,22 @@ func (b *PulsarAdminFunctionsToolBuilder) buildPulsarAdminFunctionsHandler(readO if err != nil { return b.handleError("get value", fmt.Errorf("missing required parameter 'value' for operation 'putstate': %v. A value is required to store state in the function's state store", err)), nil } - return b.handleFunctionPutstate(ctx, client, tenant, namespace, name, key, value) + return b.handleFunctionPutstate(ctx, client, identity.Tenant, identity.Namespace, identity.Name, key, value) case "trigger": - triggerValue, err := request.RequireString("triggerValue") - if err != nil { - return b.handleError("get triggerValue", fmt.Errorf("missing required parameter 'triggerValue' for operation 'trigger': %v. A trigger value is required to manually trigger the function", err)), nil + args := request.GetArguments() + triggerValue, hasValue := getStringArg(args, "triggerValue") + triggerFile, hasFile := getStringArg(args, "triggerFile") + if hasValue && triggerValue == "" { + return b.handleError("get triggerValue", fmt.Errorf("triggerValue cannot be empty")), nil + } + if hasFile && triggerFile == "" { + return b.handleError("get triggerFile", fmt.Errorf("triggerFile cannot be empty")), nil + } + if hasValue == hasFile { + return b.handleError("validate trigger arguments", fmt.Errorf("exactly one of triggerValue or triggerFile must be provided")), nil } topic := request.GetString("topic", "") - return b.handleFunctionTrigger(ctx, client, tenant, namespace, name, triggerValue, topic) + return b.handleFunctionTrigger(ctx, client, identity.Tenant, identity.Namespace, identity.Name, triggerValue, triggerFile, topic) default: return b.handleError("handle operation", fmt.Errorf("unsupported operation: %s", operation)), nil } @@ -359,6 +454,18 @@ func (b *PulsarAdminFunctionsToolBuilder) handleFunctionStatus(_ context.Context return b.marshalResponse(status) } +// handleFunctionStatusWithInstance handles the status operation for a single instance +func (b *PulsarAdminFunctionsToolBuilder) handleFunctionStatusWithInstance(_ context.Context, client cmdutils.Client, tenant, namespace, name string, instanceID int) (*mcp.CallToolResult, error) { + admin := client.Functions() + + status, err := admin.GetFunctionStatusWithInstanceID(tenant, namespace, name, instanceID) + if err != nil { + return b.handleError("get function status", err), nil + } + + return b.marshalResponse(status) +} + // handleFunctionStats handles the stats operation func (b *PulsarAdminFunctionsToolBuilder) handleFunctionStats(_ context.Context, client cmdutils.Client, tenant, namespace, name string) (*mcp.CallToolResult, error) { admin := client.Functions() @@ -372,6 +479,19 @@ func (b *PulsarAdminFunctionsToolBuilder) handleFunctionStats(_ context.Context, return b.marshalResponse(stats) } +// handleFunctionStatsWithInstance handles the stats operation for a single instance +func (b *PulsarAdminFunctionsToolBuilder) handleFunctionStatsWithInstance(_ context.Context, client cmdutils.Client, tenant, namespace, name string, instanceID int) (*mcp.CallToolResult, error) { + admin := client.Functions() + + stats, err := admin.GetFunctionStatsWithInstanceID(tenant, namespace, name, instanceID) + if err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to get stats for function '%s' instance '%d' in tenant '%s' namespace '%s': %v. Verify the function exists and is running.", + name, instanceID, tenant, namespace, err)), nil + } + + return b.marshalResponse(stats) +} + // handleFunctionQuerystate handles the querystate operation func (b *PulsarAdminFunctionsToolBuilder) handleFunctionQuerystate(_ context.Context, client cmdutils.Client, tenant, namespace, name, key string) (*mcp.CallToolResult, error) { admin := client.Functions() @@ -396,31 +516,34 @@ func (b *PulsarAdminFunctionsToolBuilder) handleFunctionQuerystate(_ context.Con // handleFunctionCreate handles the create operation func (b *PulsarAdminFunctionsToolBuilder) handleFunctionCreate(_ context.Context, client cmdutils.Client, tenant, namespace, name string, request mcp.CallToolRequest) (*mcp.CallToolResult, error) { // Build function configuration from request parameters to validate - functionConfig, err := b.buildFunctionConfig(tenant, namespace, name, request, false) + functionConfig, err := b.buildFunctionConfig(tenant, namespace, name, request) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to build function configuration for '%s' in tenant '%s' namespace '%s': %v. Please verify all required parameters are provided correctly.", name, tenant, namespace, err)), nil } + if err := validateFunctionConfigs(functionConfig); err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to validate function configuration for '%s' in tenant '%s' namespace '%s': %v.", + functionConfig.Name, functionConfig.Tenant, functionConfig.Namespace, err)), nil + } admin := client.Functions() - packagePath := "" - //nolint:gocritic - if functionConfig.Jar != nil { - packagePath = *functionConfig.Jar - } else if functionConfig.Py != nil { - packagePath = *functionConfig.Py - } else if functionConfig.Go != nil { - packagePath = *functionConfig.Go + packagePath := resolvePackagePath(functionConfig) + if packagePath == "" { + return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s': missing function package path", functionConfig.Name)), nil } - err = admin.CreateFuncWithURL(functionConfig, packagePath) + if isPackageURLSupported(packagePath) { + err = admin.CreateFuncWithURL(functionConfig, packagePath) + } else { + err = admin.CreateFunc(functionConfig, packagePath) + } if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to create function '%s' in tenant '%s' namespace '%s': %v. Verify the function configuration is valid.", - name, tenant, namespace, err)), nil + functionConfig.Name, functionConfig.Tenant, functionConfig.Namespace, err)), nil } return mcp.NewToolResultText(fmt.Sprintf("Created function '%s' successfully in tenant '%s' namespace '%s'. The function configuration has been created.", - name, tenant, namespace)), nil + functionConfig.Name, functionConfig.Tenant, functionConfig.Namespace)), nil } // handleFunctionUpdate handles the update operation @@ -428,24 +551,35 @@ func (b *PulsarAdminFunctionsToolBuilder) handleFunctionUpdate(_ context.Context admin := client.Functions() // Build function configuration from request parameters - config, err := b.buildFunctionConfig(tenant, namespace, name, request, true) + config, err := b.buildFunctionConfig(tenant, namespace, name, request) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to build function configuration for '%s' in tenant '%s' namespace '%s': %v. Please verify all parameters are provided correctly.", name, tenant, namespace, err)), nil } + if err := checkArgsForUpdate(config); err != nil { + return mcp.NewToolResultError(fmt.Sprintf("Failed to validate update configuration for '%s' in tenant '%s' namespace '%s': %v.", + config.Name, config.Tenant, config.Namespace, err)), nil + } // Update the function - updateOptions := &utils.UpdateOptions{ - UpdateAuthData: true, + updateOptions := utils.NewUpdateOptions() + if updateAuthData, ok := getBoolArg(request.GetArguments(), "updateAuthData"); ok { + updateOptions.UpdateAuthData = updateAuthData + } + + packagePath := resolvePackagePath(config) + if packagePath != "" && isPackageURLSupported(packagePath) { + err = admin.UpdateFunctionWithURL(config, packagePath, updateOptions) + } else { + err = admin.UpdateFunction(config, packagePath, updateOptions) } - err = admin.UpdateFunction(config, "", updateOptions) if err != nil { return mcp.NewToolResultError(fmt.Sprintf("Failed to update function '%s' in tenant '%s' namespace '%s': %v. Verify the function exists and the configuration is valid.", - name, tenant, namespace, err)), nil + config.Name, config.Tenant, config.Namespace, err)), nil } return mcp.NewToolResultText(fmt.Sprintf("Updated function '%s' successfully in tenant '%s' namespace '%s'. The function configuration has been modified.", - name, tenant, namespace)), nil + config.Name, config.Tenant, config.Namespace)), nil } // handleFunctionDelete handles the delete operation @@ -522,17 +656,17 @@ func (b *PulsarAdminFunctionsToolBuilder) handleFunctionPutstate(_ context.Conte } // handleFunctionTrigger handles the trigger operation -func (b *PulsarAdminFunctionsToolBuilder) handleFunctionTrigger(_ context.Context, client cmdutils.Client, tenant, namespace, name, triggerValue, topic string) (*mcp.CallToolResult, error) { +func (b *PulsarAdminFunctionsToolBuilder) handleFunctionTrigger(_ context.Context, client cmdutils.Client, tenant, namespace, name, triggerValue, triggerFile, topic string) (*mcp.CallToolResult, error) { admin := client.Functions() var err error var result string if topic != "" { // Trigger with specific topic - result, err = admin.TriggerFunction(tenant, namespace, name, topic, triggerValue, "") + result, err = admin.TriggerFunction(tenant, namespace, name, topic, triggerValue, triggerFile) } else { // Trigger without specific topic (uses first input topic) - result, err = admin.TriggerFunction(tenant, namespace, name, "", triggerValue, "") + result, err = admin.TriggerFunction(tenant, namespace, name, "", triggerValue, triggerFile) } if err != nil { @@ -554,87 +688,768 @@ func (b *PulsarAdminFunctionsToolBuilder) handleFunctionTrigger(_ context.Contex // Helper functions -// buildFunctionConfig builds a Pulsar Function configuration from MCP request parameters -func (b *PulsarAdminFunctionsToolBuilder) buildFunctionConfig(tenant, namespace, name string, request mcp.CallToolRequest, isUpdate bool) (*utils.FunctionConfig, error) { - config := &utils.FunctionConfig{ +type functionIdentity struct { + Tenant string + Namespace string + Name string +} + +const ( + defaultTenant = "public" + defaultNamespace = "default" +) + +func (b *PulsarAdminFunctionsToolBuilder) parseFunctionIdentity(request mcp.CallToolRequest, operation string) (functionIdentity, error) { + args := request.GetArguments() + fqfn, _ := getStringArg(args, "fqfn") + tenant, _ := getStringArg(args, "tenant") + namespace, _ := getStringArg(args, "namespace") + name, _ := getStringArg(args, "name") + + if fqfn != "" { + if operation == "list" { + return functionIdentity{}, fmt.Errorf("fqfn is not supported for operation 'list'") + } + if tenant != "" || namespace != "" || name != "" { + return functionIdentity{}, fmt.Errorf("fqfn cannot be combined with tenant, namespace, or name") + } + parsed, err := parseFullyQualifiedFunctionName(fqfn) + if err != nil { + return functionIdentity{}, err + } + tenant, namespace, name = parsed.Tenant, parsed.Namespace, parsed.Name + } + + if tenant == "" { + tenant = defaultTenant + } + if namespace == "" { + namespace = defaultNamespace + } + + requiresName := operation != "list" && operation != "create" && operation != "update" + if requiresName && name == "" { + return functionIdentity{}, fmt.Errorf("you must specify a name for the function or a fully qualified function name (fqfn)") + } + + return functionIdentity{ Tenant: tenant, Namespace: namespace, Name: name, + }, nil +} + +// buildFunctionConfig builds a Pulsar Function configuration from MCP request parameters +func (b *PulsarAdminFunctionsToolBuilder) buildFunctionConfig(tenant, namespace, name string, request mcp.CallToolRequest) (*utils.FunctionConfig, error) { + config := &utils.FunctionConfig{} + args := request.GetArguments() + + if configFile, ok := getStringArg(args, "functionConfigFile"); ok && configFile != "" { + //nolint:gosec + data, err := os.ReadFile(configFile) + if err == nil { + if err := yaml.Unmarshal(data, config); err != nil { + return nil, fmt.Errorf("unmarshal function config file error: %w", err) + } + } else if !os.IsNotExist(err) { + return nil, fmt.Errorf("load function config file failed: %w", err) + } + } + + if tenant != "" { + config.Tenant = tenant + } + if namespace != "" { + config.Namespace = namespace + } + if name != "" { + config.Name = name } - // Get required classname parameter (for create operations) - if !isUpdate { - classname, err := request.RequireString("classname") + if fqfn, ok := getStringArg(args, "fqfn"); ok && fqfn != "" { + parsed, err := parseFullyQualifiedFunctionName(fqfn) if err != nil { - return nil, fmt.Errorf("missing required parameter 'classname': %v", err) + return nil, err } + config.Tenant = parsed.Tenant + config.Namespace = parsed.Namespace + config.Name = parsed.Name + } + + if classname, ok := getStringArg(args, "classname"); ok && classname != "" { config.ClassName = classname - } else { - // For update, classname is optional - if classname := request.GetString("classname", ""); classname != "" { - config.ClassName = classname + } + + if inputsValue, exists := args["inputs"]; exists && inputsValue != nil { + inputs, err := parseStringSlice(inputsValue) + if err != nil { + return nil, fmt.Errorf("invalid inputs: %w", err) } + config.Inputs = inputs } - // Get inputs parameter (array of strings) - args := request.GetArguments() - if inputsInterface, exists := args["inputs"]; exists && inputsInterface != nil { - if inputsArray, ok := inputsInterface.([]interface{}); ok { - inputSpecs := make(map[string]utils.ConsumerConfig) - for _, input := range inputsArray { - if inputStr, ok := input.(string); ok { - inputSpecs[inputStr] = utils.ConsumerConfig{ - SerdeClassName: "", - SchemaType: "", - } - } - } - if len(inputSpecs) > 0 { - config.InputSpecs = inputSpecs - } + if topicsPattern, ok := getStringArg(args, "topicsPattern"); ok && topicsPattern != "" { + config.TopicsPattern = &topicsPattern + } + + if inputSpecsValue, exists := args["inputSpecs"]; exists && inputSpecsValue != nil { + inputSpecs, err := decodeConsumerConfigMap(inputSpecsValue) + if err != nil { + return nil, fmt.Errorf("invalid inputSpecs: %w", err) } + config.InputSpecs = inputSpecs } - // Get optional output parameter - if output := request.GetString("output", ""); output != "" { + if output, ok := getStringArg(args, "output"); ok && output != "" { config.Output = output } - // Get optional parallelism parameter - if parallelismInterface, exists := args["parallelism"]; exists && parallelismInterface != nil { - if parallelismFloat, ok := parallelismInterface.(float64); ok { - config.Parallelism = int(parallelismFloat) + if parallelism, ok, err := parseOptionalIntArg(args, "parallelism"); err != nil { + return nil, err + } else if ok { + config.Parallelism = parallelism + } + + if cpu, ok, err := parseOptionalFloatArg(args, "cpu"); err != nil { + return nil, err + } else if ok { + config.Resources = ensureResources(config.Resources) + config.Resources.CPU = cpu + } + + if ram, ok, err := parseOptionalInt64Arg(args, "ram"); err != nil { + return nil, err + } else if ok { + config.Resources = ensureResources(config.Resources) + config.Resources.RAM = ram + } + + if disk, ok, err := parseOptionalInt64Arg(args, "disk"); err != nil { + return nil, err + } else if ok { + config.Resources = ensureResources(config.Resources) + config.Resources.Disk = disk + } + + if userConfigValue, exists := args["userConfig"]; exists && userConfigValue != nil { + userConfig, err := decodeInterfaceMap(userConfigValue) + if err != nil { + return nil, fmt.Errorf("invalid userConfig: %w", err) + } + config.UserConfig = userConfig + } + + if producerConfigValue, exists := args["producerConfig"]; exists && producerConfigValue != nil { + producerConfig := &utils.ProducerConfig{} + if err := decodeInto(producerConfigValue, producerConfig); err != nil { + return nil, fmt.Errorf("invalid producerConfig: %w", err) + } + config.ProducerConfig = producerConfig + } + + if logTopic, ok := getStringArg(args, "logTopic"); ok && logTopic != "" { + config.LogTopic = logTopic + } + + if schemaType, ok := getStringArg(args, "schemaType"); ok && schemaType != "" { + config.OutputSchemaType = schemaType + } + + if outputSerdeClassName, ok := getStringArg(args, "outputSerdeClassName"); ok && outputSerdeClassName != "" { + config.OutputSerdeClassName = outputSerdeClassName + } + + if customSerdeInputsValue, exists := args["customSerdeInputs"]; exists && customSerdeInputsValue != nil { + customSerdeInputs, err := decodeStringMap(customSerdeInputsValue) + if err != nil { + return nil, fmt.Errorf("invalid customSerdeInputs: %w", err) + } + config.CustomSerdeInputs = customSerdeInputs + } + + if customSchemaInputsValue, exists := args["customSchemaInputs"]; exists && customSchemaInputsValue != nil { + customSchemaInputs, err := decodeStringMap(customSchemaInputsValue) + if err != nil { + return nil, fmt.Errorf("invalid customSchemaInputs: %w", err) + } + config.CustomSchemaInputs = customSchemaInputs + } + + if customSchemaOutputsValue, exists := args["customSchemaOutputs"]; exists && customSchemaOutputsValue != nil { + customSchemaOutputs, err := decodeStringMap(customSchemaOutputsValue) + if err != nil { + return nil, fmt.Errorf("invalid customSchemaOutputs: %w", err) } + config.CustomSchemaOutputs = customSchemaOutputs + } + + if inputTypeClassName, ok := getStringArg(args, "inputTypeClassName"); ok && inputTypeClassName != "" { + config.InputTypeClassName = inputTypeClassName + } + + if outputTypeClassName, ok := getStringArg(args, "outputTypeClassName"); ok && outputTypeClassName != "" { + config.OutputTypeClassName = outputTypeClassName + } + + if processingGuarantees, ok := getStringArg(args, "processingGuarantees"); ok && processingGuarantees != "" { + config.ProcessingGuarantees = processingGuarantees + } + + if retainOrdering, ok := getBoolArg(args, "retainOrdering"); ok { + config.RetainOrdering = retainOrdering + } + + if retainKeyOrdering, ok := getBoolArg(args, "retainKeyOrdering"); ok { + config.RetainKeyOrdering = retainKeyOrdering + } + + if batchBuilder, ok := getStringArg(args, "batchBuilder"); ok && batchBuilder != "" { + config.BatchBuilder = batchBuilder + } + + if forwardSourceMessageProperty, ok := getBoolArg(args, "forwardSourceMessageProperty"); ok { + config.ForwardSourceMessageProperty = forwardSourceMessageProperty + } else { + config.ForwardSourceMessageProperty = true + } + + if subsName, ok := getStringArg(args, "subsName"); ok && subsName != "" { + config.SubName = subsName + } + + if subsPosition, ok := getStringArg(args, "subsPosition"); ok && subsPosition != "" { + config.SubscriptionPosition = subsPosition + } + + if skipToLatest, ok := getBoolArg(args, "skipToLatest"); ok { + config.SkipToLatest = skipToLatest + } + + if timeoutMs, ok, err := parseOptionalInt64Arg(args, "timeoutMs"); err != nil { + return nil, err + } else if ok { + config.TimeoutMs = &timeoutMs + } + + if maxMessageRetries, ok, err := parseOptionalIntArg(args, "maxMessageRetries"); err != nil { + return nil, err + } else if ok { + config.MaxMessageRetries = &maxMessageRetries + } + + if deadLetterTopic, ok := getStringArg(args, "deadLetterTopic"); ok && deadLetterTopic != "" { + config.DeadLetterTopic = deadLetterTopic + } + + if customRuntimeOptions, ok := getStringArg(args, "customRuntimeOptions"); ok && customRuntimeOptions != "" { + config.CustomRuntimeOptions = customRuntimeOptions + } + + if secretsValue, exists := args["secrets"]; exists && secretsValue != nil { + secrets, err := decodeInterfaceMap(secretsValue) + if err != nil { + return nil, fmt.Errorf("invalid secrets: %w", err) + } + config.Secrets = secrets + } + + if cleanupSubscription, ok := getBoolArg(args, "cleanupSubscription"); ok { + config.CleanupSubscription = cleanupSubscription + } else { + config.CleanupSubscription = true + } + + if autoAck, ok := getBoolArg(args, "autoAck"); ok { + config.AutoAck = autoAck + } else { + config.AutoAck = true + } + + if windowLengthCount, ok, err := parseOptionalIntArg(args, "windowLengthCount"); err != nil { + return nil, err + } else if ok { + config.WindowConfig = ensureWindowConfig(config.WindowConfig) + value := windowLengthCount + config.WindowConfig.WindowLengthCount = &value + } + + if windowLengthDurationMs, ok, err := parseOptionalInt64Arg(args, "windowLengthDurationMs"); err != nil { + return nil, err + } else if ok { + config.WindowConfig = ensureWindowConfig(config.WindowConfig) + value := windowLengthDurationMs + config.WindowConfig.WindowLengthDurationMs = &value + } + + if slidingIntervalCount, ok, err := parseOptionalIntArg(args, "slidingIntervalCount"); err != nil { + return nil, err + } else if ok { + config.WindowConfig = ensureWindowConfig(config.WindowConfig) + value := slidingIntervalCount + config.WindowConfig.SlidingIntervalCount = &value + } + + if slidingIntervalDurationMs, ok, err := parseOptionalInt64Arg(args, "slidingIntervalDurationMs"); err != nil { + return nil, err + } else if ok { + config.WindowConfig = ensureWindowConfig(config.WindowConfig) + value := slidingIntervalDurationMs + config.WindowConfig.SlidingIntervalDurationMs = &value + } + + functionTypeArg, _ := getStringArg(args, "functionType") + jarArg, _ := getStringArg(args, "jar") + pyArg, _ := getStringArg(args, "py") + goArg, _ := getStringArg(args, "go") + + functionTypeValue := functionTypeArg + if functionTypeValue == "" && config.FunctionType != nil { + functionTypeValue = *config.FunctionType + } + jarValue := jarArg + if jarValue == "" && config.Jar != nil { + jarValue = *config.Jar + } + pyValue := pyArg + if pyValue == "" && config.Py != nil { + pyValue = *config.Py + } + goValue := goArg + if goValue == "" && config.Go != nil { + goValue = *config.Go + } + + if functionTypeValue != "" && (jarValue != "" || pyValue != "" || goValue != "") { + return nil, fmt.Errorf("functionType is mutually exclusive with jar, py, and go") + } + providedPackages := 0 + if jarValue != "" { + providedPackages++ + } + if pyValue != "" { + providedPackages++ + } + if goValue != "" { + providedPackages++ + } + if providedPackages > 1 { + return nil, fmt.Errorf("jar, py, and go are mutually exclusive") + } + + if functionTypeValue != "" { + jar := fmt.Sprintf("builtin://%s", functionTypeValue) + config.Jar = &jar + } + + if jarValue != "" { + config.Jar = &jarValue + } + + if pyValue != "" { + config.Py = &pyValue + } + + if goValue != "" { + config.Go = &goValue + } + + if config.Go != nil { + config.Runtime = utils.GoRuntime + } + if config.Py != nil { + config.Runtime = utils.PythonRuntime + } + if config.Jar != nil { + config.Runtime = utils.JavaRuntime } - // Set default parallelism if not specified if config.Parallelism <= 0 { config.Parallelism = 1 } - // Get optional jar parameter - if jar := request.GetString("jar", ""); jar != "" { - config.Jar = &jar + if config.UserConfig == nil { + config.UserConfig = make(map[string]interface{}) + } + if config.Secrets == nil { + config.Secrets = make(map[string]interface{}) + } + + formatFunctionConfig(config) + + return config, nil +} + +func parseFullyQualifiedFunctionName(fqfn string) (functionIdentity, error) { + parts := strings.Split(fqfn, "/") + if len(parts) != 3 { + return functionIdentity{}, fmt.Errorf("fully qualified function names must be of the form tenant/namespace/name") + } + if parts[0] == "" || parts[1] == "" || parts[2] == "" { + return functionIdentity{}, fmt.Errorf("fully qualified function names must be of the form tenant/namespace/name") + } + return functionIdentity{ + Tenant: parts[0], + Namespace: parts[1], + Name: parts[2], + }, nil +} + +func validateFunctionConfigs(functionConfig *utils.FunctionConfig) error { + if functionConfig.Name == "" { + inferMissingFunctionName(functionConfig) + } + if functionConfig.Tenant == "" { + inferMissingTenant(functionConfig) + } + if functionConfig.Namespace == "" { + inferMissingNamespace(functionConfig) } - // Get optional py parameter - if py := request.GetString("py", ""); py != "" { - config.Py = &py + switch numProvidedStrings(functionConfig.Jar, functionConfig.Py, functionConfig.Go) { + case 0: + return fmt.Errorf("either a Java jar or a Python file or a Go executable binary needs to be specified for the function") + case 1: + default: + return fmt.Errorf("either a Java jar or a Python file or a Go executable binary needs to be specified for the function, cannot specify more than one") } - // Get optional go parameter - if goFile := request.GetString("go", ""); goFile != "" { - config.Go = &goFile + if functionConfig.Jar != nil && !strings.HasPrefix(*functionConfig.Jar, "builtin://") && + !isPackageURLSupported(*functionConfig.Jar) && + !fileExists(*functionConfig.Jar) { + return fmt.Errorf("the specified jar file does not exist") + } + if functionConfig.Py != nil && !isPackageURLSupported(*functionConfig.Py) && !fileExists(*functionConfig.Py) { + return fmt.Errorf("the specified py file does not exist") + } + if functionConfig.Go != nil && !isPackageURLSupported(*functionConfig.Go) && !fileExists(*functionConfig.Go) { + return fmt.Errorf("the specified go file does not exist") } - // Get optional userConfig parameter (JSON object) - if userConfigInterface, exists := args["userConfig"]; exists && userConfigInterface != nil { - if userConfigMap, ok := userConfigInterface.(map[string]interface{}); ok { - config.UserConfig = userConfigMap + if functionConfig.Go != nil { + functionConfig.Runtime = utils.GoRuntime + } + if functionConfig.Py != nil { + functionConfig.Runtime = utils.PythonRuntime + } + if functionConfig.Jar != nil { + functionConfig.Runtime = utils.JavaRuntime + } + + if functionConfig.Runtime == utils.JavaRuntime || functionConfig.Runtime == utils.PythonRuntime { + if functionConfig.ClassName == "" { + return fmt.Errorf("no function classname specified") } } + return nil +} - return config, nil +func checkArgsForUpdate(functionConfig *utils.FunctionConfig) error { + if functionConfig.ClassName == "" { + if functionConfig.Name == "" { + return fmt.Errorf("function name not provided") + } + } else if functionConfig.Name == "" { + inferMissingFunctionName(functionConfig) + } + + if functionConfig.Tenant == "" { + inferMissingTenant(functionConfig) + } + if functionConfig.Namespace == "" { + inferMissingNamespace(functionConfig) + } + return nil +} + +func inferMissingFunctionName(funcConf *utils.FunctionConfig) { + className := funcConf.ClassName + domains := strings.Split(className, ".") + if len(domains) == 0 { + funcConf.Name = funcConf.ClassName + return + } + funcConf.Name = domains[len(domains)-1] +} + +func inferMissingTenant(funcConf *utils.FunctionConfig) { + funcConf.Tenant = defaultTenant +} + +func inferMissingNamespace(funcConf *utils.FunctionConfig) { + funcConf.Namespace = defaultNamespace +} + +func formatFunctionConfig(funcConf *utils.FunctionConfig) { + if funcConf == nil { + return + } + for key, value := range funcConf.UserConfig { + funcConf.UserConfig[key] = convertMap(value) + } + for key, value := range funcConf.Secrets { + funcConf.Secrets[key] = convertMap(value) + } +} + +func resolvePackagePath(config *utils.FunctionConfig) string { + if config.Jar != nil { + return *config.Jar + } + if config.Py != nil { + return *config.Py + } + if config.Go != nil { + return *config.Go + } + return "" +} + +func ensureResources(resources *utils.Resources) *utils.Resources { + if resources == nil { + return utils.NewDefaultResources() + } + return resources +} + +func ensureWindowConfig(conf *utils.WindowConfig) *utils.WindowConfig { + if conf == nil { + return utils.NewDefaultWindowConfing() + } + return conf +} + +func parseStringSlice(value interface{}) ([]string, error) { + switch v := value.(type) { + case []string: + return v, nil + case []interface{}: + result := make([]string, 0, len(v)) + for _, item := range v { + str, ok := item.(string) + if !ok { + return nil, fmt.Errorf("inputs must be strings") + } + if strings.TrimSpace(str) != "" { + result = append(result, str) + } + } + return result, nil + case string: + parts := strings.Split(v, ",") + result := make([]string, 0, len(parts)) + for _, part := range parts { + item := strings.TrimSpace(part) + if item != "" { + result = append(result, item) + } + } + return result, nil + default: + return nil, fmt.Errorf("unsupported inputs type") + } +} + +func decodeStringMap(value interface{}) (map[string]string, error) { + result := map[string]string{} + if err := decodeInto(value, &result); err != nil { + return nil, err + } + return result, nil +} + +func decodeInterfaceMap(value interface{}) (map[string]interface{}, error) { + result := map[string]interface{}{} + if err := decodeInto(value, &result); err != nil { + return nil, err + } + return result, nil +} + +func decodeConsumerConfigMap(value interface{}) (map[string]utils.ConsumerConfig, error) { + result := map[string]utils.ConsumerConfig{} + if err := decodeInto(value, &result); err != nil { + return nil, err + } + return result, nil +} + +func decodeInto(value interface{}, out interface{}) error { + switch v := value.(type) { + case string: + if err := json.Unmarshal([]byte(v), out); err != nil { + return err + } + return nil + default: + raw, err := json.Marshal(v) + if err != nil { + return err + } + return json.Unmarshal(raw, out) + } +} + +func parseOptionalIntArg(args map[string]interface{}, key string) (int, bool, error) { + value, ok := args[key] + if !ok || value == nil { + return 0, false, nil + } + switch v := value.(type) { + case int: + return v, true, nil + case int32: + return int(v), true, nil + case int64: + return int(v), true, nil + case float64: + return int(v), true, nil + case float32: + return int(v), true, nil + case json.Number: + parsed, err := v.Int64() + return int(parsed), true, err + case string: + parsed, err := strconv.Atoi(v) + return parsed, true, err + default: + return 0, true, fmt.Errorf("invalid value for %s", key) + } +} + +func parseOptionalInt64Arg(args map[string]interface{}, key string) (int64, bool, error) { + value, ok := args[key] + if !ok || value == nil { + return 0, false, nil + } + switch v := value.(type) { + case int: + return int64(v), true, nil + case int32: + return int64(v), true, nil + case int64: + return v, true, nil + case float64: + return int64(v), true, nil + case float32: + return int64(v), true, nil + case json.Number: + parsed, err := v.Int64() + return parsed, true, err + case string: + parsed, err := strconv.ParseInt(v, 10, 64) + return parsed, true, err + default: + return 0, true, fmt.Errorf("invalid value for %s", key) + } +} + +func parseOptionalFloatArg(args map[string]interface{}, key string) (float64, bool, error) { + value, ok := args[key] + if !ok || value == nil { + return 0, false, nil + } + switch v := value.(type) { + case float64: + return v, true, nil + case float32: + return float64(v), true, nil + case int: + return float64(v), true, nil + case int64: + return float64(v), true, nil + case json.Number: + parsed, err := v.Float64() + return parsed, true, err + case string: + parsed, err := strconv.ParseFloat(v, 64) + return parsed, true, err + default: + return 0, true, fmt.Errorf("invalid value for %s", key) + } +} + +func getStringArg(args map[string]interface{}, key string) (string, bool) { + value, ok := args[key] + if !ok || value == nil { + return "", false + } + switch v := value.(type) { + case string: + return v, true + default: + return fmt.Sprintf("%v", v), true + } +} + +func getBoolArg(args map[string]interface{}, key string) (bool, bool) { + value, ok := args[key] + if !ok || value == nil { + return false, false + } + switch v := value.(type) { + case bool: + return v, true + case string: + parsed, err := strconv.ParseBool(v) + if err != nil { + return false, true + } + return parsed, true + case float64: + return v != 0, true + case int: + return v != 0, true + default: + return false, true + } +} + +func numProvidedStrings(values ...*string) int { + count := 0 + for _, value := range values { + if value != nil && *value != "" { + count++ + } + } + return count +} + +func convertMap(value interface{}) interface{} { + switch v := value.(type) { + case map[interface{}]interface{}: + converted := make(map[string]interface{}, len(v)) + for key, item := range v { + converted[fmt.Sprintf("%v", key)] = convertMap(item) + } + return converted + case map[string]interface{}: + converted := make(map[string]interface{}, len(v)) + for key, item := range v { + converted[key] = convertMap(item) + } + return converted + case []interface{}: + converted := make([]interface{}, 0, len(v)) + for _, item := range v { + converted = append(converted, convertMap(item)) + } + return converted + default: + return v + } +} + +func isPackageURLSupported(functionPkgURL string) bool { + return functionPkgURL != "" && (strings.HasPrefix(functionPkgURL, "http") || + strings.HasPrefix(functionPkgURL, "file") || + strings.HasPrefix(functionPkgURL, "function") || + strings.HasPrefix(functionPkgURL, "sink") || + strings.HasPrefix(functionPkgURL, "source")) +} + +func fileExists(filename string) bool { + _, err := os.Stat(filename) + return !os.IsNotExist(err) } // handleError provides unified error handling diff --git a/pkg/mcp/builders/pulsar/functions_parity_test.go b/pkg/mcp/builders/pulsar/functions_parity_test.go new file mode 100644 index 0000000..5fee9af --- /dev/null +++ b/pkg/mcp/builders/pulsar/functions_parity_test.go @@ -0,0 +1,139 @@ +// Copyright 2025 StreamNative +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package pulsar + +import ( + oslib "os" + path "path/filepath" + "testing" + + "github.com/apache/pulsar-client-go/pulsaradmin/pkg/utils" + "github.com/mark3labs/mcp-go/mcp" + "github.com/stretchr/testify/require" +) + +func TestParseFunctionIdentity(t *testing.T) { + builder := NewPulsarAdminFunctionsToolBuilder() + + req := mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{}}} + identity, err := builder.parseFunctionIdentity(req, "list") + require.NoError(t, err) + require.Equal(t, defaultTenant, identity.Tenant) + require.Equal(t, defaultNamespace, identity.Namespace) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "fqfn": "t/ns/name", + "tenant": "t", + "namespace": "ns", + }}} + _, err = builder.parseFunctionIdentity(req, "get") + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "fqfn": "tenant//name", + }}} + _, err = builder.parseFunctionIdentity(req, "get") + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "fqfn": "//name", + }}} + _, err = builder.parseFunctionIdentity(req, "get") + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "tenant": "t", + }}} + _, err = builder.parseFunctionIdentity(req, "get") + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "tenant": "t", + "namespace": "ns", + }}} + identity, err = builder.parseFunctionIdentity(req, "update") + require.NoError(t, err) + require.Equal(t, "t", identity.Tenant) + require.Equal(t, "ns", identity.Namespace) +} + +func TestValidateFunctionConfigs(t *testing.T) { + jar := "builtin://identity" + config := &utils.FunctionConfig{ + Jar: &jar, + ClassName: "org.example.Identity", + } + require.NoError(t, validateFunctionConfigs(config)) + + require.Error(t, validateFunctionConfigs(&utils.FunctionConfig{})) + + pyDir := t.TempDir() + pyFile := path.Join(pyDir, "echo.py") + require.NoError(t, oslib.WriteFile(pyFile, []byte("print('ok')"), 0o600)) + config = &utils.FunctionConfig{ + Py: &pyFile, + ClassName: "echo.EchoFunction", + } + require.NoError(t, validateFunctionConfigs(config)) + + py := "a.py" + config = &utils.FunctionConfig{ + Jar: &jar, + Py: &py, + } + require.Error(t, validateFunctionConfigs(config)) +} + +func TestCheckArgsForUpdate(t *testing.T) { + config := &utils.FunctionConfig{ + ClassName: "org.example.Example", + } + require.NoError(t, checkArgsForUpdate(config)) + require.NotEmpty(t, config.Name) + require.Equal(t, defaultTenant, config.Tenant) + require.Equal(t, defaultNamespace, config.Namespace) +} + +func TestBuildFunctionConfigMutualExclusion(t *testing.T) { + builder := NewPulsarAdminFunctionsToolBuilder() + + req := mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "functionType": "identity", + "jar": "builtin://identity", + }}} + _, err := builder.buildFunctionConfig("public", "default", "name", req) + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "functionType": "identity", + "py": "echo.py", + }}} + _, err = builder.buildFunctionConfig("public", "default", "name", req) + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "functionType": "identity", + "go": "echo", + }}} + _, err = builder.buildFunctionConfig("public", "default", "name", req) + require.Error(t, err) + + req = mcp.CallToolRequest{Params: mcp.CallToolParams{Arguments: map[string]any{ + "jar": "builtin://identity", + "py": "echo.py", + }}} + _, err = builder.buildFunctionConfig("public", "default", "name", req) + require.Error(t, err) +} diff --git a/scripts/e2e-test.sh b/scripts/e2e-test.sh index f008c28..de0eb21 100755 --- a/scripts/e2e-test.sh +++ b/scripts/e2e-test.sh @@ -39,6 +39,24 @@ die() { exit 1 } +collect_logs() { + log "collecting debug logs" + if command -v kubectl >/dev/null 2>&1; then + log "snmcp logs (last 200 lines)" + kubectl logs "deployment/${SNMCP_RELEASE}" \ + --namespace "$SNMCP_NAMESPACE" \ + --tail=200 \ + || true + fi + + if command -v docker >/dev/null 2>&1; then + if docker ps -a --format '{{.Names}}' | grep -qx "$PULSAR_CONTAINER"; then + log "pulsar container logs (last 200 lines)" + docker logs --tail 200 "$PULSAR_CONTAINER" || true + fi + fi +} + require_cmd() { command -v "$1" >/dev/null 2>&1 || die "missing command: $1" } @@ -194,7 +212,10 @@ run_tests() { local http_base="http://127.0.0.1:${SNMCP_LOCAL_PORT}${SNMCP_HTTP_PATH}" log "running snmcp-e2e against ${http_base}" - E2E_HTTP_BASE="$http_base" "$SNMCP_E2E_BIN" + if ! E2E_HTTP_BASE="$http_base" "$SNMCP_E2E_BIN"; then + collect_logs + return 1 + fi } cleanup() {