Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .github/workflows/e2e.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ jobs:
run: go mod download

- name: Run E2E tests
env:
E2E_VERBOSE: "1"
run: ./scripts/e2e-test.sh all

- name: Cleanup
Expand Down
1 change: 1 addition & 0 deletions Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ WORKDIR /server

# Copy binary from builder
COPY --from=builder /build/snmcp /server/snmcp
COPY --from=builder /build/cmd/snmcp-e2e/testdata/functions/echo.py /server/e2e/functions/echo.py

# Change ownership
RUN chown -R snmcp:snmcp /server
Expand Down
231 changes: 230 additions & 1 deletion cmd/snmcp-e2e/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,9 +134,13 @@ func run(ctx context.Context, cfg config) error {

suffix := time.Now().UnixNano()
tenant := fmt.Sprintf("e2e-%d", suffix)
namespace := fmt.Sprintf("%s/ns-%d", tenant, suffix)
namespaceName := fmt.Sprintf("ns-%d", suffix)
namespace := fmt.Sprintf("%s/%s", tenant, namespaceName)
topic := fmt.Sprintf("persistent://%s/topic-%d", namespace, suffix)
concurrentTopic := fmt.Sprintf("persistent://%s/topic-concurrent-%d", namespace, suffix)
functionInputTopic := fmt.Sprintf("persistent://%s/function-input-%d", namespace, suffix)
functionOutputTopic := fmt.Sprintf("persistent://%s/function-output-%d", namespace, suffix)
functionName := fmt.Sprintf("echo-%d", suffix)

result, err := callTool(ctx, adminClient, "pulsar_admin_tenant", map[string]any{
"resource": "tenant",
Expand Down Expand Up @@ -209,6 +213,118 @@ func run(ctx context.Context, cfg config) error {
return err
}

result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
"resource": "topic",
"operation": "create",
"topic": functionInputTopic,
"partitions": float64(0),
})
if err := requireToolOK(result, err, "pulsar_admin_topic create function input"); err != nil {
return err
}

result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
"resource": "topic",
"operation": "create",
"topic": functionOutputTopic,
"partitions": float64(0),
})
if err := requireToolOK(result, err, "pulsar_admin_topic create function output"); err != nil {
return err
}

logf(cfg.verbose, "creating function: tenant=%s namespace=%s name=%s inputs=%v output=%s py=%s classname=%s",
tenant,
namespaceName,
functionName,
[]string{functionInputTopic},
functionOutputTopic,
"/server/e2e/functions/echo.py",
"echo.EchoFunction",
)
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
"operation": "create",
"tenant": tenant,
"namespace": namespaceName,
"name": functionName,
"classname": "echo.EchoFunction",
"inputs": []string{functionInputTopic},
"output": functionOutputTopic,
"py": "/server/e2e/functions/echo.py",
})
if err := requireToolOK(result, err, "pulsar_admin_functions create"); err != nil {
return err
}

if err := waitForFunctionRunning(ctx, adminClient, tenant, namespaceName, functionName, 60*time.Second); err != nil {
return err
}

result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
"operation": "stats",
"tenant": tenant,
"namespace": namespaceName,
"name": functionName,
"instanceId": float64(0),
})
if err := requireToolOK(result, err, "pulsar_admin_functions stats"); err != nil {
return err
}

triggerValue := "e2e-trigger"
result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
"operation": "trigger",
"tenant": tenant,
"namespace": namespaceName,
"name": functionName,
"topic": functionInputTopic,
"triggerValue": triggerValue,
})
if err := requireToolOK(result, err, "pulsar_admin_functions trigger"); err != nil {
return err
}
triggerResult := firstText(result)
logf(cfg.verbose, "trigger result: %s", triggerResult)
if !strings.Contains(triggerResult, triggerValue) {
if err := waitForTriggerOutput(ctx, adminClient, functionOutputTopic, fmt.Sprintf("trigger-sub-%d", suffix), triggerValue, 30*time.Second, cfg.verbose); err != nil {
return fmt.Errorf("unexpected trigger result: %s; output check failed: %w", triggerResult, err)
}
}

result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
"operation": "update",
"tenant": tenant,
"namespace": namespaceName,
"name": functionName,
"userConfig": map[string]any{"updated": true},
})
if err := requireToolOK(result, err, "pulsar_admin_functions update"); err != nil {
return err
}

result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
"operation": "get",
"tenant": tenant,
"namespace": namespaceName,
"name": functionName,
})
if err := requireToolOK(result, err, "pulsar_admin_functions get"); err != nil {
return err
}
if err := assertFunctionUserConfig(firstText(result), "updated"); err != nil {
return err
}

result, err = callTool(ctx, adminClient, "pulsar_admin_functions", map[string]any{
"operation": "delete",
"tenant": tenant,
"namespace": namespaceName,
"name": functionName,
})
if err := requireToolOK(result, err, "pulsar_admin_functions delete"); err != nil {
return err
}

result, err = callTool(ctx, adminClient, "pulsar_admin_topic", map[string]any{
"resource": "topic",
"operation": "create",
Expand Down Expand Up @@ -462,6 +578,119 @@ func listClusters(ctx context.Context, c *client.Client) ([]string, error) {
return clusters, nil
}

type functionStatus struct {
NumRunning int `json:"numRunning"`
}

func waitForFunctionRunning(ctx context.Context, c *client.Client, tenant, namespace, name string, timeout time.Duration) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
status, err := getFunctionStatus(ctx, c, tenant, namespace, name)
if err == nil && status.NumRunning > 0 {
return nil
}
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
}
return fmt.Errorf("function %s did not reach running state within %s", name, timeout.String())
}

func getFunctionStatus(ctx context.Context, c *client.Client, tenant, namespace, name string) (functionStatus, error) {
result, err := callTool(ctx, c, "pulsar_admin_functions", map[string]any{
"operation": "status",
"tenant": tenant,
"namespace": namespace,
"name": name,
})
if err := requireToolOK(result, err, "pulsar_admin_functions status"); err != nil {
return functionStatus{}, err
}
raw := firstText(result)
if raw == "" {
return functionStatus{}, errors.New("empty function status result")
}
var status functionStatus
if err := json.Unmarshal([]byte(raw), &status); err != nil {
return functionStatus{}, fmt.Errorf("failed to parse function status: %w", err)
}
return status, nil
}

func assertFunctionUserConfig(raw string, key string) error {
if raw == "" {
return errors.New("empty function config result")
}
var payload map[string]interface{}
if err := json.Unmarshal([]byte(raw), &payload); err != nil {
return fmt.Errorf("failed to parse function config: %w", err)
}
userConfig, ok := payload["userConfig"].(map[string]interface{})
if !ok {
return fmt.Errorf("missing userConfig in function config")
}
if _, ok := userConfig[key]; !ok {
return fmt.Errorf("userConfig missing key: %s", key)
}
return nil
}

type consumeResponse struct {
MessagesConsumed int `json:"messages_consumed"`
Messages []consumeMessage `json:"messages"`
}

type consumeMessage struct {
Data string `json:"data"`
}

func waitForTriggerOutput(ctx context.Context, c *client.Client, topic, subscription, expected string, timeout time.Duration, verbose bool) error {
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
result, err := callTool(ctx, c, "pulsar_client_consume", map[string]any{
"topic": topic,
"subscription-name": subscription,
"initial-position": "earliest",
"num-messages": float64(1),
"timeout": float64(5),
"subscription-type": "exclusive",
"subscription-mode": "durable",
"show-properties": false,
"hide-payload": false,
})
if err := requireToolOK(result, err, "pulsar_client_consume trigger output"); err != nil {
return err
}

raw := firstText(result)
if raw != "" {
var resp consumeResponse
if err := json.Unmarshal([]byte(raw), &resp); err != nil {
return fmt.Errorf("failed to parse trigger output: %w", err)
}
if resp.MessagesConsumed > 0 {
for _, msg := range resp.Messages {
if strings.Contains(msg.Data, expected) {
return nil
}
}
return fmt.Errorf("trigger output does not contain expected value: %s", expected)
}
}

logf(verbose, "trigger output not ready yet, retrying")
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(2 * time.Second):
}
}

return fmt.Errorf("trigger output not available after %s", timeout.String())
}

func runConcurrent(ctx context.Context, adminClient, testClient *client.Client, topic, subscription string) error {
var wg sync.WaitGroup
errCh := make(chan error, 2)
Expand Down
3 changes: 3 additions & 0 deletions cmd/snmcp-e2e/testdata/functions/echo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
class EchoFunction(object):
def process(self, input, context):
return input
Loading