Skip to content

Commit 3ca3a4f

Browse files
alpejulienrbrt
andauthored
refactor: polish submitter package (#2661)
## Overview Better test coverage and minor refactoring. <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> --> --------- Co-authored-by: julienrbrt <julien@rbrt.fr>
1 parent 9bd3abf commit 3ca3a4f

File tree

4 files changed

+437
-102
lines changed

4 files changed

+437
-102
lines changed

block/internal/submitting/da_retry_test.go

Lines changed: 36 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,12 @@ package submitting
33
import (
44
"testing"
55
"time"
6+
7+
"github.com/stretchr/testify/assert"
68
)
79

810
func TestRetryStateNext_Table(t *testing.T) {
9-
pol := RetryPolicy{
11+
pol := retryPolicy{
1012
MaxAttempts: 10,
1113
MinBackoff: 100 * time.Millisecond,
1214
MaxBackoff: 1 * time.Second,
@@ -16,105 +18,97 @@ func TestRetryStateNext_Table(t *testing.T) {
1618
MaxGasMultiplier: 3.0,
1719
}
1820

19-
tests := []struct {
20-
name string
21+
tests := map[string]struct {
2122
startGas float64
2223
startBackoff time.Duration
2324
reason retryReason
24-
gm float64
25+
gasMult float64
2526
sentinelNoGas bool
2627
wantGas float64
2728
wantBackoff time.Duration
2829
}{
29-
{
30-
name: "success reduces gas and resets backoff",
30+
"success reduces gas and resets backoff": {
3131
startGas: 9.0,
3232
startBackoff: 500 * time.Millisecond,
3333
reason: reasonSuccess,
34-
gm: 3.0,
34+
gasMult: 3.0,
3535
wantGas: 3.0, // 9 / 3
3636
wantBackoff: pol.MinBackoff,
3737
},
38-
{
39-
name: "success clamps very small gm to 1/Max, possibly increasing gas",
38+
"success clamps very small gasMult to 1/Max, possibly increasing gas": {
4039
startGas: 3.0,
4140
startBackoff: 250 * time.Millisecond,
4241
reason: reasonSuccess,
43-
gm: 0.01, // clamped to 1/MaxGasMultiplier = 1/3
42+
gasMult: 0.01, // clamped to 1/MaxGasMultiplier = 1/3
4443
wantGas: 9.0, // 3 / (1/3)
4544
wantBackoff: pol.MinBackoff,
4645
},
47-
{
48-
name: "mempool increases gas and sets max backoff",
46+
"mempool increases gas and sets max backoff": {
4947
startGas: 2.0,
5048
startBackoff: 0,
5149
reason: reasonMempool,
52-
gm: 2.0,
50+
gasMult: 2.0,
5351
wantGas: 4.0, // 2 * 2
5452
wantBackoff: pol.MaxBackoff,
5553
},
56-
{
57-
name: "mempool clamps gas to max",
54+
"mempool clamps gas to max": {
5855
startGas: 9.5,
5956
startBackoff: 0,
6057
reason: reasonMempool,
61-
gm: 3.0,
58+
gasMult: 3.0,
6259
wantGas: 10.0, // 9.5 * 3 = 28.5 -> clamp 10
6360
wantBackoff: pol.MaxBackoff,
6461
},
65-
{
66-
name: "failure sets initial backoff",
62+
"failure sets initial backoff": {
6763
startGas: 1.0,
6864
startBackoff: 0,
6965
reason: reasonFailure,
70-
gm: 2.0,
66+
gasMult: 2.0,
7167
wantGas: 1.0, // unchanged
7268
wantBackoff: pol.MinBackoff,
7369
},
74-
{
75-
name: "failure doubles backoff capped at max",
70+
"failure doubles backoff capped at max": {
7671
startGas: 1.0,
7772
startBackoff: 700 * time.Millisecond,
7873
reason: reasonFailure,
79-
gm: 2.0,
74+
gasMult: 2.0,
8075
wantGas: 1.0, // unchanged
8176
wantBackoff: 1 * time.Second, // 700ms*2=1400ms -> clamp 1s
8277
},
83-
{
84-
name: "tooBig doubles backoff like failure",
78+
"tooBig doubles backoff like failure": {
8579
startGas: 1.0,
8680
startBackoff: 100 * time.Millisecond,
8781
reason: reasonTooBig,
88-
gm: 2.0,
82+
gasMult: 2.0,
8983
wantGas: 1.0,
9084
wantBackoff: 200 * time.Millisecond,
9185
},
92-
{
93-
name: "sentinel no gas keeps gas unchanged on success",
86+
"sentinel no gas keeps gas unchanged on success": {
9487
startGas: 5.0,
9588
startBackoff: 0,
9689
reason: reasonSuccess,
97-
gm: 2.0,
90+
gasMult: 2.0,
9891
sentinelNoGas: true,
9992
wantGas: 5.0,
10093
wantBackoff: pol.MinBackoff,
10194
},
95+
"undefined reason keeps gas unchanged and uses min backoff": {
96+
startGas: 3.0,
97+
startBackoff: 500 * time.Millisecond,
98+
reason: reasonUndefined,
99+
gasMult: 2.0,
100+
wantGas: 3.0,
101+
wantBackoff: 0,
102+
},
102103
}
104+
for name, tc := range tests {
105+
t.Run(name, func(t *testing.T) {
106+
rs := retryState{Attempt: 0, Backoff: tc.startBackoff, GasPrice: tc.startGas}
107+
rs.Next(tc.reason, pol, tc.gasMult, tc.sentinelNoGas)
103108

104-
for _, tc := range tests {
105-
t.Run(tc.name, func(t *testing.T) {
106-
rs := RetryState{Attempt: 0, Backoff: tc.startBackoff, GasPrice: tc.startGas}
107-
rs.Next(tc.reason, pol, tc.gm, tc.sentinelNoGas)
108-
109-
if rs.GasPrice != tc.wantGas {
110-
t.Fatalf("gas price: got %v, want %v", rs.GasPrice, tc.wantGas)
111-
}
112-
if rs.Backoff != tc.wantBackoff {
113-
t.Fatalf("backoff: got %v, want %v", rs.Backoff, tc.wantBackoff)
114-
}
115-
if rs.Attempt != 1 {
116-
t.Fatalf("attempt: got %d, want %d", rs.Attempt, 1)
117-
}
109+
assert.Equal(t, tc.wantGas, rs.GasPrice, "gas price")
110+
assert.Equal(t, tc.wantBackoff, rs.Backoff, "backoff")
111+
assert.Equal(t, 1, rs.Attempt, "attempt")
118112
})
119113
}
120114
}

block/internal/submitting/da_submitter.go

Lines changed: 81 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ const (
2727
defaultGasMultiplier = 1.0
2828
defaultMaxBlobSize = 2 * 1024 * 1024 // 2MB fallback blob size limit
2929
defaultMaxGasPriceClamp = 1000.0
30-
defaultMaxGasMultiplierClamp = 3.0
30+
defaultMaxGasMultiplierClamp = 3.0 // must always > 0 to avoid division by zero
3131
)
3232

33-
// RetryPolicy defines clamped bounds for retries, backoff, and gas pricing.
34-
type RetryPolicy struct {
33+
// retryPolicy defines clamped bounds for retries, backoff, and gas pricing.
34+
type retryPolicy struct {
3535
MaxAttempts int
3636
MinBackoff time.Duration
3737
MaxBackoff time.Duration
@@ -41,8 +41,20 @@ type RetryPolicy struct {
4141
MaxGasMultiplier float64
4242
}
4343

44-
// RetryState holds the current retry attempt, backoff, and gas price.
45-
type RetryState struct {
44+
func defaultRetryPolicy(maxAttempts int, maxDuration time.Duration) retryPolicy {
45+
return retryPolicy{
46+
MaxAttempts: maxAttempts,
47+
MinBackoff: initialBackoff,
48+
MaxBackoff: maxDuration,
49+
MinGasPrice: defaultGasPrice,
50+
MaxGasPrice: defaultMaxGasPriceClamp,
51+
MaxBlobBytes: defaultMaxBlobSize,
52+
MaxGasMultiplier: defaultMaxGasMultiplierClamp,
53+
}
54+
}
55+
56+
// retryState holds the current retry attempt, backoff, and gas price.
57+
type retryState struct {
4658
Attempt int
4759
Backoff time.Duration
4860
GasPrice float64
@@ -51,14 +63,14 @@ type RetryState struct {
5163
type retryReason int
5264

5365
const (
54-
reasonInitial retryReason = iota
66+
reasonUndefined retryReason = iota
5567
reasonFailure
5668
reasonMempool
5769
reasonSuccess
5870
reasonTooBig
5971
)
6072

61-
func (rs *RetryState) Next(reason retryReason, pol RetryPolicy, gasMultiplier float64, sentinelNoGas bool) {
73+
func (rs *retryState) Next(reason retryReason, pol retryPolicy, gasMultiplier float64, sentinelNoGas bool) {
6274
switch reason {
6375
case reasonSuccess:
6476
// Reduce gas price towards initial on success, if multiplier is available
@@ -73,22 +85,25 @@ func (rs *RetryState) Next(reason retryReason, pol RetryPolicy, gasMultiplier fl
7385
rs.GasPrice = clamp(rs.GasPrice*m, pol.MinGasPrice, pol.MaxGasPrice)
7486
}
7587
// Honor mempool stalling by using max backoff window
76-
rs.Backoff = clamp(pol.MaxBackoff, pol.MinBackoff, pol.MaxBackoff)
88+
rs.Backoff = pol.MaxBackoff
7789
case reasonFailure, reasonTooBig:
7890
if rs.Backoff == 0 {
7991
rs.Backoff = pol.MinBackoff
8092
} else {
8193
rs.Backoff *= 2
8294
}
8395
rs.Backoff = clamp(rs.Backoff, pol.MinBackoff, pol.MaxBackoff)
84-
case reasonInitial:
96+
default:
8597
rs.Backoff = 0
8698
}
8799
rs.Attempt++
88100
}
89101

90102
// clamp constrains a value between min and max bounds for any comparable type
91103
func clamp[T ~float64 | time.Duration](v, min, max T) T {
104+
if min > max {
105+
min, max = max, min
106+
}
92107
if v < min {
93108
return min
94109
}
@@ -98,34 +113,6 @@ func clamp[T ~float64 | time.Duration](v, min, max T) T {
98113
return v
99114
}
100115

101-
// getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping
102-
func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol RetryPolicy) float64 {
103-
gasMultiplier, err := s.da.GasMultiplier(ctx)
104-
if err != nil || gasMultiplier <= 0 {
105-
if s.config.DA.GasMultiplier > 0 {
106-
return clamp(s.config.DA.GasMultiplier, 0.1, pol.MaxGasMultiplier)
107-
}
108-
s.logger.Warn().Err(err).Msg("failed to get gas multiplier from DA layer, using default 1.0")
109-
return defaultGasMultiplier
110-
}
111-
return clamp(gasMultiplier, 0.1, pol.MaxGasMultiplier)
112-
}
113-
114-
// initialGasPrice determines the starting gas price with clamping and sentinel handling
115-
func (s *DASubmitter) initialGasPrice(ctx context.Context, pol RetryPolicy) (price float64, sentinelNoGas bool) {
116-
if s.config.DA.GasPrice == noGasPrice {
117-
return noGasPrice, true
118-
}
119-
if s.config.DA.GasPrice > 0 {
120-
return clamp(s.config.DA.GasPrice, pol.MinGasPrice, pol.MaxGasPrice), false
121-
}
122-
if gp, err := s.da.GasPrice(ctx); err == nil {
123-
return clamp(gp, pol.MinGasPrice, pol.MaxGasPrice), false
124-
}
125-
s.logger.Warn().Msg("DA gas price unavailable; using default 0.0")
126-
return pol.MinGasPrice, false
127-
}
128-
129116
// DASubmitter handles DA submission operations
130117
type DASubmitter struct {
131118
da coreda.DA
@@ -152,6 +139,34 @@ func NewDASubmitter(
152139
}
153140
}
154141

142+
// getGasMultiplier fetches the gas multiplier from DA layer with fallback and clamping
143+
func (s *DASubmitter) getGasMultiplier(ctx context.Context, pol retryPolicy) float64 {
144+
gasMultiplier, err := s.da.GasMultiplier(ctx)
145+
if err != nil || gasMultiplier <= 0 {
146+
if s.config.DA.GasMultiplier > 0 {
147+
return clamp(s.config.DA.GasMultiplier, 0.1, pol.MaxGasMultiplier)
148+
}
149+
s.logger.Warn().Err(err).Msg("failed to get gas multiplier from DA layer, using default 1.0")
150+
return defaultGasMultiplier
151+
}
152+
return clamp(gasMultiplier, 0.1, pol.MaxGasMultiplier)
153+
}
154+
155+
// initialGasPrice determines the starting gas price with clamping and sentinel handling
156+
func (s *DASubmitter) initialGasPrice(ctx context.Context, pol retryPolicy) (price float64, sentinelNoGas bool) {
157+
if s.config.DA.GasPrice == noGasPrice {
158+
return noGasPrice, true
159+
}
160+
if s.config.DA.GasPrice > 0 {
161+
return clamp(s.config.DA.GasPrice, pol.MinGasPrice, pol.MaxGasPrice), false
162+
}
163+
if gp, err := s.da.GasPrice(ctx); err == nil {
164+
return clamp(gp, pol.MinGasPrice, pol.MaxGasPrice), false
165+
}
166+
s.logger.Warn().Msg("DA gas price unavailable; using default 0.0")
167+
return pol.MinGasPrice, false
168+
}
169+
155170
// SubmitHeaders submits pending headers to DA layer
156171
func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) error {
157172
headers, err := cache.GetPendingHeaders(ctx)
@@ -301,25 +316,17 @@ func submitToDA[T any](
301316
options []byte,
302317
cache cache.Manager,
303318
) error {
304-
marshaled, err := marshalItems(items, marshalFn, itemType)
319+
marshaled, err := marshalItems(ctx, items, marshalFn, itemType)
305320
if err != nil {
306321
return err
307322
}
308323

309324
// Build retry policy from config with sane defaults
310-
pol := RetryPolicy{
311-
MaxAttempts: s.config.DA.MaxSubmitAttempts,
312-
MinBackoff: initialBackoff,
313-
MaxBackoff: s.config.DA.BlockTime.Duration,
314-
MinGasPrice: defaultGasPrice,
315-
MaxGasPrice: defaultMaxGasPriceClamp,
316-
MaxBlobBytes: defaultMaxBlobSize,
317-
MaxGasMultiplier: defaultMaxGasMultiplierClamp,
318-
}
325+
pol := defaultRetryPolicy(s.config.DA.MaxSubmitAttempts, s.config.DA.BlockTime.Duration)
319326

320327
// Choose initial gas price with clamp
321328
gasPrice, sentinelNoGas := s.initialGasPrice(ctx, pol)
322-
rs := RetryState{Attempt: 0, Backoff: 0, GasPrice: gasPrice}
329+
rs := retryState{Attempt: 0, Backoff: 0, GasPrice: gasPrice}
323330
gm := s.getGasMultiplier(ctx, pol)
324331

325332
// Limit this submission to a single size-capped batch
@@ -421,35 +428,51 @@ func limitBatchBySize[T any](items []T, marshaled [][]byte, maxBytes int) ([]T,
421428
}
422429

423430
func marshalItems[T any](
431+
parentCtx context.Context,
424432
items []T,
425433
marshalFn func(T) ([]byte, error),
426434
itemType string,
427435
) ([][]byte, error) {
436+
if len(items) == 0 {
437+
return nil, nil
438+
}
428439
marshaled := make([][]byte, len(items))
440+
ctx, cancel := context.WithCancel(parentCtx)
441+
defer cancel()
442+
443+
// Semaphore to limit concurrency to 32 workers
444+
sem := make(chan struct{}, 32)
429445

430-
// Use a channel to collect errors from goroutines
431-
errCh := make(chan error, len(items))
446+
// Use a channel to collect results from goroutines
447+
resultCh := make(chan error, len(items))
432448

433449
// Marshal items concurrently
434450
for i, item := range items {
435451
go func(idx int, itm T) {
436-
bz, err := marshalFn(itm)
437-
if err != nil {
438-
errCh <- fmt.Errorf("failed to marshal %s item at index %d: %w", itemType, idx, err)
439-
return
452+
sem <- struct{}{}
453+
defer func() { <-sem }()
454+
455+
select {
456+
case <-ctx.Done():
457+
resultCh <- ctx.Err()
458+
default:
459+
bz, err := marshalFn(itm)
460+
if err != nil {
461+
resultCh <- fmt.Errorf("failed to marshal %s item at index %d: %w", itemType, idx, err)
462+
return
463+
}
464+
marshaled[idx] = bz
465+
resultCh <- nil
440466
}
441-
marshaled[idx] = bz
442-
errCh <- nil
443467
}(i, item)
444468
}
445469

446470
// Wait for all goroutines to complete and check for errors
447471
for i := 0; i < len(items); i++ {
448-
if err := <-errCh; err != nil {
472+
if err := <-resultCh; err != nil {
449473
return nil, err
450474
}
451475
}
452-
453476
return marshaled, nil
454477
}
455478

0 commit comments

Comments
 (0)