Skip to content

Commit fbd94ce

Browse files
committed
Merge branch 'main' into marko/seqeucners_pkg
2 parents 2079a1a + af5d6b1 commit fbd94ce

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

68 files changed

+2821
-1083
lines changed

.mockery.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,20 @@ packages:
6363
dir: ./test/mocks
6464
pkgname: mocks
6565
filename: da.go
66-
github.com/evstack/ev-node/pkg/da/types:
67-
interfaces:
6866
Verifier:
6967
config:
7068
dir: ./test/mocks
7169
pkgname: mocks
72-
filename: da_verifier.go
70+
filename: da.go
7371
github.com/evstack/ev-node/pkg/da/jsonrpc:
7472
interfaces:
7573
BlobModule:
7674
config:
7775
dir: ./pkg/da/jsonrpc/mocks
7876
pkgname: mocks
7977
filename: blob_module_mock.go
78+
HeaderModule:
79+
config:
80+
dir: ./pkg/da/jsonrpc/mocks
81+
pkgname: mocks
82+
filename: header_module_mock.go

apps/evm/cmd/rollback.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@ import (
55
"errors"
66
"fmt"
77

8-
ds "github.com/ipfs/go-datastore"
9-
kt "github.com/ipfs/go-datastore/keytransform"
108
"github.com/spf13/cobra"
119

1210
goheaderstore "github.com/celestiaorg/go-header/store"
13-
"github.com/evstack/ev-node/node"
1411
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1512
"github.com/evstack/ev-node/pkg/store"
1613
"github.com/evstack/ev-node/types"
@@ -50,10 +47,7 @@ func NewRollbackCmd() *cobra.Command {
5047
}()
5148

5249
// prefixed evolve db
53-
evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{
54-
Prefix: ds.NewKey(node.EvPrefix),
55-
})
56-
50+
evolveDB := store.NewEvNodeKVStore(rawEvolveDB)
5751
evolveStore := store.New(evolveDB)
5852
if height == 0 {
5953
currentHeight, err := evolveStore.Height(goCtx)

apps/evm/cmd/run.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -43,17 +43,23 @@ var RunCmd = &cobra.Command{
4343
Aliases: []string{"node", "run"},
4444
Short: "Run the evolve node with EVM execution client",
4545
RunE: func(cmd *cobra.Command, args []string) error {
46-
executor, err := createExecutionClient(cmd)
46+
nodeConfig, err := rollcmd.ParseConfig(cmd)
4747
if err != nil {
4848
return err
4949
}
5050

51-
nodeConfig, err := rollcmd.ParseConfig(cmd)
51+
logger := rollcmd.SetupLogger(nodeConfig.Log)
52+
53+
// Create datastore first - needed by execution client for ExecMeta tracking
54+
datastore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, evmDbName)
5255
if err != nil {
5356
return err
5457
}
5558

56-
logger := rollcmd.SetupLogger(nodeConfig.Log)
59+
executor, err := createExecutionClient(cmd, datastore)
60+
if err != nil {
61+
return err
62+
}
5763

5864
blobClient, err := blobrpc.NewClient(context.Background(), nodeConfig.DA.Address, nodeConfig.DA.AuthToken, "")
5965
if err != nil {
@@ -72,11 +78,6 @@ var RunCmd = &cobra.Command{
7278

7379
logger.Info().Str("headerNamespace", headerNamespace.HexString()).Str("dataNamespace", dataNamespace.HexString()).Msg("namespaces")
7480

75-
datastore, err := store.NewDefaultKVStore(nodeConfig.RootDir, nodeConfig.DBPath, evmDbName)
76-
if err != nil {
77-
return err
78-
}
79-
8081
genesisPath := filepath.Join(filepath.Dir(nodeConfig.ConfigPath()), "genesis.json")
8182
genesis, err := genesispkg.LoadGenesis(genesisPath)
8283
if err != nil {
@@ -88,7 +89,7 @@ var RunCmd = &cobra.Command{
8889
}
8990

9091
// Create sequencer based on configuration
91-
sequencer, err := createSequencer(context.Background(), logger, datastore, nodeConfig, genesis, daClient)
92+
sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient)
9293
if err != nil {
9394
return err
9495
}
@@ -154,22 +155,20 @@ func init() {
154155
// If BasedSequencer is enabled, it creates a based sequencer that fetches transactions from DA.
155156
// Otherwise, it creates a single (traditional) sequencer.
156157
func createSequencer(
157-
ctx context.Context,
158158
logger zerolog.Logger,
159159
datastore datastore.Batching,
160160
nodeConfig config.Config,
161161
genesis genesis.Genesis,
162162
daClient block.FullDAClient,
163163
) (coresequencer.Sequencer, error) {
164-
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)
165-
166164
if nodeConfig.Node.BasedSequencer {
167165
// Based sequencer mode - fetch transactions only from DA
168166
if !nodeConfig.Node.Aggregator {
169167
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
170168
}
171169

172-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
170+
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
171+
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
173172
if err != nil {
174173
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
175174
}
@@ -183,15 +182,12 @@ func createSequencer(
183182
}
184183

185184
sequencer, err := single.NewSequencer(
186-
ctx,
187185
logger,
188186
datastore,
189187
daClient,
190188
[]byte(genesis.ChainID),
191189
nodeConfig.Node.BlockTime.Duration,
192-
nodeConfig.Node.Aggregator,
193190
1000,
194-
fiRetriever,
195191
genesis,
196192
)
197193
if err != nil {
@@ -205,7 +201,7 @@ func createSequencer(
205201
return sequencer, nil
206202
}
207203

208-
func createExecutionClient(cmd *cobra.Command) (execution.Executor, error) {
204+
func createExecutionClient(cmd *cobra.Command, db datastore.Batching) (execution.Executor, error) {
209205
// Read execution client parameters from flags
210206
ethURL, err := cmd.Flags().GetString(evm.FlagEvmEthURL)
211207
if err != nil {
@@ -250,7 +246,7 @@ func createExecutionClient(cmd *cobra.Command) (execution.Executor, error) {
250246
genesisHash := common.HexToHash(genesisHashStr)
251247
feeRecipient := common.HexToAddress(feeRecipientStr)
252248

253-
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient)
249+
return evm.NewEngineExecutionClient(ethURL, engineURL, jwtSecret, genesisHash, feeRecipient, db)
254250
}
255251

256252
// addFlags adds flags related to the EVM execution client

apps/grpc/cmd/run.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ func createSequencer(
120120
}
121121

122122
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
123-
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)
124123

125124
if nodeConfig.Node.BasedSequencer {
126125
// Based sequencer mode - fetch transactions only from DA
127126
if !nodeConfig.Node.Aggregator {
128127
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
129128
}
130129

131-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
130+
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
131+
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
132132
if err != nil {
133133
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
134134
}
@@ -142,15 +142,12 @@ func createSequencer(
142142
}
143143

144144
sequencer, err := single.NewSequencer(
145-
ctx,
146145
logger,
147146
datastore,
148147
daClient,
149148
[]byte(genesis.ChainID),
150149
nodeConfig.Node.BlockTime.Duration,
151-
nodeConfig.Node.Aggregator,
152150
1000,
153-
fiRetriever,
154151
genesis,
155152
)
156153
if err != nil {

apps/testapp/cmd/rollback.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,11 @@ import (
66
"fmt"
77

88
kvexecutor "github.com/evstack/ev-node/apps/testapp/kv"
9-
"github.com/evstack/ev-node/node"
109
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1110
"github.com/evstack/ev-node/pkg/store"
1211
"github.com/evstack/ev-node/types"
1312

1413
goheaderstore "github.com/celestiaorg/go-header/store"
15-
ds "github.com/ipfs/go-datastore"
16-
kt "github.com/ipfs/go-datastore/keytransform"
1714
"github.com/spf13/cobra"
1815
)
1916

@@ -51,10 +48,7 @@ func NewRollbackCmd() *cobra.Command {
5148
}()
5249

5350
// prefixed evolve db
54-
evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{
55-
Prefix: ds.NewKey(node.EvPrefix),
56-
})
57-
51+
evolveDB := store.NewEvNodeKVStore(rawEvolveDB)
5852
evolveStore := store.New(evolveDB)
5953
if height == 0 {
6054
currentHeight, err := evolveStore.Height(goCtx)

apps/testapp/cmd/run.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,15 @@ func createSequencer(
121121
}
122122

123123
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
124-
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)
125124

126125
if nodeConfig.Node.BasedSequencer {
127126
// Based sequencer mode - fetch transactions only from DA
128127
if !nodeConfig.Node.Aggregator {
129128
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
130129
}
131130

132-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
131+
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
132+
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
133133
if err != nil {
134134
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
135135
}
@@ -143,15 +143,12 @@ func createSequencer(
143143
}
144144

145145
sequencer, err := single.NewSequencer(
146-
ctx,
147146
logger,
148147
datastore,
149148
daClient,
150149
[]byte(genesis.ChainID),
151150
nodeConfig.Node.BlockTime.Duration,
152-
nodeConfig.Node.Aggregator,
153151
1000,
154-
fiRetriever,
155152
genesis,
156153
)
157154
if err != nil {

block/components.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func NewSyncComponents(
161161
config,
162162
genesis,
163163
daSubmitter,
164+
nil, // No sequencer for sync nodes
164165
nil, // No signer for sync nodes
165166
logger,
166167
errorCh,
@@ -250,6 +251,7 @@ func NewAggregatorComponents(
250251
config,
251252
genesis,
252253
daSubmitter,
254+
sequencer,
253255
signer, // Signer for aggregator nodes to submit to DA
254256
logger,
255257
errorCh,

block/internal/cache/pending_headers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// - DA submission of multiple headers is atomic - it's impossible to submit only part of a batch
1818
//
1919
// lastSubmittedHeaderHeight is updated only after receiving confirmation from DA.
20-
// Worst case scenario is when headers was successfully submitted to DA, but confirmation was not received (e.g. node was
20+
// Worst case scenario is when headers were successfully submitted to DA, but confirmation was not received (e.g. node was
2121
// restarted, networking issue occurred). In this case headers are re-submitted to DA (it's extra cost).
2222
// evolve is able to skip duplicate headers so this shouldn't affect full nodes.
2323
// TODO(tzdybal): we shouldn't try to push all pending headers at once; this should depend on max blob size

block/internal/da/client.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Config struct {
3030
// It is unexported; callers should use the exported Client interface.
3131
type client struct {
3232
blobAPI *blobrpc.BlobAPI
33+
headerAPI *blobrpc.HeaderAPI
3334
logger zerolog.Logger
3435
defaultTimeout time.Duration
3536
namespaceBz []byte
@@ -58,6 +59,7 @@ func NewClient(cfg Config) FullClient {
5859

5960
return &client{
6061
blobAPI: &cfg.DA.Blob,
62+
headerAPI: &cfg.DA.Header,
6163
logger: cfg.Logger.With().Str("component", "da_client").Logger(),
6264
defaultTimeout: cfg.DefaultTimeout,
6365
namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(),
@@ -180,8 +182,22 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace
180182
}
181183
}
182184

185+
// getBlockTimestamp fetches the block timestamp from the DA layer header.
186+
func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Time, error) {
187+
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
188+
defer cancel()
189+
190+
header, err := c.headerAPI.GetByHeight(headerCtx, height)
191+
if err != nil {
192+
return time.Time{}, fmt.Errorf("failed to get header timestamp for block %d: %w", height, err)
193+
}
194+
195+
return header.Time(), nil
196+
}
197+
183198
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
184199
// It uses GetAll to fetch all blobs at once.
200+
// The timestamp is derived from the DA block header to ensure determinism.
185201
func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
186202
ns, err := share.NewNamespaceFromBytes(namespace)
187203
if err != nil {
@@ -195,21 +211,29 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
195211
}
196212
}
197213

198-
getCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
199-
defer cancel()
214+
blobCtx, blobCancel := context.WithTimeout(ctx, c.defaultTimeout)
215+
defer blobCancel()
200216

201-
blobs, err := c.blobAPI.GetAll(getCtx, height, []share.Namespace{ns})
217+
blobs, err := c.blobAPI.GetAll(blobCtx, height, []share.Namespace{ns})
202218
if err != nil {
203219
// Handle known errors by substring because RPC may wrap them.
204220
switch {
205221
case strings.Contains(err.Error(), datypes.ErrBlobNotFound.Error()):
206222
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
223+
// Fetch block timestamp for deterministic responses using parent context
224+
blockTime, err := c.getBlockTimestamp(ctx, height)
225+
if err != nil {
226+
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
227+
blockTime = time.Now()
228+
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
229+
}
230+
207231
return datypes.ResultRetrieve{
208232
BaseResult: datypes.BaseResult{
209233
Code: datypes.StatusNotFound,
210234
Message: datypes.ErrBlobNotFound.Error(),
211235
Height: height,
212-
Timestamp: time.Now(),
236+
Timestamp: blockTime,
213237
},
214238
}
215239
case strings.Contains(err.Error(), datypes.ErrHeightFromFuture.Error()):
@@ -234,14 +258,22 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
234258
}
235259
}
236260

261+
// Fetch block timestamp for deterministic responses using parent context
262+
blockTime, err := c.getBlockTimestamp(ctx, height)
263+
if err != nil {
264+
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
265+
blockTime = time.Now()
266+
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
267+
}
268+
237269
if len(blobs) == 0 {
238270
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
239271
return datypes.ResultRetrieve{
240272
BaseResult: datypes.BaseResult{
241273
Code: datypes.StatusNotFound,
242274
Message: datypes.ErrBlobNotFound.Error(),
243275
Height: height,
244-
Timestamp: time.Now(),
276+
Timestamp: blockTime,
245277
},
246278
}
247279
}
@@ -261,7 +293,7 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
261293
Code: datypes.StatusSuccess,
262294
Height: height,
263295
IDs: ids,
264-
Timestamp: time.Now(),
296+
Timestamp: blockTime,
265297
},
266298
Data: data,
267299
}

0 commit comments

Comments
 (0)