diff --git a/system-tests/lib/cre/don/config/config.go b/system-tests/lib/cre/don/config/config.go index b3a4de77cfa..1f25d86d034 100644 --- a/system-tests/lib/cre/don/config/config.go +++ b/system-tests/lib/cre/don/config/config.go @@ -467,17 +467,20 @@ func addWorkerNodeConfig( if donMetadata.IsShardDON() { existingConfig.Sharding.ShardingEnabled = ptr.Ptr(true) existingConfig.Sharding.ShardIndex = ptr.Ptr(uint16(donMetadata.ShardIndex)) //nolint:gosec // disable G115 overflow is unrealistic + existingConfig.Sharding.ArbiterPort = ptr.Ptr(cre.DefaultArbiterPort) + existingConfig.Sharding.ShardOrchestratorPort = ptr.Ptr(cre.DefaultShardOrchestratorPort) - // all shards apart from the leader need to connect to shard orchestrators running on shard leader DON (shard0) if !donMetadata.IsShardLeader() { shard0, sErr := topology.DonsMetadata.ShardLeaderDON() if sErr != nil { return existingConfig, fmt.Errorf("failed to fetch shard leader DON: %w", sErr) } - // all shards have the same amount of nodes, we can use current node index to select - // shard0 node it should connect to. We connect corresponding nodes to spread the load. - existingConfig.Sharding.ShardOrchestratorAddress = ptr.Ptr(*commonconfig.MustParseURL(shard0.NodesMetadata[m.Index].ShardOrchestratorAddress())) + if m.Index >= len(shard0.NodesMetadata) { + return existingConfig, fmt.Errorf("shard %d node index %d exceeds shard leader node count %d", donMetadata.ShardIndex, m.Index, len(shard0.NodesMetadata)) + } + + existingConfig.Sharding.ShardOrchestratorAddress = ptr.Ptr(*commonconfig.MustParseURL(shard0.NodesMetadata[m.Index].ShardOrchestratorAddressWithPort(cre.DefaultShardOrchestratorPort))) } } diff --git a/system-tests/lib/cre/sharding/sharding.go b/system-tests/lib/cre/sharding/sharding.go index 94a53ba9847..eb5bd3f16a1 100644 --- a/system-tests/lib/cre/sharding/sharding.go +++ b/system-tests/lib/cre/sharding/sharding.go @@ -41,7 +41,10 @@ type SetupShardingInput struct { } func SetupSharding(ctx context.Context, input SetupShardingInput) error { - // Get the shard leader DON + if err := ValidateShardTopology(input.Dons); err != nil { + return fmt.Errorf("shard topology validation failed: %w", err) + } + shardLeaderDON, err := getShardLeaderDON(input.Dons) if err != nil { return fmt.Errorf("failed to get shard leader DON: %w", err) @@ -266,3 +269,35 @@ func configureRingOCR3(creEnv *cre.Environment, ringOCR3Addr common.Address, sha return nil } + +func ValidateShardTopology(dons *cre.Dons) error { + shardDONs := dons.DonsWithFlag(cre.ShardDON) + if len(shardDONs) < 2 { + return fmt.Errorf("sharding requires at least 2 shard DONs, got %d", len(shardDONs)) + } + + var leaderFound bool + leaderNodeCount := len(shardDONs[0].Nodes) + + for _, don := range shardDONs { + if don.Metadata().IsShardLeader() { + if leaderFound { + return errors.New("multiple shard DONs with shard_index=0 found") + } + leaderFound = true + leaderNodeCount = len(don.Nodes) + } + } + + if !leaderFound { + return errors.New("no shard DON with shard_index=0 found") + } + + for _, don := range shardDONs { + if len(don.Nodes) != leaderNodeCount { + return fmt.Errorf("shard %q has %d nodes but shard leader has %d; all shards must have the same node count", don.Name, len(don.Nodes), leaderNodeCount) + } + } + + return nil +} diff --git a/system-tests/lib/cre/sharding/sharding_test.go b/system-tests/lib/cre/sharding/sharding_test.go index 917afcbd7a8..39806dce9be 100644 --- a/system-tests/lib/cre/sharding/sharding_test.go +++ b/system-tests/lib/cre/sharding/sharding_test.go @@ -3,11 +3,20 @@ package sharding import ( "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/smartcontractkit/chainlink/system-tests/lib/cre" ) +func makeNodes(count int) []*cre.Node { + nodes := make([]*cre.Node, count) + for i := range nodes { + nodes[i] = &cre.Node{Name: "node-" + string(rune('0'+i))} + } + return nodes +} + func TestGetShardZeroDON(t *testing.T) { t.Run("returns shard zero when present", func(t *testing.T) { shardZero := &cre.Don{ @@ -47,3 +56,80 @@ func TestGetShardZeroDON(t *testing.T) { func TestRingContractQualifier(t *testing.T) { require.Equal(t, "ring", RingContractQualifier) } + +func TestShardOrchestratorAddress(t *testing.T) { + nm := &cre.NodeMetadata{Host: "10.0.0.1"} + + t.Run("default port matches constant", func(t *testing.T) { + addr := nm.ShardOrchestratorAddress() + assert.Equal(t, "10.0.0.1:50051", addr) + assert.Equal(t, uint16(50051), cre.DefaultShardOrchestratorPort) + }) + + t.Run("custom port", func(t *testing.T) { + addr := nm.ShardOrchestratorAddressWithPort(60051) + assert.Equal(t, "10.0.0.1:60051", addr) + }) + + t.Run("default arbiter port", func(t *testing.T) { + assert.Equal(t, uint16(9876), cre.DefaultArbiterPort) + }) +} + +func TestValidateShardTopology(t *testing.T) { + t.Run("valid topology passes", func(t *testing.T) { + dons := cre.NewDons([]*cre.Don{ + {Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)}, + {Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(4)}, + }, nil) + require.NoError(t, ValidateShardTopology(dons)) + }) + + t.Run("single shard DON fails", func(t *testing.T) { + dons := cre.NewDons([]*cre.Don{ + {Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)}, + }, nil) + err := ValidateShardTopology(dons) + require.Error(t, err) + assert.Contains(t, err.Error(), "at least 2 shard DONs") + }) + + t.Run("no leader fails", func(t *testing.T) { + dons := cre.NewDons([]*cre.Don{ + {Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(4)}, + {Name: "shard2", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 2, Nodes: makeNodes(4)}, + }, nil) + err := ValidateShardTopology(dons) + require.Error(t, err) + assert.Contains(t, err.Error(), "shard_index=0") + }) + + t.Run("duplicate leaders fail", func(t *testing.T) { + dons := cre.NewDons([]*cre.Don{ + {Name: "shard0a", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)}, + {Name: "shard0b", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)}, + }, nil) + err := ValidateShardTopology(dons) + require.Error(t, err) + assert.Contains(t, err.Error(), "multiple shard DONs") + }) + + t.Run("mismatched node count fails", func(t *testing.T) { + dons := cre.NewDons([]*cre.Don{ + {Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)}, + {Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(3)}, + }, nil) + err := ValidateShardTopology(dons) + require.Error(t, err) + assert.Contains(t, err.Error(), "same node count") + }) + + t.Run("non-shard DONs are ignored", func(t *testing.T) { + dons := cre.NewDons([]*cre.Don{ + {Name: "workflow", Flags: []cre.CapabilityFlag{cre.WorkflowDON}, Nodes: makeNodes(2)}, + {Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)}, + {Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(4)}, + }, nil) + require.NoError(t, ValidateShardTopology(dons)) + }) +} diff --git a/system-tests/lib/cre/types.go b/system-tests/lib/cre/types.go index 748567180a3..d7181e7fbfe 100644 --- a/system-tests/lib/cre/types.go +++ b/system-tests/lib/cre/types.go @@ -1135,8 +1135,17 @@ func (n *NodeMetadata) PeerID() string { return strings.TrimPrefix(n.Keys.PeerID(), "p2p_") } +const ( + DefaultShardOrchestratorPort uint16 = 50051 + DefaultArbiterPort uint16 = 9876 +) + func (n *NodeMetadata) ShardOrchestratorAddress() string { - return fmt.Sprintf("%s:%d", n.Host, 50051) + return n.ShardOrchestratorAddressWithPort(DefaultShardOrchestratorPort) +} + +func (n *NodeMetadata) ShardOrchestratorAddressWithPort(port uint16) string { + return fmt.Sprintf("%s:%d", n.Host, port) } type NodeMetadataConfig struct {