Skip to content

Commit a30ea68

Browse files
committed
updates
1 parent 98e546c commit a30ea68

File tree

2 files changed

+29
-25
lines changed

2 files changed

+29
-25
lines changed

pkg/sequencers/single/sequencer.go

Lines changed: 20 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,8 @@ type Sequencer struct {
6363
// It is set when we detect (via GetLatestDAHeight) that the DA layer is more
6464
// than one epoch ahead of our checkpoint, and cleared when we hit
6565
// ErrHeightFromFuture (meaning we've reached the DA head).
66-
catchingUp bool
66+
67+
catchingUp atomic.Bool
6768
// currentDAEndTime is the DA epoch end timestamp from the last fetched epoch.
6869
// Used as the block timestamp during catch-up to match based sequencing behavior.
6970
currentDAEndTime time.Time
@@ -235,7 +236,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
235236
// During catch-up, the sequencer must produce blocks identical to what base
236237
// sequencing would produce (forced inclusion txs only, no mempool).
237238
var mempoolBatch *coresequencer.Batch
238-
if !c.catchingUp {
239+
if !c.catchingUp.Load() {
239240
var err error
240241
mempoolBatch, err = c.queue.Next(ctx)
241242
if err != nil {
@@ -353,7 +354,7 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
353354
Uint64("consumed_count", forcedTxConsumedCount).
354355
Uint64("checkpoint_tx_index", c.checkpoint.TxIndex).
355356
Uint64("checkpoint_da_height", c.checkpoint.DAHeight).
356-
Bool("catching_up", c.catchingUp).
357+
Bool("catching_up", c.catchingUp.Load()).
357358
Msg("updated checkpoint after processing forced inclusion transactions")
358359
}
359360

@@ -365,12 +366,15 @@ func (c *Sequencer) GetNextBatch(ctx context.Context, req coresequencer.GetNextB
365366
// During catch-up, use the DA epoch end timestamp to match based sequencing behavior.
366367
// Replicates based sequencing nodes' behavior of timestamping blocks during catchingUp.
367368
timestamp := time.Now()
368-
if c.catchingUp && !c.currentDAEndTime.IsZero() {
369-
var remainingForcedTxs uint64
370-
if len(c.cachedForcedInclusionTxs) > 0 {
371-
remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex
369+
if c.catchingUp.Load() {
370+
daEndTime := c.currentDAEndTime
371+
if !daEndTime.IsZero() {
372+
var remainingForcedTxs uint64
373+
if len(c.cachedForcedInclusionTxs) > 0 {
374+
remainingForcedTxs = uint64(len(c.cachedForcedInclusionTxs)) - c.checkpoint.TxIndex
375+
}
376+
timestamp = daEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond)
372377
}
373-
timestamp = c.currentDAEndTime.Add(-time.Duration(remainingForcedTxs) * time.Millisecond)
374378
}
375379

376380
return &coresequencer.GetNextBatchResponse{
@@ -426,7 +430,7 @@ func (c *Sequencer) GetDAHeight() uint64 {
426430
// with only forced inclusion transactions (no mempool), matching the blocks
427431
// that base sequencing nodes would have produced during sequencer downtime.
428432
func (c *Sequencer) IsCatchingUp() bool {
429-
return c.catchingUp
433+
return c.catchingUp.Load()
430434
}
431435

432436
// fetchNextDAEpoch fetches transactions from the next DA epoch using checkpoint.
@@ -447,7 +451,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint
447451
c.logger.Debug().
448452
Uint64("da_height", currentDAHeight).
449453
Uint64("tx_index", c.checkpoint.TxIndex).
450-
Bool("catching_up", c.catchingUp).
454+
Bool("catching_up", c.catchingUp.Load()).
451455
Msg("fetching forced inclusion transactions from DA")
452456

453457
forcedTxsEvent, err := c.fiRetriever.RetrieveForcedIncludedTxs(ctx, currentDAHeight)
@@ -458,18 +462,18 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint
458462
Msg("DA height from future, waiting for DA to produce block")
459463

460464
// We've reached the DA head — exit catch-up mode
461-
if c.catchingUp {
465+
if c.catchingUp.Load() {
462466
c.logger.Info().
463467
Uint64("da_height", currentDAHeight).
464468
Msg("catch-up complete: reached DA head, resuming normal sequencing")
465-
c.catchingUp = false
469+
c.catchingUp.Store(false)
466470
}
467471

468472
return 0, nil
469473
} else if errors.Is(err, block.ErrForceInclusionNotConfigured) {
470474
// Forced inclusion not configured, continue without forced txs
471475
c.cachedForcedInclusionTxs = [][]byte{}
472-
c.catchingUp = false
476+
c.catchingUp.Store(false)
473477
return 0, nil
474478
}
475479

@@ -502,7 +506,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint
502506
Int("skipped_tx_count", skippedTxs).
503507
Uint64("da_height_start", forcedTxsEvent.StartDaHeight).
504508
Uint64("da_height_end", forcedTxsEvent.EndDaHeight).
505-
Bool("catching_up", c.catchingUp).
509+
Bool("catching_up", c.catchingUp.Load()).
506510
Msg("fetched forced inclusion transactions from DA")
507511

508512
// Cache the transactions
@@ -528,7 +532,7 @@ func (c *Sequencer) fetchNextDAEpoch(ctx context.Context, maxBytes uint64) (uint
528532
// overhead.
529533
func (c *Sequencer) updateCatchUpState(ctx context.Context) {
530534
// Already catching up — nothing to do. We'll exit via ErrHeightFromFuture.
531-
if c.catchingUp {
535+
if c.catchingUp.Load() {
532536
return
533537
}
534538

@@ -574,7 +578,7 @@ func (c *Sequencer) updateCatchUpState(ctx context.Context) {
574578
}
575579

576580
// The DA layer is more than one epoch ahead. Enter catch-up mode.
577-
c.catchingUp = true
581+
c.catchingUp.Store(true)
578582
c.logger.Warn().
579583
Uint64("checkpoint_da_height", currentDAHeight).
580584
Uint64("latest_da_height", latestDAHeight).

pkg/sequencers/single/sequencer_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -365,7 +365,7 @@ func TestSequencer_GetNextBatch_BeforeDASubmission(t *testing.T) {
365365

366366
func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) {
367367
ctx := context.Background()
368-
logger := zerolog.New(zerolog.NewConsoleWriter())
368+
logger := zerolog.New(zerolog.NewTestWriter(t))
369369

370370
// Create in-memory datastore
371371
db := ds.NewMapDatastore()
@@ -458,7 +458,7 @@ func TestSequencer_GetNextBatch_ForcedInclusionAndBatch_MaxBytes(t *testing.T) {
458458

459459
func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) {
460460
ctx := context.Background()
461-
logger := zerolog.New(zerolog.NewConsoleWriter())
461+
logger := zerolog.New(zerolog.NewTestWriter(t))
462462

463463
db := ds.NewMapDatastore()
464464
defer db.Close()
@@ -541,7 +541,7 @@ func TestSequencer_GetNextBatch_ForcedInclusion_ExceedsMaxBytes(t *testing.T) {
541541

542542
func TestSequencer_GetNextBatch_AlwaysCheckPendingForcedInclusion(t *testing.T) {
543543
ctx := context.Background()
544-
logger := zerolog.New(zerolog.NewConsoleWriter())
544+
logger := zerolog.New(zerolog.NewTestWriter(t))
545545

546546
db := ds.NewMapDatastore()
547547
defer db.Close()
@@ -882,7 +882,7 @@ func TestSequencer_DAFailureAndQueueThrottling_Integration(t *testing.T) {
882882

883883
func TestSequencer_CheckpointPersistence_CrashRecovery(t *testing.T) {
884884
ctx := context.Background()
885-
logger := zerolog.New(zerolog.NewConsoleWriter())
885+
logger := zerolog.New(zerolog.NewTestWriter(t))
886886

887887
db := ds.NewMapDatastore()
888888
defer db.Close()
@@ -1242,7 +1242,7 @@ func TestSequencer_GetNextBatch_GasFilterError(t *testing.T) {
12421242
// This test uses maxBytes to limit how many txs are fetched, triggering the unprocessed txs scenario.
12431243
func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) {
12441244
ctx := context.Background()
1245-
logger := zerolog.New(zerolog.NewConsoleWriter())
1245+
logger := zerolog.New(zerolog.NewTestWriter(t))
12461246

12471247
db := ds.NewMapDatastore()
12481248
defer db.Close()
@@ -1312,7 +1312,7 @@ func TestSequencer_CatchUp_DetectsOldEpoch(t *testing.T) {
13121312

13131313
func TestSequencer_CatchUp_SkipsMempoolDuringCatchUp(t *testing.T) {
13141314
ctx := context.Background()
1315-
logger := zerolog.New(zerolog.NewConsoleWriter())
1315+
logger := zerolog.New(zerolog.NewTestWriter(t))
13161316

13171317
db := ds.NewMapDatastore()
13181318
defer db.Close()
@@ -1463,7 +1463,7 @@ func TestSequencer_CatchUp_UsesDATimestamp(t *testing.T) {
14631463

14641464
func TestSequencer_CatchUp_ExitsCatchUpAtDAHead(t *testing.T) {
14651465
ctx := context.Background()
1466-
logger := zerolog.New(zerolog.NewConsoleWriter())
1466+
logger := zerolog.New(zerolog.NewTestWriter(t))
14671467

14681468
db := ds.NewMapDatastore()
14691469
defer db.Close()
@@ -1676,7 +1676,7 @@ func TestSequencer_CatchUp_MultiEpochReplay(t *testing.T) {
16761676
// Simulates a sequencer that missed 3 DA epochs and must replay them all
16771677
// before resuming normal operation.
16781678
ctx := context.Background()
1679-
logger := zerolog.New(zerolog.NewConsoleWriter())
1679+
logger := zerolog.New(zerolog.NewTestWriter(t))
16801680

16811681
db := ds.NewMapDatastore()
16821682
defer db.Close()
@@ -1915,7 +1915,7 @@ func TestSequencer_CatchUp_MonotonicTimestamps(t *testing.T) {
19151915
// resulting blocks. This uses the same jitter scheme as the based
19161916
// sequencer: timestamp = DAEndTime - (remainingForcedTxs * 1ms).
19171917
ctx := context.Background()
1918-
logger := zerolog.New(zerolog.NewConsoleWriter())
1918+
logger := zerolog.New(zerolog.NewTestWriter(t))
19191919

19201920
db := ds.NewMapDatastore()
19211921
defer db.Close()

0 commit comments

Comments
 (0)