From 577256a01b9f99d8a25f97429888e028d674fae7 Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Fri, 27 Feb 2026 14:10:28 -0800 Subject: [PATCH 1/3] [CRE][Gateway] Support for multi-handlers: part 2 --- .../jobs/operations/propose_gateway_job.go | 208 ++++++++-------- .../operations/propose_gateway_job_test.go | 10 +- deployment/cre/jobs/pkg/gateway_job.go | 159 +++++++++---- deployment/cre/jobs/pkg/gateway_job_test.go | 224 +++++++++--------- system-tests/lib/cre/don/gateway/gateway.go | 4 +- .../lib/cre/environment/environment.go | 2 +- system-tests/lib/cre/topology.go | 76 +++--- system-tests/lib/cre/types.go | 9 +- 8 files changed, 401 insertions(+), 291 deletions(-) diff --git a/deployment/cre/jobs/operations/propose_gateway_job.go b/deployment/cre/jobs/operations/propose_gateway_job.go index 1ebc3bb065e..78c28d63aa0 100644 --- a/deployment/cre/jobs/operations/propose_gateway_job.go +++ b/deployment/cre/jobs/operations/propose_gateway_job.go @@ -23,7 +23,7 @@ const defaultGatewayRequestTimeoutSec = 12 type ProposeGatewayJobInput struct { Domain string DONFilters []offchain.TargetDONFilter - DONs []DON `yaml:"dons"` + Services []GatewayService `yaml:"services"` GatewayRequestTimeoutSec int `yaml:"gatewayRequestTimeoutSec"` AllowedPorts []int `yaml:"allowedPorts"` AllowedSchemes []string `yaml:"allowedSchemes"` @@ -33,10 +33,10 @@ type ProposeGatewayJobInput struct { JobLabels map[string]string } -type DON struct { - Name string - F int - Handlers []string +type GatewayService struct { + ServiceName string `yaml:"servicename"` + Handlers []string `yaml:"handlers"` + DONs []string `yaml:"dons"` } type ProposeGatewayJobDeps struct { @@ -55,94 +55,31 @@ var ProposeGatewayJob = operations.NewOperation[ProposeGatewayJobInput, ProposeG ) // proposeGatewayJob builds a gateway job spec and then proposes it to the nodes of a DON. -// It first fetches node information and chain configurations about the target DONs given in input.DONs to build the job spec. -// Target DONs are the DONs that the Gateway allows communication with. -// It then proposes this job spec to each node of the specific DON based on input filters and a chain selector. -// All nodes must be connected to job distributor and have the proper chain declared. +// It derives the set of unique DON names from input.Services, fetches node information and +// chain configurations for each DON from JD, then builds TargetDON entries. +// Each TargetDON contains all handlers from the services that reference it. func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input ProposeGatewayJobInput) (ProposeGatewayJobOutput, error) { - targetDONs := make([]pkg.TargetDON, 0) - - for _, ad := range input.DONs { - // Use filters from input, except the DON name which needs to be the target DON - filters := &nodev1.ListNodesRequest_Filter{} - for _, f := range input.DONFilters { - if f.Key == offchain.FilterKeyDONName { - continue - } - filters = offchain.TargetDONFilter{ - Key: f.Key, - Value: f.Value, - }.AddToFilter(filters) - } - filtersWithTargetDONName := offchain.TargetDONFilter{ - Key: offchain.FilterKeyDONName, - Value: ad.Name, - }.AddToFilter(filters) - - ns, err := pkg.FetchNodesFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{ - Domain: input.Domain, - Filters: filtersWithTargetDONName, - }) - if err != nil { - return ProposeGatewayJobOutput{}, err - } - if len(ns) == 0 { - return ProposeGatewayJobOutput{}, fmt.Errorf("no nodes with filters %s", input.DONFilters) - } - - nodes, err := pkg.FetchNodeChainConfigsFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{ - Domain: input.Domain, - Filters: filtersWithTargetDONName, - }) - if err != nil { - return ProposeGatewayJobOutput{}, err - } - if len(nodes) == 0 { - return ProposeGatewayJobOutput{}, fmt.Errorf("no chain configs with filters %s", input.DONFilters) + // Collect unique DON names and build DON→handlers mapping from services + donHandlers := make(map[string][]string) // DON name → handler types + for _, svc := range input.Services { + for _, donName := range svc.DONs { + donHandlers[donName] = append(donHandlers[donName], svc.Handlers...) } + } - fam, chainID, err := parseSelector(uint64(input.GatewayKeyChainSelector)) + targetDONs := make([]pkg.TargetDON, 0, len(donHandlers)) + for donName, handlers := range donHandlers { + members, f, err := resolveDONMembers(deps, input, donName) if err != nil { return ProposeGatewayJobOutput{}, err } - // make map of node id to node - m := make(map[string]*nodev1.Node, len(ns)) - for _, n := range ns { - m[n.Id] = n - } - - var members []pkg.TargetDONMember - for _, n := range nodes { - var found bool - for _, cc := range n.ChainConfigs { - if cc.Chain.Id == chainID && cc.Chain.Type == fam { - nodeName := n.NodeID - if matched, ok := m[n.NodeID]; ok { - nodeName = matched.Name - } - members = append(members, pkg.TargetDONMember{ - Address: cc.AccountAddress, - Name: fmt.Sprintf("%s (DON %s)", nodeName, ad.Name), - }) - found = true - - break - } - } - - if !found { - return ProposeGatewayJobOutput{}, fmt.Errorf("could not find key belonging to chain id %s on node %s", chainID, n.NodeID) - } - } - - td := pkg.TargetDON{ - ID: ad.Name, - F: ad.F, + targetDONs = append(targetDONs, pkg.TargetDON{ + ID: donName, + F: f, Members: members, - Handlers: ad.Handlers, - } - targetDONs = append(targetDONs, td) + Handlers: dedup(handlers), + }) } requestTimeoutSec := input.GatewayRequestTimeoutSec @@ -160,8 +97,7 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr AuthGatewayID: input.AuthGatewayID, } - err := gj.Validate() - if err != nil { + if err := gj.Validate(); err != nil { return ProposeGatewayJobOutput{}, err } @@ -197,18 +133,18 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr Specs: make(map[string][]string), } for nodeIdx, n := range nodes { - spec, err := gj.Resolve(nodeIdx) - if err != nil { - return ProposeGatewayJobOutput{}, err + spec, specErr := gj.Resolve(nodeIdx) + if specErr != nil { + return ProposeGatewayJobOutput{}, specErr } - _, err = deps.Env.Offchain.ProposeJob(b.GetContext(), &jobv1.ProposeJobRequest{ + _, propErr := deps.Env.Offchain.ProposeJob(b.GetContext(), &jobv1.ProposeJobRequest{ NodeId: n.GetId(), Spec: spec, Labels: labels, }) - if err != nil { - return ProposeGatewayJobOutput{}, fmt.Errorf("error proposing job to node %s spec %s : %w", n.GetId(), spec, err) + if propErr != nil { + return ProposeGatewayJobOutput{}, fmt.Errorf("error proposing job to node %s spec %s : %w", n.GetId(), spec, propErr) } output.Specs[n.GetId()] = append(output.Specs[n.GetId()], spec) @@ -220,6 +156,92 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr return output, nil } +func resolveDONMembers(deps ProposeGatewayJobDeps, input ProposeGatewayJobInput, donName string) ([]pkg.TargetDONMember, int, error) { + filters := &nodev1.ListNodesRequest_Filter{} + for _, f := range input.DONFilters { + if f.Key == offchain.FilterKeyDONName { + continue + } + filters = offchain.TargetDONFilter{ + Key: f.Key, + Value: f.Value, + }.AddToFilter(filters) + } + filtersWithTargetDONName := offchain.TargetDONFilter{ + Key: offchain.FilterKeyDONName, + Value: donName, + }.AddToFilter(filters) + + ns, err := pkg.FetchNodesFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{ + Domain: input.Domain, + Filters: filtersWithTargetDONName, + }) + if err != nil { + return nil, 0, err + } + if len(ns) == 0 { + return nil, 0, fmt.Errorf("no nodes with filters %s", input.DONFilters) + } + + nodeChainConfigs, err := pkg.FetchNodeChainConfigsFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{ + Domain: input.Domain, + Filters: filtersWithTargetDONName, + }) + if err != nil { + return nil, 0, err + } + if len(nodeChainConfigs) == 0 { + return nil, 0, fmt.Errorf("no chain configs with filters %s", input.DONFilters) + } + + fam, chainID, err := parseSelector(uint64(input.GatewayKeyChainSelector)) + if err != nil { + return nil, 0, err + } + + m := make(map[string]*nodev1.Node, len(ns)) + for _, n := range ns { + m[n.Id] = n + } + + var members []pkg.TargetDONMember + for _, n := range nodeChainConfigs { + var found bool + for _, cc := range n.ChainConfigs { + if cc.Chain.Id == chainID && cc.Chain.Type == fam { + nodeName := n.NodeID + if matched, ok := m[n.NodeID]; ok { + nodeName = matched.Name + } + members = append(members, pkg.TargetDONMember{ + Address: cc.AccountAddress, + Name: fmt.Sprintf("%s (DON %s)", nodeName, donName), + }) + found = true + break + } + } + if !found { + return nil, 0, fmt.Errorf("could not find key belonging to chain id %s on node %s", chainID, n.NodeID) + } + } + + f := (len(members) - 1) / 3 + return members, f, nil +} + +func dedup(ss []string) []string { + seen := make(map[string]struct{}, len(ss)) + out := make([]string, 0, len(ss)) + for _, s := range ss { + if _, ok := seen[s]; !ok { + seen[s] = struct{}{} + out = append(out, s) + } + } + return out +} + func parseSelector(sel uint64) (nodev1.ChainType, string, error) { fam, err := chainsel.GetSelectorFamily(sel) if err != nil { diff --git a/deployment/cre/jobs/operations/propose_gateway_job_test.go b/deployment/cre/jobs/operations/propose_gateway_job_test.go index 08a478ac4ea..959a07a2aa7 100644 --- a/deployment/cre/jobs/operations/propose_gateway_job_test.go +++ b/deployment/cre/jobs/operations/propose_gateway_job_test.go @@ -112,14 +112,14 @@ var commonInput = ProposeGatewayJobInput{ Value: "zone-b", }, }, - DONs: []DON{ + Services: []GatewayService{ { - Name: "workflow_1_zone-b", + ServiceName: "workflows", Handlers: []string{ "http-capabilities", "web-api-capabilities", }, - F: 1, + DONs: []string{"workflow_1_zone-b"}, }, }, GatewayKeyChainSelector: 10344971235874465080, @@ -149,7 +149,8 @@ func TestProposeGatewayJob(t *testing.T) { output: ProposeGatewayJobOutput{ Specs: map[string][]string{ "node_5": { - "type = 'gateway'\nschemaVersion = 1\nname = 'CRE Gateway'\nexternalJobID = 'cf8aa339-6349-5e5b-9289-5c2907711200'\nforwardingAllowed = false\n\n[gatewayConfig]\n[gatewayConfig.ConnectionManagerConfig]\nAuthChallengeLen = 10\nAuthGatewayId = 'gateway-node-0'\nAuthTimestampToleranceSec = 5\nHeartbeatIntervalSec = 20\n\n[[gatewayConfig.Dons]]\nDonId = 'workflow_1_zone-b'\nF = 1\n\n[[gatewayConfig.Dons.Handlers]]\nName = 'http-capabilities'\nServiceName = 'workflows'\n\n[gatewayConfig.Dons.Handlers.Config]\nCleanUpPeriodMs = 600000\n\n[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter]\nglobalBurst = 100\nglobalRPS = 500\nperSenderBurst = 100\nperSenderRPS = 100\n\n[[gatewayConfig.Dons.Handlers]]\nName = 'web-api-capabilities'\n\n[gatewayConfig.Dons.Handlers.Config]\nmaxAllowedMessageAgeSec = 1000\n\n[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter]\nglobalBurst = 10\nglobalRPS = 50\nperSenderBurst = 10\nperSenderRPS = 10\n\n[[gatewayConfig.Dons.Members]]\nAddress = '0x04'\nName = 'cl-cre-one-zone-b-0 (DON workflow_1_zone-b)'\n\n[gatewayConfig.HTTPClientConfig]\nMaxResponseBytes = 50000000\nAllowedPorts = [443]\nAllowedSchemes = ['https']\nAllowedIPsCIDR = []\n\n[gatewayConfig.NodeServerConfig]\nHandshakeTimeoutMillis = 1000\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5003\nReadTimeoutMillis = 1000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 1000\n\n[gatewayConfig.UserServerConfig]\nContentTypeHeader = 'application/jsonrpc'\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5002\nReadTimeoutMillis = 5000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 6000\n"}, + "type = 'gateway'\nschemaVersion = 1\nname = 'CRE Gateway'\nexternalJobID = 'cf8aa339-6349-5e5b-9289-5c2907711200'\nforwardingAllowed = false\n\n[gatewayConfig]\n[gatewayConfig.ConnectionManagerConfig]\nAuthChallengeLen = 10\nAuthGatewayId = 'gateway-node-0'\nAuthTimestampToleranceSec = 5\nHeartbeatIntervalSec = 20\n\n[[gatewayConfig.ShardedDONs]]\nDonName = 'workflow_1_zone-b'\nF = 0\n\n[[gatewayConfig.ShardedDONs.Shards]]\n[[gatewayConfig.ShardedDONs.Shards.Nodes]]\nAddress = '0x04'\nName = 'cl-cre-one-zone-b-0 (DON workflow_1_zone-b)'\n\n[[gatewayConfig.Services]]\nServiceName = 'workflows'\nDONs = ['workflow_1_zone-b']\n\n[[gatewayConfig.Services.Handlers]]\nName = 'http-capabilities'\nServiceName = 'workflows'\n\n[gatewayConfig.Services.Handlers.Config]\nCleanUpPeriodMs = 600000\n\n[gatewayConfig.Services.Handlers.Config.NodeRateLimiter]\nglobalBurst = 100\nglobalRPS = 500\nperSenderBurst = 100\nperSenderRPS = 100\n\n[[gatewayConfig.Services.Handlers]]\nName = 'web-api-capabilities'\n\n[gatewayConfig.Services.Handlers.Config]\nmaxAllowedMessageAgeSec = 1000\n\n[gatewayConfig.Services.Handlers.Config.NodeRateLimiter]\nglobalBurst = 10\nglobalRPS = 50\nperSenderBurst = 10\nperSenderRPS = 10\n\n[gatewayConfig.HTTPClientConfig]\nMaxResponseBytes = 50000000\nAllowedPorts = [443]\nAllowedSchemes = ['https']\nAllowedIPsCIDR = []\n\n[gatewayConfig.NodeServerConfig]\nHandshakeTimeoutMillis = 1000\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5003\nReadTimeoutMillis = 1000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 1000\n\n[gatewayConfig.UserServerConfig]\nContentTypeHeader = 'application/jsonrpc'\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5002\nReadTimeoutMillis = 5000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 6000\n", + }, }, }, }, @@ -193,7 +194,6 @@ func TestProposeGatewayJob(t *testing.T) { assert.Contains(t, err.Error(), tc.errorMsg) } else { require.NoError(t, err) - require.NotNil(t, output) } require.Equal(t, tc.output, output) diff --git a/deployment/cre/jobs/pkg/gateway_job.go b/deployment/cre/jobs/pkg/gateway_job.go index a2b6317ee51..e90fffb73c4 100644 --- a/deployment/cre/jobs/pkg/gateway_job.go +++ b/deployment/cre/jobs/pkg/gateway_job.go @@ -15,9 +15,24 @@ const ( GatewayHandlerTypeHTTPCapabilities = "http-capabilities" GatewayHandlerTypeVault = "vault" + ServiceNameWorkflows = "workflows" + ServiceNameVault = "vault" + minimumRequestTimeoutSec = 5 ) +// HandlerServiceName returns the service name for a given handler type. +func HandlerServiceName(handlerType string) string { + switch handlerType { + case GatewayHandlerTypeVault: + return ServiceNameVault + case GatewayHandlerTypeHTTPCapabilities, GatewayHandlerTypeWebAPICapabilities: + return ServiceNameWorkflows + default: + return handlerType + } +} + type TargetDONMember struct { Address string Name string @@ -85,34 +100,9 @@ func (g GatewayJob) Resolve(gatewayNodeIdx int) (string, error) { externalJobID = uuid.NewSHA1(uuid.Nil, []byte(g.JobName)).String() } - dons := []don{} - for _, targetDON := range g.TargetDONs { - ms := []member{} - for _, mem := range targetDON.Members { - ms = append(ms, member(mem)) - } - - hs := []handler{} - for _, ht := range targetDON.Handlers { - switch ht { - case GatewayHandlerTypeWebAPICapabilities: - hs = append(hs, newDefaultWebAPICapabilitiesHandler()) - case GatewayHandlerTypeVault: - hs = append(hs, newDefaultVaultHandler(g.RequestTimeoutSec)) - case GatewayHandlerTypeHTTPCapabilities: - hs = append(hs, newDefaultHTTPCapabilitiesHandler()) - default: - return "", errors.New("unknown handler type: " + ht) - } - } - - d := don{ - DonID: targetDON.ID, - F: targetDON.F, - Members: ms, - Handlers: hs, - } - dons = append(dons, d) + shardedDONs, services, err := g.buildServicesAndShardedDONs() + if err != nil { + return "", err } requestTimeout := time.Duration(g.RequestTimeoutSec) * time.Second @@ -146,7 +136,8 @@ func (g GatewayJob) Resolve(gatewayNodeIdx int) (string, error) { AllowedPorts: []int{443}, AllowedSchemes: []string{"https"}, }, - Dons: dons, + ShardedDONs: shardedDONs, + Services: services, } if len(g.AllowedPorts) > 0 { @@ -173,14 +164,92 @@ func (g GatewayJob) Resolve(gatewayNodeIdx int) (string, error) { ForwardingAllowed: false, GatewayConfig: config, } - b, err := toml.Marshal(spec) - if err != nil { - return "", err + b, marshalErr := toml.Marshal(spec) + if marshalErr != nil { + return "", marshalErr } return string(b), nil } +func (g GatewayJob) buildServicesAndShardedDONs() ([]shardedDON, []service, error) { + var shardedDONs []shardedDON + + type serviceEntry struct { + handlers []handler + donNames []string + // track handler names to avoid duplicates + handlerNames map[string]struct{} + } + serviceMap := make(map[string]*serviceEntry) + // preserve insertion order + var serviceOrder []string + + for _, targetDON := range g.TargetDONs { + nodes := make([]member, len(targetDON.Members)) + for i, mem := range targetDON.Members { + nodes[i] = member(mem) + } + + shardedDONs = append(shardedDONs, shardedDON{ + DonName: targetDON.ID, + F: targetDON.F, + Shards: []shard{{Nodes: nodes}}, + }) + + for _, ht := range targetDON.Handlers { + svcName := HandlerServiceName(ht) + + var h handler + switch ht { + case GatewayHandlerTypeWebAPICapabilities: + h = newDefaultWebAPICapabilitiesHandler() + case GatewayHandlerTypeVault: + h = newDefaultVaultHandler(g.RequestTimeoutSec) + case GatewayHandlerTypeHTTPCapabilities: + h = newDefaultHTTPCapabilitiesHandler() + default: + return nil, nil, errors.New("unknown handler type: " + ht) + } + + entry, exists := serviceMap[svcName] + if !exists { + entry = &serviceEntry{handlerNames: make(map[string]struct{})} + serviceMap[svcName] = entry + serviceOrder = append(serviceOrder, svcName) + } + + if _, dup := entry.handlerNames[ht]; !dup { + entry.handlers = append(entry.handlers, h) + entry.handlerNames[ht] = struct{}{} + } + + donAlreadyReferenced := false + for _, d := range entry.donNames { + if d == targetDON.ID { + donAlreadyReferenced = true + break + } + } + if !donAlreadyReferenced { + entry.donNames = append(entry.donNames, targetDON.ID) + } + } + } + + services := make([]service, 0, len(serviceMap)) + for _, svcName := range serviceOrder { + entry := serviceMap[svcName] + services = append(services, service{ + ServiceName: svcName, + Handlers: entry.handlers, + DONs: entry.donNames, + }) + } + + return shardedDONs, services, nil +} + type webAPICapabilitiesHandlerConfig struct { MaxAllowedMessageAgeSec int `toml:"maxAllowedMessageAgeSec"` NodeRateLimiter nodeRateLimiterConfig `toml:"NodeRateLimiter"` @@ -235,12 +304,29 @@ type gatewaySpec struct { type gatewayConfig struct { ConnectionManagerConfig connectionManagerConfig `toml:"ConnectionManagerConfig"` - Dons []don `toml:"Dons"` + ShardedDONs []shardedDON `toml:"ShardedDONs"` + Services []service `toml:"Services"` HTTPClientConfig httpClientConfig `toml:"HTTPClientConfig"` NodeServerConfig nodeServerConfig `toml:"NodeServerConfig"` UserServerConfig userServerConfig `toml:"UserServerConfig"` } +type service struct { + ServiceName string `toml:"ServiceName"` + Handlers []handler `toml:"Handlers"` + DONs []string `toml:"DONs"` +} + +type shardedDON struct { + DonName string `toml:"DonName"` + F int `toml:"F"` + Shards []shard `toml:"Shards"` +} + +type shard struct { + Nodes []member `toml:"Nodes"` +} + type connectionManagerConfig struct { AuthChallengeLen int `toml:"AuthChallengeLen"` AuthGatewayID string `toml:"AuthGatewayId"` @@ -248,13 +334,6 @@ type connectionManagerConfig struct { HeartbeatIntervalSec int `toml:"HeartbeatIntervalSec"` } -type don struct { - DonID string `toml:"DonId"` - F int `toml:"F"` - Handlers []handler `toml:"Handlers"` - Members []member `toml:"Members"` -} - type handler struct { Name string `toml:"Name"` ServiceName string `toml:"ServiceName,omitempty"` diff --git a/deployment/cre/jobs/pkg/gateway_job_test.go b/deployment/cre/jobs/pkg/gateway_job_test.go index e30e81ddf66..6804474f9c5 100644 --- a/deployment/cre/jobs/pkg/gateway_job_test.go +++ b/deployment/cre/jobs/pkg/gateway_job_test.go @@ -32,70 +32,64 @@ AuthGatewayId = 'gateway-node-1' AuthTimestampToleranceSec = 5 HeartbeatIntervalSec = 20 -[[gatewayConfig.Dons]] -DonId = 'workflow_1' +[[gatewayConfig.ShardedDONs]] +DonName = 'workflow_1' F = 1 -[[gatewayConfig.Dons.Handlers]] -Name = 'web-api-capabilities' - -[gatewayConfig.Dons.Handlers.Config] -maxAllowedMessageAgeSec = 1000 - -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] -globalBurst = 10 -globalRPS = 50 -perSenderBurst = 10 -perSenderRPS = 10 - -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xabc' Name = 'Node 1' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xdef' Name = 'Node 2' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xghi' Name = 'Node 3' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xjkl' Name = 'Node 4' -[[gatewayConfig.Dons]] -DonId = 'workflow_2' +[[gatewayConfig.ShardedDONs]] +DonName = 'workflow_2' F = 0 -[[gatewayConfig.Dons.Handlers]] -Name = 'web-api-capabilities' - -[gatewayConfig.Dons.Handlers.Config] -maxAllowedMessageAgeSec = 1000 - -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] -globalBurst = 10 -globalRPS = 50 -perSenderBurst = 10 -perSenderRPS = 10 - -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0x2abc' Name = 'Node 1' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0x2def' Name = 'Node 2' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0x2ghi' Name = 'Node 3' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0x2jkl' Name = 'Node 4' +[[gatewayConfig.Services]] +ServiceName = 'workflows' +DONs = ['workflow_1', 'workflow_2'] + +[[gatewayConfig.Services.Handlers]] +Name = 'web-api-capabilities' + +[gatewayConfig.Services.Handlers.Config] +maxAllowedMessageAgeSec = 1000 + +[gatewayConfig.Services.Handlers.Config.NodeRateLimiter] +globalBurst = 10 +globalRPS = 50 +perSenderBurst = 10 +perSenderRPS = 10 + [gatewayConfig.HTTPClientConfig] MaxResponseBytes = 50000000 AllowedPorts = [443] @@ -134,82 +128,80 @@ AuthGatewayId = 'gateway-node-1' AuthTimestampToleranceSec = 5 HeartbeatIntervalSec = 20 -[[gatewayConfig.Dons]] -DonId = 'workflow_1' +[[gatewayConfig.ShardedDONs]] +DonName = 'workflow_1' F = 1 -[[gatewayConfig.Dons.Handlers]] -Name = 'web-api-capabilities' - -[gatewayConfig.Dons.Handlers.Config] -maxAllowedMessageAgeSec = 1000 - -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] -globalBurst = 10 -globalRPS = 50 -perSenderBurst = 10 -perSenderRPS = 10 - -[[gatewayConfig.Dons.Handlers]] -Name = 'vault' -ServiceName = 'vault' - -[gatewayConfig.Dons.Handlers.Config] -requestTimeoutSec = 14 - -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] -globalBurst = 10 -globalRPS = 50 -perSenderBurst = 10 -perSenderRPS = 10 - -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xabc' Name = 'Node 1' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xdef' Name = 'Node 2' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xghi' Name = 'Node 3' -[[gatewayConfig.Dons.Members]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] Address = '0xjkl' Name = 'Node 4' -[[gatewayConfig.Dons]] -DonId = 'workflow_2' +[[gatewayConfig.ShardedDONs]] +DonName = 'workflow_2' F = 0 -[[gatewayConfig.Dons.Handlers]] +[[gatewayConfig.ShardedDONs.Shards]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0x2abc' +Name = 'Node 1' + +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0x2def' +Name = 'Node 2' + +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0x2ghi' +Name = 'Node 3' + +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0x2jkl' +Name = 'Node 4' + +[[gatewayConfig.Services]] +ServiceName = 'workflows' +DONs = ['workflow_1', 'workflow_2'] + +[[gatewayConfig.Services.Handlers]] Name = 'web-api-capabilities' -[gatewayConfig.Dons.Handlers.Config] +[gatewayConfig.Services.Handlers.Config] maxAllowedMessageAgeSec = 1000 -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] +[gatewayConfig.Services.Handlers.Config.NodeRateLimiter] globalBurst = 10 globalRPS = 50 perSenderBurst = 10 perSenderRPS = 10 -[[gatewayConfig.Dons.Members]] -Address = '0x2abc' -Name = 'Node 1' +[[gatewayConfig.Services]] +ServiceName = 'vault' +DONs = ['workflow_1'] -[[gatewayConfig.Dons.Members]] -Address = '0x2def' -Name = 'Node 2' +[[gatewayConfig.Services.Handlers]] +Name = 'vault' +ServiceName = 'vault' -[[gatewayConfig.Dons.Members]] -Address = '0x2ghi' -Name = 'Node 3' +[gatewayConfig.Services.Handlers.Config] +requestTimeoutSec = 14 -[[gatewayConfig.Dons.Members]] -Address = '0x2jkl' -Name = 'Node 4' +[gatewayConfig.Services.Handlers.Config.NodeRateLimiter] +globalBurst = 10 +globalRPS = 50 +perSenderBurst = 10 +perSenderRPS = 10 [gatewayConfig.HTTPClientConfig] MaxResponseBytes = 50000000 @@ -249,56 +241,66 @@ AuthGatewayId = 'gateway-node-1' AuthTimestampToleranceSec = 5 HeartbeatIntervalSec = 20 -[[gatewayConfig.Dons]] -DonId = 'workflow_1' +[[gatewayConfig.ShardedDONs]] +DonName = 'workflow_1' F = 3 -[[gatewayConfig.Dons.Handlers]] +[[gatewayConfig.ShardedDONs.Shards]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0xabc' +Name = 'Node 1' + +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0xdef' +Name = 'Node 2' + +[[gatewayConfig.ShardedDONs]] +DonName = 'workflow_2' +F = 0 + +[[gatewayConfig.ShardedDONs.Shards]] +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0xghi' +Name = 'Node 3' + +[[gatewayConfig.ShardedDONs.Shards.Nodes]] +Address = '0xjkl' +Name = 'Node 4' + +[[gatewayConfig.Services]] +ServiceName = 'workflows' +DONs = ['workflow_1'] + +[[gatewayConfig.Services.Handlers]] Name = 'http-capabilities' ServiceName = 'workflows' -[gatewayConfig.Dons.Handlers.Config] +[gatewayConfig.Services.Handlers.Config] CleanUpPeriodMs = 600000 -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] +[gatewayConfig.Services.Handlers.Config.NodeRateLimiter] globalBurst = 100 globalRPS = 500 perSenderBurst = 100 perSenderRPS = 100 -[[gatewayConfig.Dons.Members]] -Address = '0xabc' -Name = 'Node 1' - -[[gatewayConfig.Dons.Members]] -Address = '0xdef' -Name = 'Node 2' - -[[gatewayConfig.Dons]] -DonId = 'workflow_2' -F = 0 +[[gatewayConfig.Services]] +ServiceName = 'vault' +DONs = ['workflow_2'] -[[gatewayConfig.Dons.Handlers]] +[[gatewayConfig.Services.Handlers]] Name = 'vault' ServiceName = 'vault' -[gatewayConfig.Dons.Handlers.Config] +[gatewayConfig.Services.Handlers.Config] requestTimeoutSec = 14 -[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter] +[gatewayConfig.Services.Handlers.Config.NodeRateLimiter] globalBurst = 10 globalRPS = 50 perSenderBurst = 10 perSenderRPS = 10 -[[gatewayConfig.Dons.Members]] -Address = '0xghi' -Name = 'Node 3' - -[[gatewayConfig.Dons.Members]] -Address = '0xjkl' -Name = 'Node 4' - [gatewayConfig.HTTPClientConfig] MaxResponseBytes = 50000000 AllowedPorts = [443] diff --git a/system-tests/lib/cre/don/gateway/gateway.go b/system-tests/lib/cre/don/gateway/gateway.go index 67f6aeffcc9..ce93960c72c 100644 --- a/system-tests/lib/cre/don/gateway/gateway.go +++ b/system-tests/lib/cre/don/gateway/gateway.go @@ -23,7 +23,7 @@ type WhitelistConfig struct { ExtraAllowedIPsCIDR []string } -func CreateJobs(ctx context.Context, creEnv *cre.Environment, dons *cre.Dons, gatewayConfigs []cre.GatewayConfig, whitelistConfig WhitelistConfig) error { +func CreateJobs(ctx context.Context, creEnv *cre.Environment, dons *cre.Dons, gatewayServiceConfigs []cre.GatewayServiceConfig, whitelistConfig WhitelistConfig) error { specs := make(map[string][]string) if !dons.RequiresGateway() { @@ -46,7 +46,7 @@ func CreateJobs(ctx context.Context, creEnv *cre.Environment, dons *cre.Dons, ga }, Template: job_types.Gateway, Inputs: job_types.JobSpecInput{ - "dons": gatewayConfigs, + "services": gatewayServiceConfigs, "allowedPorts": append(whitelistConfig.ExtraAllowedPorts, DefaultAllowedPorts...), "allowedSchemes": []string{"http", "https"}, "allowedIPsCIDR": whitelistConfig.ExtraAllowedIPsCIDR, diff --git a/system-tests/lib/cre/environment/environment.go b/system-tests/lib/cre/environment/environment.go index ed5d77694a8..4a516a5b6ef 100644 --- a/system-tests/lib/cre/environment/environment.go +++ b/system-tests/lib/cre/environment/environment.go @@ -278,7 +278,7 @@ func SetupTestEnvironment( fmt.Print(libformat.PurpleText("%s", input.StageGen.WrapAndNext("DONs and Job Distributor started and linked in %.2f seconds", input.StageGen.Elapsed().Seconds()))) fmt.Print(libformat.PurpleText("%s", input.StageGen.Wrap("Creating Jobs with Job Distributor"))) - gJobErr := gateway.CreateJobs(ctx, creEnvironment, dons, topology.GatewayConfigs, input.GatewayWhitelistConfig) + gJobErr := gateway.CreateJobs(ctx, creEnvironment, dons, topology.GatewayServiceConfigs, input.GatewayWhitelistConfig) if gJobErr != nil { return nil, pkgerrors.Wrap(gJobErr, "failed to create gateway jobs with Job Distributor") } diff --git a/system-tests/lib/cre/topology.go b/system-tests/lib/cre/topology.go index 1440d26f9d3..6cd5f210918 100644 --- a/system-tests/lib/cre/topology.go +++ b/system-tests/lib/cre/topology.go @@ -2,13 +2,13 @@ package cre import ( "fmt" + "slices" "strings" "github.com/pkg/errors" "github.com/smartcontractkit/chainlink/deployment/cre/jobs/pkg" libc "github.com/smartcontractkit/chainlink/system-tests/lib/conversions" - "github.com/smartcontractkit/chainlink/system-tests/lib/infra" ) @@ -18,10 +18,10 @@ const ( ) type Topology struct { - WorkflowDONIDs []uint64 `toml:"workflow_don_ids" json:"workflow_don_ids"` - DonsMetadata *DonsMetadata `toml:"dons_metadata" json:"dons_metadata"` - GatewayConfigs []GatewayConfig `toml:"gateway_configs" json:"gateway_configs"` - GatewayConnectors *GatewayConnectors `toml:"gateway_connectors" json:"gateway_connectors"` + WorkflowDONIDs []uint64 `toml:"workflow_don_ids" json:"workflow_don_ids"` + DonsMetadata *DonsMetadata `toml:"dons_metadata" json:"dons_metadata"` + GatewayServiceConfigs []GatewayServiceConfig `toml:"gateway_service_configs" json:"gateway_service_configs"` + GatewayConnectors *GatewayConnectors `toml:"gateway_connectors" json:"gateway_connectors"` } func NewTopology(nodeSet []*NodeSet, provider infra.Provider, capabilityConfigs map[CapabilityFlag]CapabilityConfig) (*Topology, error) { @@ -51,14 +51,18 @@ func NewTopology(nodeSet []*NodeSet, provider infra.Provider, capabilityConfigs DonsMetadata: donsMetadata, } + donNames := make([]string, 0, len(wfDONs)) for _, wfDON := range wfDONs { - topology.GatewayConfigs = append(topology.GatewayConfigs, GatewayConfig{ - Name: wfDON.Name, - Handlers: []string{pkg.GatewayHandlerTypeWebAPICapabilities}, - }) + donNames = append(donNames, wfDON.Name) topology.WorkflowDONIDs = append(topology.WorkflowDONIDs, wfDON.ID) } + topology.GatewayServiceConfigs = append(topology.GatewayServiceConfigs, GatewayServiceConfig{ + ServiceName: pkg.ServiceNameWorkflows, + Handlers: []string{pkg.GatewayHandlerTypeWebAPICapabilities}, + DONs: donNames, + }) + if donsMetadata.RequiresGateway() { topology.GatewayConnectors = NewGatewayConnectorOutput() gatewayCount := 0 @@ -119,39 +123,39 @@ func (t *Topology) Bootstrap() (*NodeMetadata, bool) { return t.DonsMetadata.Bootstrap() } -// AddGatewayHandlers adds the given handler names to the gateway config of the given DON. It only adds handlers, if they are not already present. -// Actual configuration for each handler is generated later during deployment. +// AddGatewayHandlers adds the given handler names to the appropriate service configs for the given DON. +// Handlers are grouped by service name (derived from handler type). If a service doesn't exist yet, it is created. +// The DON is added to each service's DON list if not already present. func (t *Topology) AddGatewayHandlers(donMetadata DonMetadata, handlers []string) error { - donFound := false + for _, handlerName := range handlers { + svcName := pkg.HandlerServiceName(handlerName) + + svcIdx := -1 + for i, svc := range t.GatewayServiceConfigs { + if strings.EqualFold(svc.ServiceName, svcName) { + svcIdx = i + break + } + } - for idx, gc := range t.GatewayConfigs { - if gc.Name == donMetadata.Name { - donFound = true + if svcIdx == -1 { + t.GatewayServiceConfigs = append(t.GatewayServiceConfigs, GatewayServiceConfig{ + ServiceName: svcName, + Handlers: []string{handlerName}, + DONs: []string{donMetadata.Name}, + }) + continue } - if donFound { - for _, handlerName := range handlers { - alreadyPresent := false - for _, existingHandler := range gc.Handlers { - if strings.EqualFold(existingHandler, handlerName) { - alreadyPresent = true - break - } - } - if !alreadyPresent { - t.GatewayConfigs[idx].Handlers = append(t.GatewayConfigs[idx].Handlers, handlerName) - } - } - break + if !slices.ContainsFunc(t.GatewayServiceConfigs[svcIdx].Handlers, func(h string) bool { + return strings.EqualFold(h, handlerName) + }) { + t.GatewayServiceConfigs[svcIdx].Handlers = append(t.GatewayServiceConfigs[svcIdx].Handlers, handlerName) } - } - // if we did not find the DON in the gateway config, we need to add it - if !donFound { - t.GatewayConfigs = append(t.GatewayConfigs, GatewayConfig{ - Name: donMetadata.Name, - Handlers: handlers, - }) + if !slices.Contains(t.GatewayServiceConfigs[svcIdx].DONs, donMetadata.Name) { + t.GatewayServiceConfigs[svcIdx].DONs = append(t.GatewayServiceConfigs[svcIdx].DONs, donMetadata.Name) + } } return nil diff --git a/system-tests/lib/cre/types.go b/system-tests/lib/cre/types.go index 748567180a3..1a076a6b082 100644 --- a/system-tests/lib/cre/types.go +++ b/system-tests/lib/cre/types.go @@ -416,9 +416,12 @@ func (c *ConfigureCapabilityRegistryInput) Validate() error { return nil } -type GatewayConfig struct { - Name string // DON name - Handlers []string +// GatewayServiceConfig represents a service in the new multi-DON gateway format. +// Each service groups handlers and references the DON names it operates on. +type GatewayServiceConfig struct { + ServiceName string `yaml:"servicename"` + Handlers []string `yaml:"handlers"` + DONs []string `yaml:"dons"` } type GatewayConnectors struct { From a1a803507d02adab63e4e7f3bb11409e813b33b2 Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Mon, 2 Mar 2026 19:33:20 -0800 Subject: [PATCH 2/3] remove check for multi-handlers --- core/services/gateway/gateway.go | 7 ------- 1 file changed, 7 deletions(-) diff --git a/core/services/gateway/gateway.go b/core/services/gateway/gateway.go index d39ab616d58..b5a2ef719ee 100644 --- a/core/services/gateway/gateway.go +++ b/core/services/gateway/gateway.go @@ -120,7 +120,6 @@ func setupFromNewConfig( donNameToConfig[don.DonName] = don } - assignedDONs := make(map[string]struct{}) // For each service, create a MultiHandler with its handlers and attached DONs for _, svc := range cfg.Services { var shardedDONs []config.ShardedDONConfig @@ -131,12 +130,6 @@ func setupFromNewConfig( if !ok { return nil, fmt.Errorf("service %q references unknown DON: %s", svc.ServiceName, donName) } - if _, assigned := assignedDONs[donName]; assigned { - // NOTE: this check can be relaxed in the future once we clean up all "service.method" strings - // and split them correctly in Multihandler - return nil, fmt.Errorf("DON %q is assigned to multiple services", donName) - } - assignedDONs[donName] = struct{}{} shardedDONs = append(shardedDONs, donCfg) var shardConnMgrs []handlers.DON From a9c4d4295d89839bd933528cf25fc4a9ac278dc7 Mon Sep 17 00:00:00 2001 From: Bolek Kulbabinski <1416262+bolekk@users.noreply.github.com> Date: Mon, 2 Mar 2026 21:18:58 -0800 Subject: [PATCH 3/3] simplify types --- .../jobs/operations/propose_gateway_job.go | 46 +++-- deployment/cre/jobs/pkg/gateway_job.go | 103 ++++------- deployment/cre/jobs/pkg/gateway_job_test.go | 165 +++++++----------- 3 files changed, 120 insertions(+), 194 deletions(-) diff --git a/deployment/cre/jobs/operations/propose_gateway_job.go b/deployment/cre/jobs/operations/propose_gateway_job.go index 78c28d63aa0..21c8879b695 100644 --- a/deployment/cre/jobs/operations/propose_gateway_job.go +++ b/deployment/cre/jobs/operations/propose_gateway_job.go @@ -56,32 +56,37 @@ var ProposeGatewayJob = operations.NewOperation[ProposeGatewayJobInput, ProposeG // proposeGatewayJob builds a gateway job spec and then proposes it to the nodes of a DON. // It derives the set of unique DON names from input.Services, fetches node information and -// chain configurations for each DON from JD, then builds TargetDON entries. -// Each TargetDON contains all handlers from the services that reference it. +// chain configurations for each DON from JD, then builds the gateway job. func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input ProposeGatewayJobInput) (ProposeGatewayJobOutput, error) { - // Collect unique DON names and build DON→handlers mapping from services - donHandlers := make(map[string][]string) // DON name → handler types + donNameSet := make(map[string]struct{}) for _, svc := range input.Services { for _, donName := range svc.DONs { - donHandlers[donName] = append(donHandlers[donName], svc.Handlers...) + donNameSet[donName] = struct{}{} } } - targetDONs := make([]pkg.TargetDON, 0, len(donHandlers)) - for donName, handlers := range donHandlers { + dons := make([]pkg.TargetDON, 0, len(donNameSet)) + for donName := range donNameSet { members, f, err := resolveDONMembers(deps, input, donName) if err != nil { return ProposeGatewayJobOutput{}, err } - - targetDONs = append(targetDONs, pkg.TargetDON{ - ID: donName, - F: f, - Members: members, - Handlers: dedup(handlers), + dons = append(dons, pkg.TargetDON{ + ID: donName, + F: f, + Members: members, }) } + services := make([]pkg.GatewayServiceConfig, len(input.Services)) + for i, svc := range input.Services { + services[i] = pkg.GatewayServiceConfig{ + ServiceName: svc.ServiceName, + Handlers: svc.Handlers, + DONs: svc.DONs, + } + } + requestTimeoutSec := input.GatewayRequestTimeoutSec if requestTimeoutSec == 0 { requestTimeoutSec = defaultGatewayRequestTimeoutSec @@ -89,7 +94,8 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr gj := pkg.GatewayJob{ JobName: "CRE Gateway", - TargetDONs: targetDONs, + DONs: dons, + Services: services, RequestTimeoutSec: requestTimeoutSec, AllowedPorts: input.AllowedPorts, AllowedSchemes: input.AllowedSchemes, @@ -230,18 +236,6 @@ func resolveDONMembers(deps ProposeGatewayJobDeps, input ProposeGatewayJobInput, return members, f, nil } -func dedup(ss []string) []string { - seen := make(map[string]struct{}, len(ss)) - out := make([]string, 0, len(ss)) - for _, s := range ss { - if _, ok := seen[s]; !ok { - seen[s] = struct{}{} - out = append(out, s) - } - } - return out -} - func parseSelector(sel uint64) (nodev1.ChainType, string, error) { fam, err := chainsel.GetSelectorFamily(sel) if err != nil { diff --git a/deployment/cre/jobs/pkg/gateway_job.go b/deployment/cre/jobs/pkg/gateway_job.go index e90fffb73c4..6f9357e74e6 100644 --- a/deployment/cre/jobs/pkg/gateway_job.go +++ b/deployment/cre/jobs/pkg/gateway_job.go @@ -39,14 +39,20 @@ type TargetDONMember struct { } type TargetDON struct { - ID string - F int - Members []TargetDONMember - Handlers []string + ID string + F int + Members []TargetDONMember +} + +type GatewayServiceConfig struct { + ServiceName string + Handlers []string + DONs []string } type GatewayJob struct { - TargetDONs []TargetDON + DONs []TargetDON + Services []GatewayServiceConfig JobName string RequestTimeoutSec int AllowedPorts []int @@ -61,8 +67,12 @@ func (g GatewayJob) Validate() error { return errors.New("must provide job name") } - if len(g.TargetDONs) == 0 { - return errors.New("must provide at least one target DON") + if len(g.DONs) == 0 { + return errors.New("must provide at least one DON") + } + + if len(g.Services) == 0 { + return errors.New("must provide at least one service") } // We impose a lower bound to account for other timeouts which are hardcoded, @@ -173,77 +183,38 @@ func (g GatewayJob) Resolve(gatewayNodeIdx int) (string, error) { } func (g GatewayJob) buildServicesAndShardedDONs() ([]shardedDON, []service, error) { - var shardedDONs []shardedDON - - type serviceEntry struct { - handlers []handler - donNames []string - // track handler names to avoid duplicates - handlerNames map[string]struct{} - } - serviceMap := make(map[string]*serviceEntry) - // preserve insertion order - var serviceOrder []string - - for _, targetDON := range g.TargetDONs { - nodes := make([]member, len(targetDON.Members)) - for i, mem := range targetDON.Members { - nodes[i] = member(mem) + shardedDONs := make([]shardedDON, len(g.DONs)) + for i, don := range g.DONs { + nodes := make([]member, len(don.Members)) + for j, mem := range don.Members { + nodes[j] = member(mem) } - - shardedDONs = append(shardedDONs, shardedDON{ - DonName: targetDON.ID, - F: targetDON.F, + shardedDONs[i] = shardedDON{ + DonName: don.ID, + F: don.F, Shards: []shard{{Nodes: nodes}}, - }) - - for _, ht := range targetDON.Handlers { - svcName := HandlerServiceName(ht) + } + } - var h handler + services := make([]service, 0, len(g.Services)) + for _, svcCfg := range g.Services { + var handlers []handler + for _, ht := range svcCfg.Handlers { switch ht { case GatewayHandlerTypeWebAPICapabilities: - h = newDefaultWebAPICapabilitiesHandler() + handlers = append(handlers, newDefaultWebAPICapabilitiesHandler()) case GatewayHandlerTypeVault: - h = newDefaultVaultHandler(g.RequestTimeoutSec) + handlers = append(handlers, newDefaultVaultHandler(g.RequestTimeoutSec)) case GatewayHandlerTypeHTTPCapabilities: - h = newDefaultHTTPCapabilitiesHandler() + handlers = append(handlers, newDefaultHTTPCapabilitiesHandler()) default: return nil, nil, errors.New("unknown handler type: " + ht) } - - entry, exists := serviceMap[svcName] - if !exists { - entry = &serviceEntry{handlerNames: make(map[string]struct{})} - serviceMap[svcName] = entry - serviceOrder = append(serviceOrder, svcName) - } - - if _, dup := entry.handlerNames[ht]; !dup { - entry.handlers = append(entry.handlers, h) - entry.handlerNames[ht] = struct{}{} - } - - donAlreadyReferenced := false - for _, d := range entry.donNames { - if d == targetDON.ID { - donAlreadyReferenced = true - break - } - } - if !donAlreadyReferenced { - entry.donNames = append(entry.donNames, targetDON.ID) - } } - } - - services := make([]service, 0, len(serviceMap)) - for _, svcName := range serviceOrder { - entry := serviceMap[svcName] services = append(services, service{ - ServiceName: svcName, - Handlers: entry.handlers, - DONs: entry.donNames, + ServiceName: svcCfg.ServiceName, + Handlers: handlers, + DONs: svcCfg.DONs, }) } diff --git a/deployment/cre/jobs/pkg/gateway_job_test.go b/deployment/cre/jobs/pkg/gateway_job_test.go index 6804474f9c5..ce405fb48c6 100644 --- a/deployment/cre/jobs/pkg/gateway_job_test.go +++ b/deployment/cre/jobs/pkg/gateway_job_test.go @@ -1,7 +1,6 @@ package pkg import ( - "fmt" "testing" "github.com/stretchr/testify/assert" @@ -15,7 +14,7 @@ func TestGateway_Validate(t *testing.T) { require.ErrorContains(t, g.Validate(), "must provide job name") g.JobName = "AGatewayJob" - require.ErrorContains(t, g.Validate(), "must provide at least one target DON") + require.ErrorContains(t, g.Validate(), "must provide at least one DON") } const ( @@ -333,53 +332,34 @@ func TestGateway_Resolve(t *testing.T) { g := GatewayJob{ JobName: "Gateway1", RequestTimeoutSec: 15, - TargetDONs: []TargetDON{ + DONs: []TargetDON{ { - ID: "workflow_1", - F: 1, - Handlers: []string{GatewayHandlerTypeWebAPICapabilities}, + ID: "workflow_1", + F: 1, Members: []TargetDONMember{ - { - Address: "0xabc", - Name: "Node 1", - }, - { - Address: "0xdef", - Name: "Node 2", - }, - { - Address: "0xghi", - Name: "Node 3", - }, - { - Address: "0xjkl", - Name: "Node 4", - }, + {Address: "0xabc", Name: "Node 1"}, + {Address: "0xdef", Name: "Node 2"}, + {Address: "0xghi", Name: "Node 3"}, + {Address: "0xjkl", Name: "Node 4"}, }, }, { - ID: "workflow_2", - Handlers: []string{GatewayHandlerTypeWebAPICapabilities}, + ID: "workflow_2", Members: []TargetDONMember{ - { - Address: "0x2abc", - Name: "Node 1", - }, - { - Address: "0x2def", - Name: "Node 2", - }, - { - Address: "0x2ghi", - Name: "Node 3", - }, - { - Address: "0x2jkl", - Name: "Node 4", - }, + {Address: "0x2abc", Name: "Node 1"}, + {Address: "0x2def", Name: "Node 2"}, + {Address: "0x2ghi", Name: "Node 3"}, + {Address: "0x2jkl", Name: "Node 4"}, }, }, }, + Services: []GatewayServiceConfig{ + { + ServiceName: ServiceNameWorkflows, + Handlers: []string{GatewayHandlerTypeWebAPICapabilities}, + DONs: []string{"workflow_1", "workflow_2"}, + }, + }, } spec, err := g.Resolve(1) @@ -393,58 +373,42 @@ func TestGateway_Resolve_WithVaultHandler(t *testing.T) { g := GatewayJob{ JobName: "Gateway1", RequestTimeoutSec: 15, - TargetDONs: []TargetDON{ + DONs: []TargetDON{ { - ID: "workflow_1", - F: 1, - Handlers: []string{GatewayHandlerTypeWebAPICapabilities, GatewayHandlerTypeVault}, + ID: "workflow_1", + F: 1, Members: []TargetDONMember{ - { - Address: "0xabc", - Name: "Node 1", - }, - { - Address: "0xdef", - Name: "Node 2", - }, - { - Address: "0xghi", - Name: "Node 3", - }, - { - Address: "0xjkl", - Name: "Node 4", - }, + {Address: "0xabc", Name: "Node 1"}, + {Address: "0xdef", Name: "Node 2"}, + {Address: "0xghi", Name: "Node 3"}, + {Address: "0xjkl", Name: "Node 4"}, }, }, { ID: "workflow_2", - - Handlers: []string{GatewayHandlerTypeWebAPICapabilities}, Members: []TargetDONMember{ - { - Address: "0x2abc", - Name: "Node 1", - }, - { - Address: "0x2def", - Name: "Node 2", - }, - { - Address: "0x2ghi", - Name: "Node 3", - }, - { - Address: "0x2jkl", - Name: "Node 4", - }, + {Address: "0x2abc", Name: "Node 1"}, + {Address: "0x2def", Name: "Node 2"}, + {Address: "0x2ghi", Name: "Node 3"}, + {Address: "0x2jkl", Name: "Node 4"}, }, }, }, + Services: []GatewayServiceConfig{ + { + ServiceName: ServiceNameWorkflows, + Handlers: []string{GatewayHandlerTypeWebAPICapabilities}, + DONs: []string{"workflow_1", "workflow_2"}, + }, + { + ServiceName: ServiceNameVault, + Handlers: []string{GatewayHandlerTypeVault}, + DONs: []string{"workflow_1"}, + }, + }, } spec, err := g.Resolve(1) - fmt.Println(spec) require.NoError(t, err) assert.Equal(t, expectedWithVault, spec) } @@ -455,38 +419,35 @@ func TestGateway_Resolve_WithHTTPCapabilitiesHandler(t *testing.T) { g := GatewayJob{ JobName: "Gateway1", RequestTimeoutSec: 15, - TargetDONs: []TargetDON{ + DONs: []TargetDON{ { - ID: "workflow_1", - F: 3, - Handlers: []string{GatewayHandlerTypeHTTPCapabilities}, + ID: "workflow_1", + F: 3, Members: []TargetDONMember{ - { - Address: "0xabc", - Name: "Node 1", - }, - { - Address: "0xdef", - Name: "Node 2", - }, + {Address: "0xabc", Name: "Node 1"}, + {Address: "0xdef", Name: "Node 2"}, }, }, { - ID: "workflow_2", - F: 0, - Handlers: []string{GatewayHandlerTypeVault}, + ID: "workflow_2", Members: []TargetDONMember{ - { - Address: "0xghi", - Name: "Node 3", - }, - { - Address: "0xjkl", - Name: "Node 4", - }, + {Address: "0xghi", Name: "Node 3"}, + {Address: "0xjkl", Name: "Node 4"}, }, }, }, + Services: []GatewayServiceConfig{ + { + ServiceName: ServiceNameWorkflows, + Handlers: []string{GatewayHandlerTypeHTTPCapabilities}, + DONs: []string{"workflow_1"}, + }, + { + ServiceName: ServiceNameVault, + Handlers: []string{GatewayHandlerTypeVault}, + DONs: []string{"workflow_2"}, + }, + }, } spec, err := g.Resolve(1)