Skip to content

Commit dc05911

Browse files
committed
feat: implement multi-account support with round-robin address selection for DA submissions
1 parent c6dd42c commit dc05911

File tree

10 files changed

+445
-19
lines changed

10 files changed

+445
-19
lines changed

block/internal/submitting/da_submitter.go

Lines changed: 65 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@ package submitting
33
import (
44
"bytes"
55
"context"
6+
"encoding/json"
67
"fmt"
78
"time"
89

@@ -13,6 +14,7 @@ import (
1314
"github.com/evstack/ev-node/block/internal/common"
1415
coreda "github.com/evstack/ev-node/core/da"
1516
"github.com/evstack/ev-node/pkg/config"
17+
pkgda "github.com/evstack/ev-node/pkg/da"
1618
"github.com/evstack/ev-node/pkg/genesis"
1719
"github.com/evstack/ev-node/pkg/rpc/server"
1820
"github.com/evstack/ev-node/pkg/signer"
@@ -125,6 +127,9 @@ type DASubmitter struct {
125127
// calculate namespaces bytes once and reuse them
126128
namespaceBz []byte
127129
namespaceDataBz []byte
130+
131+
// address selector for multi-account support
132+
addressSelector pkgda.AddressSelector
128133
}
129134

130135
// NewDASubmitter creates a new DA submitter
@@ -148,6 +153,17 @@ func NewDASubmitter(
148153
metrics = common.NopMetrics()
149154
}
150155

156+
// Create address selector based on configuration
157+
var addressSelector pkgda.AddressSelector
158+
if len(config.DA.SigningAddresses) > 0 {
159+
addressSelector = pkgda.NewRoundRobinSelector(config.DA.SigningAddresses)
160+
daSubmitterLogger.Info().
161+
Int("num_addresses", len(config.DA.SigningAddresses)).
162+
Msg("initialized round-robin address selector for multi-account DA submissions")
163+
} else {
164+
addressSelector = pkgda.NewNoOpSelector()
165+
}
166+
151167
return &DASubmitter{
152168
da: da,
153169
config: config,
@@ -157,6 +173,7 @@ func NewDASubmitter(
157173
logger: daSubmitterLogger,
158174
namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(),
159175
namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(),
176+
addressSelector: addressSelector,
160177
}
161178
}
162179

@@ -236,7 +253,6 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er
236253
"header",
237254
s.namespaceBz,
238255
[]byte(s.config.DA.SubmitOptions),
239-
cache,
240256
func() uint64 { return cache.NumPendingHeaders() },
241257
)
242258
}
@@ -280,7 +296,6 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe
280296
"data",
281297
s.namespaceDataBz,
282298
[]byte(s.config.DA.SubmitOptions),
283-
cache,
284299
func() uint64 { return cache.NumPendingData() },
285300
)
286301
}
@@ -341,6 +356,41 @@ func (s *DASubmitter) createSignedData(dataList []*types.SignedData, signer sign
341356
return signedDataList, nil
342357
}
343358

359+
// mergeSubmitOptions merges the base submit options with a signing address.
360+
// If the base options are valid JSON, the signing address is added to the JSON object.
361+
// Otherwise, a new JSON object is created with just the signing address.
362+
// Returns the base options unchanged if no signing address is provided.
363+
func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, error) {
364+
if signingAddress == "" {
365+
return baseOptions, nil
366+
}
367+
368+
// Create options map with signing address
369+
optionsMap := make(map[string]interface{})
370+
371+
// If base options are provided, try to parse them as JSON
372+
if len(baseOptions) > 0 {
373+
// Try to unmarshal existing options
374+
if err := json.Unmarshal(baseOptions, &optionsMap); err != nil {
375+
// If it's not valid JSON, start with empty map
376+
// Log at debug level since this is expected for non-JSON options
377+
optionsMap = make(map[string]interface{})
378+
}
379+
}
380+
381+
// Add or override the signing address
382+
// Note: Uses "signer_address" to match Celestia's TxConfig JSON schema
383+
optionsMap["signer_address"] = signingAddress
384+
385+
// Marshal back to JSON
386+
mergedOptions, err := json.Marshal(optionsMap)
387+
if err != nil {
388+
return nil, fmt.Errorf("failed to marshal submit options: %w", err)
389+
}
390+
391+
return mergedOptions, nil
392+
}
393+
344394
// submitToDA is a generic helper for submitting items to the DA layer with retry, backoff, and gas price logic.
345395
func submitToDA[T any](
346396
s *DASubmitter,
@@ -351,7 +401,6 @@ func submitToDA[T any](
351401
itemType string,
352402
namespace []byte,
353403
options []byte,
354-
cache cache.Manager,
355404
getTotalPendingFn func() uint64,
356405
) error {
357406
marshaled, err := marshalItems(ctx, items, marshalFn, itemType)
@@ -393,10 +442,22 @@ func submitToDA[T any](
393442
return err
394443
}
395444

445+
// Select signing address and merge with options
446+
signingAddress := s.addressSelector.Next()
447+
mergedOptions, err := mergeSubmitOptions(options, signingAddress)
448+
if err != nil {
449+
s.logger.Error().Err(err).Msg("failed to merge submit options with signing address")
450+
return fmt.Errorf("failed to merge submit options: %w", err)
451+
}
452+
453+
if signingAddress != "" {
454+
s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission")
455+
}
456+
396457
submitCtx, cancel := context.WithTimeout(ctx, submissionTimeout)
397458
defer cancel()
398459
// Perform submission
399-
res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, options)
460+
res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, mergedOptions)
400461

401462
// Record submission result for observability
402463
if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil {

block/internal/submitting/da_submitter_mocks_test.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) {
8686
nsBz,
8787
opts,
8888
nil,
89-
nil,
9089
)
9190
assert.NoError(t, err)
9291

@@ -138,7 +137,6 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) {
138137
nsBz,
139138
opts,
140139
nil,
141-
nil,
142140
)
143141
assert.NoError(t, err)
144142
assert.Equal(t, []float64{5.5, 5.5}, usedGas)
@@ -195,7 +193,6 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) {
195193
nsBz,
196194
opts,
197195
nil,
198-
nil,
199196
)
200197
assert.NoError(t, err)
201198
assert.Equal(t, []int{4, 2}, batchSizes)
@@ -245,7 +242,6 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) {
245242
nsBz,
246243
opts,
247244
nil,
248-
nil,
249245
)
250246
assert.NoError(t, err)
251247
assert.Equal(t, []float64{-1, -1}, usedGas)
@@ -286,7 +282,6 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) {
286282
nsBz,
287283
opts,
288284
nil,
289-
nil,
290285
)
291286
assert.NoError(t, err)
292287
assert.Equal(t, 3, totalSubmitted)
Lines changed: 124 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,124 @@
1+
package submitting
2+
3+
import (
4+
"encoding/json"
5+
"testing"
6+
7+
"github.com/stretchr/testify/assert"
8+
"github.com/stretchr/testify/require"
9+
)
10+
11+
func TestMergeSubmitOptions_NoSigningAddress(t *testing.T) {
12+
baseOptions := []byte(`{"key":"value"}`)
13+
14+
result, err := mergeSubmitOptions(baseOptions, "")
15+
require.NoError(t, err)
16+
assert.Equal(t, baseOptions, result, "should return unchanged options when no signing address")
17+
}
18+
19+
func TestMergeSubmitOptions_EmptyBaseOptions(t *testing.T) {
20+
signingAddress := "celestia1abc123"
21+
22+
result, err := mergeSubmitOptions([]byte{}, signingAddress)
23+
require.NoError(t, err)
24+
25+
var resultMap map[string]interface{}
26+
err = json.Unmarshal(result, &resultMap)
27+
require.NoError(t, err)
28+
29+
assert.Equal(t, signingAddress, resultMap["signer_address"])
30+
}
31+
32+
func TestMergeSubmitOptions_ValidJSON(t *testing.T) {
33+
baseOptions := []byte(`{"existing":"option","number":42}`)
34+
signingAddress := "celestia1def456"
35+
36+
result, err := mergeSubmitOptions(baseOptions, signingAddress)
37+
require.NoError(t, err)
38+
39+
var resultMap map[string]interface{}
40+
err = json.Unmarshal(result, &resultMap)
41+
require.NoError(t, err)
42+
43+
assert.Equal(t, "option", resultMap["existing"])
44+
assert.Equal(t, float64(42), resultMap["number"]) // JSON numbers are float64
45+
assert.Equal(t, signingAddress, resultMap["signer_address"])
46+
}
47+
48+
func TestMergeSubmitOptions_InvalidJSON(t *testing.T) {
49+
baseOptions := []byte(`not-json-content`)
50+
signingAddress := "celestia1ghi789"
51+
52+
result, err := mergeSubmitOptions(baseOptions, signingAddress)
53+
require.NoError(t, err)
54+
55+
var resultMap map[string]interface{}
56+
err = json.Unmarshal(result, &resultMap)
57+
require.NoError(t, err)
58+
59+
// Should create new JSON object with just the signing address
60+
assert.Equal(t, signingAddress, resultMap["signer_address"])
61+
assert.Len(t, resultMap, 1, "should only contain signing address when base options are invalid JSON")
62+
}
63+
64+
func TestMergeSubmitOptions_OverrideExistingAddress(t *testing.T) {
65+
baseOptions := []byte(`{"signer_address":"old-address","other":"data"}`)
66+
newAddress := "celestia1new456"
67+
68+
result, err := mergeSubmitOptions(baseOptions, newAddress)
69+
require.NoError(t, err)
70+
71+
var resultMap map[string]interface{}
72+
err = json.Unmarshal(result, &resultMap)
73+
require.NoError(t, err)
74+
75+
assert.Equal(t, newAddress, resultMap["signer_address"], "should override existing signing address")
76+
assert.Equal(t, "data", resultMap["other"])
77+
}
78+
79+
func TestMergeSubmitOptions_NilBaseOptions(t *testing.T) {
80+
signingAddress := "celestia1jkl012"
81+
82+
result, err := mergeSubmitOptions(nil, signingAddress)
83+
require.NoError(t, err)
84+
85+
var resultMap map[string]interface{}
86+
err = json.Unmarshal(result, &resultMap)
87+
require.NoError(t, err)
88+
89+
assert.Equal(t, signingAddress, resultMap["signer_address"])
90+
}
91+
92+
func TestMergeSubmitOptions_ComplexJSON(t *testing.T) {
93+
baseOptions := []byte(`{
94+
"nested": {
95+
"key": "value"
96+
},
97+
"array": [1, 2, 3],
98+
"bool": true
99+
}`)
100+
signingAddress := "celestia1complex"
101+
102+
result, err := mergeSubmitOptions(baseOptions, signingAddress)
103+
require.NoError(t, err)
104+
105+
var resultMap map[string]interface{}
106+
err = json.Unmarshal(result, &resultMap)
107+
require.NoError(t, err)
108+
109+
// Check nested structure is preserved
110+
nested, ok := resultMap["nested"].(map[string]interface{})
111+
require.True(t, ok)
112+
assert.Equal(t, "value", nested["key"])
113+
114+
// Check array is preserved
115+
array, ok := resultMap["array"].([]interface{})
116+
require.True(t, ok)
117+
assert.Len(t, array, 3)
118+
119+
// Check bool is preserved
120+
assert.Equal(t, true, resultMap["bool"])
121+
122+
// Check signing address was added
123+
assert.Equal(t, signingAddress, resultMap["signer_address"])
124+
}

core/da/dummy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -221,7 +221,7 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice
221221

222222
d.blobs[idStr] = blob
223223
d.commitments[idStr] = commitment
224-
d.proofs[idStr] = commitment // Simple proof
224+
d.proofs[idStr] = commitment // Simple proof
225225
d.namespaceByID[idStr] = namespace // Store namespace for this blob
226226

227227
ids = append(ids, id)

docs/learn/config.md

Lines changed: 40 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ This document provides a comprehensive reference for all configuration options a
2424
- [DA Gas Price](#da-gas-price)
2525
- [DA Gas Multiplier](#da-gas-multiplier)
2626
- [DA Submit Options](#da-submit-options)
27+
- [DA Signing Addresses](#da-signing-addresses)
2728
- [DA Namespace](#da-namespace)
2829
- [DA Header Namespace](#da-namespace)
2930
- [DA Data Namespace](#da-data-namespace)
@@ -377,13 +378,15 @@ _Constant:_ `FlagDAGasMultiplier`
377378
### DA Submit Options
378379

379380
**Description:**
380-
Additional options passed to the DA layer when submitting data. The format and meaning of these options depend on the specific DA implementation being used.
381+
Additional options passed to the DA layer when submitting data. The format and meaning of these options depend on the specific DA implementation being used. For example, with Celestia, this can include custom gas settings or other submission parameters in JSON format.
382+
383+
**Note:** If you configure multiple signing addresses (see [DA Signing Addresses](#da-signing-addresses)), the selected signing address will be automatically merged into these options as a JSON field `signer_address` (matching Celestia's TxConfig schema). If the base options are already valid JSON, the signing address is added to the existing object; otherwise, a new JSON object is created.
381384

382385
**YAML:**
383386

384387
```yaml
385388
da:
386-
submit_options: "{"key":"value"}" # Example, format depends on DA layer
389+
submit_options: '{"key":"value"}' # Example, format depends on DA layer
387390
```
388391

389392
**Command-line Flag:**
@@ -392,6 +395,41 @@ _Example:_ `--rollkit.da.submit_options '{"custom_param":true}'`
392395
_Default:_ `""` (empty)
393396
_Constant:_ `FlagDASubmitOptions`
394397

398+
### DA Signing Addresses
399+
400+
**Description:**
401+
A comma-separated list of signing addresses to use for DA blob submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches that can occur with high-throughput Cosmos SDK-based DA layers. This is particularly useful for Celestia when submitting many transactions concurrently.
402+
403+
Each submission will select the next address in the list, and that address will be automatically added to the `submit_options` as `signer_address`. This ensures that the DA layer (e.g., celestia-node) uses the specified account for signing that particular blob submission.
404+
405+
**Setup Requirements:**
406+
407+
- All addresses must be loaded into the DA node's keyring and have sufficient funds for transaction fees
408+
- For Celestia, see the guide on setting up multiple accounts in the DA node documentation
409+
410+
**YAML:**
411+
412+
```yaml
413+
da:
414+
signing_addresses:
415+
- "celestia1abc123..."
416+
- "celestia1def456..."
417+
- "celestia1ghi789..."
418+
```
419+
420+
**Command-line Flag:**
421+
`--rollkit.da.signing_addresses <string>`
422+
_Example:_ `--rollkit.da.signing_addresses celestia1abc...,celestia1def...,celestia1ghi...`
423+
_Default:_ `[]` (empty, uses default DA node behavior)
424+
_Constant:_ `FlagDASigningAddresses`
425+
426+
**Behavior:**
427+
428+
- If no signing addresses are configured, submissions use the DA layer's default signing behavior
429+
- If one address is configured, all submissions use that address
430+
- If multiple addresses are configured, they are used in round-robin order to distribute the load and prevent nonce/sequence conflicts
431+
- The address selection is thread-safe for concurrent submissions
432+
395433
### DA Namespace
396434

397435
**Description:**

0 commit comments

Comments
 (0)