Skip to content

Commit fcd0fca

Browse files
committed
Merge branch 'main' into alex/deterministic_queue
2 parents 06db707 + 742b8bb commit fcd0fca

38 files changed

+978
-708
lines changed

.mockery.yaml

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,17 +63,20 @@ packages:
6363
dir: ./test/mocks
6464
pkgname: mocks
6565
filename: da.go
66-
github.com/evstack/ev-node/pkg/da/types:
67-
interfaces:
6866
Verifier:
6967
config:
7068
dir: ./test/mocks
7169
pkgname: mocks
72-
filename: da_verifier.go
70+
filename: da.go
7371
github.com/evstack/ev-node/pkg/da/jsonrpc:
7472
interfaces:
7573
BlobModule:
7674
config:
7775
dir: ./pkg/da/jsonrpc/mocks
7876
pkgname: mocks
7977
filename: blob_module_mock.go
78+
HeaderModule:
79+
config:
80+
dir: ./pkg/da/jsonrpc/mocks
81+
pkgname: mocks
82+
filename: header_module_mock.go

apps/evm/cmd/rollback.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,9 @@ import (
55
"errors"
66
"fmt"
77

8-
ds "github.com/ipfs/go-datastore"
9-
kt "github.com/ipfs/go-datastore/keytransform"
108
"github.com/spf13/cobra"
119

1210
goheaderstore "github.com/celestiaorg/go-header/store"
13-
"github.com/evstack/ev-node/node"
1411
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1512
"github.com/evstack/ev-node/pkg/store"
1613
"github.com/evstack/ev-node/types"
@@ -50,10 +47,7 @@ func NewRollbackCmd() *cobra.Command {
5047
}()
5148

5249
// prefixed evolve db
53-
evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{
54-
Prefix: ds.NewKey(node.EvPrefix),
55-
})
56-
50+
evolveDB := store.NewEvNodeKVStore(rawEvolveDB)
5751
evolveStore := store.New(evolveDB)
5852
if height == 0 {
5953
currentHeight, err := evolveStore.Height(goCtx)

apps/evm/cmd/run.go

Lines changed: 3 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -88,7 +88,7 @@ var RunCmd = &cobra.Command{
8888
}
8989

9090
// Create sequencer based on configuration
91-
sequencer, err := createSequencer(context.Background(), logger, datastore, nodeConfig, genesis, daClient)
91+
sequencer, err := createSequencer(logger, datastore, nodeConfig, genesis, daClient)
9292
if err != nil {
9393
return err
9494
}
@@ -154,22 +154,20 @@ func init() {
154154
// If BasedSequencer is enabled, it creates a based sequencer that fetches transactions from DA.
155155
// Otherwise, it creates a single (traditional) sequencer.
156156
func createSequencer(
157-
ctx context.Context,
158157
logger zerolog.Logger,
159158
datastore datastore.Batching,
160159
nodeConfig config.Config,
161160
genesis genesis.Genesis,
162161
daClient block.FullDAClient,
163162
) (coresequencer.Sequencer, error) {
164-
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)
165-
166163
if nodeConfig.Node.BasedSequencer {
167164
// Based sequencer mode - fetch transactions only from DA
168165
if !nodeConfig.Node.Aggregator {
169166
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
170167
}
171168

172-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
169+
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
170+
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
173171
if err != nil {
174172
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
175173
}
@@ -183,15 +181,12 @@ func createSequencer(
183181
}
184182

185183
sequencer, err := single.NewSequencer(
186-
ctx,
187184
logger,
188185
datastore,
189186
daClient,
190187
[]byte(genesis.ChainID),
191188
nodeConfig.Node.BlockTime.Duration,
192-
nodeConfig.Node.Aggregator,
193189
1000,
194-
fiRetriever,
195190
genesis,
196191
)
197192
if err != nil {

apps/grpc/cmd/run.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -120,15 +120,15 @@ func createSequencer(
120120
}
121121

122122
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
123-
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)
124123

125124
if nodeConfig.Node.BasedSequencer {
126125
// Based sequencer mode - fetch transactions only from DA
127126
if !nodeConfig.Node.Aggregator {
128127
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
129128
}
130129

131-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
130+
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
131+
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
132132
if err != nil {
133133
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
134134
}
@@ -142,15 +142,12 @@ func createSequencer(
142142
}
143143

144144
sequencer, err := single.NewSequencer(
145-
ctx,
146145
logger,
147146
datastore,
148147
daClient,
149148
[]byte(genesis.ChainID),
150149
nodeConfig.Node.BlockTime.Duration,
151-
nodeConfig.Node.Aggregator,
152150
1000,
153-
fiRetriever,
154151
genesis,
155152
)
156153
if err != nil {

apps/testapp/cmd/rollback.go

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,11 @@ import (
66
"fmt"
77

88
kvexecutor "github.com/evstack/ev-node/apps/testapp/kv"
9-
"github.com/evstack/ev-node/node"
109
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1110
"github.com/evstack/ev-node/pkg/store"
1211
"github.com/evstack/ev-node/types"
1312

1413
goheaderstore "github.com/celestiaorg/go-header/store"
15-
ds "github.com/ipfs/go-datastore"
16-
kt "github.com/ipfs/go-datastore/keytransform"
1714
"github.com/spf13/cobra"
1815
)
1916

@@ -51,10 +48,7 @@ func NewRollbackCmd() *cobra.Command {
5148
}()
5249

5350
// prefixed evolve db
54-
evolveDB := kt.Wrap(rawEvolveDB, &kt.PrefixTransform{
55-
Prefix: ds.NewKey(node.EvPrefix),
56-
})
57-
51+
evolveDB := store.NewEvNodeKVStore(rawEvolveDB)
5852
evolveStore := store.New(evolveDB)
5953
if height == 0 {
6054
currentHeight, err := evolveStore.Height(goCtx)

apps/testapp/cmd/run.go

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,15 @@ func createSequencer(
121121
}
122122

123123
daClient := block.NewDAClient(blobClient, nodeConfig, logger)
124-
fiRetriever := block.NewForcedInclusionRetriever(daClient, genesis, logger)
125124

126125
if nodeConfig.Node.BasedSequencer {
127126
// Based sequencer mode - fetch transactions only from DA
128127
if !nodeConfig.Node.Aggregator {
129128
return nil, fmt.Errorf("based sequencer mode requires aggregator mode to be enabled")
130129
}
131130

132-
basedSeq, err := based.NewBasedSequencer(ctx, fiRetriever, datastore, genesis, logger)
131+
fiRetriever := block.NewForcedInclusionRetriever(daClient, logger, genesis.DAStartHeight, genesis.DAEpochForcedInclusion)
132+
basedSeq, err := based.NewBasedSequencer(fiRetriever, datastore, genesis, logger)
133133
if err != nil {
134134
return nil, fmt.Errorf("failed to create based sequencer: %w", err)
135135
}
@@ -143,15 +143,12 @@ func createSequencer(
143143
}
144144

145145
sequencer, err := single.NewSequencer(
146-
ctx,
147146
logger,
148147
datastore,
149148
daClient,
150149
[]byte(genesis.ChainID),
151150
nodeConfig.Node.BlockTime.Duration,
152-
nodeConfig.Node.Aggregator,
153151
1000,
154-
fiRetriever,
155152
genesis,
156153
)
157154
if err != nil {

block/components.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,7 @@ func NewSyncComponents(
161161
config,
162162
genesis,
163163
daSubmitter,
164+
nil, // No sequencer for sync nodes
164165
nil, // No signer for sync nodes
165166
logger,
166167
errorCh,
@@ -250,6 +251,7 @@ func NewAggregatorComponents(
250251
config,
251252
genesis,
252253
daSubmitter,
254+
sequencer,
253255
signer, // Signer for aggregator nodes to submit to DA
254256
logger,
255257
errorCh,

block/internal/cache/pending_headers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
// - DA submission of multiple headers is atomic - it's impossible to submit only part of a batch
1818
//
1919
// lastSubmittedHeaderHeight is updated only after receiving confirmation from DA.
20-
// Worst case scenario is when headers was successfully submitted to DA, but confirmation was not received (e.g. node was
20+
// Worst case scenario is when headers were successfully submitted to DA, but confirmation was not received (e.g. node was
2121
// restarted, networking issue occurred). In this case headers are re-submitted to DA (it's extra cost).
2222
// evolve is able to skip duplicate headers so this shouldn't affect full nodes.
2323
// TODO(tzdybal): we shouldn't try to push all pending headers at once; this should depend on max blob size

block/internal/da/client.go

Lines changed: 38 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ type Config struct {
3030
// It is unexported; callers should use the exported Client interface.
3131
type client struct {
3232
blobAPI *blobrpc.BlobAPI
33+
headerAPI *blobrpc.HeaderAPI
3334
logger zerolog.Logger
3435
defaultTimeout time.Duration
3536
namespaceBz []byte
@@ -58,6 +59,7 @@ func NewClient(cfg Config) FullClient {
5859

5960
return &client{
6061
blobAPI: &cfg.DA.Blob,
62+
headerAPI: &cfg.DA.Header,
6163
logger: cfg.Logger.With().Str("component", "da_client").Logger(),
6264
defaultTimeout: cfg.DefaultTimeout,
6365
namespaceBz: datypes.NamespaceFromString(cfg.Namespace).Bytes(),
@@ -180,8 +182,22 @@ func (c *client) Submit(ctx context.Context, data [][]byte, _ float64, namespace
180182
}
181183
}
182184

185+
// getBlockTimestamp fetches the block timestamp from the DA layer header.
186+
func (c *client) getBlockTimestamp(ctx context.Context, height uint64) (time.Time, error) {
187+
headerCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
188+
defer cancel()
189+
190+
header, err := c.headerAPI.GetByHeight(headerCtx, height)
191+
if err != nil {
192+
return time.Time{}, fmt.Errorf("failed to get header timestamp for block %d: %w", height, err)
193+
}
194+
195+
return header.Time(), nil
196+
}
197+
183198
// Retrieve retrieves blobs from the DA layer at the specified height and namespace.
184199
// It uses GetAll to fetch all blobs at once.
200+
// The timestamp is derived from the DA block header to ensure determinism.
185201
func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) datypes.ResultRetrieve {
186202
ns, err := share.NewNamespaceFromBytes(namespace)
187203
if err != nil {
@@ -195,21 +211,29 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
195211
}
196212
}
197213

198-
getCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout)
199-
defer cancel()
214+
blobCtx, blobCancel := context.WithTimeout(ctx, c.defaultTimeout)
215+
defer blobCancel()
200216

201-
blobs, err := c.blobAPI.GetAll(getCtx, height, []share.Namespace{ns})
217+
blobs, err := c.blobAPI.GetAll(blobCtx, height, []share.Namespace{ns})
202218
if err != nil {
203219
// Handle known errors by substring because RPC may wrap them.
204220
switch {
205221
case strings.Contains(err.Error(), datypes.ErrBlobNotFound.Error()):
206222
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
223+
// Fetch block timestamp for deterministic responses using parent context
224+
blockTime, err := c.getBlockTimestamp(ctx, height)
225+
if err != nil {
226+
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
227+
blockTime = time.Now()
228+
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
229+
}
230+
207231
return datypes.ResultRetrieve{
208232
BaseResult: datypes.BaseResult{
209233
Code: datypes.StatusNotFound,
210234
Message: datypes.ErrBlobNotFound.Error(),
211235
Height: height,
212-
Timestamp: time.Now(),
236+
Timestamp: blockTime,
213237
},
214238
}
215239
case strings.Contains(err.Error(), datypes.ErrHeightFromFuture.Error()):
@@ -234,14 +258,22 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
234258
}
235259
}
236260

261+
// Fetch block timestamp for deterministic responses using parent context
262+
blockTime, err := c.getBlockTimestamp(ctx, height)
263+
if err != nil {
264+
c.logger.Error().Uint64("height", height).Err(err).Msg("failed to get block timestamp")
265+
blockTime = time.Now()
266+
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
267+
}
268+
237269
if len(blobs) == 0 {
238270
c.logger.Debug().Uint64("height", height).Msg("No blobs found at height")
239271
return datypes.ResultRetrieve{
240272
BaseResult: datypes.BaseResult{
241273
Code: datypes.StatusNotFound,
242274
Message: datypes.ErrBlobNotFound.Error(),
243275
Height: height,
244-
Timestamp: time.Now(),
276+
Timestamp: blockTime,
245277
},
246278
}
247279
}
@@ -261,7 +293,7 @@ func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte)
261293
Code: datypes.StatusSuccess,
262294
Height: height,
263295
IDs: ids,
264-
Timestamp: time.Now(),
296+
Timestamp: blockTime,
265297
},
266298
Data: data,
267299
}

0 commit comments

Comments
 (0)