Skip to content

Commit 5c48ddb

Browse files
committed
Merge branch 'main' into julien/app
2 parents 0ad1ef2 + ce18484 commit 5c48ddb

File tree

17 files changed

+507
-343
lines changed

17 files changed

+507
-343
lines changed

block/internal/executing/block_producer.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,4 @@ type BlockProducer interface {
2121

2222
// ApplyBlock executes the block transactions and returns the new state.
2323
ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error)
24-
25-
// ValidateBlock validates block structure and state transitions.
26-
ValidateBlock(ctx context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error
2724
}

block/internal/executing/executor.go

Lines changed: 150 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -10,25 +10,31 @@ import (
1010
"sync/atomic"
1111
"time"
1212

13-
"github.com/evstack/ev-node/pkg/raft"
14-
"github.com/ipfs/go-datastore"
15-
"github.com/libp2p/go-libp2p/core/crypto"
16-
"github.com/rs/zerolog"
17-
"golang.org/x/sync/errgroup"
18-
1913
"github.com/evstack/ev-node/block/internal/cache"
2014
"github.com/evstack/ev-node/block/internal/common"
2115
coreexecutor "github.com/evstack/ev-node/core/execution"
2216
coresequencer "github.com/evstack/ev-node/core/sequencer"
2317
"github.com/evstack/ev-node/pkg/config"
2418
"github.com/evstack/ev-node/pkg/genesis"
19+
"github.com/evstack/ev-node/pkg/raft"
2520
"github.com/evstack/ev-node/pkg/signer"
2621
"github.com/evstack/ev-node/pkg/store"
2722
"github.com/evstack/ev-node/types"
23+
"github.com/ipfs/go-datastore"
24+
"github.com/libp2p/go-libp2p/core/crypto"
25+
"github.com/rs/zerolog"
2826
)
2927

3028
var _ BlockProducer = (*Executor)(nil)
3129

30+
// lastBlockInfo contains cached per-block data to avoid store reads + protobuf
31+
// deserialization in CreateBlock.
32+
type lastBlockInfo struct {
33+
headerHash types.Hash
34+
dataHash types.Hash
35+
signature types.Signature
36+
}
37+
3238
// Executor handles block production, transaction processing, and state management
3339
type Executor struct {
3440
// Core components
@@ -56,6 +62,16 @@ type Executor struct {
5662
// State management
5763
lastState *atomic.Pointer[types.State]
5864

65+
// hasPendingBlock tracks whether a pending block exists in the store,
66+
// avoiding a store lookup on every ProduceBlock call.
67+
hasPendingBlock atomic.Bool
68+
69+
// Cached per-block data
70+
lastBlockInfo atomic.Pointer[lastBlockInfo]
71+
72+
// pendingCheckCounter amortizes the expensive NumPendingHeaders/NumPendingData
73+
pendingCheckCounter uint64
74+
5975
// Channels for coordination
6076
txNotifyCh chan struct{}
6177
errorCh chan<- error // Channel to report critical execution client failures
@@ -168,6 +184,9 @@ func (e *Executor) Stop() error {
168184
e.wg.Wait()
169185

170186
e.logger.Info().Msg("executor stopped")
187+
if !e.hasPendingBlock.Load() {
188+
_ = e.deletePendingBlock(context.Background()) // nolint: gocritic // not critical
189+
}
171190
return nil
172191
}
173192

@@ -277,6 +296,26 @@ func (e *Executor) initializeState() error {
277296
return fmt.Errorf("failed to migrate legacy pending block: %w", err)
278297
}
279298

299+
if _, err := e.store.GetMetadata(e.ctx, headerKey); err == nil {
300+
e.hasPendingBlock.Store(true)
301+
}
302+
303+
// Warm the last-block cache
304+
if state.LastBlockHeight > 0 {
305+
h, d, err := e.store.GetBlockData(e.ctx, state.LastBlockHeight)
306+
if err == nil {
307+
info := &lastBlockInfo{
308+
headerHash: h.Hash(),
309+
dataHash: d.Hash(),
310+
}
311+
sig, err := e.store.GetSignature(e.ctx, state.LastBlockHeight)
312+
if err == nil {
313+
info.signature = *sig
314+
}
315+
e.lastBlockInfo.Store(info)
316+
}
317+
}
318+
280319
// Determine sync target: use Raft height if node is behind Raft consensus
281320
syncTargetHeight := state.LastBlockHeight
282321
if e.raftNode != nil {
@@ -413,18 +452,26 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
413452

414453
e.logger.Debug().Uint64("height", newHeight).Msg("producing block")
415454

416-
// check pending limits
455+
// Amortized pending limit check — NumPendingHeaders/NumPendingData call
456+
// advancePastEmptyData which scans the store. Only amortize when the limit
457+
// is large enough that checking every N blocks won't overshoot.
458+
const pendingCheckInterval uint64 = 64 // arbitrary but good value
417459
if e.config.Node.MaxPendingHeadersAndData > 0 {
418-
pendingHeaders := e.cache.NumPendingHeaders()
419-
pendingData := e.cache.NumPendingData()
420-
if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData ||
421-
pendingData >= e.config.Node.MaxPendingHeadersAndData {
422-
e.logger.Warn().
423-
Uint64("pending_headers", pendingHeaders).
424-
Uint64("pending_data", pendingData).
425-
Uint64("limit", e.config.Node.MaxPendingHeadersAndData).
426-
Msg("pending limit reached, skipping block production")
427-
return nil
460+
e.pendingCheckCounter++
461+
shouldCheck := e.config.Node.MaxPendingHeadersAndData <= pendingCheckInterval ||
462+
e.pendingCheckCounter%pendingCheckInterval == 0
463+
if shouldCheck {
464+
pendingHeaders := e.cache.NumPendingHeaders()
465+
pendingData := e.cache.NumPendingData()
466+
if pendingHeaders >= e.config.Node.MaxPendingHeadersAndData ||
467+
pendingData >= e.config.Node.MaxPendingHeadersAndData {
468+
e.logger.Warn().
469+
Uint64("pending_headers", pendingHeaders).
470+
Uint64("pending_data", pendingData).
471+
Uint64("limit", e.config.Node.MaxPendingHeadersAndData).
472+
Msg("pending limit reached, skipping block production")
473+
return nil
474+
}
428475
}
429476
}
430477

@@ -434,17 +481,22 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
434481
batchData *BatchData
435482
)
436483

437-
// Check if there's an already stored block at the newHeight
438-
// If there is use that instead of creating a new block
439-
pendingHeader, pendingData, err := e.getPendingBlock(ctx)
440-
if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight {
441-
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
442-
header = pendingHeader
443-
data = pendingData
444-
} else if err != nil && !errors.Is(err, datastore.ErrNotFound) {
445-
return fmt.Errorf("failed to get block data: %w", err)
446-
} else {
484+
// Check if there's an already stored block at the newHeight.
485+
// Only hit the store if the in-memory flag indicates a pending block exists.
486+
if e.hasPendingBlock.Load() {
487+
pendingHeader, pendingData, err := e.getPendingBlock(ctx)
488+
if err == nil && pendingHeader != nil && pendingHeader.Height() == newHeight {
489+
e.logger.Info().Uint64("height", newHeight).Msg("using pending block")
490+
header = pendingHeader
491+
data = pendingData
492+
} else if err != nil && !errors.Is(err, datastore.ErrNotFound) {
493+
return fmt.Errorf("failed to get block data: %w", err)
494+
}
495+
}
496+
497+
if header == nil {
447498
// get batch from sequencer
499+
var err error
448500
batchData, err = e.blockProducer.RetrieveBatch(ctx)
449501
if errors.Is(err, common.ErrNoBatch) {
450502
e.logger.Debug().Msg("no batch available")
@@ -478,14 +530,16 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
478530

479531
// signing the header is done after applying the block
480532
// as for signing, the state of the block may be required by the signature payload provider.
481-
// For based sequencer, this will return an empty signature
482-
signature, err := e.signHeader(header.Header)
533+
// For based sequencer, this will return an empty signature.
534+
signature, _, err := e.signHeader(&header.Header)
483535
if err != nil {
484536
return fmt.Errorf("failed to sign header: %w", err)
485537
}
486538
header.Signature = signature
487539

488-
if err := e.blockProducer.ValidateBlock(ctx, currentState, header, data); err != nil {
540+
// Structural validation only — skip the expensive Validate() / DACommitment()
541+
// re-computation since we just produced this block ourselves.
542+
if err := currentState.AssertValidSequence(header); err != nil {
489543
e.sendCriticalError(fmt.Errorf("failed to validate block: %w", err))
490544
e.logger.Error().Err(err).Msg("CRITICAL: Permanent block validation error - halting block production")
491545
return fmt.Errorf("failed to validate block: %w", err)
@@ -533,28 +587,36 @@ func (e *Executor) ProduceBlock(ctx context.Context) error {
533587
}
534588
e.logger.Debug().Uint64("height", newHeight).Msg("proposed block to raft")
535589
}
536-
if err := e.deletePendingBlock(batch); err != nil {
537-
e.logger.Warn().Err(err).Uint64("height", newHeight).Msg("failed to delete pending block metadata")
538-
}
539590

540591
if err := batch.Commit(); err != nil {
541592
return fmt.Errorf("failed to commit batch: %w", err)
542593
}
543594

595+
e.hasPendingBlock.Store(false)
596+
544597
// Update in-memory state after successful commit
545598
e.setLastState(newState)
546599

547-
// broadcast header and data to P2P network
548-
g, broadcastCtx := errgroup.WithContext(e.ctx)
549-
g.Go(func() error {
550-
return e.headerBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PSignedHeader{SignedHeader: header})
600+
// Update last-block cache so the next CreateBlock avoids a store read.
601+
e.lastBlockInfo.Store(&lastBlockInfo{
602+
headerHash: newState.LastHeaderHash,
603+
dataHash: data.Hash(),
604+
signature: signature,
551605
})
552-
g.Go(func() error {
553-
return e.dataBroadcaster.WriteToStoreAndBroadcast(broadcastCtx, &types.P2PData{Data: data})
554-
})
555-
if err := g.Wait(); err != nil {
556-
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
557-
// don't fail block production on broadcast error
606+
607+
// Broadcast header and data to P2P network sequentially.
608+
// IMPORTANT: Header MUST be broadcast before data — the P2P layer validates
609+
// incoming data against the current and previous header, so out-of-order
610+
// delivery would cause validation failures on peers.
611+
if err := e.headerBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PSignedHeader{
612+
SignedHeader: header,
613+
}); err != nil {
614+
e.logger.Error().Err(err).Msg("failed to broadcast header")
615+
}
616+
if err := e.dataBroadcaster.WriteToStoreAndBroadcast(e.ctx, &types.P2PData{
617+
Data: data,
618+
}); err != nil {
619+
e.logger.Error().Err(err).Msg("failed to broadcast data")
558620
}
559621

560622
e.recordBlockMetrics(newState, data)
@@ -604,26 +666,33 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
604666
currentState := e.getLastState()
605667
headerTime := uint64(e.genesis.StartTime.UnixNano())
606668

607-
// Get last block info
608669
var lastHeaderHash types.Hash
609670
var lastDataHash types.Hash
610671
var lastSignature types.Signature
611672

612673
if height > e.genesis.InitialHeight {
613674
headerTime = uint64(batchData.UnixNano())
614675

615-
lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1)
616-
if err != nil {
617-
return nil, nil, fmt.Errorf("failed to get last block: %w", err)
618-
}
619-
lastHeaderHash = lastHeader.Hash()
620-
lastDataHash = lastData.Hash()
676+
if info := e.lastBlockInfo.Load(); info != nil {
677+
// Fast path: use in-memory cache
678+
lastHeaderHash = info.headerHash
679+
lastDataHash = info.dataHash
680+
lastSignature = info.signature
681+
} else {
682+
// Cold start fallback: read from store
683+
lastHeader, lastData, err := e.store.GetBlockData(ctx, height-1)
684+
if err != nil {
685+
return nil, nil, fmt.Errorf("failed to get last block: %w", err)
686+
}
687+
lastHeaderHash = lastHeader.Hash()
688+
lastDataHash = lastData.Hash()
621689

622-
lastSignaturePtr, err := e.store.GetSignature(ctx, height-1)
623-
if err != nil {
624-
return nil, nil, fmt.Errorf("failed to get last signature: %w", err)
690+
lastSignaturePtr, err := e.store.GetSignature(ctx, height-1)
691+
if err != nil {
692+
return nil, nil, fmt.Errorf("failed to get last signature: %w", err)
693+
}
694+
lastSignature = *lastSignaturePtr
625695
}
626-
lastSignature = *lastSignaturePtr
627696
}
628697

629698
// Get signer info and validator hash
@@ -642,7 +711,6 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
642711
return nil, nil, fmt.Errorf("failed to get validator hash: %w", err)
643712
}
644713
} else {
645-
// For based sequencer without signer, use nil pubkey and compute validator hash
646714
var err error
647715
validatorHash, err = e.options.ValidatorHasherProvider(e.genesis.ProposerAddress, nil)
648716
if err != nil {
@@ -703,16 +771,20 @@ func (e *Executor) CreateBlock(ctx context.Context, height uint64, batchData *Ba
703771
func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *types.Data) (types.State, error) {
704772
currentState := e.getLastState()
705773

706-
// Prepare transactions
707-
rawTxs := make([][]byte, len(data.Txs))
708-
for i, tx := range data.Txs {
709-
rawTxs[i] = []byte(tx)
774+
// Convert Txs to [][]byte for the execution client.
775+
// types.Tx is []byte, so this is a type conversion, not a copy.
776+
var rawTxs [][]byte
777+
if n := len(data.Txs); n > 0 {
778+
rawTxs = make([][]byte, n)
779+
for i, tx := range data.Txs {
780+
rawTxs[i] = []byte(tx)
781+
}
710782
}
711783

712784
// Execute transactions
713-
ctx = context.WithValue(ctx, types.HeaderContextKey, header)
785+
execCtx := context.WithValue(ctx, types.HeaderContextKey, header)
714786

715-
newAppHash, err := e.executeTxsWithRetry(ctx, rawTxs, header, currentState)
787+
newAppHash, err := e.executeTxsWithRetry(execCtx, rawTxs, header, currentState)
716788
if err != nil {
717789
e.sendCriticalError(fmt.Errorf("failed to execute transactions: %w", err))
718790
return types.State{}, fmt.Errorf("failed to execute transactions: %w", err)
@@ -727,19 +799,25 @@ func (e *Executor) ApplyBlock(ctx context.Context, header types.Header, data *ty
727799
return newState, nil
728800
}
729801

730-
// signHeader signs the block header
731-
func (e *Executor) signHeader(header types.Header) (types.Signature, error) {
802+
// signHeader signs the block header and returns both the signature and the
803+
// serialized header bytes (signing payload). The caller can reuse headerBytes
804+
// in SaveBlockDataFromBytes to avoid a redundant MarshalBinary call.
805+
func (e *Executor) signHeader(header *types.Header) (types.Signature, []byte, error) {
732806
// For based sequencer, return empty signature as there is no signer
733807
if e.signer == nil {
734-
return types.Signature{}, nil
808+
return types.Signature{}, nil, nil
735809
}
736810

737-
bz, err := e.options.AggregatorNodeSignatureBytesProvider(&header)
811+
bz, err := e.options.AggregatorNodeSignatureBytesProvider(header)
738812
if err != nil {
739-
return nil, fmt.Errorf("failed to get signature payload: %w", err)
813+
return nil, nil, fmt.Errorf("failed to get signature payload: %w", err)
740814
}
741815

742-
return e.signer.Sign(bz)
816+
sig, err := e.signer.Sign(bz)
817+
if err != nil {
818+
return nil, nil, err
819+
}
820+
return sig, bz, nil
743821
}
744822

745823
// executeTxsWithRetry executes transactions with retry logic.
@@ -772,19 +850,6 @@ func (e *Executor) executeTxsWithRetry(ctx context.Context, rawTxs [][]byte, hea
772850
return nil, nil
773851
}
774852

775-
// ValidateBlock validates the created block.
776-
func (e *Executor) ValidateBlock(_ context.Context, lastState types.State, header *types.SignedHeader, data *types.Data) error {
777-
// Set custom verifier for aggregator node signature
778-
header.SetCustomVerifierForAggregator(e.options.AggregatorNodeSignatureBytesProvider)
779-
780-
// Basic header validation
781-
if err := header.ValidateBasic(); err != nil {
782-
return fmt.Errorf("invalid header: %w", err)
783-
}
784-
785-
return lastState.AssertValidForNextState(header, data)
786-
}
787-
788853
// sendCriticalError sends a critical error to the error channel without blocking
789854
func (e *Executor) sendCriticalError(err error) {
790855
if e.errorCh != nil {
@@ -804,10 +869,11 @@ func (e *Executor) recordBlockMetrics(newState types.State, data *types.Data) {
804869
return
805870
}
806871

807-
e.metrics.NumTxs.Set(float64(len(data.Txs)))
808-
e.metrics.TotalTxs.Add(float64(len(data.Txs)))
809-
e.metrics.TxsPerBlock.Observe(float64(len(data.Txs)))
810-
e.metrics.BlockSizeBytes.Set(float64(data.Size()))
872+
nTxs := float64(len(data.Txs))
873+
e.metrics.NumTxs.Set(nTxs)
874+
e.metrics.TotalTxs.Add(nTxs)
875+
e.metrics.TxsPerBlock.Observe(nTxs)
876+
e.metrics.BlockSizeBytes.Set(float64(data.TxsByteSize()))
811877
e.metrics.CommittedHeight.Set(float64(data.Metadata.Height))
812878
}
813879

0 commit comments

Comments
 (0)