Skip to content

Commit ccf46e8

Browse files
committed
signing optimization
1 parent 351004c commit ccf46e8

File tree

3 files changed

+214
-20
lines changed

3 files changed

+214
-20
lines changed

block/internal/submitting/da_submitter.go

Lines changed: 207 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,11 @@ import (
55
"context"
66
"encoding/json"
77
"fmt"
8+
"runtime"
9+
"sync"
810
"time"
911

12+
lru "github.com/hashicorp/golang-lru/v2"
1013
"github.com/rs/zerolog"
1114

1215
"github.com/evstack/ev-node/block/internal/cache"
@@ -21,6 +24,16 @@ import (
2124
"github.com/evstack/ev-node/types"
2225
)
2326

27+
const (
28+
// DefaultEnvelopeCacheSize is the default size for caching signed DA envelopes.
29+
// This avoids re-signing headers on retry scenarios.
30+
DefaultEnvelopeCacheSize = 10_000
31+
32+
// signingWorkerPoolSize determines how many parallel signing goroutines to use.
33+
// Ed25519 signing is CPU-bound, so we use GOMAXPROCS workers.
34+
signingWorkerPoolSize = 0 // 0 means use runtime.GOMAXPROCS(0)
35+
)
36+
2437
const initialBackoff = 100 * time.Millisecond
2538

2639
// retryPolicy defines clamped bounds for retries and backoff.
@@ -100,6 +113,13 @@ type DASubmitter struct {
100113

101114
// address selector for multi-account support
102115
addressSelector pkgda.AddressSelector
116+
117+
// envelopeCache caches fully signed DA envelopes by height to avoid re-signing on retries
118+
envelopeCache *lru.Cache[uint64, []byte]
119+
envelopeCacheMu sync.RWMutex
120+
121+
// signingWorkers is the number of parallel workers for signing
122+
signingWorkers int
103123
}
104124

105125
// NewDASubmitter creates a new DA submitter
@@ -134,6 +154,18 @@ func NewDASubmitter(
134154
addressSelector = pkgda.NewNoOpSelector()
135155
}
136156

157+
// Create envelope cache for avoiding re-signing on retries
158+
envelopeCache, err := lru.New[uint64, []byte](DefaultEnvelopeCacheSize)
159+
if err != nil {
160+
daSubmitterLogger.Warn().Err(err).Msg("failed to create envelope cache, continuing without caching")
161+
}
162+
163+
// Determine number of signing workers
164+
workers := signingWorkerPoolSize
165+
if workers <= 0 || workers > runtime.GOMAXPROCS(0) {
166+
workers = runtime.GOMAXPROCS(0)
167+
}
168+
137169
return &DASubmitter{
138170
client: client,
139171
config: config,
@@ -142,6 +174,8 @@ func NewDASubmitter(
142174
metrics: metrics,
143175
logger: daSubmitterLogger,
144176
addressSelector: addressSelector,
177+
envelopeCache: envelopeCache,
178+
signingWorkers: workers,
145179
}
146180
}
147181

@@ -175,21 +209,10 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed
175209

176210
s.logger.Info().Int("count", len(headers)).Msg("submitting headers to DA")
177211

178-
// Create DA envelopes from pre-marshalled headers
179-
envelopes := make([][]byte, len(headers))
180-
for i, header := range headers {
181-
// Sign the pre-marshalled header content
182-
envelopeSignature, err := signer.Sign(marshalledHeaders[i])
183-
if err != nil {
184-
return fmt.Errorf("failed to sign envelope for header %d: %w", i, err)
185-
}
186-
187-
// Create the envelope and marshal it
188-
envelope, err := header.MarshalDAEnvelope(envelopeSignature)
189-
if err != nil {
190-
return fmt.Errorf("failed to marshal DA envelope for header %d: %w", i, err)
191-
}
192-
envelopes[i] = envelope
212+
// Create DA envelopes with parallel signing and caching
213+
envelopes, err := s.createDAEnvelopes(headers, marshalledHeaders, signer)
214+
if err != nil {
215+
return err
193216
}
194217

195218
return submitToDA(s, ctx, headers, envelopes,
@@ -200,6 +223,8 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed
200223
if l := len(submitted); l > 0 {
201224
lastHeight := submitted[l-1].Height()
202225
cache.SetLastSubmittedHeaderHeight(ctx, lastHeight)
226+
// Clear envelope cache for successfully submitted heights
227+
s.clearEnvelopeCacheUpTo(lastHeight)
203228
}
204229
},
205230
"header",
@@ -209,6 +234,173 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, headers []*types.Signed
209234
)
210235
}
211236

237+
// createDAEnvelopes creates signed DA envelopes for the given headers.
238+
// It uses caching to avoid re-signing on retries and parallel signing for new envelopes.
239+
func (s *DASubmitter) createDAEnvelopes(headers []*types.SignedHeader, marshalledHeaders [][]byte, signer signer.Signer) ([][]byte, error) {
240+
envelopes := make([][]byte, len(headers))
241+
242+
// First pass: check cache for already-signed envelopes
243+
var needSigning []int // indices that need signing
244+
for i, header := range headers {
245+
height := header.Height()
246+
if cached := s.getCachedEnvelope(height); cached != nil {
247+
envelopes[i] = cached
248+
} else {
249+
needSigning = append(needSigning, i)
250+
}
251+
}
252+
253+
// If all envelopes were cached, we're done
254+
if len(needSigning) == 0 {
255+
s.logger.Debug().Int("cached", len(headers)).Msg("all envelopes retrieved from cache")
256+
return envelopes, nil
257+
}
258+
259+
s.logger.Debug().
260+
Int("cached", len(headers)-len(needSigning)).
261+
Int("to_sign", len(needSigning)).
262+
Msg("signing DA envelopes")
263+
264+
// For small batches, sign sequentially to avoid goroutine overhead
265+
if len(needSigning) <= 2 || s.signingWorkers <= 1 {
266+
for _, i := range needSigning {
267+
envelope, err := s.signAndCacheEnvelope(headers[i], marshalledHeaders[i], signer)
268+
if err != nil {
269+
return nil, fmt.Errorf("failed to create envelope for header %d: %w", i, err)
270+
}
271+
envelopes[i] = envelope
272+
}
273+
return envelopes, nil
274+
}
275+
276+
// Parallel signing for larger batches
277+
return s.signEnvelopesParallel(headers, marshalledHeaders, envelopes, needSigning, signer)
278+
}
279+
280+
// signEnvelopesParallel signs envelopes in parallel using a worker pool.
281+
func (s *DASubmitter) signEnvelopesParallel(
282+
headers []*types.SignedHeader,
283+
marshalledHeaders [][]byte,
284+
envelopes [][]byte,
285+
needSigning []int,
286+
signer signer.Signer,
287+
) ([][]byte, error) {
288+
type signJob struct {
289+
index int
290+
}
291+
type signResult struct {
292+
index int
293+
envelope []byte
294+
err error
295+
}
296+
297+
jobs := make(chan signJob, len(needSigning))
298+
results := make(chan signResult, len(needSigning))
299+
300+
// Start workers
301+
numWorkers := min(s.signingWorkers, len(needSigning))
302+
var wg sync.WaitGroup
303+
for range numWorkers {
304+
wg.Go(func() {
305+
for job := range jobs {
306+
envelope, err := s.signAndCacheEnvelope(headers[job.index], marshalledHeaders[job.index], signer)
307+
results <- signResult{index: job.index, envelope: envelope, err: err}
308+
}
309+
})
310+
}
311+
312+
// Send jobs
313+
for _, i := range needSigning {
314+
jobs <- signJob{index: i}
315+
}
316+
close(jobs)
317+
318+
// Wait for workers to finish and close results
319+
go func() {
320+
wg.Wait()
321+
close(results)
322+
}()
323+
324+
// Collect results
325+
var firstErr error
326+
for result := range results {
327+
if result.err != nil && firstErr == nil {
328+
firstErr = fmt.Errorf("failed to create envelope for header %d: %w", result.index, result.err)
329+
continue
330+
}
331+
if result.err == nil {
332+
envelopes[result.index] = result.envelope
333+
}
334+
}
335+
336+
if firstErr != nil {
337+
return nil, firstErr
338+
}
339+
340+
return envelopes, nil
341+
}
342+
343+
// signAndCacheEnvelope signs a single header and caches the result.
344+
func (s *DASubmitter) signAndCacheEnvelope(header *types.SignedHeader, marshalledHeader []byte, signer signer.Signer) ([]byte, error) {
345+
// Sign the pre-marshalled header content
346+
envelopeSignature, err := signer.Sign(marshalledHeader)
347+
if err != nil {
348+
return nil, fmt.Errorf("failed to sign envelope: %w", err)
349+
}
350+
351+
// Create the envelope and marshal it
352+
envelope, err := header.MarshalDAEnvelope(envelopeSignature)
353+
if err != nil {
354+
return nil, fmt.Errorf("failed to marshal DA envelope: %w", err)
355+
}
356+
357+
// Cache for potential retries
358+
s.setCachedEnvelope(header.Height(), envelope)
359+
360+
return envelope, nil
361+
}
362+
363+
// getCachedEnvelope retrieves a cached envelope for the given height.
364+
func (s *DASubmitter) getCachedEnvelope(height uint64) []byte {
365+
if s.envelopeCache == nil {
366+
return nil
367+
}
368+
s.envelopeCacheMu.RLock()
369+
defer s.envelopeCacheMu.RUnlock()
370+
371+
if envelope, ok := s.envelopeCache.Get(height); ok {
372+
return envelope
373+
}
374+
return nil
375+
}
376+
377+
// setCachedEnvelope stores an envelope in the cache.
378+
func (s *DASubmitter) setCachedEnvelope(height uint64, envelope []byte) {
379+
if s.envelopeCache == nil {
380+
return
381+
}
382+
s.envelopeCacheMu.Lock()
383+
defer s.envelopeCacheMu.Unlock()
384+
385+
s.envelopeCache.Add(height, envelope)
386+
}
387+
388+
// clearEnvelopeCacheUpTo removes cached envelopes up to and including the given height.
389+
func (s *DASubmitter) clearEnvelopeCacheUpTo(height uint64) {
390+
if s.envelopeCache == nil {
391+
return
392+
}
393+
s.envelopeCacheMu.Lock()
394+
defer s.envelopeCacheMu.Unlock()
395+
396+
keys := s.envelopeCache.Keys()
397+
for _, h := range keys {
398+
if h <= height {
399+
s.envelopeCache.Remove(h)
400+
}
401+
}
402+
}
403+
212404
// SubmitData submits pending data to DA layer
213405
func (s *DASubmitter) SubmitData(ctx context.Context, unsignedDataList []*types.SignedData, marshalledData [][]byte, cache cache.Manager, signer signer.Signer, genesis genesis.Genesis) error {
214406
if len(unsignedDataList) == 0 {

pkg/store/cached_store.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -171,7 +171,7 @@ func (cs *CachedStore) ClearCache() {
171171

172172
// Rollback wraps the underlying store's Rollback and invalidates affected cache entries.
173173
func (cs *CachedStore) Rollback(ctx context.Context, height uint64, aggregator bool) error {
174-
currentHeight, err := cs.Store.Height(ctx)
174+
currentHeight, err := cs.Height(ctx)
175175
if err != nil {
176176
return err
177177
}

pkg/store/cached_store_test.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,19 +74,21 @@ func TestCachedStore_GetHeader_MultipleHeights(t *testing.T) {
7474

7575
// Load multiple heights
7676
headers := make([]*types.SignedHeader, 5)
77-
for h := uint64(1); h <= 5; h++ {
77+
for i := 0; i < 5; i++ {
78+
h := uint64(i + 1)
7879
header, err := cachedStore.GetHeader(ctx, h)
7980
require.NoError(t, err)
8081
require.NotNil(t, header)
8182
assert.Equal(t, h, header.Height())
82-
headers[h-1] = header
83+
headers[i] = header
8384
}
8485

8586
// Re-access all heights - should return same cached references
86-
for h := uint64(1); h <= 5; h++ {
87+
for i := 0; i < 5; i++ {
88+
h := uint64(i + 1)
8789
header, err := cachedStore.GetHeader(ctx, h)
8890
require.NoError(t, err)
89-
assert.Same(t, headers[h-1], header, "should return cached header for height %d", h)
91+
assert.Same(t, headers[i], header, "should return cached header for height %d", h)
9092
}
9193
}
9294

0 commit comments

Comments
 (0)