Skip to content

Commit 24139bc

Browse files
committed
feat: Replace the Syncer's polling DA worker with an event-driven DAFollower and introduce DA client subscription.
1 parent e877782 commit 24139bc

File tree

13 files changed

+608
-265
lines changed

13 files changed

+608
-265
lines changed

apps/evm/server/force_inclusion_test.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,13 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B
5050
return nil, nil
5151
}
5252

53+
func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) {
54+
// Not needed in these tests; return a closed channel.
55+
ch := make(chan da.SubscriptionEvent)
56+
close(ch)
57+
return ch, nil
58+
}
59+
5360
func (m *mockDA) Validate(ctx context.Context, ids []da.ID, proofs []da.Proof, namespace []byte) ([]bool, error) {
5461
return nil, nil
5562
}

block/internal/da/client.go

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -350,6 +350,48 @@ func (c *client) HasForcedInclusionNamespace() bool {
350350
return c.hasForcedNamespace
351351
}
352352

353+
// Subscribe subscribes to blobs in the given namespace via the celestia-node
354+
// Subscribe API. It returns a channel that emits a SubscriptionEvent for every
355+
// DA block containing a matching blob. The channel is closed when ctx is
356+
// cancelled. The caller must drain the channel after cancellation to avoid
357+
// goroutine leaks.
358+
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
359+
ns, err := share.NewNamespaceFromBytes(namespace)
360+
if err != nil {
361+
return nil, fmt.Errorf("invalid namespace: %w", err)
362+
}
363+
364+
rawCh, err := c.blobAPI.Subscribe(ctx, ns)
365+
if err != nil {
366+
return nil, fmt.Errorf("blob subscribe: %w", err)
367+
}
368+
369+
out := make(chan datypes.SubscriptionEvent, 16)
370+
go func() {
371+
defer close(out)
372+
for {
373+
select {
374+
case <-ctx.Done():
375+
return
376+
case resp, ok := <-rawCh:
377+
if !ok {
378+
return
379+
}
380+
if resp == nil {
381+
continue
382+
}
383+
select {
384+
case out <- datypes.SubscriptionEvent{Height: resp.Height}:
385+
case <-ctx.Done():
386+
return
387+
}
388+
}
389+
}
390+
}()
391+
392+
return out, nil
393+
}
394+
353395
// Get fetches blobs by their IDs. Used for visualization and fetching specific blobs.
354396
func (c *client) Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error) {
355397
if len(ids) == 0 {

block/internal/da/interface.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,11 @@ type Client interface {
1717
// Get retrieves blobs by their IDs. Used for visualization and fetching specific blobs.
1818
Get(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
1919

20+
// Subscribe returns a channel that emits one SubscriptionEvent per DA block
21+
// that contains a blob in the given namespace. The channel is closed when ctx
22+
// is cancelled. Callers MUST drain the channel after cancellation.
23+
Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
24+
2025
// GetLatestDAHeight returns the latest height available on the DA layer.
2126
GetLatestDAHeight(ctx context.Context) (uint64, error)
2227

block/internal/da/tracing.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -145,6 +145,9 @@ func (t *tracedClient) GetForcedInclusionNamespace() []byte {
145145
func (t *tracedClient) HasForcedInclusionNamespace() bool {
146146
return t.inner.HasForcedInclusionNamespace()
147147
}
148+
func (t *tracedClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
149+
return t.inner.Subscribe(ctx, namespace)
150+
}
148151

149152
type submitError struct{ msg string }
150153

block/internal/da/tracing_test.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,14 @@ type mockFullClient struct {
2222
getFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Blob, error)
2323
getProofsFn func(ctx context.Context, ids []datypes.ID, namespace []byte) ([]datypes.Proof, error)
2424
validateFn func(ctx context.Context, ids []datypes.ID, proofs []datypes.Proof, namespace []byte) ([]bool, error)
25+
subscribeFn func(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
26+
}
27+
28+
func (m *mockFullClient) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
29+
if m.subscribeFn == nil {
30+
panic("not expected to be called")
31+
}
32+
return m.subscribeFn(ctx, namespace)
2533
}
2634

2735
func (m *mockFullClient) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) datypes.ResultSubmit {

0 commit comments

Comments
 (0)