Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 0 additions & 7 deletions core/services/gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,6 @@ func setupFromNewConfig(
donNameToConfig[don.DonName] = don
}

assignedDONs := make(map[string]struct{})
// For each service, create a MultiHandler with its handlers and attached DONs
for _, svc := range cfg.Services {
var shardedDONs []config.ShardedDONConfig
Expand All @@ -131,12 +130,6 @@ func setupFromNewConfig(
if !ok {
return nil, fmt.Errorf("service %q references unknown DON: %s", svc.ServiceName, donName)
}
if _, assigned := assignedDONs[donName]; assigned {
// NOTE: this check can be relaxed in the future once we clean up all "service.method" strings
// and split them correctly in Multihandler
return nil, fmt.Errorf("DON %q is assigned to multiple services", donName)
}
assignedDONs[donName] = struct{}{}
shardedDONs = append(shardedDONs, donCfg)

var shardConnMgrs []handlers.DON
Expand Down
208 changes: 115 additions & 93 deletions deployment/cre/jobs/operations/propose_gateway_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ const defaultGatewayRequestTimeoutSec = 12
type ProposeGatewayJobInput struct {
Domain string
DONFilters []offchain.TargetDONFilter
DONs []DON `yaml:"dons"`
Services []GatewayService `yaml:"services"`
GatewayRequestTimeoutSec int `yaml:"gatewayRequestTimeoutSec"`
AllowedPorts []int `yaml:"allowedPorts"`
AllowedSchemes []string `yaml:"allowedSchemes"`
Expand All @@ -33,10 +33,10 @@ type ProposeGatewayJobInput struct {
JobLabels map[string]string
}

type DON struct {
Name string
F int
Handlers []string
type GatewayService struct {
ServiceName string `yaml:"servicename"`
Handlers []string `yaml:"handlers"`
DONs []string `yaml:"dons"`
}

type ProposeGatewayJobDeps struct {
Expand All @@ -55,94 +55,31 @@ var ProposeGatewayJob = operations.NewOperation[ProposeGatewayJobInput, ProposeG
)

// proposeGatewayJob builds a gateway job spec and then proposes it to the nodes of a DON.
// It first fetches node information and chain configurations about the target DONs given in input.DONs to build the job spec.
// Target DONs are the DONs that the Gateway allows communication with.
// It then proposes this job spec to each node of the specific DON based on input filters and a chain selector.
// All nodes must be connected to job distributor and have the proper chain declared.
// It derives the set of unique DON names from input.Services, fetches node information and
// chain configurations for each DON from JD, then builds TargetDON entries.
// Each TargetDON contains all handlers from the services that reference it.
func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input ProposeGatewayJobInput) (ProposeGatewayJobOutput, error) {
targetDONs := make([]pkg.TargetDON, 0)

for _, ad := range input.DONs {
// Use filters from input, except the DON name which needs to be the target DON
filters := &nodev1.ListNodesRequest_Filter{}
for _, f := range input.DONFilters {
if f.Key == offchain.FilterKeyDONName {
continue
}
filters = offchain.TargetDONFilter{
Key: f.Key,
Value: f.Value,
}.AddToFilter(filters)
}
filtersWithTargetDONName := offchain.TargetDONFilter{
Key: offchain.FilterKeyDONName,
Value: ad.Name,
}.AddToFilter(filters)

ns, err := pkg.FetchNodesFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{
Domain: input.Domain,
Filters: filtersWithTargetDONName,
})
if err != nil {
return ProposeGatewayJobOutput{}, err
}
if len(ns) == 0 {
return ProposeGatewayJobOutput{}, fmt.Errorf("no nodes with filters %s", input.DONFilters)
}

nodes, err := pkg.FetchNodeChainConfigsFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{
Domain: input.Domain,
Filters: filtersWithTargetDONName,
})
if err != nil {
return ProposeGatewayJobOutput{}, err
}
if len(nodes) == 0 {
return ProposeGatewayJobOutput{}, fmt.Errorf("no chain configs with filters %s", input.DONFilters)
// Collect unique DON names and build DON→handlers mapping from services
donHandlers := make(map[string][]string) // DON name → handler types
for _, svc := range input.Services {
for _, donName := range svc.DONs {
donHandlers[donName] = append(donHandlers[donName], svc.Handlers...)
}
}

fam, chainID, err := parseSelector(uint64(input.GatewayKeyChainSelector))
targetDONs := make([]pkg.TargetDON, 0, len(donHandlers))
for donName, handlers := range donHandlers {
members, f, err := resolveDONMembers(deps, input, donName)
if err != nil {
return ProposeGatewayJobOutput{}, err
}

// make map of node id to node
m := make(map[string]*nodev1.Node, len(ns))
for _, n := range ns {
m[n.Id] = n
}

var members []pkg.TargetDONMember
for _, n := range nodes {
var found bool
for _, cc := range n.ChainConfigs {
if cc.Chain.Id == chainID && cc.Chain.Type == fam {
nodeName := n.NodeID
if matched, ok := m[n.NodeID]; ok {
nodeName = matched.Name
}
members = append(members, pkg.TargetDONMember{
Address: cc.AccountAddress,
Name: fmt.Sprintf("%s (DON %s)", nodeName, ad.Name),
})
found = true

break
}
}

if !found {
return ProposeGatewayJobOutput{}, fmt.Errorf("could not find key belonging to chain id %s on node %s", chainID, n.NodeID)
}
}

td := pkg.TargetDON{
ID: ad.Name,
F: ad.F,
targetDONs = append(targetDONs, pkg.TargetDON{
ID: donName,
F: f,
Members: members,
Handlers: ad.Handlers,
}
targetDONs = append(targetDONs, td)
Handlers: dedup(handlers),
})
}

requestTimeoutSec := input.GatewayRequestTimeoutSec
Expand All @@ -160,8 +97,7 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr
AuthGatewayID: input.AuthGatewayID,
}

err := gj.Validate()
if err != nil {
if err := gj.Validate(); err != nil {
return ProposeGatewayJobOutput{}, err
}

Expand Down Expand Up @@ -197,18 +133,18 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr
Specs: make(map[string][]string),
}
for nodeIdx, n := range nodes {
spec, err := gj.Resolve(nodeIdx)
if err != nil {
return ProposeGatewayJobOutput{}, err
spec, specErr := gj.Resolve(nodeIdx)
if specErr != nil {
return ProposeGatewayJobOutput{}, specErr
}

_, err = deps.Env.Offchain.ProposeJob(b.GetContext(), &jobv1.ProposeJobRequest{
_, propErr := deps.Env.Offchain.ProposeJob(b.GetContext(), &jobv1.ProposeJobRequest{
NodeId: n.GetId(),
Spec: spec,
Labels: labels,
})
if err != nil {
return ProposeGatewayJobOutput{}, fmt.Errorf("error proposing job to node %s spec %s : %w", n.GetId(), spec, err)
if propErr != nil {
return ProposeGatewayJobOutput{}, fmt.Errorf("error proposing job to node %s spec %s : %w", n.GetId(), spec, propErr)
}

output.Specs[n.GetId()] = append(output.Specs[n.GetId()], spec)
Expand All @@ -220,6 +156,92 @@ func proposeGatewayJob(b operations.Bundle, deps ProposeGatewayJobDeps, input Pr
return output, nil
}

func resolveDONMembers(deps ProposeGatewayJobDeps, input ProposeGatewayJobInput, donName string) ([]pkg.TargetDONMember, int, error) {
filters := &nodev1.ListNodesRequest_Filter{}
for _, f := range input.DONFilters {
if f.Key == offchain.FilterKeyDONName {
continue
}
filters = offchain.TargetDONFilter{
Key: f.Key,
Value: f.Value,
}.AddToFilter(filters)
}
filtersWithTargetDONName := offchain.TargetDONFilter{
Key: offchain.FilterKeyDONName,
Value: donName,
}.AddToFilter(filters)

ns, err := pkg.FetchNodesFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{
Domain: input.Domain,
Filters: filtersWithTargetDONName,
})
if err != nil {
return nil, 0, err
}
if len(ns) == 0 {
return nil, 0, fmt.Errorf("no nodes with filters %s", input.DONFilters)
}

nodeChainConfigs, err := pkg.FetchNodeChainConfigsFromJD(deps.Env.GetContext(), deps.Env, pkg.FetchNodesRequest{
Domain: input.Domain,
Filters: filtersWithTargetDONName,
})
if err != nil {
return nil, 0, err
}
if len(nodeChainConfigs) == 0 {
return nil, 0, fmt.Errorf("no chain configs with filters %s", input.DONFilters)
}

fam, chainID, err := parseSelector(uint64(input.GatewayKeyChainSelector))
if err != nil {
return nil, 0, err
}

m := make(map[string]*nodev1.Node, len(ns))
for _, n := range ns {
m[n.Id] = n
}

var members []pkg.TargetDONMember
for _, n := range nodeChainConfigs {
var found bool
for _, cc := range n.ChainConfigs {
if cc.Chain.Id == chainID && cc.Chain.Type == fam {
nodeName := n.NodeID
if matched, ok := m[n.NodeID]; ok {
nodeName = matched.Name
}
members = append(members, pkg.TargetDONMember{
Address: cc.AccountAddress,
Name: fmt.Sprintf("%s (DON %s)", nodeName, donName),
})
found = true
break
}
}
if !found {
return nil, 0, fmt.Errorf("could not find key belonging to chain id %s on node %s", chainID, n.NodeID)
}
}

f := (len(members) - 1) / 3
return members, f, nil
}

func dedup(ss []string) []string {
seen := make(map[string]struct{}, len(ss))
out := make([]string, 0, len(ss))
for _, s := range ss {
if _, ok := seen[s]; !ok {
seen[s] = struct{}{}
out = append(out, s)
}
}
return out
}

func parseSelector(sel uint64) (nodev1.ChainType, string, error) {
fam, err := chainsel.GetSelectorFamily(sel)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions deployment/cre/jobs/operations/propose_gateway_job_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,14 +112,14 @@ var commonInput = ProposeGatewayJobInput{
Value: "zone-b",
},
},
DONs: []DON{
Services: []GatewayService{
{
Name: "workflow_1_zone-b",
ServiceName: "workflows",
Handlers: []string{
"http-capabilities",
"web-api-capabilities",
},
F: 1,
DONs: []string{"workflow_1_zone-b"},
},
},
GatewayKeyChainSelector: 10344971235874465080,
Expand Down Expand Up @@ -149,7 +149,8 @@ func TestProposeGatewayJob(t *testing.T) {
output: ProposeGatewayJobOutput{
Specs: map[string][]string{
"node_5": {
"type = 'gateway'\nschemaVersion = 1\nname = 'CRE Gateway'\nexternalJobID = 'cf8aa339-6349-5e5b-9289-5c2907711200'\nforwardingAllowed = false\n\n[gatewayConfig]\n[gatewayConfig.ConnectionManagerConfig]\nAuthChallengeLen = 10\nAuthGatewayId = 'gateway-node-0'\nAuthTimestampToleranceSec = 5\nHeartbeatIntervalSec = 20\n\n[[gatewayConfig.Dons]]\nDonId = 'workflow_1_zone-b'\nF = 1\n\n[[gatewayConfig.Dons.Handlers]]\nName = 'http-capabilities'\nServiceName = 'workflows'\n\n[gatewayConfig.Dons.Handlers.Config]\nCleanUpPeriodMs = 600000\n\n[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter]\nglobalBurst = 100\nglobalRPS = 500\nperSenderBurst = 100\nperSenderRPS = 100\n\n[[gatewayConfig.Dons.Handlers]]\nName = 'web-api-capabilities'\n\n[gatewayConfig.Dons.Handlers.Config]\nmaxAllowedMessageAgeSec = 1000\n\n[gatewayConfig.Dons.Handlers.Config.NodeRateLimiter]\nglobalBurst = 10\nglobalRPS = 50\nperSenderBurst = 10\nperSenderRPS = 10\n\n[[gatewayConfig.Dons.Members]]\nAddress = '0x04'\nName = 'cl-cre-one-zone-b-0 (DON workflow_1_zone-b)'\n\n[gatewayConfig.HTTPClientConfig]\nMaxResponseBytes = 50000000\nAllowedPorts = [443]\nAllowedSchemes = ['https']\nAllowedIPsCIDR = []\n\n[gatewayConfig.NodeServerConfig]\nHandshakeTimeoutMillis = 1000\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5003\nReadTimeoutMillis = 1000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 1000\n\n[gatewayConfig.UserServerConfig]\nContentTypeHeader = 'application/jsonrpc'\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5002\nReadTimeoutMillis = 5000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 6000\n"},
"type = 'gateway'\nschemaVersion = 1\nname = 'CRE Gateway'\nexternalJobID = 'cf8aa339-6349-5e5b-9289-5c2907711200'\nforwardingAllowed = false\n\n[gatewayConfig]\n[gatewayConfig.ConnectionManagerConfig]\nAuthChallengeLen = 10\nAuthGatewayId = 'gateway-node-0'\nAuthTimestampToleranceSec = 5\nHeartbeatIntervalSec = 20\n\n[[gatewayConfig.ShardedDONs]]\nDonName = 'workflow_1_zone-b'\nF = 0\n\n[[gatewayConfig.ShardedDONs.Shards]]\n[[gatewayConfig.ShardedDONs.Shards.Nodes]]\nAddress = '0x04'\nName = 'cl-cre-one-zone-b-0 (DON workflow_1_zone-b)'\n\n[[gatewayConfig.Services]]\nServiceName = 'workflows'\nDONs = ['workflow_1_zone-b']\n\n[[gatewayConfig.Services.Handlers]]\nName = 'http-capabilities'\nServiceName = 'workflows'\n\n[gatewayConfig.Services.Handlers.Config]\nCleanUpPeriodMs = 600000\n\n[gatewayConfig.Services.Handlers.Config.NodeRateLimiter]\nglobalBurst = 100\nglobalRPS = 500\nperSenderBurst = 100\nperSenderRPS = 100\n\n[[gatewayConfig.Services.Handlers]]\nName = 'web-api-capabilities'\n\n[gatewayConfig.Services.Handlers.Config]\nmaxAllowedMessageAgeSec = 1000\n\n[gatewayConfig.Services.Handlers.Config.NodeRateLimiter]\nglobalBurst = 10\nglobalRPS = 50\nperSenderBurst = 10\nperSenderRPS = 10\n\n[gatewayConfig.HTTPClientConfig]\nMaxResponseBytes = 50000000\nAllowedPorts = [443]\nAllowedSchemes = ['https']\nAllowedIPsCIDR = []\n\n[gatewayConfig.NodeServerConfig]\nHandshakeTimeoutMillis = 1000\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5003\nReadTimeoutMillis = 1000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 1000\n\n[gatewayConfig.UserServerConfig]\nContentTypeHeader = 'application/jsonrpc'\nMaxRequestBytes = 100000\nPath = '/'\nPort = 5002\nReadTimeoutMillis = 5000\nRequestTimeoutMillis = 5000\nWriteTimeoutMillis = 6000\n",
},
},
},
},
Expand Down Expand Up @@ -193,7 +194,6 @@ func TestProposeGatewayJob(t *testing.T) {
assert.Contains(t, err.Error(), tc.errorMsg)
} else {
require.NoError(t, err)
require.NotNil(t, output)
}

require.Equal(t, tc.output, output)
Expand Down
Loading
Loading