Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
name: Devstack Integration Tests
name: Integration Tests

on:
workflow_dispatch:
Expand All @@ -9,9 +9,9 @@ on:

jobs:
devstack-integration:
name: devstack sync test
name: devstack chaos test
runs-on: ubuntu-latest
timeout-minutes: 10
timeout-minutes: 5

steps:
- uses: actions/checkout@v4
Expand All @@ -25,5 +25,5 @@ jobs:
- name: Install just
uses: extractions/setup-just@v2

- name: Run devstack integration test
run: go test -v -timeout 5m -tags integration -run TestDevstackIntegration ./cmd/devstack/
- name: Run chaos test (1 minute)
run: CHAOS_DURATION=1m just sbdev-test-chaos
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ dist/

# Go-specific Files
*.test
.gomodcache/

# Go coverage tool output
*.out
Expand Down Expand Up @@ -55,3 +56,5 @@ server.dev.yaml
sandbox/*
cmd/devstack/profiles/*
cmd/sandbox/*
cmd/devstack/.gocache/*
.test-sandbox/*
420 changes: 419 additions & 1 deletion CHANGELOG.md

Large diffs are not rendered by default.

107 changes: 107 additions & 0 deletions cmd/devstack/ack_nack_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
//go:build integration
// +build integration

package main

import (
"fmt"
"testing"
"time"
)

// TestACKNACKMechanism tests that ACK/NACK messages are properly sent and received
func TestACKNACKMechanism(t *testing.T) {
if testing.Short() {
t.Skip("skipping ACK/NACK test in short mode")
}

h := NewDevstackHarness(t)

// Create default ACLs for both clients
if err := h.alice.CreateDefaultACLs(); err != nil {
t.Fatalf("create alice default ACLs: %v", err)
}
if err := h.bob.CreateDefaultACLs(); err != nil {
t.Fatalf("create bob default ACLs: %v", err)
}

// Setup RPC endpoint for both Alice and Bob (like WebSocket latency test)
appName := "acktest"
endpoint := "msg"

if err := h.alice.SetupRPCEndpoint(appName, endpoint); err != nil {
t.Fatalf("setup alice RPC: %v", err)
}
if err := h.bob.SetupRPCEndpoint(appName, endpoint); err != nil {
t.Fatalf("setup bob RPC: %v", err)
}

// With persistent sandbox, peers already discovered - minimal wait needed
// Just ensure WebSocket connections are established
time.Sleep(500 * time.Millisecond)

t.Run("SuccessfulACK", func(t *testing.T) {
// Test that ACK is received for successful file write
content := []byte("test message for ACK")
filename := "test-ack.request"
md5Hash := CalculateMD5(content)

start := time.Now()

// Alice uploads RPC request - this should wait for ACK before returning
if err := h.alice.UploadRPCRequest(appName, endpoint, filename, content); err != nil {
t.Fatalf("alice upload failed: %v", err)
}

uploadTime := time.Since(start)
t.Logf("✅ Upload with ACK completed in %v", uploadTime)

// Verify file was written (Bob should receive it via WebSocket sync)
timeout := 3 * time.Second
if err := h.bob.WaitForRPCRequest(h.alice.email, appName, endpoint, filename, md5Hash, timeout); err != nil {
t.Fatalf("bob didn't receive file: %v", err)
}

// ACK should be faster than the old 1-second sleep hack
if uploadTime > 2*time.Second {
t.Errorf("Upload took too long (%v), ACK mechanism may not be working", uploadTime)
}
})

t.Run("MultipleFilesWithACK", func(t *testing.T) {
// Test that ACK works correctly for multiple files in quick succession
numFiles := 10
start := time.Now()

for i := 0; i < numFiles; i++ {
content := GenerateRandomFile(1024) // 1KB files
md5Hash := CalculateMD5(content)
filename := fmt.Sprintf("multi-ack-%d.request", i)

if err := h.alice.UploadRPCRequest(appName, endpoint, filename, content); err != nil {
t.Fatalf("alice upload %d failed: %v", i, err)
}

// Verify Bob receives it
timeout := 3 * time.Second
if err := h.bob.WaitForRPCRequest(h.alice.email, appName, endpoint, filename, md5Hash, timeout); err != nil {
t.Fatalf("bob didn't receive file %d: %v", i, err)
}
}

totalTime := time.Since(start)
avgPerFile := totalTime / time.Duration(numFiles)

t.Logf("✅ %d files with ACK completed in %v (avg %v per file)", numFiles, totalTime, avgPerFile)

// With old 1-second sleep, this would take >10 seconds
// With ACK, should be much faster
if totalTime > 10*time.Second {
t.Errorf("Multiple files took too long (%v), ACK mechanism may not be working", totalTime)
}
})

// Generate report
report := h.metrics.GenerateReport()
report.Log(t)
}
194 changes: 194 additions & 0 deletions cmd/devstack/acl_propagation_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
//go:build integration
// +build integration

package main

import (
"crypto/md5"
"fmt"
"os"
"path/filepath"
"testing"
"time"
)

// TestACLPropagationUpdates stresses propagation of syft.pub.yaml changes (public and RPC)
// to catch cases where ACL updates fail to reach peers.
func TestACLPropagationUpdates(t *testing.T) {
h := NewDevstackHarness(t)

// Start a third client to mirror the chaos test topology.
serverURL := fmt.Sprintf("http://127.0.0.1:%d", h.state.Server.Port)
charliePort, _ := getFreePort()
charlieState, err := startClient(
h.state.Clients[0].BinPath,
h.root,
"charlie@example.com",
serverURL,
charliePort,
)
if err != nil {
t.Fatalf("start charlie: %v", err)
}
defer func() { _ = killProcess(charlieState.PID) }()

charlie := &ClientHelper{
t: t,
email: charlieState.Email,
state: charlieState,
dataDir: charlieState.DataPath,
publicDir: filepath.Join(charlieState.DataPath, "datasites", charlieState.Email, "public"),
metrics: &ClientMetrics{},
}

clients := []*ClientHelper{h.alice, h.bob, charlie}

// Ensure defaults exist
for _, c := range clients {
if err := c.CreateDefaultACLs(); err != nil {
t.Fatalf("default ACLs for %s: %v", c.email, err)
}
}

waitForPath := func(c *ClientHelper, sender, relPath, wantMD5 string, timeout time.Duration) error {
path := filepath.Join(c.dataDir, "datasites", sender, relPath)
deadline := time.Now().Add(timeout)
for time.Now().Before(deadline) {
data, err := os.ReadFile(path)
if err == nil {
if fmt.Sprintf("%x", md5.Sum(data)) == wantMD5 {
return nil
}
}
time.Sleep(100 * time.Millisecond)
}
return fmt.Errorf("timeout waiting for %s", relPath)
}

writeACL := func(c *ClientHelper, relPath, content string) (string, error) {
aclPath := filepath.Join(c.dataDir, "datasites", c.email, relPath)
if err := os.MkdirAll(filepath.Dir(aclPath), 0o755); err != nil {
return "", err
}
if err := os.WriteFile(aclPath, []byte(content), 0o644); err != nil {
return "", err
}
return fmt.Sprintf("%x", md5.Sum([]byte(content))), nil
}

expectPropagation := func(owner *ClientHelper, relPath, wantMD5 string, timeout time.Duration) {
for _, peer := range clients {
if peer.email == owner.email {
continue
}
if err := waitForPath(peer, owner.email, relPath, wantMD5, timeout); err != nil {
t.Fatalf("propagation of %s from %s to %s failed: %v", relPath, owner.email, peer.email, err)
}
}
}

// Public ACL: flip between permissive, single-peer, and owner-only for each participant.
publicRel := filepath.Join("public", "syft.pub.yaml")
version := 0
for idx, owner := range clients {
version++
permissive := `terminal: false
rules:
- pattern: '**'
access:
admin: []
write: ['%s']
read: ['*']
# version: %d
`
md5Perm, err := writeACL(owner, publicRel, fmt.Sprintf(permissive, owner.email, version))
if err != nil {
t.Fatalf("write public ACL permissive for %s: %v", owner.email, err)
}
expectPropagation(owner, publicRel, md5Perm, 10*time.Second)

version++
onePeer := clients[(idx+1)%len(clients)]
shared := `terminal: false
rules:
- pattern: '**'
access:
admin: []
write: ['%s']
read: ['%s','%s']
# version: %d
`
md5Shared, err := writeACL(owner, publicRel, fmt.Sprintf(shared, owner.email, owner.email, onePeer.email, version))
if err != nil {
t.Fatalf("write public ACL shared for %s: %v", owner.email, err)
}
expectPropagation(owner, publicRel, md5Shared, 10*time.Second)

version++
ownerOnly := `terminal: false
rules:
- pattern: '**'
access:
admin: []
write: ['%s']
read: ['%s']
# version: %d
`
md5Restrict, err := writeACL(owner, publicRel, fmt.Sprintf(ownerOnly, owner.email, owner.email, version))
if err != nil {
t.Fatalf("write public ACL restrictive for %s: %v", owner.email, err)
}
expectPropagation(owner, publicRel, md5Restrict, 10*time.Second)
}

// RPC ACL: update and verify propagation
app := "aclprop"
endpoint := "rpc1"
if err := h.bob.SetupRPCEndpoint(app, endpoint); err != nil {
t.Fatalf("setup bob RPC: %v", err)
}
rpcRootRel := filepath.Join("app_data", app, "rpc", "syft.pub.yaml")
rpcRel := filepath.Join("app_data", app, "rpc", endpoint, "syft.pub.yaml")
rpcACL := `rules:
- pattern: '**.request'
access:
admin: []
read: ['*']
write: ['alice@example.com','bob@example.com']
- pattern: '**.response'
access:
admin: []
read: ['alice@example.com','bob@example.com']
write: ['alice@example.com','bob@example.com']
`
rpcACL2 := `rules:
- pattern: '**.request'
access:
admin: []
read: ['alice@example.com']
write: ['alice@example.com']
- pattern: '**.response'
access:
admin: []
read: ['alice@example.com']
write: ['alice@example.com']
`

md5RPC1, err := writeACL(h.bob, rpcRel, rpcACL)
if err != nil {
t.Fatalf("write rpc acl1: %v", err)
}
rootACLData, err := os.ReadFile(filepath.Join(h.bob.dataDir, "datasites", h.bob.email, rpcRootRel))
if err != nil {
t.Fatalf("read rpc root acl: %v", err)
}
md5RPCRoot := fmt.Sprintf("%x", md5.Sum(rootACLData))
expectPropagation(h.bob, rpcRootRel, md5RPCRoot, 8*time.Second)
expectPropagation(h.bob, rpcRel, md5RPC1, 8*time.Second)

md5RPC2, err := writeACL(h.bob, rpcRel, rpcACL2)
if err != nil {
t.Fatalf("write rpc acl2: %v", err)
}
expectPropagation(h.bob, rpcRel, md5RPC2, 8*time.Second)
}
12 changes: 11 additions & 1 deletion cmd/devstack/acl_race_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,17 @@ func TestACLRaceCondition(t *testing.T) {

t.Log("=== TESTING ACL PRIORITY SYNC FIX ===")
t.Log("")
t.Log("Step 1: Create fresh RPC endpoint with ACL")
t.Log("Step 1: Create default root ACLs (bootstrap)")

// Create default root and public ACLs (like real client bootstrap)
if err := h.alice.CreateDefaultACLs(); err != nil {
t.Fatalf("create alice default ACLs: %v", err)
}
if err := h.bob.CreateDefaultACLs(); err != nil {
t.Fatalf("create bob default ACLs: %v", err)
}

t.Log("Step 2: Create fresh RPC endpoint with ACL")

// Create RPC endpoints
if err := h.alice.SetupRPCEndpoint(appName, endpoint); err != nil {
Expand Down
Loading
Loading