Skip to content

Commit fabc2de

Browse files
committed
updates
1 parent de67ecf commit fabc2de

File tree

2 files changed

+4
-62
lines changed

2 files changed

+4
-62
lines changed

block/internal/submitting/batching_strategy.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,11 @@ import (
88
)
99

1010
// BatchingStrategy defines the interface for different batching strategies
11+
// Batching strategies always go through the da submitter which does extra size checks and possible further splitting for batches above the DA layer blob size.
1112
type BatchingStrategy interface {
1213
// ShouldSubmit determines if a batch should be submitted based on the strategy
1314
// Returns true if submission should happen now
14-
ShouldSubmit(pendingCount uint64, totalSize int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool
15+
ShouldSubmit(pendingCount uint64, totalSizeBeforeSig int, maxBlobSize int, timeSinceLastSubmit time.Duration) bool
1516
}
1617

1718
// NewBatchingStrategy creates a batching strategy based on configuration

block/internal/submitting/submitter.go

Lines changed: 2 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -190,7 +190,7 @@ func (s *Submitter) daSubmissionLoop() {
190190
return
191191
}
192192

193-
// Calculate total size
193+
// Calculate total size (excluding signature)
194194
totalSize := 0
195195
for _, marshalled := range marshalledHeaders {
196196
totalSize += len(marshalled)
@@ -245,7 +245,7 @@ func (s *Submitter) daSubmissionLoop() {
245245
return
246246
}
247247

248-
// Calculate total size
248+
// Calculate total size (excluding signature)
249249
totalSize := 0
250250
for _, marshalled := range marshalledData {
251251
totalSize += len(marshalled)
@@ -286,65 +286,6 @@ func (s *Submitter) daSubmissionLoop() {
286286
}
287287
}
288288

289-
// marshalItems marshals items concurrently with a worker pool
290-
func marshalItems[T any](
291-
ctx context.Context,
292-
items []T,
293-
marshalFn func(T) ([]byte, error),
294-
) ([][]byte, int, error) {
295-
if len(items) == 0 {
296-
return nil, 0, nil
297-
}
298-
299-
marshaled := make([][]byte, len(items))
300-
workerCtx, cancel := context.WithCancel(ctx)
301-
defer cancel()
302-
303-
// Semaphore to limit concurrency to 32 workers
304-
sem := make(chan struct{}, 32)
305-
306-
// Use a channel to collect results from goroutines
307-
type result struct {
308-
idx int
309-
err error
310-
size int
311-
}
312-
resultCh := make(chan result, len(items))
313-
314-
// Marshal items concurrently
315-
for i, item := range items {
316-
go func(idx int, itm T) {
317-
sem <- struct{}{}
318-
defer func() { <-sem }()
319-
320-
select {
321-
case <-workerCtx.Done():
322-
resultCh <- result{idx: idx, err: workerCtx.Err()}
323-
default:
324-
bz, err := marshalFn(itm)
325-
if err != nil {
326-
resultCh <- result{idx: idx, err: fmt.Errorf("failed to marshal item at index %d: %w", idx, err)}
327-
return
328-
}
329-
marshaled[idx] = bz
330-
resultCh <- result{idx: idx, size: len(bz)}
331-
}
332-
}(i, item)
333-
}
334-
335-
// Wait for all goroutines to complete and accumulate total size
336-
totalSize := 0
337-
for range items {
338-
res := <-resultCh
339-
if res.err != nil {
340-
return nil, 0, res.err
341-
}
342-
totalSize += res.size
343-
}
344-
345-
return marshaled, totalSize, nil
346-
}
347-
348289
// processDAInclusionLoop handles DA inclusion processing (both sync and aggregator nodes)
349290
func (s *Submitter) processDAInclusionLoop() {
350291
s.logger.Info().Msg("starting DA inclusion processing loop")

0 commit comments

Comments
 (0)