Skip to content

Commit a5c4434

Browse files
committed
bump batch size and make it a config
1 parent ebff89b commit a5c4434

File tree

5 files changed

+33
-11
lines changed

5 files changed

+33
-11
lines changed

block/internal/da/client.go

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,29 +32,39 @@ type client struct {
3232
da coreda.DA
3333
logger zerolog.Logger
3434
defaultTimeout time.Duration
35+
batchSize int
3536
namespaceBz []byte
3637
namespaceDataBz []byte
3738
}
3839

40+
const (
41+
defaultRetrieveBatchSize = 150
42+
)
43+
3944
// Config contains configuration for the DA client.
4045
type Config struct {
41-
DA coreda.DA
42-
Logger zerolog.Logger
43-
DefaultTimeout time.Duration
44-
Namespace string
45-
DataNamespace string
46+
DA coreda.DA
47+
Logger zerolog.Logger
48+
DefaultTimeout time.Duration
49+
Namespace string
50+
DataNamespace string
51+
RetrieveBatchSize int
4652
}
4753

4854
// NewClient creates a new DA client with pre-calculated namespace bytes.
4955
func NewClient(cfg Config) *client {
5056
if cfg.DefaultTimeout == 0 {
5157
cfg.DefaultTimeout = 30 * time.Second
5258
}
59+
if cfg.RetrieveBatchSize <= 0 {
60+
cfg.RetrieveBatchSize = defaultRetrieveBatchSize
61+
}
5362

5463
return &client{
5564
da: cfg.DA,
5665
logger: cfg.Logger.With().Str("component", "da_client").Logger(),
5766
defaultTimeout: cfg.DefaultTimeout,
67+
batchSize: cfg.RetrieveBatchSize,
5868
namespaceBz: coreda.NamespaceFromString(cfg.Namespace).Bytes(),
5969
namespaceDataBz: coreda.NamespaceFromString(cfg.DataNamespace).Bytes(),
6070
}
@@ -203,7 +213,8 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
203213
}
204214
}
205215
// 2. Get Blobs using the retrieved IDs in batches
206-
batchSize := 100
216+
// Each batch has its own timeout while keeping the link to the parent context
217+
batchSize := c.batchSize
207218
blobs := make([][]byte, 0, len(idsResult.IDs))
208219
for i := 0; i < len(idsResult.IDs); i += batchSize {
209220
end := min(i+batchSize, len(idsResult.IDs))

block/public.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package block
22

33
import (
4+
"time"
5+
46
"github.com/evstack/ev-node/block/internal/common"
57
"github.com/evstack/ev-node/block/internal/da"
68
coreda "github.com/evstack/ev-node/core/da"
@@ -39,9 +41,11 @@ func NewDAClient(
3941
logger zerolog.Logger,
4042
) DAClient {
4143
return da.NewClient(da.Config{
42-
DA: daLayer,
43-
Logger: logger,
44-
Namespace: config.DA.GetNamespace(),
45-
DataNamespace: config.DA.GetDataNamespace(),
44+
DA: daLayer,
45+
Logger: logger,
46+
Namespace: config.DA.GetNamespace(),
47+
DefaultTimeout: 15 * time.Second,
48+
DataNamespace: config.DA.GetDataNamespace(),
49+
RetrieveBatchSize: config.DA.RetrieveBatchSize,
4650
})
4751
}

pkg/config/config.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ const (
7070
FlagDAMempoolTTL = FlagPrefixEvnode + "da.mempool_ttl"
7171
// FlagDAMaxSubmitAttempts is a flag for specifying the maximum DA submit attempts
7272
FlagDAMaxSubmitAttempts = FlagPrefixEvnode + "da.max_submit_attempts"
73+
// FlagDARetrieveBatchSize configures how many IDs are fetched per DA Get request
74+
FlagDARetrieveBatchSize = FlagPrefixEvnode + "da.retrieve_batch_size"
7375

7476
// P2P configuration flags
7577

@@ -162,6 +164,7 @@ type DAConfig struct {
162164
BlockTime DurationWrapper `mapstructure:"block_time" yaml:"block_time" comment:"Average block time of the DA chain (duration). Determines frequency of DA layer syncing, maximum backoff time for retries, and is multiplied by MempoolTTL to calculate transaction expiration. Examples: \"15s\", \"30s\", \"1m\", \"2m30s\", \"10m\"."`
163165
MempoolTTL uint64 `mapstructure:"mempool_ttl" yaml:"mempool_ttl" comment:"Number of DA blocks after which a transaction is considered expired and dropped from the mempool. Controls retry backoff timing."`
164166
MaxSubmitAttempts int `mapstructure:"max_submit_attempts" yaml:"max_submit_attempts" comment:"Maximum number of attempts to submit data to the DA layer before giving up. Higher values provide more resilience but can delay error reporting."`
167+
RetrieveBatchSize int `mapstructure:"retrieve_batch_size" yaml:"retrieve_batch_size" comment:"Number of IDs to request per DA Get call when retrieving blobs. Smaller batches lower per-request latency; larger batches reduce the number of RPC round trips. Default: 100."`
165168
}
166169

167170
// GetNamespace returns the namespace for header submissions.
@@ -320,6 +323,7 @@ func AddFlags(cmd *cobra.Command) {
320323
cmd.Flags().StringSlice(FlagDASigningAddresses, def.DA.SigningAddresses, "Comma-separated list of addresses for DA submissions (used in round-robin)")
321324
cmd.Flags().Uint64(FlagDAMempoolTTL, def.DA.MempoolTTL, "number of DA blocks until transaction is dropped from the mempool")
322325
cmd.Flags().Int(FlagDAMaxSubmitAttempts, def.DA.MaxSubmitAttempts, "maximum number of attempts to submit data to the DA layer before giving up")
326+
cmd.Flags().Int(FlagDARetrieveBatchSize, def.DA.RetrieveBatchSize, "number of IDs to request per DA Get call when retrieving blobs")
323327

324328
// P2P configuration flags
325329
cmd.Flags().String(FlagP2PListenAddress, def.P2P.ListenAddress, "P2P listen address (host:port)")

pkg/config/config_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ func TestDefaultConfig(t *testing.T) {
2626
assert.Equal(t, "", def.DA.AuthToken)
2727
assert.Equal(t, "", def.DA.SubmitOptions)
2828
assert.NotEmpty(t, def.DA.Namespace)
29+
assert.Equal(t, 150, def.DA.RetrieveBatchSize)
2930
assert.Equal(t, 1*time.Second, def.Node.BlockTime.Duration)
3031
assert.Equal(t, 6*time.Second, def.DA.BlockTime.Duration)
3132
assert.Equal(t, uint64(0), def.DA.MempoolTTL)
@@ -70,6 +71,7 @@ func TestAddFlags(t *testing.T) {
7071
assertFlagValue(t, flags, FlagDASigningAddresses, DefaultConfig().DA.SigningAddresses)
7172
assertFlagValue(t, flags, FlagDAMempoolTTL, DefaultConfig().DA.MempoolTTL)
7273
assertFlagValue(t, flags, FlagDAMaxSubmitAttempts, DefaultConfig().DA.MaxSubmitAttempts)
74+
assertFlagValue(t, flags, FlagDARetrieveBatchSize, DefaultConfig().DA.RetrieveBatchSize)
7375

7476
// P2P flags
7577
assertFlagValue(t, flags, FlagP2PListenAddress, DefaultConfig().P2P.ListenAddress)
@@ -99,7 +101,7 @@ func TestAddFlags(t *testing.T) {
99101
assertFlagValue(t, flags, FlagRPCAddress, DefaultConfig().RPC.Address)
100102

101103
// Count the number of flags we're explicitly checking
102-
expectedFlagCount := 37 // Update this number if you add more flag checks above
104+
expectedFlagCount := 38 // Update this number if you add more flag checks above
103105

104106
// Get the actual number of flags (both regular and persistent)
105107
actualFlagCount := 0

pkg/config/defaults.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,7 @@ func DefaultConfig() Config {
7575
MaxSubmitAttempts: 30,
7676
Namespace: randString(10),
7777
DataNamespace: "",
78+
RetrieveBatchSize: 150,
7879
},
7980
Instrumentation: DefaultInstrumentationConfig(),
8081
Log: LogConfig{

0 commit comments

Comments
 (0)