Skip to content

Commit e7cd0af

Browse files
committed
proper implementation
1 parent 7ed1ee3 commit e7cd0af

File tree

5 files changed

+413
-252
lines changed

5 files changed

+413
-252
lines changed

docs/guides/migrating-to-ev-abci.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,9 +41,9 @@ import (
4141
)
4242
```
4343

44-
1. Add the migration manager keeper to your app struct
45-
2. Register the module in your module manager
46-
3. Configure the migration manager in your app initialization
44+
2. Add the migration manager keeper to your app struct
45+
3. Register the module in your module manager
46+
4. Configure the migration manager in your app initialization
4747

4848
### Step 2: Replace Staking Module with Wrapper
4949

sequencers/based/sequencer.go

Lines changed: 23 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ type BasedSequencer struct {
3737

3838
// Cached transactions from the current DA block being processed
3939
currentBatchTxs [][]byte
40+
// DA epoch end time for timestamp calculation
41+
currentDAEndTime time.Time
4042
}
4143

4244
// NewBasedSequencer creates a new based sequencer instance
@@ -71,10 +73,14 @@ func NewBasedSequencer(
7173
}
7274
} else {
7375
bs.checkpoint = checkpoint
74-
bs.logger.Info().
75-
Uint64("da_height", checkpoint.DAHeight).
76-
Uint64("tx_index", checkpoint.TxIndex).
77-
Msg("loaded based sequencer checkpoint from DB")
76+
// If we had a non-zero tx index, we're resuming from a crash mid-block
77+
// The transactions starting from that index are what we need
78+
if checkpoint.TxIndex > 0 {
79+
bs.logger.Debug().
80+
Uint64("tx_index", checkpoint.TxIndex).
81+
Uint64("da_height", checkpoint.DAHeight).
82+
Msg("resuming from checkpoint within DA epoch")
83+
}
7884
}
7985

8086
return bs, nil
@@ -93,7 +99,6 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
9399
// If we have no cached transactions or we've consumed all from the current DA block,
94100
// fetch the next DA epoch
95101
daHeight := s.GetDAHeight()
96-
t := time.Time{}
97102

98103
if len(s.currentBatchTxs) == 0 || s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
99104
daEndTime, daEndHeight, err := s.fetchNextDAEpoch(ctx, req.MaxBytes)
@@ -102,16 +107,15 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
102107
}
103108

104109
daHeight = daEndHeight
105-
t = daEndTime
110+
s.currentDAEndTime = daEndTime
106111
}
107112

108113
// Create batch from current position up to MaxBytes
109114
batch := s.createBatchFromCheckpoint(req.MaxBytes)
110115

111116
// Update checkpoint with how many transactions we consumed
112-
txCount := uint64(len(batch.Transactions))
113-
if txCount > 0 {
114-
s.checkpoint.TxIndex += txCount
117+
if daHeight > 0 || len(batch.Transactions) > 0 {
118+
s.checkpoint.TxIndex += uint64(len(batch.Transactions))
115119

116120
// If we've consumed all transactions from this DA epoch, move to next
117121
if s.checkpoint.TxIndex >= uint64(len(s.currentBatchTxs)) {
@@ -125,14 +129,19 @@ func (s *BasedSequencer) GetNextBatch(ctx context.Context, req coresequencer.Get
125129

126130
// Persist checkpoint
127131
if err := s.checkpointStore.Save(ctx, s.checkpoint); err != nil {
128-
s.logger.Error().Err(err).Msg("failed to save checkpoint")
129132
return nil, fmt.Errorf("failed to save checkpoint: %w", err)
130133
}
131134
}
132135

136+
// Calculate timestamp based on remaining transactions after this batch
137+
// timestamp correspond to the last block time of a DA epoch, based on the remaining transactions to be executed
138+
// this is done in order to handle the case where a DA epoch must fit in multiple blocks
139+
remainingTxs := uint64(len(s.currentBatchTxs)) - s.checkpoint.TxIndex
140+
timestamp := s.currentDAEndTime.Add(-time.Duration(remainingTxs) * time.Millisecond)
141+
133142
return &coresequencer.GetNextBatchResponse{
134143
Batch: batch,
135-
Timestamp: t,
144+
Timestamp: timestamp,
136145
BatchData: req.LastBatchData,
137146
}, nil
138147
}
@@ -150,17 +159,16 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
150159
if err != nil {
151160
// Check if forced inclusion is not configured
152161
if errors.Is(err, block.ErrForceInclusionNotConfigured) {
153-
return time.Time{}, currentDAHeight, block.ErrForceInclusionNotConfigured
162+
return time.Time{}, 0, block.ErrForceInclusionNotConfigured
154163
} else if errors.Is(err, coreda.ErrHeightFromFuture) {
155164
// If we get a height from future error, stay at current position
156165
// We'll retry the same height on the next call until DA produces that block
157166
s.logger.Debug().
158167
Uint64("da_height", currentDAHeight).
159168
Msg("DA height from future, waiting for DA to produce block")
160-
return time.Time{}, currentDAHeight, nil
169+
return time.Time{}, 0, nil
161170
}
162-
s.logger.Error().Err(err).Uint64("da_height", currentDAHeight).Msg("failed to retrieve forced inclusion transactions")
163-
return time.Time{}, currentDAHeight, err
171+
return time.Time{}, 0, fmt.Errorf("failed to retrieve forced inclusion transactions: %w", err)
164172
}
165173

166174
// Validate and filter transactions
@@ -190,14 +198,6 @@ func (s *BasedSequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64)
190198
// Cache the transactions for this DA epoch
191199
s.currentBatchTxs = validTxs
192200

193-
// If we had a non-zero tx index, we're resuming from a crash mid-block
194-
// The transactions starting from that index are what we need
195-
if s.checkpoint.TxIndex > 0 {
196-
s.logger.Info().
197-
Uint64("tx_index", s.checkpoint.TxIndex).
198-
Msg("resuming from checkpoint within DA epoch")
199-
}
200-
201201
return forcedTxsEvent.Timestamp.UTC(), forcedTxsEvent.EndDaHeight, nil
202202
}
203203

0 commit comments

Comments
 (0)