Skip to content

Commit 72abe51

Browse files
committed
feat: subscribe to both header and data namespaces for inline processing
When header and data use different DA namespaces, the DAFollower now subscribes to both and merges events via a fan-in goroutine. This ensures inline blob processing works correctly for split-namespace configurations. Changes: - Add DataNamespace to DAFollowerConfig and daFollower - Subscribe to both namespaces in runSubscription with mergeSubscriptions fan-in - Guard handleSubscriptionEvent to only advance localDAHeight when ProcessBlobs returns at least one complete event (header+data matched) - Pass DataNamespace from syncer.go - Implement Subscribe on DummyDA test helper with subscriber notification
1 parent 2ae32fb commit 72abe51

File tree

3 files changed

+128
-12
lines changed

3 files changed

+128
-12
lines changed

block/internal/syncing/da_follower.go

Lines changed: 69 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,10 @@
11
package syncing
22

33
import (
4+
"bytes"
45
"context"
56
"errors"
7+
"fmt"
68
"sync"
79
"sync/atomic"
810
"time"
@@ -45,6 +47,9 @@ type daFollower struct {
4547

4648
// Namespace to subscribe on (header namespace).
4749
namespace []byte
50+
// dataNamespace is the data namespace (may equal namespace when header+data
51+
// share the same namespace). When different, we subscribe to both and merge.
52+
dataNamespace []byte
4853

4954
// localDAHeight is only written by catchupLoop and read by followLoop
5055
// to determine whether a catchup is needed.
@@ -76,18 +81,24 @@ type DAFollowerConfig struct {
7681
Logger zerolog.Logger
7782
PipeEvent func(ctx context.Context, event common.DAHeightEvent) error
7883
Namespace []byte
84+
DataNamespace []byte // may be nil or equal to Namespace
7985
StartDAHeight uint64
8086
DABlockTime time.Duration
8187
}
8288

8389
// NewDAFollower creates a new daFollower.
8490
func NewDAFollower(cfg DAFollowerConfig) DAFollower {
91+
dataNs := cfg.DataNamespace
92+
if len(dataNs) == 0 {
93+
dataNs = cfg.Namespace
94+
}
8595
f := &daFollower{
8696
client: cfg.Client,
8797
retriever: cfg.Retriever,
8898
logger: cfg.Logger.With().Str("component", "da_follower").Logger(),
8999
pipeEvent: cfg.PipeEvent,
90100
namespace: cfg.Namespace,
101+
dataNamespace: dataNs,
91102
catchupSignal: make(chan struct{}, 1),
92103
daBlockTime: cfg.DABlockTime,
93104
}
@@ -154,12 +165,22 @@ func (f *daFollower) followLoop() {
154165
}
155166
}
156167

157-
// runSubscription opens a single subscription and processes events until the
158-
// channel is closed or an error occurs.
168+
// runSubscription opens subscriptions on both header and data namespaces (if
169+
// different) and processes events until a channel is closed or an error occurs.
159170
func (f *daFollower) runSubscription() error {
160-
ch, err := f.client.Subscribe(f.ctx, f.namespace)
171+
headerCh, err := f.client.Subscribe(f.ctx, f.namespace)
161172
if err != nil {
162-
return err
173+
return fmt.Errorf("subscribe header namespace: %w", err)
174+
}
175+
176+
// If namespaces differ, subscribe to the data namespace too and fan-in.
177+
ch := headerCh
178+
if !bytes.Equal(f.namespace, f.dataNamespace) {
179+
dataCh, err := f.client.Subscribe(f.ctx, f.dataNamespace)
180+
if err != nil {
181+
return fmt.Errorf("subscribe data namespace: %w", err)
182+
}
183+
ch = f.mergeSubscriptions(headerCh, dataCh)
163184
}
164185

165186
for {
@@ -175,6 +196,41 @@ func (f *daFollower) runSubscription() error {
175196
}
176197
}
177198

199+
// mergeSubscriptions fans two subscription channels into one, concatenating
200+
// blobs from both namespaces when events arrive at the same DA height.
201+
func (f *daFollower) mergeSubscriptions(
202+
headerCh, dataCh <-chan datypes.SubscriptionEvent,
203+
) <-chan datypes.SubscriptionEvent {
204+
out := make(chan datypes.SubscriptionEvent, 16)
205+
go func() {
206+
defer close(out)
207+
for headerCh != nil || dataCh != nil {
208+
var ev datypes.SubscriptionEvent
209+
var ok bool
210+
select {
211+
case <-f.ctx.Done():
212+
return
213+
case ev, ok = <-headerCh:
214+
if !ok {
215+
headerCh = nil
216+
continue
217+
}
218+
case ev, ok = <-dataCh:
219+
if !ok {
220+
dataCh = nil
221+
continue
222+
}
223+
}
224+
select {
225+
case out <- ev:
226+
case <-f.ctx.Done():
227+
return
228+
}
229+
}
230+
}()
231+
return out
232+
}
233+
178234
// handleSubscriptionEvent processes a subscription event. When the follower is
179235
// caught up (ev.Height == localDAHeight) and blobs are available, it processes
180236
// them inline — avoiding a DA re-fetch round trip. Otherwise, it just updates
@@ -194,11 +250,15 @@ func (f *daFollower) handleSubscriptionEvent(ev datypes.SubscriptionEvent) {
194250
return // catchupLoop already signaled via updateHighest
195251
}
196252
}
197-
// Advance local height — we processed this height inline.
198-
f.localDAHeight.Store(ev.Height + 1)
199-
f.headReached.Store(true)
200-
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
201-
Msg("processed subscription blobs inline (fast path)")
253+
// Only advance if we produced at least one complete event.
254+
// With split namespaces, the first namespace's blobs may not produce
255+
// events until the second namespace's blobs arrive at the same height.
256+
if len(events) != 0 {
257+
f.localDAHeight.Store(ev.Height + 1)
258+
f.headReached.Store(true)
259+
f.logger.Debug().Uint64("da_height", ev.Height).Int("events", len(events)).
260+
Msg("processed subscription blobs inline (fast path)")
261+
}
202262
return
203263
}
204264

block/internal/syncing/syncer.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ func (s *Syncer) Start(ctx context.Context) error {
203203
Logger: s.logger,
204204
PipeEvent: s.pipeEvent,
205205
Namespace: s.daClient.GetHeaderNamespace(),
206+
DataNamespace: s.daClient.GetDataNamespace(),
206207
StartDAHeight: s.daRetrieverHeight.Load(),
207208
DABlockTime: s.config.DA.BlockTime.Duration,
208209
})

test/testda/dummy.go

Lines changed: 58 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,12 @@ func (h *Header) Time() time.Time {
2828
return h.Timestamp
2929
}
3030

31+
// subscriber holds a channel and the context for a single Subscribe caller.
32+
type subscriber struct {
33+
ch chan datypes.SubscriptionEvent
34+
ctx context.Context
35+
}
36+
3137
// DummyDA is a test implementation of the DA client interface.
3238
// It supports blob storage, height simulation, failure injection, and header retrieval.
3339
type DummyDA struct {
@@ -38,13 +44,50 @@ type DummyDA struct {
3844
headers map[uint64]*Header // height -> header (with timestamp)
3945
failSubmit atomic.Bool
4046

47+
// subscribers tracks active Subscribe callers.
48+
subscribers []*subscriber
49+
4150
tickerMu sync.Mutex
4251
tickerStop chan struct{}
4352
}
4453

45-
func (d *DummyDA) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
46-
//TODO implement me
47-
panic("implement me")
54+
// Subscribe returns a channel that emits a SubscriptionEvent for every new DA
55+
// height produced by Submit or StartHeightTicker. The channel is closed when
56+
// ctx is cancelled or Reset is called.
57+
func (d *DummyDA) Subscribe(ctx context.Context, _ []byte) (<-chan datypes.SubscriptionEvent, error) {
58+
ch := make(chan datypes.SubscriptionEvent, 64)
59+
sub := &subscriber{ch: ch, ctx: ctx}
60+
61+
d.mu.Lock()
62+
d.subscribers = append(d.subscribers, sub)
63+
d.mu.Unlock()
64+
65+
// Remove subscriber and close channel when ctx is cancelled.
66+
go func() {
67+
<-ctx.Done()
68+
d.mu.Lock()
69+
defer d.mu.Unlock()
70+
for i, s := range d.subscribers {
71+
if s == sub {
72+
d.subscribers = append(d.subscribers[:i], d.subscribers[i+1:]...)
73+
break
74+
}
75+
}
76+
close(ch)
77+
}()
78+
79+
return ch, nil
80+
}
81+
82+
// notifySubscribers sends an event to all active subscribers. Must be called
83+
// with d.mu held.
84+
func (d *DummyDA) notifySubscribers(ev datypes.SubscriptionEvent) {
85+
for _, sub := range d.subscribers {
86+
select {
87+
case sub.ch <- ev:
88+
case <-sub.ctx.Done():
89+
}
90+
}
4891
}
4992

5093
// Option configures a DummyDA instance.
@@ -116,6 +159,10 @@ func (d *DummyDA) Submit(_ context.Context, data [][]byte, _ float64, namespace
116159
Timestamp: now,
117160
}
118161
}
162+
d.notifySubscribers(datypes.SubscriptionEvent{
163+
Height: height,
164+
Blobs: data,
165+
})
119166
d.mu.Unlock()
120167

121168
return datypes.ResultSubmit{
@@ -258,6 +305,9 @@ func (d *DummyDA) StartHeightTicker(interval time.Duration) func() {
258305
Timestamp: now,
259306
}
260307
}
308+
d.notifySubscribers(datypes.SubscriptionEvent{
309+
Height: height,
310+
})
261311
d.mu.Unlock()
262312
case <-stopCh:
263313
return
@@ -282,6 +332,11 @@ func (d *DummyDA) Reset() {
282332
d.blobs = make(map[uint64]map[string][][]byte)
283333
d.headers = make(map[uint64]*Header)
284334
d.failSubmit.Store(false)
335+
// Close all subscriber channels.
336+
for _, sub := range d.subscribers {
337+
close(sub.ch)
338+
}
339+
d.subscribers = nil
285340
d.mu.Unlock()
286341

287342
d.tickerMu.Lock()

0 commit comments

Comments
 (0)