From dc059114f775c083099faa529da7c1a15a4153b8 Mon Sep 17 00:00:00 2001 From: Randy Grok Date: Thu, 23 Oct 2025 16:46:33 +0200 Subject: [PATCH 1/9] feat: implement multi-account support with round-robin address selection for DA submissions --- block/internal/submitting/da_submitter.go | 69 ++++++++- .../submitting/da_submitter_mocks_test.go | 5 - .../submitting/da_submitter_options_test.go | 124 +++++++++++++++ core/da/dummy.go | 2 +- docs/learn/config.md | 42 ++++- pkg/config/config.go | 4 + pkg/config/config_test.go | 3 +- pkg/da/selector.go | 57 +++++++ pkg/da/selector_test.go | 146 ++++++++++++++++++ sequencers/single/queue_test.go | 12 +- 10 files changed, 445 insertions(+), 19 deletions(-) create mode 100644 block/internal/submitting/da_submitter_options_test.go create mode 100644 pkg/da/selector.go create mode 100644 pkg/da/selector_test.go diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index baccd069b9..7428496b0f 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -3,6 +3,7 @@ package submitting import ( "bytes" "context" + "encoding/json" "fmt" "time" @@ -13,6 +14,7 @@ import ( "github.com/evstack/ev-node/block/internal/common" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" + pkgda "github.com/evstack/ev-node/pkg/da" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/rpc/server" "github.com/evstack/ev-node/pkg/signer" @@ -125,6 +127,9 @@ type DASubmitter struct { // calculate namespaces bytes once and reuse them namespaceBz []byte namespaceDataBz []byte + + // address selector for multi-account support + addressSelector pkgda.AddressSelector } // NewDASubmitter creates a new DA submitter @@ -148,6 +153,17 @@ func NewDASubmitter( metrics = common.NopMetrics() } + // Create address selector based on configuration + var addressSelector pkgda.AddressSelector + if len(config.DA.SigningAddresses) > 0 { + addressSelector = pkgda.NewRoundRobinSelector(config.DA.SigningAddresses) + daSubmitterLogger.Info(). + Int("num_addresses", len(config.DA.SigningAddresses)). + Msg("initialized round-robin address selector for multi-account DA submissions") + } else { + addressSelector = pkgda.NewNoOpSelector() + } + return &DASubmitter{ da: da, config: config, @@ -157,6 +173,7 @@ func NewDASubmitter( logger: daSubmitterLogger, namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), + addressSelector: addressSelector, } } @@ -236,7 +253,6 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er "header", s.namespaceBz, []byte(s.config.DA.SubmitOptions), - cache, func() uint64 { return cache.NumPendingHeaders() }, ) } @@ -280,7 +296,6 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe "data", s.namespaceDataBz, []byte(s.config.DA.SubmitOptions), - cache, func() uint64 { return cache.NumPendingData() }, ) } @@ -341,6 +356,41 @@ func (s *DASubmitter) createSignedData(dataList []*types.SignedData, signer sign return signedDataList, nil } +// mergeSubmitOptions merges the base submit options with a signing address. +// If the base options are valid JSON, the signing address is added to the JSON object. +// Otherwise, a new JSON object is created with just the signing address. +// Returns the base options unchanged if no signing address is provided. +func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, error) { + if signingAddress == "" { + return baseOptions, nil + } + + // Create options map with signing address + optionsMap := make(map[string]interface{}) + + // If base options are provided, try to parse them as JSON + if len(baseOptions) > 0 { + // Try to unmarshal existing options + if err := json.Unmarshal(baseOptions, &optionsMap); err != nil { + // If it's not valid JSON, start with empty map + // Log at debug level since this is expected for non-JSON options + optionsMap = make(map[string]interface{}) + } + } + + // Add or override the signing address + // Note: Uses "signer_address" to match Celestia's TxConfig JSON schema + optionsMap["signer_address"] = signingAddress + + // Marshal back to JSON + mergedOptions, err := json.Marshal(optionsMap) + if err != nil { + return nil, fmt.Errorf("failed to marshal submit options: %w", err) + } + + return mergedOptions, nil +} + // submitToDA is a generic helper for submitting items to the DA layer with retry, backoff, and gas price logic. func submitToDA[T any]( s *DASubmitter, @@ -351,7 +401,6 @@ func submitToDA[T any]( itemType string, namespace []byte, options []byte, - cache cache.Manager, getTotalPendingFn func() uint64, ) error { marshaled, err := marshalItems(ctx, items, marshalFn, itemType) @@ -393,10 +442,22 @@ func submitToDA[T any]( return err } + // Select signing address and merge with options + signingAddress := s.addressSelector.Next() + mergedOptions, err := mergeSubmitOptions(options, signingAddress) + if err != nil { + s.logger.Error().Err(err).Msg("failed to merge submit options with signing address") + return fmt.Errorf("failed to merge submit options: %w", err) + } + + if signingAddress != "" { + s.logger.Debug().Str("signingAddress", signingAddress).Msg("using signing address for DA submission") + } + submitCtx, cancel := context.WithTimeout(ctx, submissionTimeout) defer cancel() // Perform submission - res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, options) + res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, mergedOptions) // Record submission result for observability if daVisualizationServer := server.GetDAVisualizationServer(); daVisualizationServer != nil { diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index fc309e0638..9ef14fba36 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -86,7 +86,6 @@ func TestSubmitToDA_MempoolRetry_IncreasesGasAndSucceeds(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) @@ -138,7 +137,6 @@ func TestSubmitToDA_UnknownError_RetriesSameGasThenSucceeds(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, []float64{5.5, 5.5}, usedGas) @@ -195,7 +193,6 @@ func TestSubmitToDA_TooBig_HalvesBatch(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, []int{4, 2}, batchSizes) @@ -245,7 +242,6 @@ func TestSubmitToDA_SentinelNoGas_PreservesGasAcrossRetries(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, []float64{-1, -1}, usedGas) @@ -286,7 +282,6 @@ func TestSubmitToDA_PartialSuccess_AdvancesWindow(t *testing.T) { nsBz, opts, nil, - nil, ) assert.NoError(t, err) assert.Equal(t, 3, totalSubmitted) diff --git a/block/internal/submitting/da_submitter_options_test.go b/block/internal/submitting/da_submitter_options_test.go new file mode 100644 index 0000000000..b14d3d9f52 --- /dev/null +++ b/block/internal/submitting/da_submitter_options_test.go @@ -0,0 +1,124 @@ +package submitting + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestMergeSubmitOptions_NoSigningAddress(t *testing.T) { + baseOptions := []byte(`{"key":"value"}`) + + result, err := mergeSubmitOptions(baseOptions, "") + require.NoError(t, err) + assert.Equal(t, baseOptions, result, "should return unchanged options when no signing address") +} + +func TestMergeSubmitOptions_EmptyBaseOptions(t *testing.T) { + signingAddress := "celestia1abc123" + + result, err := mergeSubmitOptions([]byte{}, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_ValidJSON(t *testing.T) { + baseOptions := []byte(`{"existing":"option","number":42}`) + signingAddress := "celestia1def456" + + result, err := mergeSubmitOptions(baseOptions, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, "option", resultMap["existing"]) + assert.Equal(t, float64(42), resultMap["number"]) // JSON numbers are float64 + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_InvalidJSON(t *testing.T) { + baseOptions := []byte(`not-json-content`) + signingAddress := "celestia1ghi789" + + result, err := mergeSubmitOptions(baseOptions, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + // Should create new JSON object with just the signing address + assert.Equal(t, signingAddress, resultMap["signer_address"]) + assert.Len(t, resultMap, 1, "should only contain signing address when base options are invalid JSON") +} + +func TestMergeSubmitOptions_OverrideExistingAddress(t *testing.T) { + baseOptions := []byte(`{"signer_address":"old-address","other":"data"}`) + newAddress := "celestia1new456" + + result, err := mergeSubmitOptions(baseOptions, newAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, newAddress, resultMap["signer_address"], "should override existing signing address") + assert.Equal(t, "data", resultMap["other"]) +} + +func TestMergeSubmitOptions_NilBaseOptions(t *testing.T) { + signingAddress := "celestia1jkl012" + + result, err := mergeSubmitOptions(nil, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} + +func TestMergeSubmitOptions_ComplexJSON(t *testing.T) { + baseOptions := []byte(`{ + "nested": { + "key": "value" + }, + "array": [1, 2, 3], + "bool": true + }`) + signingAddress := "celestia1complex" + + result, err := mergeSubmitOptions(baseOptions, signingAddress) + require.NoError(t, err) + + var resultMap map[string]interface{} + err = json.Unmarshal(result, &resultMap) + require.NoError(t, err) + + // Check nested structure is preserved + nested, ok := resultMap["nested"].(map[string]interface{}) + require.True(t, ok) + assert.Equal(t, "value", nested["key"]) + + // Check array is preserved + array, ok := resultMap["array"].([]interface{}) + require.True(t, ok) + assert.Len(t, array, 3) + + // Check bool is preserved + assert.Equal(t, true, resultMap["bool"]) + + // Check signing address was added + assert.Equal(t, signingAddress, resultMap["signer_address"]) +} diff --git a/core/da/dummy.go b/core/da/dummy.go index a1b9ca2e90..0f9fd38244 100644 --- a/core/da/dummy.go +++ b/core/da/dummy.go @@ -221,7 +221,7 @@ func (d *DummyDA) SubmitWithOptions(ctx context.Context, blobs []Blob, gasPrice d.blobs[idStr] = blob d.commitments[idStr] = commitment - d.proofs[idStr] = commitment // Simple proof + d.proofs[idStr] = commitment // Simple proof d.namespaceByID[idStr] = namespace // Store namespace for this blob ids = append(ids, id) diff --git a/docs/learn/config.md b/docs/learn/config.md index 6fa6befe0c..c4af5ad4b6 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -24,6 +24,7 @@ This document provides a comprehensive reference for all configuration options a - [DA Gas Price](#da-gas-price) - [DA Gas Multiplier](#da-gas-multiplier) - [DA Submit Options](#da-submit-options) + - [DA Signing Addresses](#da-signing-addresses) - [DA Namespace](#da-namespace) - [DA Header Namespace](#da-namespace) - [DA Data Namespace](#da-data-namespace) @@ -377,13 +378,15 @@ _Constant:_ `FlagDAGasMultiplier` ### DA Submit Options **Description:** -Additional options passed to the DA layer when submitting data. The format and meaning of these options depend on the specific DA implementation being used. +Additional options passed to the DA layer when submitting data. The format and meaning of these options depend on the specific DA implementation being used. For example, with Celestia, this can include custom gas settings or other submission parameters in JSON format. + +**Note:** If you configure multiple signing addresses (see [DA Signing Addresses](#da-signing-addresses)), the selected signing address will be automatically merged into these options as a JSON field `signer_address` (matching Celestia's TxConfig schema). If the base options are already valid JSON, the signing address is added to the existing object; otherwise, a new JSON object is created. **YAML:** ```yaml da: - submit_options: "{"key":"value"}" # Example, format depends on DA layer + submit_options: '{"key":"value"}' # Example, format depends on DA layer ``` **Command-line Flag:** @@ -392,6 +395,41 @@ _Example:_ `--rollkit.da.submit_options '{"custom_param":true}'` _Default:_ `""` (empty) _Constant:_ `FlagDASubmitOptions` +### DA Signing Addresses + +**Description:** +A comma-separated list of signing addresses to use for DA blob submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches that can occur with high-throughput Cosmos SDK-based DA layers. This is particularly useful for Celestia when submitting many transactions concurrently. + +Each submission will select the next address in the list, and that address will be automatically added to the `submit_options` as `signer_address`. This ensures that the DA layer (e.g., celestia-node) uses the specified account for signing that particular blob submission. + +**Setup Requirements:** + +- All addresses must be loaded into the DA node's keyring and have sufficient funds for transaction fees +- For Celestia, see the guide on setting up multiple accounts in the DA node documentation + +**YAML:** + +```yaml +da: + signing_addresses: + - "celestia1abc123..." + - "celestia1def456..." + - "celestia1ghi789..." +``` + +**Command-line Flag:** +`--rollkit.da.signing_addresses ` +_Example:_ `--rollkit.da.signing_addresses celestia1abc...,celestia1def...,celestia1ghi...` +_Default:_ `[]` (empty, uses default DA node behavior) +_Constant:_ `FlagDASigningAddresses` + +**Behavior:** + +- If no signing addresses are configured, submissions use the DA layer's default signing behavior +- If one address is configured, all submissions use that address +- If multiple addresses are configured, they are used in round-robin order to distribute the load and prevent nonce/sequence conflicts +- The address selection is thread-safe for concurrent submissions + ### DA Namespace **Description:** diff --git a/pkg/config/config.go b/pkg/config/config.go index d7309d7c3a..ae10f7589c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -68,6 +68,8 @@ const ( FlagDADataNamespace = FlagPrefixEvnode + "da.data_namespace" // FlagDASubmitOptions is a flag for data availability submit options FlagDASubmitOptions = FlagPrefixEvnode + "da.submit_options" + // FlagDASigningAddresses is a flag for specifying multiple DA signing addresses + FlagDASigningAddresses = FlagPrefixEvnode + "da.signing_addresses" // FlagDAMempoolTTL is a flag for specifying the DA mempool TTL FlagDAMempoolTTL = FlagPrefixEvnode + "da.mempool_ttl" // FlagDAMaxSubmitAttempts is a flag for specifying the maximum DA submit attempts @@ -161,6 +163,7 @@ type DAConfig struct { GasPrice float64 `mapstructure:"gas_price" yaml:"gas_price" comment:"Gas price for data availability transactions. Use -1 for automatic gas price determination. Higher values may result in faster inclusion."` GasMultiplier float64 `mapstructure:"gas_multiplier" yaml:"gas_multiplier" comment:"Multiplier applied to gas price when retrying failed DA submissions. Values > 1 increase gas price on retries to improve chances of inclusion."` SubmitOptions string `mapstructure:"submit_options" yaml:"submit_options" comment:"Additional options passed to the DA layer when submitting data. Format depends on the specific DA implementation being used."` + SigningAddresses []string `mapstructure:"signing_addresses" yaml:"signing_addresses" comment:"List of signing addresses to use for DA submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches. Useful for high-throughput chains using Cosmos SDK."` Namespace string `mapstructure:"namespace" yaml:"namespace" comment:"Namespace ID used when submitting blobs to the DA layer. When a DataNamespace is provided, only the header is sent to this namespace."` DataNamespace string `mapstructure:"data_namespace" yaml:"data_namespace" comment:"Namespace ID for submitting data to DA layer. Use this to speed-up light clients."` BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."` @@ -325,6 +328,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagDANamespace, def.DA.Namespace, "DA namespace for header (or blob) submissions") cmd.Flags().String(FlagDADataNamespace, def.DA.DataNamespace, "DA namespace for data submissions") cmd.Flags().String(FlagDASubmitOptions, def.DA.SubmitOptions, "DA submit options") + cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of signing addresses for DA submissions (used in round-robin)") cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool") cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up") diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 3f9b85913a..599e7bc49b 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -72,6 +72,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagDAGasMultiplier, DefaultConfig().DA.GasMultiplier) assertFlagValue(t, flags, FlagDANamespace, DefaultConfig().DA.Namespace) assertFlagValue(t, flags, FlagDASubmitOptions, DefaultConfig().DA.SubmitOptions) + assertFlagValue(t, flags, FlagDASigningAddresses, DefaultConfig().DA.SigningAddresses) assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL) assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts) @@ -103,7 +104,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address) // Count the number of flags we're explicitly checking - expectedFlagCount := 38 // Update this number if you add more flag checks above + expectedFlagCount := 39 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 diff --git a/pkg/da/selector.go b/pkg/da/selector.go new file mode 100644 index 0000000000..81618e65ed --- /dev/null +++ b/pkg/da/selector.go @@ -0,0 +1,57 @@ +package da + +import ( + "sync/atomic" +) + +// AddressSelector defines the interface for selecting a signing address from a list. +type AddressSelector interface { + // Next returns the next address to use for signing. + // Returns empty string if no addresses are configured. + Next() string +} + +// RoundRobinSelector implements round-robin selection of signing addresses. +// This helps prevent sequence mismatches in Cosmos SDK when submitting +// multiple transactions concurrently. +type RoundRobinSelector struct { + addresses []string + counter atomic.Uint64 +} + +// NewRoundRobinSelector creates a new round-robin address selector. +func NewRoundRobinSelector(addresses []string) *RoundRobinSelector { + return &RoundRobinSelector{ + addresses: addresses, + } +} + +// Next returns the next address in round-robin fashion. +// Thread-safe for concurrent access. +func (s *RoundRobinSelector) Next() string { + if len(s.addresses) == 0 { + return "" + } + + if len(s.addresses) == 1 { + return s.addresses[0] + } + + // Atomically increment and get the previous value for this call + index := s.counter.Add(1) - 1 + return s.addresses[index%uint64(len(s.addresses))] +} + +// NoOpSelector always returns an empty string. +// Used when no signing addresses are configured. +type NoOpSelector struct{} + +// NewNoOpSelector creates a selector that returns no address. +func NewNoOpSelector() *NoOpSelector { + return &NoOpSelector{} +} + +// Next returns an empty string. +func (s *NoOpSelector) Next() string { + return "" +} diff --git a/pkg/da/selector_test.go b/pkg/da/selector_test.go new file mode 100644 index 0000000000..cd975a94f1 --- /dev/null +++ b/pkg/da/selector_test.go @@ -0,0 +1,146 @@ +package da + +import ( + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestRoundRobinSelector_EmptyList(t *testing.T) { + selector := NewRoundRobinSelector([]string{}) + + addr := selector.Next() + assert.Empty(t, addr, "should return empty string for empty address list") + + // Multiple calls should still return empty + addr = selector.Next() + assert.Empty(t, addr) +} + +func TestRoundRobinSelector_SingleAddress(t *testing.T) { + addresses := []string{"celestia1abc123"} + selector := NewRoundRobinSelector(addresses) + + // All calls should return the same address + for i := 0; i < 10; i++ { + addr := selector.Next() + assert.Equal(t, "celestia1abc123", addr, "should always return the single address") + } +} + +func TestRoundRobinSelector_MultipleAddresses(t *testing.T) { + addresses := []string{ + "celestia1abc123", + "celestia1def456", + "celestia1ghi789", + } + selector := NewRoundRobinSelector(addresses) + + // First round + assert.Equal(t, "celestia1abc123", selector.Next()) + assert.Equal(t, "celestia1def456", selector.Next()) + assert.Equal(t, "celestia1ghi789", selector.Next()) + + // Second round - should cycle back + assert.Equal(t, "celestia1abc123", selector.Next()) + assert.Equal(t, "celestia1def456", selector.Next()) + assert.Equal(t, "celestia1ghi789", selector.Next()) +} + +func TestRoundRobinSelector_Concurrent(t *testing.T) { + addresses := []string{ + "celestia1abc123", + "celestia1def456", + "celestia1ghi789", + } + selector := NewRoundRobinSelector(addresses) + + const numGoroutines = 100 + const numCallsPerGoroutine = 100 + + results := make([]string, numGoroutines*numCallsPerGoroutine) + var wg sync.WaitGroup + + // Launch concurrent goroutines + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func(start int) { + defer wg.Done() + for j := 0; j < numCallsPerGoroutine; j++ { + addr := selector.Next() + results[start+j] = addr + } + }(i * numCallsPerGoroutine) + } + + wg.Wait() + + // Verify all results are valid addresses + for _, addr := range results { + require.Contains(t, addresses, addr, "all returned addresses should be from the configured list") + } + + // Count occurrences of each address + counts := make(map[string]int) + for _, addr := range results { + counts[addr]++ + } + + // Each address should be used approximately equally (within 10% tolerance) + expectedCount := len(results) / len(addresses) + tolerance := expectedCount / 10 + + for _, addr := range addresses { + count := counts[addr] + assert.InDelta(t, expectedCount, count, float64(tolerance), + "address %s should be used approximately %d times, got %d", addr, expectedCount, count) + } +} + +func TestRoundRobinSelector_WrapAround(t *testing.T) { + addresses := []string{"addr1", "addr2"} + selector := NewRoundRobinSelector(addresses) + + // Test wrap around behavior with large number of calls + seen := make(map[string]int) + for i := 0; i < 1000; i++ { + addr := selector.Next() + seen[addr]++ + } + + // Both addresses should be used 500 times each + assert.Equal(t, 500, seen["addr1"]) + assert.Equal(t, 500, seen["addr2"]) +} + +func TestNoOpSelector(t *testing.T) { + selector := NewNoOpSelector() + + // Should always return empty string + for i := 0; i < 10; i++ { + addr := selector.Next() + assert.Empty(t, addr, "NoOpSelector should always return empty string") + } +} + +func TestNoOpSelector_Concurrent(t *testing.T) { + selector := NewNoOpSelector() + + const numGoroutines = 50 + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + go func() { + defer wg.Done() + for j := 0; j < 100; j++ { + addr := selector.Next() + assert.Empty(t, addr) + } + }() + } + + wg.Wait() +} diff --git a/sequencers/single/queue_test.go b/sequencers/single/queue_test.go index e2be3eaee3..0ede59a90e 100644 --- a/sequencers/single/queue_test.go +++ b/sequencers/single/queue_test.go @@ -283,12 +283,12 @@ func TestLoad_WithMixedData(t *testing.T) { require.Equal(2, bq.Size(), "Queue should contain only the 2 valid batches") // Check hashes to be sure (order might vary depending on datastore query) loadedHashes := make(map[string]bool) -bq.mu.Lock() -for i := bq.head; i < len(bq.queue); i++ { - h, _ := bq.queue[i].Hash() - loadedHashes[hex.EncodeToString(h)] = true -} -bq.mu.Unlock() + bq.mu.Lock() + for i := bq.head; i < len(bq.queue); i++ { + h, _ := bq.queue[i].Hash() + loadedHashes[hex.EncodeToString(h)] = true + } + bq.mu.Unlock() require.True(loadedHashes[hexHash1], "Valid batch 1 not found in queue") require.True(loadedHashes[hexHash2], "Valid batch 2 not found in queue") From 9d34697b9be943c69fa4af4c405c565e6e1f8401 Mon Sep 17 00:00:00 2001 From: Randy Grok <98407738+randygrok@users.noreply.github.com> Date: Wed, 29 Oct 2025 12:02:34 +0100 Subject: [PATCH 2/9] Update pkg/config/config.go Co-authored-by: Marko --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 041f5de177..108efebf4c 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -164,7 +164,7 @@ type DAConfig struct { GasPrice float64 `mapstructure:"gas_price" yaml:"gas_price" comment:"Gas price for data availability transactions. Use -1 for automatic gas price determination. Higher values may result in faster inclusion."` GasMultiplier float64 `mapstructure:"gas_multiplier" yaml:"gas_multiplier" comment:"Multiplier applied to gas price when retrying failed DA submissions. Values > 1 increase gas price on retries to improve chances of inclusion."` SubmitOptions string `mapstructure:"submit_options" yaml:"submit_options" comment:"Additional options passed to the DA layer when submitting data. Format depends on the specific DA implementation being used."` - SigningAddresses []string `mapstructure:"signing_addresses" yaml:"signing_addresses" comment:"List of signing addresses to use for DA submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches. Useful for high-throughput chains using Cosmos SDK."` + SigningAddresses []string `mapstructure:"signing_addresses" yaml:"signing_addresses" comment:"List of addresses to use for DA submissions. When multiple addresses are provided, they will be used in round-robin fashion to prevent sequence mismatches. Useful for high-throughput chains."` Namespace string `mapstructure:"namespace" yaml:"namespace" comment:"Namespace ID used when submitting blobs to the DA layer. When a DataNamespace is provided, only the header is sent to this namespace."` DataNamespace string `mapstructure:"data_namespace" yaml:"data_namespace" comment:"Namespace ID for submitting data to DA layer. Use this to speed-up light clients."` BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."` From 6ac2c9003d30c0e335494f7808912b9696df72e0 Mon Sep 17 00:00:00 2001 From: Randy Grok <98407738+randygrok@users.noreply.github.com> Date: Wed, 29 Oct 2025 12:03:10 +0100 Subject: [PATCH 3/9] Update docs/learn/config.md Co-authored-by: Marko --- docs/learn/config.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/learn/config.md b/docs/learn/config.md index c4af5ad4b6..6e7b99391a 100644 --- a/docs/learn/config.md +++ b/docs/learn/config.md @@ -418,7 +418,7 @@ da: ``` **Command-line Flag:** -`--rollkit.da.signing_addresses ` +`--evnode.da.signing_addresses ` _Example:_ `--rollkit.da.signing_addresses celestia1abc...,celestia1def...,celestia1ghi...` _Default:_ `[]` (empty, uses default DA node behavior) _Constant:_ `FlagDASigningAddresses` From 9c4ffcde70f551f120d2c3ee5aa3ca1f743d63be Mon Sep 17 00:00:00 2001 From: Randy Grok Date: Wed, 29 Oct 2025 12:42:08 +0100 Subject: [PATCH 4/9] fix: ensure options map is initialized correctly in mergeSubmitOptions function test: add case for merging with null JSON base options --- block/internal/submitting/da_submitter.go | 13 ++++++++----- .../submitting/da_submitter_options_test.go | 8 ++++++++ 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 8f425a00c5..4de10fd79a 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -364,19 +364,22 @@ func mergeSubmitOptions(baseOptions []byte, signingAddress string) ([]byte, erro return baseOptions, nil } - // Create options map with signing address - optionsMap := make(map[string]interface{}) + var optionsMap map[string]interface{} // If base options are provided, try to parse them as JSON if len(baseOptions) > 0 { - // Try to unmarshal existing options + // Try to unmarshal existing options, ignoring errors for non-JSON input if err := json.Unmarshal(baseOptions, &optionsMap); err != nil { - // If it's not valid JSON, start with empty map - // Log at debug level since this is expected for non-JSON options + // Not valid JSON - start with empty map optionsMap = make(map[string]interface{}) } } + // Ensure map is initialized even if unmarshal returned nil + if optionsMap == nil { + optionsMap = make(map[string]interface{}) + } + // Add or override the signing address // Note: Uses "signer_address" to match Celestia's TxConfig JSON schema optionsMap["signer_address"] = signingAddress diff --git a/block/internal/submitting/da_submitter_options_test.go b/block/internal/submitting/da_submitter_options_test.go index b14d3d9f52..2f6d17a17d 100644 --- a/block/internal/submitting/da_submitter_options_test.go +++ b/block/internal/submitting/da_submitter_options_test.go @@ -122,3 +122,11 @@ func TestMergeSubmitOptions_ComplexJSON(t *testing.T) { // Check signing address was added assert.Equal(t, signingAddress, resultMap["signer_address"]) } + +func TestMergeSubmitOptions_NullJSON(t *testing.T) { + base := []byte("null") + merged, err := mergeSubmitOptions(base, `{"signer_address": "abc"}`) + require.NoError(t, err) + require.NotNil(t, merged) + require.Contains(t, string(merged), "signer_address") +} From 63fc1f9000abcaedcd44004da71cf34fc584e0f4 Mon Sep 17 00:00:00 2001 From: Randy Grok Date: Wed, 29 Oct 2025 12:44:16 +0100 Subject: [PATCH 5/9] feat: add support for DA signing addresses in entrypoint script --- apps/evm/single/entrypoint.sh | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/apps/evm/single/entrypoint.sh b/apps/evm/single/entrypoint.sh index a91c70d4a9..e23caa76dd 100755 --- a/apps/evm/single/entrypoint.sh +++ b/apps/evm/single/entrypoint.sh @@ -88,6 +88,10 @@ if [ -n "$DA_NAMESPACE" ]; then default_flags="$default_flags --rollkit.da.namespace $DA_NAMESPACE" fi +if [ -n "$DA_SIGNING_ADDRESSES" ]; then + default_flags="$default_flags --rollkit.da.signing_addresses $DA_SIGNING_ADDRESSES" +fi + # If no arguments passed, show help if [ $# -eq 0 ]; then exec evm-single From d87adf580774172cf2f33390f9aff38aab2cae6c Mon Sep 17 00:00:00 2001 From: Randy Grok <98407738+randygrok@users.noreply.github.com> Date: Wed, 29 Oct 2025 13:05:07 +0100 Subject: [PATCH 6/9] Update pkg/config/config.go Co-authored-by: Marko --- pkg/config/config.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config.go b/pkg/config/config.go index 108efebf4c..8e7f8cce26 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -331,7 +331,7 @@ func AddFlags(cmd *cobra.Command) { cmd.Flags().String(FlagDANamespace, def.DA.Namespace, "DA namespace for header (or blob) submissions") cmd.Flags().String(FlagDADataNamespace, def.DA.DataNamespace, "DA namespace for data submissions") cmd.Flags().String(FlagDASubmitOptions, def.DA.SubmitOptions, "DA submit options") - cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of signing addresses for DA submissions (used in round-robin)") + cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of addresses for DA submissions (used in round-robin)") cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool") cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up") From 8933b1717898387f84d7c3bd83a9c4777b8e8bcd Mon Sep 17 00:00:00 2001 From: Randy Grok Date: Wed, 29 Oct 2025 14:26:49 +0100 Subject: [PATCH 7/9] update flag --- pkg/config/config_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 26068269da..c9284076b7 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -105,7 +105,7 @@ func TestAddFlags(t *testing.T) { assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address) // Count the number of flags we're explicitly checking - expectedFlagCount := 39 // Update this number if you add more flag checks above + expectedFlagCount := 40 // Update this number if you add more flag checks above // Get the actual number of flags (both regular and persistent) actualFlagCount := 0 From c6ef5318e56ee6ecaa00036b73ebc8c196c5a881 Mon Sep 17 00:00:00 2001 From: Randy Grok Date: Thu, 30 Oct 2025 08:08:44 +0100 Subject: [PATCH 8/9] fix: improve error handling in RoundRobinSelector for empty address cases --- pkg/da/selector.go | 9 +++++++-- pkg/da/selector_test.go | 19 ++++++++++++------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/pkg/da/selector.go b/pkg/da/selector.go index 81618e65ed..d5bb996fd0 100644 --- a/pkg/da/selector.go +++ b/pkg/da/selector.go @@ -7,7 +7,7 @@ import ( // AddressSelector defines the interface for selecting a signing address from a list. type AddressSelector interface { // Next returns the next address to use for signing. - // Returns empty string if no addresses are configured. + // Implementations may return empty string (NoOpSelector) or panic (RoundRobinSelector with no addresses). Next() string } @@ -20,7 +20,11 @@ type RoundRobinSelector struct { } // NewRoundRobinSelector creates a new round-robin address selector. +// Panics if addresses is empty - use NewNoOpSelector instead. func NewRoundRobinSelector(addresses []string) *RoundRobinSelector { + if len(addresses) == 0 { + panic("NewRoundRobinSelector: addresses slice is empty; use NewNoOpSelector instead") + } return &RoundRobinSelector{ addresses: addresses, } @@ -28,9 +32,10 @@ func NewRoundRobinSelector(addresses []string) *RoundRobinSelector { // Next returns the next address in round-robin fashion. // Thread-safe for concurrent access. +// Panics if no addresses are configured - this indicates a programming error. func (s *RoundRobinSelector) Next() string { if len(s.addresses) == 0 { - return "" + panic("RoundRobinSelector.Next: no addresses configured; use NewNoOpSelector instead") } if len(s.addresses) == 1 { diff --git a/pkg/da/selector_test.go b/pkg/da/selector_test.go index cd975a94f1..c25a35e682 100644 --- a/pkg/da/selector_test.go +++ b/pkg/da/selector_test.go @@ -9,14 +9,19 @@ import ( ) func TestRoundRobinSelector_EmptyList(t *testing.T) { - selector := NewRoundRobinSelector([]string{}) - - addr := selector.Next() - assert.Empty(t, addr, "should return empty string for empty address list") + // Should panic when creating selector with empty address list + assert.Panics(t, func() { + NewRoundRobinSelector([]string{}) + }, "should panic when creating RoundRobinSelector with empty address list") +} - // Multiple calls should still return empty - addr = selector.Next() - assert.Empty(t, addr) +func TestRoundRobinSelector_NextWithoutAddresses(t *testing.T) { + // Should panic if Next is called on a selector with no addresses + // (e.g., if someone creates the struct directly without using the constructor) + selector := &RoundRobinSelector{addresses: []string{}} + assert.Panics(t, func() { + selector.Next() + }, "should panic when calling Next with no addresses configured") } func TestRoundRobinSelector_SingleAddress(t *testing.T) { From 82a1303d9506382cea975af5d0dd1eeb29007546 Mon Sep 17 00:00:00 2001 From: tac0turtle Date: Mon, 10 Nov 2025 11:41:23 +0100 Subject: [PATCH 9/9] fix build after conflict resolution --- block/internal/submitting/da_submitter.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 724aec0600..39c7b1615e 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -466,7 +466,7 @@ func submitToDA[T any]( // Perform submission start := time.Now() - res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, options) + res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, rs.GasPrice, namespace, mergedOptions) s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia") // Record submission result for observability