Skip to content

Commit 2ae32fb

Browse files
committed
feat: add inline blob processing to DAFollower for zero-latency follow mode
When the DA subscription delivers blobs at the current local DA height, the followLoop now processes them inline via ProcessBlobs — avoiding a round-trip re-fetch from the DA layer. Architecture: - followLoop: processes subscription blobs inline when caught up (fast path), falls through to catchupLoop when behind (slow path). - catchupLoop: unchanged — sequential RetrieveFromDA() for bulk sync. Changes: - Add Blobs field to SubscriptionEvent for carrying raw blob data - Add extractBlobData() to DA client Subscribe adapter - Export ProcessBlobs on DARetriever interface - Add handleSubscriptionEvent() to DAFollower with inline fast path - Add TestDAFollower_InlineProcessing with 3 sub-tests
1 parent 24139bc commit 2ae32fb

File tree

8 files changed

+249
-4
lines changed

8 files changed

+249
-4
lines changed

block/internal/da/client.go

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,10 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datype
381381
continue
382382
}
383383
select {
384-
case out <- datypes.SubscriptionEvent{Height: resp.Height}:
384+
case out <- datypes.SubscriptionEvent{
385+
Height: resp.Height,
386+
Blobs: extractBlobData(resp),
387+
}:
385388
case <-ctx.Done():
386389
return
387390
}
@@ -392,6 +395,26 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datype
392395
return out, nil
393396
}
394397

398+
// extractBlobData extracts raw byte slices from a subscription response,
399+
// filtering out nil blobs and empty data.
400+
func extractBlobData(resp *blobrpc.SubscriptionResponse) [][]byte {
401+
if resp == nil || len(resp.Blobs) == 0 {
402+
return nil
403+
}
404+
blobs := make([][]byte, 0, len(resp.Blobs))
405+
for _, blob := range resp.Blobs {
406+
if blob == nil {
407+
continue
408+
}
409+
data := blob.Data()
410+
if len(data) == 0 {
411+
continue
412+
}
413+
blobs = append(blobs, data)
414+
}
415+
return blobs
416+
}
417+
395418
// Get fetches blobs by their IDs. Used for visualization and fetching specific blobs.
396419
func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
397420
if len(ids) == 0 {

block/internal/syncing/da_follower.go

Lines changed: 34 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,9 @@ import (
1717
// DAFollower subscribes to DA blob events and drives sequential catchup.
1818
//
1919
// Architecture:
20-
// - followLoop listens on the subscription channel and atomically updates
21-
// highestSeenDAHeight.
20+
// - followLoop listens on the subscription channel. When caught up, it processes
21+
// subscription blobs inline (fast path, no DA re-fetch). Otherwise, it updates
22+
// highestSeenDAHeight and signals the catchup loop.
2223
// - catchupLoop sequentially retrieves from localDAHeight → highestSeenDAHeight,
2324
// piping events to the Syncer's heightInCh.
2425
//
@@ -169,11 +170,41 @@ func (f *daFollower) runSubscription() error {
169170
if !ok {
170171
return errors.New("subscription channel closed")
171172
}
172-
f.updateHighest(ev.Height)
173+
f.handleSubscriptionEvent(ev)
173174
}
174175
}
175176
}
176177

178+
// handleSubscriptionEvent processes a subscription event. When the follower is
179+
// caught up (ev.Height == localDAHeight) and blobs are available, it processes
180+
// them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates
181+
// highestSeenDAHeight and lets catchupLoop handle retrieval.
182+
func (f *daFollower) handleSubscriptionEvent(ev datypes.SubscriptionEvent) {
183+
// Always record the highest height we've seen from the subscription.
184+
f.updateHighest(ev.Height)
185+
186+
// Fast path: process blobs inline when caught up.
187+
// Only fire when ev.Height == localDAHeight to avoid out-of-order processing.
188+
if len(ev.Blobs) > 0 && ev.Height == f.localDAHeight.Load() {
189+
events := f.retriever.ProcessBlobs(f.ctx, ev.Blobs, ev.Height)
190+
for _, event := range events {
191+
if err := f.pipeEvent(f.ctx, event); err != nil {
192+
f.logger.Warn().Err(err).Uint64("da_height", ev.Height).
193+
Msg("failed to pipe inline event, catchup will retry")
194+
return // catchupLoop already signaled via updateHighest
195+
}
196+
}
197+
// Advance local height — we processed this height inline.
198+
f.localDAHeight.Store(ev.Height + 1)
199+
f.headReached.Store(true)
200+
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
201+
Msg("processed subscription blobs inline (fast path)")
202+
return
203+
}
204+
205+
// Slow path: behind or no blobs — catchupLoop will handle via signal from updateHighest.
206+
}
207+
177208
// updateHighest atomically bumps highestSeenDAHeight and signals catchup if needed.
178209
func (f *daFollower) updateHighest(height uint64) {
179210
for {

block/internal/syncing/da_retriever.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@ import (
2424
type DARetriever interface {
2525
// RetrieveFromDA retrieves blocks from the specified DA height and returns height events
2626
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
27+
// ProcessBlobs parses raw blob bytes at a given DA height into height events.
28+
// Used by the DAFollower to process subscription blobs inline without re-fetching.
29+
ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
2730
// QueuePriorityHeight queues a DA height for priority retrieval (from P2P hints).
2831
// These heights take precedence over sequential fetching.
2932
QueuePriorityHeight(daHeight uint64)
@@ -191,6 +194,12 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight
191194
}
192195
}
193196

197+
// ProcessBlobs processes raw blob bytes to extract headers and data and returns height events.
198+
// This is the public interface used by the DAFollower for inline subscription processing.
199+
func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
200+
return r.processBlobs(ctx, blobs, daHeight)
201+
}
202+
194203
// processBlobs processes retrieved blobs to extract headers and data and returns height events
195204
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
196205
// Decode all blobs

block/internal/syncing/da_retriever_mock.go

Lines changed: 65 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

block/internal/syncing/da_retriever_tracing.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,3 +63,7 @@ func (t *tracedDARetriever) QueuePriorityHeight(daHeight uint64) {
6363
func (t *tracedDARetriever) PopPriorityHeight() uint64 {
6464
return t.inner.PopPriorityHeight()
6565
}
66+
67+
func (t *tracedDARetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
68+
return t.inner.ProcessBlobs(ctx, blobs, daHeight)
69+
}

block/internal/syncing/da_retriever_tracing_test.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,10 @@ func (m *mockDARetriever) QueuePriorityHeight(daHeight uint64) {}
3131

3232
func (m *mockDARetriever) PopPriorityHeight() uint64 { return 0 }
3333

34+
func (m *mockDARetriever) ProcessBlobs(_ context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
35+
return nil
36+
}
37+
3438
func setupDARetrieverTrace(t *testing.T, inner DARetriever) (DARetriever, *tracetest.SpanRecorder) {
3539
t.Helper()
3640
sr := tracetest.NewSpanRecorder()

block/internal/syncing/syncer_backoff_test.go

Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,112 @@ func TestDAFollower_CatchupThenReachHead(t *testing.T) {
256256
})
257257
}
258258

259+
// TestDAFollower_InlineProcessing verifies the fast path: when the subscription
260+
// delivers blobs at the current localDAHeight, handleSubscriptionEvent processes
261+
// them inline via ProcessBlobs (not RetrieveFromDA).
262+
func TestDAFollower_InlineProcessing(t *testing.T) {
263+
t.Run("processes_blobs_inline_when_caught_up", func(t *testing.T) {
264+
daRetriever := NewMockDARetriever(t)
265+
266+
var pipedEvents []common.DAHeightEvent
267+
pipeEvent := func(_ context.Context, ev common.DAHeightEvent) error {
268+
pipedEvents = append(pipedEvents, ev)
269+
return nil
270+
}
271+
272+
follower := NewDAFollower(DAFollowerConfig{
273+
Retriever: daRetriever,
274+
Logger: zerolog.Nop(),
275+
PipeEvent: pipeEvent,
276+
Namespace: []byte("ns"),
277+
StartDAHeight: 10,
278+
DABlockTime: 500 * time.Millisecond,
279+
}).(*daFollower)
280+
281+
follower.ctx, follower.cancel = context.WithCancel(t.Context())
282+
defer follower.cancel()
283+
284+
blobs := [][]byte{[]byte("header-blob"), []byte("data-blob")}
285+
expectedEvents := []common.DAHeightEvent{
286+
{DaHeight: 10, Source: common.SourceDA},
287+
}
288+
289+
// ProcessBlobs should be called (not RetrieveFromDA)
290+
daRetriever.On("ProcessBlobs", mock.Anything, blobs, uint64(10)).
291+
Return(expectedEvents).Once()
292+
293+
// Simulate subscription event at the current localDAHeight
294+
follower.handleSubscriptionEvent(datypes.SubscriptionEvent{
295+
Height: 10,
296+
Blobs: blobs,
297+
})
298+
299+
// Verify: ProcessBlobs was called, events were piped, height advanced
300+
require.Len(t, pipedEvents, 1, "should pipe 1 event from inline processing")
301+
assert.Equal(t, uint64(10), pipedEvents[0].DaHeight)
302+
assert.Equal(t, uint64(11), follower.localDAHeight.Load(), "localDAHeight should advance past processed height")
303+
assert.True(t, follower.HasReachedHead(), "should mark head as reached after inline processing")
304+
})
305+
306+
t.Run("falls_through_to_catchup_when_behind", func(t *testing.T) {
307+
daRetriever := NewMockDARetriever(t)
308+
309+
pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil }
310+
311+
follower := NewDAFollower(DAFollowerConfig{
312+
Retriever: daRetriever,
313+
Logger: zerolog.Nop(),
314+
PipeEvent: pipeEvent,
315+
Namespace: []byte("ns"),
316+
StartDAHeight: 10,
317+
DABlockTime: 500 * time.Millisecond,
318+
}).(*daFollower)
319+
320+
follower.ctx, follower.cancel = context.WithCancel(t.Context())
321+
defer follower.cancel()
322+
323+
// Subscription reports height 15 but local is at 10 — should NOT process inline
324+
follower.handleSubscriptionEvent(datypes.SubscriptionEvent{
325+
Height: 15,
326+
Blobs: [][]byte{[]byte("blob")},
327+
})
328+
329+
// ProcessBlobs should NOT have been called
330+
daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything)
331+
assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change")
332+
assert.Equal(t, uint64(15), follower.highestSeenDAHeight.Load(), "highestSeen should be updated")
333+
})
334+
335+
t.Run("falls_through_when_no_blobs", func(t *testing.T) {
336+
daRetriever := NewMockDARetriever(t)
337+
338+
pipeEvent := func(_ context.Context, _ common.DAHeightEvent) error { return nil }
339+
340+
follower := NewDAFollower(DAFollowerConfig{
341+
Retriever: daRetriever,
342+
Logger: zerolog.Nop(),
343+
PipeEvent: pipeEvent,
344+
Namespace: []byte("ns"),
345+
StartDAHeight: 10,
346+
DABlockTime: 500 * time.Millisecond,
347+
}).(*daFollower)
348+
349+
follower.ctx, follower.cancel = context.WithCancel(t.Context())
350+
defer follower.cancel()
351+
352+
// Subscription at current height but no blobs — should fall through
353+
follower.handleSubscriptionEvent(datypes.SubscriptionEvent{
354+
Height: 10,
355+
Blobs: nil,
356+
})
357+
358+
// ProcessBlobs should NOT have been called
359+
daRetriever.AssertNotCalled(t, "ProcessBlobs", mock.Anything, mock.Anything, mock.Anything)
360+
assert.Equal(t, uint64(10), follower.localDAHeight.Load(), "localDAHeight should not change")
361+
assert.Equal(t, uint64(10), follower.highestSeenDAHeight.Load(), "highestSeen should be updated")
362+
})
363+
}
364+
259365
// backoffTestGenesis creates a test genesis for the backoff tests.
260366
func backoffTestGenesis(addr []byte) genesis.Genesis {
261367
return genesis.Genesis{

pkg/da/types/types.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,4 +88,7 @@ func SplitID(id []byte) (uint64, []byte, error) {
8888
type SubscriptionEvent struct {
8989
// Height is the DA layer height at which the blob was finalized.
9090
Height uint64
91+
// Blobs contains the raw blob data from the subscription response.
92+
// When non-nil, followers can process blobs inline without re-fetching from DA.
93+
Blobs [][]byte
9194
}

0 commit comments

Comments
 (0)