Skip to content

Commit dbc8eb1

Browse files
Copilotjulienrbrt
andcommitted
Refactor recovery pruning into pruner component
Co-authored-by: julienrbrt <29894366+julienrbrt@users.noreply.github.com>
1 parent 11d98d1 commit dbc8eb1

File tree

17 files changed

+330
-96
lines changed

17 files changed

+330
-96
lines changed

apps/evm/cmd/rollback.go

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
"github.com/ethereum/go-ethereum/common"
1010
ds "github.com/ipfs/go-datastore"
11+
"github.com/rs/zerolog"
1112
"github.com/spf13/cobra"
1213

1314
"github.com/evstack/ev-node/execution/evm"
@@ -30,6 +31,7 @@ func NewRollbackCmd() *cobra.Command {
3031
if err != nil {
3132
return err
3233
}
34+
logger := rollcmd.SetupLogger(nodeConfig.Log)
3335

3436
goCtx := cmd.Context()
3537
if goCtx == nil {
@@ -69,11 +71,10 @@ func NewRollbackCmd() *cobra.Command {
6971
}
7072

7173
// rollback execution layer via EngineClient
72-
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB)
74+
engineClient, err := createRollbackEngineClient(cmd, rawEvolveDB, logger.With().Str("module", "engine_client").Logger())
7375
if err != nil {
7476
cmd.Printf("Warning: failed to create engine client, skipping EL rollback: %v\n", err)
7577
} else {
76-
engineClient.SetExecMetaRetention(nodeConfig.Node.StateHistoryRetention)
7778
if err := engineClient.Rollback(goCtx, height); err != nil {
7879
return fmt.Errorf("failed to rollback execution layer: %w", err)
7980
}
@@ -100,7 +101,7 @@ func NewRollbackCmd() *cobra.Command {
100101
return cmd
101102
}
102103

103-
func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.EngineClient, error) {
104+
func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching, logger zerolog.Logger) (*evm.EngineClient, error) {
104105
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
105106
if err != nil {
106107
return nil, fmt.Errorf("failed to get '%s' flag: %w", evm.FlagEvmEthURL, err)
@@ -129,5 +130,5 @@ func createRollbackEngineClient(cmd *cobra.Command, db ds.Batching) (*evm.Engine
129130
return nil, fmt.Errorf("JWT secret file '%s' is empty", jwtSecretFile)
130131
}
131132

132-
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false)
133+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, common.Hash{}, common.Address{}, db, false, logger)
133134
}

apps/evm/cmd/run.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ var RunCmd = &cobra.Command{
5555
}
5656

5757
tracingEnabled := nodeConfig.Instrumentation.IsTracingEnabled()
58-
executor, err := createExecutionClient(cmd, datastore, tracingEnabled)
58+
executor, err := createExecutionClient(cmd, datastore, tracingEnabled, logger.With().Str("module", "engine_client").Logger())
5959
if err != nil {
6060
return err
6161
}
@@ -67,12 +67,6 @@ var RunCmd = &cobra.Command{
6767

6868
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
6969

70-
// Attach logger to the EVM engine client if available
71-
if ec, ok := executor.(*evm.EngineClient); ok {
72-
ec.SetExecMetaRetention(nodeConfig.Node.StateHistoryRetention)
73-
ec.SetLogger(logger.With().Str("module", "engine_client").Logger())
74-
}
75-
7670
headerNamespace := da.NamespaceFromString(nodeConfig.DA.GetNamespace())
7771
dataNamespace := da.NamespaceFromString(nodeConfig.DA.GetDataNamespace())
7872

@@ -193,7 +187,7 @@ func createSequencer(
193187
return sequencer, nil
194188
}
195189

196-
func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEnabled bool) (execution.Executor, error) {
190+
func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEnabled bool, logger zerolog.Logger) (execution.Executor, error) {
197191
// Read execution client parameters from flags
198192
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
199193
if err != nil {
@@ -238,7 +232,7 @@ func createExecutionClient(cmd *cobra.Command, db datastore.Batching, tracingEna
238232
genesisHash := common.HexToHash(genesisHashStr)
239233
feeRecipient := common.HexToAddress(feeRecipientStr)
240234

241-
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db, tracingEnabled)
235+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db, tracingEnabled, logger)
242236
}
243237

244238
// addFlags adds flags related to the EVM execution client

block/components.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ import (
1212
"github.com/evstack/ev-node/block/internal/common"
1313
da "github.com/evstack/ev-node/block/internal/da"
1414
"github.com/evstack/ev-node/block/internal/executing"
15+
"github.com/evstack/ev-node/block/internal/pruner"
1516
"github.com/evstack/ev-node/block/internal/reaping"
1617
"github.com/evstack/ev-node/block/internal/submitting"
1718
"github.com/evstack/ev-node/block/internal/syncing"
@@ -29,6 +30,7 @@ import (
2930
// Components represents the block-related components
3031
type Components struct {
3132
Executor *executing.Executor
33+
Pruner *pruner.Pruner
3234
Reaper *reaping.Reaper
3335
Syncer *syncing.Syncer
3436
Submitter *submitting.Submitter
@@ -60,6 +62,11 @@ func (bc *Components) Start(ctx context.Context) error {
6062
return fmt.Errorf("failed to start executor: %w", err)
6163
}
6264
}
65+
if bc.Pruner != nil {
66+
if err := bc.Pruner.Start(ctxWithCancel); err != nil {
67+
return fmt.Errorf("failed to start pruner: %w", err)
68+
}
69+
}
6370
if bc.Reaper != nil {
6471
if err := bc.Reaper.Start(ctxWithCancel); err != nil {
6572
return fmt.Errorf("failed to start reaper: %w", err)
@@ -96,6 +103,11 @@ func (bc *Components) Stop() error {
96103
errs = errors.Join(errs, fmt.Errorf("failed to stop executor: %w", err))
97104
}
98105
}
106+
if bc.Pruner != nil {
107+
if err := bc.Pruner.Stop(); err != nil {
108+
errs = errors.Join(errs, fmt.Errorf("failed to stop pruner: %w", err))
109+
}
110+
}
99111
if bc.Reaper != nil {
100112
if err := bc.Reaper.Stop(); err != nil {
101113
errs = errors.Join(errs, fmt.Errorf("failed to stop reaper: %w", err))
@@ -166,6 +178,14 @@ func NewSyncComponents(
166178
syncer.SetBlockSyncer(syncing.WithTracingBlockSyncer(syncer))
167179
}
168180

181+
var execPruner pruner.ExecMetaPruner
182+
if exec != nil {
183+
if candidate, ok := exec.(pruner.ExecMetaPruner); ok {
184+
execPruner = candidate
185+
}
186+
}
187+
recoveryPruner := pruner.New(store, execPruner, config.Node.StateHistoryRetention, pruner.DefaultPruneInterval, logger.With().Str("component", "Pruner").Logger())
188+
169189
// Create submitter for sync nodes (no signer, only DA inclusion processing)
170190
var daSubmitter submitting.DASubmitterAPI = submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger, headerDAHintAppender, dataDAHintAppender)
171191
if config.Instrumentation.IsTracingEnabled() {
@@ -189,6 +209,7 @@ func NewSyncComponents(
189209
Syncer: syncer,
190210
Submitter: submitter,
191211
Cache: cacheManager,
212+
Pruner: recoveryPruner,
192213
errorCh: errorCh,
193214
}, nil
194215
}
@@ -248,6 +269,14 @@ func NewAggregatorComponents(
248269
executor.SetBlockProducer(executing.WithTracingBlockProducer(executor))
249270
}
250271

272+
var execPruner pruner.ExecMetaPruner
273+
if exec != nil {
274+
if candidate, ok := exec.(pruner.ExecMetaPruner); ok {
275+
execPruner = candidate
276+
}
277+
}
278+
recoveryPruner := pruner.New(store, execPruner, config.Node.StateHistoryRetention, pruner.DefaultPruneInterval, logger.With().Str("component", "Pruner").Logger())
279+
251280
reaper, err := reaping.NewReaper(
252281
exec,
253282
sequencer,
@@ -264,6 +293,7 @@ func NewAggregatorComponents(
264293
if config.Node.BasedSequencer { // no submissions needed for bases sequencer
265294
return &Components{
266295
Executor: executor,
296+
Pruner: recoveryPruner,
267297
Reaper: reaper,
268298
Cache: cacheManager,
269299
errorCh: errorCh,
@@ -290,6 +320,7 @@ func NewAggregatorComponents(
290320

291321
return &Components{
292322
Executor: executor,
323+
Pruner: recoveryPruner,
293324
Reaper: reaper,
294325
Submitter: submitter,
295326
Cache: cacheManager,

block/components_test.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,7 @@ func TestNewSyncComponents_Creation(t *testing.T) {
127127
assert.NotNil(t, components.Syncer)
128128
assert.NotNil(t, components.Submitter)
129129
assert.NotNil(t, components.Cache)
130+
assert.NotNil(t, components.Pruner)
130131
assert.NotNil(t, components.errorCh)
131132
assert.Nil(t, components.Executor) // Sync nodes don't have executors
132133
}
@@ -183,6 +184,7 @@ func TestNewAggregatorComponents_Creation(t *testing.T) {
183184
assert.NotNil(t, components.Executor)
184185
assert.NotNil(t, components.Submitter)
185186
assert.NotNil(t, components.Cache)
187+
assert.NotNil(t, components.Pruner)
186188
assert.NotNil(t, components.errorCh)
187189
assert.Nil(t, components.Syncer) // Aggregator nodes currently don't create syncers in this constructor
188190
}

block/internal/pruner/pruner.go

Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
package pruner
2+
3+
import (
4+
"context"
5+
"errors"
6+
"sync"
7+
"time"
8+
9+
ds "github.com/ipfs/go-datastore"
10+
"github.com/rs/zerolog"
11+
12+
"github.com/evstack/ev-node/pkg/store"
13+
)
14+
15+
const (
16+
DefaultPruneInterval = time.Minute
17+
maxPruneBatch = uint64(1000)
18+
)
19+
20+
// ExecMetaPruner removes execution metadata at a given height.
21+
type ExecMetaPruner interface {
22+
PruneExecMeta(ctx context.Context, height uint64) error
23+
}
24+
25+
type stateDeleter interface {
26+
DeleteStateAtHeight(ctx context.Context, height uint64) error
27+
}
28+
29+
// Pruner periodically removes old state and execution metadata entries.
30+
type Pruner struct {
31+
store store.Store
32+
stateDeleter stateDeleter
33+
execPruner ExecMetaPruner
34+
retention uint64
35+
interval time.Duration
36+
logger zerolog.Logger
37+
lastPruned uint64
38+
39+
wg sync.WaitGroup
40+
cancel context.CancelFunc
41+
}
42+
43+
// New creates a new Pruner instance.
44+
func New(store store.Store, execPruner ExecMetaPruner, retention uint64, interval time.Duration, logger zerolog.Logger) *Pruner {
45+
if interval <= 0 {
46+
interval = DefaultPruneInterval
47+
}
48+
49+
var deleter stateDeleter
50+
if store != nil {
51+
if sd, ok := store.(stateDeleter); ok {
52+
deleter = sd
53+
}
54+
}
55+
56+
return &Pruner{
57+
store: store,
58+
stateDeleter: deleter,
59+
execPruner: execPruner,
60+
retention: retention,
61+
interval: interval,
62+
logger: logger,
63+
}
64+
}
65+
66+
// Start begins the pruning loop.
67+
func (p *Pruner) Start(ctx context.Context) error {
68+
if p == nil || p.retention == 0 || (p.stateDeleter == nil && p.execPruner == nil) {
69+
return nil
70+
}
71+
72+
loopCtx, cancel := context.WithCancel(ctx)
73+
p.cancel = cancel
74+
75+
p.wg.Add(1)
76+
go p.pruneLoop(loopCtx)
77+
78+
return nil
79+
}
80+
81+
// Stop stops the pruning loop.
82+
func (p *Pruner) Stop() error {
83+
if p == nil || p.cancel == nil {
84+
return nil
85+
}
86+
87+
p.cancel()
88+
p.wg.Wait()
89+
return nil
90+
}
91+
92+
func (p *Pruner) pruneLoop(ctx context.Context) {
93+
defer p.wg.Done()
94+
ticker := time.NewTicker(p.interval)
95+
defer ticker.Stop()
96+
97+
if err := p.pruneOnce(ctx); err != nil {
98+
p.logger.Error().Err(err).Msg("failed to prune recovery history")
99+
}
100+
101+
for {
102+
select {
103+
case <-ticker.C:
104+
if err := p.pruneOnce(ctx); err != nil {
105+
p.logger.Error().Err(err).Msg("failed to prune recovery history")
106+
}
107+
case <-ctx.Done():
108+
return
109+
}
110+
}
111+
}
112+
113+
func (p *Pruner) pruneOnce(ctx context.Context) error {
114+
if p.retention == 0 || p.store == nil {
115+
return nil
116+
}
117+
118+
height, err := p.store.Height(ctx)
119+
if err != nil {
120+
return err
121+
}
122+
123+
if height <= p.retention {
124+
return nil
125+
}
126+
127+
target := height - p.retention
128+
if target < p.lastPruned {
129+
p.lastPruned = target
130+
return nil
131+
}
132+
if target == p.lastPruned {
133+
return nil
134+
}
135+
136+
start := p.lastPruned + 1
137+
end := target
138+
if end-start+1 > maxPruneBatch {
139+
end = start + maxPruneBatch - 1
140+
}
141+
142+
for h := start; h <= end; h++ {
143+
if p.stateDeleter != nil {
144+
if err := p.stateDeleter.DeleteStateAtHeight(ctx, h); err != nil && !errors.Is(err, ds.ErrNotFound) {
145+
return err
146+
}
147+
}
148+
if p.execPruner != nil {
149+
if err := p.execPruner.PruneExecMeta(ctx, h); err != nil && !errors.Is(err, ds.ErrNotFound) {
150+
return err
151+
}
152+
}
153+
}
154+
155+
p.lastPruned = end
156+
return nil
157+
}

0 commit comments

Comments
 (0)