Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions system-tests/lib/cre/don/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,17 +467,20 @@ func addWorkerNodeConfig(
if donMetadata.IsShardDON() {
existingConfig.Sharding.ShardingEnabled = ptr.Ptr(true)
existingConfig.Sharding.ShardIndex = ptr.Ptr(uint16(donMetadata.ShardIndex)) //nolint:gosec // disable G115 overflow is unrealistic
existingConfig.Sharding.ArbiterPort = ptr.Ptr(cre.DefaultArbiterPort)
existingConfig.Sharding.ShardOrchestratorPort = ptr.Ptr(cre.DefaultShardOrchestratorPort)

// all shards apart from the leader need to connect to shard orchestrators running on shard leader DON (shard0)
if !donMetadata.IsShardLeader() {
shard0, sErr := topology.DonsMetadata.ShardLeaderDON()
if sErr != nil {
return existingConfig, fmt.Errorf("failed to fetch shard leader DON: %w", sErr)
}

// all shards have the same amount of nodes, we can use current node index to select
// shard0 node it should connect to. We connect corresponding nodes to spread the load.
existingConfig.Sharding.ShardOrchestratorAddress = ptr.Ptr(*commonconfig.MustParseURL(shard0.NodesMetadata[m.Index].ShardOrchestratorAddress()))
if m.Index >= len(shard0.NodesMetadata) {
return existingConfig, fmt.Errorf("shard %d node index %d exceeds shard leader node count %d", donMetadata.ShardIndex, m.Index, len(shard0.NodesMetadata))
}

existingConfig.Sharding.ShardOrchestratorAddress = ptr.Ptr(*commonconfig.MustParseURL(shard0.NodesMetadata[m.Index].ShardOrchestratorAddressWithPort(cre.DefaultShardOrchestratorPort)))
}
}

Expand Down
37 changes: 36 additions & 1 deletion system-tests/lib/cre/sharding/sharding.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@ type SetupShardingInput struct {
}

func SetupSharding(ctx context.Context, input SetupShardingInput) error {
// Get the shard leader DON
if err := ValidateShardTopology(input.Dons); err != nil {
return fmt.Errorf("shard topology validation failed: %w", err)
}

shardLeaderDON, err := getShardLeaderDON(input.Dons)
if err != nil {
return fmt.Errorf("failed to get shard leader DON: %w", err)
Expand Down Expand Up @@ -266,3 +269,35 @@ func configureRingOCR3(creEnv *cre.Environment, ringOCR3Addr common.Address, sha

return nil
}

func ValidateShardTopology(dons *cre.Dons) error {
shardDONs := dons.DonsWithFlag(cre.ShardDON)
if len(shardDONs) < 2 {
return fmt.Errorf("sharding requires at least 2 shard DONs, got %d", len(shardDONs))
}

var leaderFound bool
leaderNodeCount := len(shardDONs[0].Nodes)

for _, don := range shardDONs {
if don.Metadata().IsShardLeader() {
if leaderFound {
return errors.New("multiple shard DONs with shard_index=0 found")
}
leaderFound = true
leaderNodeCount = len(don.Nodes)
}
}

if !leaderFound {
return errors.New("no shard DON with shard_index=0 found")
}

for _, don := range shardDONs {
if len(don.Nodes) != leaderNodeCount {
return fmt.Errorf("shard %q has %d nodes but shard leader has %d; all shards must have the same node count", don.Name, len(don.Nodes), leaderNodeCount)
}
}

return nil
}
86 changes: 86 additions & 0 deletions system-tests/lib/cre/sharding/sharding_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,20 @@ package sharding
import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink/system-tests/lib/cre"
)

func makeNodes(count int) []*cre.Node {
nodes := make([]*cre.Node, count)
for i := range nodes {
nodes[i] = &cre.Node{Name: "node-" + string(rune('0'+i))}
}
return nodes
}

func TestGetShardZeroDON(t *testing.T) {
t.Run("returns shard zero when present", func(t *testing.T) {
shardZero := &cre.Don{
Expand Down Expand Up @@ -47,3 +56,80 @@ func TestGetShardZeroDON(t *testing.T) {
func TestRingContractQualifier(t *testing.T) {
require.Equal(t, "ring", RingContractQualifier)
}

func TestShardOrchestratorAddress(t *testing.T) {
nm := &cre.NodeMetadata{Host: "10.0.0.1"}

t.Run("default port matches constant", func(t *testing.T) {
addr := nm.ShardOrchestratorAddress()
assert.Equal(t, "10.0.0.1:50051", addr)
assert.Equal(t, uint16(50051), cre.DefaultShardOrchestratorPort)
})

t.Run("custom port", func(t *testing.T) {
addr := nm.ShardOrchestratorAddressWithPort(60051)
assert.Equal(t, "10.0.0.1:60051", addr)
})

t.Run("default arbiter port", func(t *testing.T) {
assert.Equal(t, uint16(9876), cre.DefaultArbiterPort)
})
}

func TestValidateShardTopology(t *testing.T) {
t.Run("valid topology passes", func(t *testing.T) {
dons := cre.NewDons([]*cre.Don{
{Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)},
{Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(4)},
}, nil)
require.NoError(t, ValidateShardTopology(dons))
})

t.Run("single shard DON fails", func(t *testing.T) {
dons := cre.NewDons([]*cre.Don{
{Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)},
}, nil)
err := ValidateShardTopology(dons)
require.Error(t, err)
assert.Contains(t, err.Error(), "at least 2 shard DONs")
})

t.Run("no leader fails", func(t *testing.T) {
dons := cre.NewDons([]*cre.Don{
{Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(4)},
{Name: "shard2", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 2, Nodes: makeNodes(4)},
}, nil)
err := ValidateShardTopology(dons)
require.Error(t, err)
assert.Contains(t, err.Error(), "shard_index=0")
})

t.Run("duplicate leaders fail", func(t *testing.T) {
dons := cre.NewDons([]*cre.Don{
{Name: "shard0a", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)},
{Name: "shard0b", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)},
}, nil)
err := ValidateShardTopology(dons)
require.Error(t, err)
assert.Contains(t, err.Error(), "multiple shard DONs")
})

t.Run("mismatched node count fails", func(t *testing.T) {
dons := cre.NewDons([]*cre.Don{
{Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)},
{Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(3)},
}, nil)
err := ValidateShardTopology(dons)
require.Error(t, err)
assert.Contains(t, err.Error(), "same node count")
})

t.Run("non-shard DONs are ignored", func(t *testing.T) {
dons := cre.NewDons([]*cre.Don{
{Name: "workflow", Flags: []cre.CapabilityFlag{cre.WorkflowDON}, Nodes: makeNodes(2)},
{Name: "shard0", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 0, Nodes: makeNodes(4)},
{Name: "shard1", Flags: []cre.CapabilityFlag{cre.ShardDON}, ShardIndex: 1, Nodes: makeNodes(4)},
}, nil)
require.NoError(t, ValidateShardTopology(dons))
})
}
11 changes: 10 additions & 1 deletion system-tests/lib/cre/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -1135,8 +1135,17 @@ func (n *NodeMetadata) PeerID() string {
return strings.TrimPrefix(n.Keys.PeerID(), "p2p_")
}

const (
DefaultShardOrchestratorPort uint16 = 50051
DefaultArbiterPort uint16 = 9876
)

func (n *NodeMetadata) ShardOrchestratorAddress() string {
return fmt.Sprintf("%s:%d", n.Host, 50051)
return n.ShardOrchestratorAddressWithPort(DefaultShardOrchestratorPort)
}

func (n *NodeMetadata) ShardOrchestratorAddressWithPort(port uint16) string {
return fmt.Sprintf("%s:%d", n.Host, port)
}

type NodeMetadataConfig struct {
Expand Down
Loading