Skip to content

Commit 239157f

Browse files
committed
Apply review feedback
1 parent de64d82 commit 239157f

File tree

6 files changed

+103
-63
lines changed

6 files changed

+103
-63
lines changed

apps/evm/server/force_inclusion_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B
5050
return nil, nil
5151
}
5252

53-
func (m *mockDA) Subscribe(_ context.Context, _ []byte, _ bool) (<-chan da.SubscriptionEvent, error) {
53+
func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) {
5454
// Not needed in these tests; return a closed channel.
5555
ch := make(chan da.SubscriptionEvent)
5656
close(ch)

block/internal/da/async_block_retriever.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync/atomic"
78
"time"
89

910
ds "github.com/ipfs/go-datastore"
@@ -44,9 +45,11 @@ type asyncBlockRetriever struct {
4445
cache ds.Batching
4546

4647
// Current DA height tracking (accessed atomically via subscriber).
47-
// Updated externally via UpdateCurrentHeight.
4848
daStartHeight uint64
4949

50+
// Tracks DA height consumed by the sequencer to trigger cache cleanups.
51+
consumedHeight atomic.Uint64
52+
5053
// Prefetch window - how many blocks ahead to speculatively fetch.
5154
prefetchWindow uint64
5255
}
@@ -108,17 +111,17 @@ func (f *asyncBlockRetriever) Stop() {
108111
f.subscriber.Stop()
109112
}
110113

111-
// UpdateCurrentHeight updates the current DA height.
114+
// UpdateCurrentHeight updates the consumed DA height and triggers cache cleanup.
112115
func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) {
113116
for {
114-
current := f.subscriber.LocalDAHeight()
117+
current := f.consumedHeight.Load()
115118
if height <= current {
116119
return
117120
}
118-
if f.subscriber.CompareAndSwapLocalHeight(current, height) {
121+
if f.consumedHeight.CompareAndSwap(current, height) {
119122
f.logger.Debug().
120123
Uint64("new_height", height).
121-
Msg("updated current DA height")
124+
Msg("updated consumed DA height for cleanup")
122125
f.cleanupOldBlocks(context.Background(), height)
123126
return
124127
}
@@ -168,11 +171,10 @@ func (f *asyncBlockRetriever) GetCachedBlock(ctx context.Context, daHeight uint6
168171
return block, nil
169172
}
170173

171-
// HandleEvent caches blobs from the subscription inline.
174+
// HandleEvent caches blobs from the subscription inline, even empty ones,
175+
// to record that the DA height was seen and has 0 blobs.
172176
func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent) {
173-
if len(ev.Blobs) > 0 {
174-
f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
175-
}
177+
f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
176178
}
177179

178180
// HandleCatchup fetches a single height via Retrieve and caches it.
@@ -181,8 +183,7 @@ func (f *asyncBlockRetriever) HandleCatchup(ctx context.Context, height uint64)
181183
f.fetchAndCacheBlock(ctx, height)
182184

183185
// Speculatively prefetch ahead.
184-
highest := f.subscriber.HighestSeenDAHeight()
185-
target := highest + f.prefetchWindow
186+
target := height + f.prefetchWindow
186187
for h := height + 1; h <= target; h++ {
187188
if err := ctx.Err(); err != nil {
188189
return err

block/internal/da/async_block_retriever_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,9 +156,7 @@ func TestAsyncBlockRetriever_HeightFromFuture(t *testing.T) {
156156
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
157157

158158
// Subscription delivers height 100 with no blobs.
159-
subCh := make(chan datypes.SubscriptionEvent, 1)
160-
subCh <- datypes.SubscriptionEvent{Height: 100}
161-
159+
subCh := make(chan datypes.SubscriptionEvent)
162160
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
163161
blockCh := make(chan datypes.SubscriptionEvent)
164162
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
@@ -169,15 +167,17 @@ func TestAsyncBlockRetriever_HeightFromFuture(t *testing.T) {
169167
}).Maybe()
170168

171169
logger := zerolog.Nop()
172-
fetcher := NewAsyncBlockRetriever(client, logger, fiNs, 100*time.Millisecond, 100, 10)
170+
fetcher := NewAsyncBlockRetriever(client, logger, fiNs, time.Millisecond, 100, 10)
173171

174-
ctx, cancel := context.WithCancel(context.Background())
172+
ctx, cancel := context.WithCancel(t.Context())
175173
defer cancel()
176174
fetcher.Start(ctx)
177175
defer fetcher.Stop()
178176

179177
// Wait a bit for catchup to attempt fetches.
180-
time.Sleep(250 * time.Millisecond)
178+
require.Eventually(t, func() bool {
179+
return fetcher.(*asyncBlockRetriever).subscriber.HasReachedHead()
180+
}, 1250*time.Millisecond, time.Millisecond)
181181

182182
// Cache should be empty since all heights are from the future.
183183
block, err := fetcher.GetCachedBlock(ctx, 100)

block/internal/da/subscriber.go

Lines changed: 11 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,9 @@ func NewSubscriber(cfg SubscriberConfig) *Subscriber {
8787
daBlockTime: cfg.DABlockTime,
8888
fetchBlockTimestamp: cfg.FetchBlockTimestamp,
8989
}
90+
if len(s.namespaces) == 0 {
91+
s.logger.Warn().Msg("no namespaces configured, subscriber will stay idle")
92+
}
9093
return s
9194
}
9295

@@ -98,7 +101,7 @@ func (s *Subscriber) SetStartHeight(height uint64) {
98101
// Start begins the follow and catchup goroutines.
99102
func (s *Subscriber) Start(ctx context.Context) error {
100103
if len(s.namespaces) == 0 {
101-
return errors.New("no namespaces configured")
104+
return nil
102105
}
103106

104107
ctx, s.cancel = context.WithCancel(ctx)
@@ -131,11 +134,6 @@ func (s *Subscriber) HasReachedHead() bool {
131134
return s.headReached.Load()
132135
}
133136

134-
// SetHeadReached marks the subscriber as having reached DA head.
135-
func (s *Subscriber) SetHeadReached() {
136-
s.headReached.Store(true)
137-
}
138-
139137
// CompareAndSwapLocalHeight attempts a CAS on localDAHeight.
140138
// Used by handlers that want to claim exclusive processing of a height.
141139
func (s *Subscriber) CompareAndSwapLocalHeight(old, new uint64) bool {
@@ -343,23 +341,19 @@ func (s *Subscriber) runCatchup(ctx context.Context) {
343341
if err := s.handler.HandleCatchup(ctx, local); err != nil {
344342
// Roll back so we can retry after backoff.
345343
s.localDAHeight.Store(local)
346-
if !s.waitOnCatchupError(ctx, err, local) {
344+
if errors.Is(err, datypes.ErrHeightFromFuture) {
345+
s.headReached.Store(true)
346+
return
347+
}
348+
if !s.shouldContinueCatchup(ctx, err, local) {
347349
return
348350
}
349-
continue
350351
}
351352
}
352353
}
353354

354-
// ErrCaughtUp is a sentinel used to signal that the catchup loop has reached DA head.
355-
var ErrCaughtUp = errors.New("caught up with DA head")
356-
357-
// waitOnCatchupError logs the error and backs off before retrying.
358-
func (s *Subscriber) waitOnCatchupError(ctx context.Context, err error, daHeight uint64) bool {
359-
if errors.Is(err, ErrCaughtUp) {
360-
s.logger.Debug().Uint64("da_height", daHeight).Msg("DA catchup reached head, waiting for subscription signal")
361-
return false
362-
}
355+
// shouldContinueCatchup logs the error and backs off before retrying.
356+
func (s *Subscriber) shouldContinueCatchup(ctx context.Context, err error, daHeight uint64) bool {
363357
if ctx.Err() != nil {
364358
return false
365359
}

block/internal/syncing/da_follower.go

Lines changed: 45 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -116,7 +116,6 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve
116116
}
117117
}
118118
if len(events) != 0 {
119-
f.subscriber.SetHeadReached()
120119
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
121120
Msg("processed subscription blobs inline (fast path)")
122121
} else {
@@ -131,38 +130,63 @@ func (f *daFollower) HandleEvent(ctx context.Context, ev datypes.SubscriptionEve
131130

132131
// HandleCatchup retrieves events at a single DA height and pipes them
133132
// to the event sink. Checks priority heights first.
133+
//
134+
// ErrHeightFromFuture is handled here rather than in fetchAndPipeHeight because
135+
// the two call sites need different behaviour:
136+
// - Normal catchup: mark head reached and return da.ErrCaughtUp to stop the loop.
137+
// - Priority hint: the hint points to a future height — ignore it without
138+
// skipping the current daHeight or stopping catchup.
134139
func (f *daFollower) HandleCatchup(ctx context.Context, daHeight uint64) error {
135-
// Check for priority heights from P2P hints first.
136-
if priorityHeight := f.popPriorityHeight(); priorityHeight > 0 {
137-
if priorityHeight >= daHeight {
138-
f.logger.Debug().
139-
Uint64("da_height", priorityHeight).
140-
Msg("fetching priority DA height from P2P hint")
141-
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
142-
return err
140+
// 1. Drain stale or future priority heights from P2P hints
141+
for {
142+
priorityHeight := f.popPriorityHeight()
143+
if priorityHeight == 0 {
144+
break
145+
}
146+
if priorityHeight < daHeight {
147+
continue // skip stale hints without yielding back to the catchup loop
148+
}
149+
150+
f.logger.Debug().
151+
Uint64("da_height", priorityHeight).
152+
Msg("fetching priority DA height from P2P hint")
153+
154+
if err := f.fetchAndPipeHeight(ctx, priorityHeight); err != nil {
155+
if errors.Is(err, datypes.ErrHeightFromFuture) {
156+
// Priority hint points to a future height — silently ignore.
157+
f.logger.Debug().Uint64("priority_da_height", priorityHeight).
158+
Msg("priority hint is from future, ignoring")
159+
continue
143160
}
161+
// Roll back so daHeight is attempted again next cycle after backoff.
162+
f.subscriber.SetLocalHeight(daHeight)
163+
return err
144164
}
145-
// Re-queue the current height by rolling back (the subscriber already advanced).
165+
166+
// We successfully handled a priority height (we didn't actually process `daHeight`)
167+
// Roll back so daHeight is attempted again next cycle.
146168
f.subscriber.SetLocalHeight(daHeight)
147169
return nil
148170
}
149171

150-
return f.fetchAndPipeHeight(ctx, daHeight)
172+
// 2. Normal sequential fetch
173+
if err := f.fetchAndPipeHeight(ctx, daHeight); err != nil {
174+
return err
175+
}
176+
return nil
151177
}
152178

153179
// fetchAndPipeHeight retrieves events at a single DA height and pipes them.
180+
// It does NOT handle ErrHeightFromFuture — callers must decide how to react
181+
// because the correct response depends on whether this is a normal sequential
182+
// catchup or a priority-hint fetch.
154183
func (f *daFollower) fetchAndPipeHeight(ctx context.Context, daHeight uint64) error {
155184
events, err := f.retriever.RetrieveFromDA(ctx, daHeight)
156185
if err != nil {
157-
switch {
158-
case errors.Is(err, datypes.ErrBlobNotFound):
186+
if errors.Is(err, datypes.ErrBlobNotFound) {
159187
return nil
160-
case errors.Is(err, datypes.ErrHeightFromFuture):
161-
f.subscriber.SetHeadReached()
162-
return err
163-
default:
164-
return err
165188
}
189+
return err
166190
}
167191

168192
for _, event := range events {
@@ -200,5 +224,8 @@ func (f *daFollower) popPriorityHeight() uint64 {
200224
}
201225
height := f.priorityHeights[0]
202226
f.priorityHeights = f.priorityHeights[1:]
227+
if len(f.priorityHeights) == 0 {
228+
f.priorityHeights = nil
229+
}
203230
return height
204231
}

block/internal/syncing/syncer_backoff_test.go

Lines changed: 28 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ func TestDAFollower_BackoffOnCatchupError(t *testing.T) {
2525
daBlockTime time.Duration
2626
error error
2727
expectsBackoff bool
28+
exitsCatchup bool // ErrCaughtUp → clean exit, no backoff, no advance
2829
description string
2930
}{
3031
"generic_error_triggers_backoff": {
@@ -33,11 +34,11 @@ func TestDAFollower_BackoffOnCatchupError(t *testing.T) {
3334
expectsBackoff: true,
3435
description: "Generic DA errors should trigger backoff",
3536
},
36-
"height_from_future_triggers_backoff": {
37-
daBlockTime: 500 * time.Millisecond,
38-
error: datypes.ErrHeightFromFuture,
39-
expectsBackoff: true,
40-
description: "Height from future should trigger backoff",
37+
"height_from_future_stops_catchup": {
38+
daBlockTime: 500 * time.Millisecond,
39+
error: datypes.ErrHeightFromFuture,
40+
exitsCatchup: true,
41+
description: "Height from future should stop catchup (da.ErrCaughtUp), not backoff",
4142
},
4243
"blob_not_found_no_backoff": {
4344
daBlockTime: 1 * time.Second,
@@ -92,7 +93,10 @@ func TestDAFollower_BackoffOnCatchupError(t *testing.T) {
9293
}).
9394
Return(nil, tc.error).Once()
9495

95-
if tc.expectsBackoff {
96+
if tc.exitsCatchup {
97+
// ErrCaughtUp causes runCatchup to exit cleanly after 1 call.
98+
// No second mock needed.
99+
} else if tc.expectsBackoff {
96100
daRetriever.On("RetrieveFromDA", mock.Anything, uint64(100)).
97101
Run(func(args mock.Arguments) {
98102
callTimes = append(callTimes, time.Now())
@@ -110,10 +114,20 @@ func TestDAFollower_BackoffOnCatchupError(t *testing.T) {
110114
Return(nil, datypes.ErrBlobNotFound).Once()
111115
}
112116

113-
go sub.RunCatchupForTest(ctx)
114-
<-ctx.Done()
117+
done := make(chan struct{})
118+
go func() {
119+
sub.RunCatchupForTest(ctx)
120+
close(done)
121+
}()
122+
select {
123+
case <-ctx.Done():
124+
case <-done:
125+
}
115126

116-
if tc.expectsBackoff {
127+
if tc.exitsCatchup {
128+
assert.Equal(t, 1, callCount, "should exit after single call (ErrCaughtUp)")
129+
assert.True(t, follower.HasReachedHead(), "should mark head as reached")
130+
} else if tc.expectsBackoff {
117131
require.Len(t, callTimes, 2, "should make exactly 2 calls with backoff")
118132

119133
timeBetweenCalls := callTimes[1].Sub(callTimes[0])
@@ -291,12 +305,16 @@ func TestDAFollower_InlineProcessing(t *testing.T) {
291305
daRetriever.On("ProcessBlobs", mock.Anything, blobs, uint64(10)).
292306
Return(expectedEvents).Once()
293307

294-
// Simulate subscription event at the current localDAHeight
308+
// Simulate subscription event: update highest seen, then handle event
309+
follower.subscriber.UpdateHighestForTest(10)
295310
follower.HandleEvent(t.Context(), datypes.SubscriptionEvent{
296311
Height: 10,
297312
Blobs: blobs,
298313
})
299314

315+
// Simulate catchup loop waking up to see if local > highest
316+
follower.subscriber.RunCatchupForTest(t.Context())
317+
300318
// Verify: ProcessBlobs was called, events were piped, height advanced
301319
require.Len(t, pipedEvents, 1, "should pipe 1 event from inline processing")
302320
assert.Equal(t, uint64(10), pipedEvents[0].DaHeight)

0 commit comments

Comments
 (0)