Skip to content

Commit c21c211

Browse files
committed
feat: Implement blob subscription for local DA and update JSON-RPC client to use WebSockets, along with E2E test updates for new evnode flags and P2P address retrieval.
1 parent f4b9f2f commit c21c211

File tree

5 files changed

+143
-16
lines changed

5 files changed

+143
-16
lines changed

pkg/da/jsonrpc/client.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ import (
44
"context"
55
"fmt"
66
"net/http"
7+
"strings"
78

89
libshare "github.com/celestiaorg/go-square/v3/share"
910
"github.com/filecoin-project/go-jsonrpc"
@@ -23,6 +24,15 @@ func (c *Client) Close() {
2324
}
2425
}
2526

27+
// httpToWS converts an HTTP(S) URL to a WebSocket URL.
28+
// go-jsonrpc requires WebSocket for channel-based subscriptions (e.g. Subscribe).
29+
// WebSocket connections also support regular RPC calls, so this is backward-compatible.
30+
func httpToWS(addr string) string {
31+
addr = strings.Replace(addr, "https://", "wss://", 1)
32+
addr = strings.Replace(addr, "http://", "ws://", 1)
33+
return addr
34+
}
35+
2636
// NewClient connects to the celestia-node RPC endpoint
2737
func NewClient(ctx context.Context, addr, token string, authHeaderName string) (*Client, error) {
2838
var httpHeader http.Header
@@ -33,16 +43,19 @@ func NewClient(ctx context.Context, addr, token string, authHeaderName string) (
3343
httpHeader = http.Header{authHeaderName: []string{fmt.Sprintf("Bearer %s", token)}}
3444
}
3545

46+
// Use WebSocket so that channel-based subscriptions (blob.Subscribe) work.
47+
wsAddr := httpToWS(addr)
48+
3649
var cl Client
3750

3851
// Connect to the blob namespace
39-
blobCloser, err := jsonrpc.NewClient(ctx, addr, "blob", &cl.Blob.Internal, httpHeader)
52+
blobCloser, err := jsonrpc.NewClient(ctx, wsAddr, "blob", &cl.Blob.Internal, httpHeader)
4053
if err != nil {
4154
return nil, fmt.Errorf("failed to connect to blob namespace: %w", err)
4255
}
4356

4457
// Connect to the header namespace
45-
headerCloser, err := jsonrpc.NewClient(ctx, addr, "header", &cl.Header.Internal, httpHeader)
58+
headerCloser, err := jsonrpc.NewClient(ctx, wsAddr, "header", &cl.Header.Internal, httpHeader)
4659
if err != nil {
4760
blobCloser()
4861
return nil, fmt.Errorf("failed to connect to header namespace: %w", err)

test/e2e/evm_force_inclusion_e2e_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,10 @@ func TestEvmFullNodeForceInclusionE2E(t *testing.T) {
238238
// --- End Sequencer Setup ---
239239

240240
// --- Start Full Node Setup ---
241+
// Get sequencer's full P2P address (including peer ID) for the full node to connect to
242+
sequencerP2PAddress := getNodeP2PAddress(t, sut, sequencerHome, env.Endpoints.RollkitRPCPort)
241243
// Reuse setupFullNode helper which handles genesis copying and node startup
242-
setupFullNode(t, sut, fullNodeHome, sequencerHome, env.FullNodeJWT, env.GenesisHash, env.Endpoints.GetRollkitP2PAddress(), env.Endpoints)
244+
setupFullNode(t, sut, fullNodeHome, sequencerHome, env.FullNodeJWT, env.GenesisHash, sequencerP2PAddress, env.Endpoints)
243245
t.Log("Full node is up")
244246
// --- End Full Node Setup ---
245247

test/e2e/evm_test_common.go

Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -449,15 +449,15 @@ func setupFullNode(t testing.TB, sut *SystemUnderTest, fullNodeHome, sequencerHo
449449
"--home", fullNodeHome,
450450
"--evm.jwt-secret-file", fullNodeJwtSecretFile,
451451
"--evm.genesis-hash", genesisHash,
452-
"--rollkit.p2p.peers", sequencerP2PAddress,
452+
"--evnode.p2p.peers", sequencerP2PAddress,
453453
"--evm.engine-url", endpoints.GetFullNodeEngineURL(),
454454
"--evm.eth-url", endpoints.GetFullNodeEthURL(),
455-
"--rollkit.da.block_time", DefaultDABlockTime,
456-
"--rollkit.da.address", endpoints.GetDAAddress(),
457-
"--rollkit.da.namespace", DefaultDANamespace,
458-
"--rollkit.da.batching_strategy", "immediate",
459-
"--rollkit.rpc.address", endpoints.GetFullNodeRPCListen(),
460-
"--rollkit.p2p.listen_address", endpoints.GetFullNodeP2PAddress(),
455+
"--evnode.da.block_time", DefaultDABlockTime,
456+
"--evnode.da.address", endpoints.GetDAAddress(),
457+
"--evnode.da.namespace", DefaultDANamespace,
458+
"--evnode.da.batching_strategy", "immediate",
459+
"--evnode.rpc.address", endpoints.GetFullNodeRPCListen(),
460+
"--evnode.p2p.listen_address", endpoints.GetFullNodeP2PAddress(),
461461
}
462462
sut.ExecCmd(evmSingleBinaryPath, args...)
463463
// Use AwaitNodeLive instead of AwaitNodeUp because in lazy mode scenarios,
@@ -932,4 +932,3 @@ func PrintTraceReport(t testing.TB, label string, spans []TraceSpan) {
932932
t.Logf("%-40s %5.1f%% %s", name, pct, bar)
933933
}
934934
}
935-

tools/local-da/local.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313
"sync"
1414
"time"
1515

16+
libshare "github.com/celestiaorg/go-square/v3/share"
1617
"github.com/rs/zerolog"
1718

1819
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
@@ -27,6 +28,18 @@ const (
2728
DefaultBlockTime = 1 * time.Second
2829
)
2930

31+
// subscriber holds a registered subscription's channel and namespace filter.
32+
type subscriber struct {
33+
ch chan subscriptionEvent
34+
ns libshare.Namespace
35+
}
36+
37+
// subscriptionEvent is sent to subscribers when a new DA block is produced.
38+
type subscriptionEvent struct {
39+
height uint64
40+
blobs []*blobrpc.Blob
41+
}
42+
3043
// LocalDA is a simple implementation of in-memory DA. Not production ready! Intended only for testing!
3144
//
3245
// Data is stored in a map, where key is a serialized sequence number. This key is returned as ID.
@@ -43,6 +56,10 @@ type LocalDA struct {
4356
blockTime time.Duration
4457
lastTime time.Time // tracks last timestamp to ensure monotonicity
4558

59+
// Subscriber registry (protected by mu)
60+
subscribers map[int]*subscriber
61+
nextSubID int
62+
4663
logger zerolog.Logger
4764
}
4865

@@ -57,6 +74,7 @@ func NewLocalDA(logger zerolog.Logger, opts ...func(*LocalDA) *LocalDA) *LocalDA
5774
data: make(map[uint64][]kvp),
5875
timestamps: make(map[uint64]time.Time),
5976
blobData: make(map[uint64][]*blobrpc.Blob),
77+
subscribers: make(map[int]*subscriber),
6078
maxBlobSize: DefaultMaxBlobSize,
6179
blockTime: DefaultBlockTime,
6280
lastTime: time.Now(),
@@ -209,6 +227,7 @@ func (d *LocalDA) SubmitWithOptions(ctx context.Context, blobs []datypes.Blob, g
209227

210228
d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob})
211229
}
230+
d.notifySubscribers(d.height)
212231
d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("SubmitWithOptions successful")
213232
return ids, nil
214233
}
@@ -239,6 +258,7 @@ func (d *LocalDA) Submit(ctx context.Context, blobs []datypes.Blob, gasPrice flo
239258

240259
d.data[d.height] = append(d.data[d.height], kvp{ids[i], blob})
241260
}
261+
d.notifySubscribers(d.height)
242262
d.logger.Info().Uint64("newHeight", d.height).Int("count", len(ids)).Msg("Submit successful")
243263
return ids, nil
244264
}
@@ -335,5 +355,68 @@ func (d *LocalDA) produceEmptyBlock() {
335355
defer d.mu.Unlock()
336356
d.height++
337357
d.timestamps[d.height] = d.monotonicTime()
358+
d.notifySubscribers(d.height)
338359
d.logger.Debug().Uint64("height", d.height).Msg("produced empty block")
339360
}
361+
362+
// subscribe registers a new subscriber for blobs matching the given namespace.
363+
// Returns a read-only channel and a subscription ID for later unsubscription.
364+
// Must NOT be called with d.mu held.
365+
func (d *LocalDA) subscribe(ns libshare.Namespace) (<-chan subscriptionEvent, int) {
366+
d.mu.Lock()
367+
defer d.mu.Unlock()
368+
369+
id := d.nextSubID
370+
d.nextSubID++
371+
ch := make(chan subscriptionEvent, 64)
372+
d.subscribers[id] = &subscriber{ch: ch, ns: ns}
373+
d.logger.Info().Int("subID", id).Str("namespace", hex.EncodeToString(ns.Bytes())).Msg("subscriber registered")
374+
return ch, id
375+
}
376+
377+
// unsubscribe removes a subscriber and closes its channel.
378+
// Must NOT be called with d.mu held.
379+
func (d *LocalDA) unsubscribe(id int) {
380+
d.mu.Lock()
381+
defer d.mu.Unlock()
382+
383+
if sub, ok := d.subscribers[id]; ok {
384+
close(sub.ch)
385+
delete(d.subscribers, id)
386+
d.logger.Info().Int("subID", id).Msg("subscriber unregistered")
387+
}
388+
}
389+
390+
// notifySubscribers sends a subscriptionEvent to all registered subscribers.
391+
// For each subscriber, only blobs matching the subscriber's namespace are included.
392+
// Slow consumers (full channel) are dropped to avoid blocking block production.
393+
// MUST be called with d.mu held.
394+
func (d *LocalDA) notifySubscribers(height uint64) {
395+
if len(d.subscribers) == 0 {
396+
return
397+
}
398+
399+
allBlobs := d.blobData[height] // may be nil for empty blocks
400+
401+
for id, sub := range d.subscribers {
402+
// Filter blobs matching subscriber namespace
403+
var matched []*blobrpc.Blob
404+
for _, b := range allBlobs {
405+
if b != nil && b.Namespace().Equals(sub.ns) {
406+
matched = append(matched, b)
407+
}
408+
}
409+
410+
evt := subscriptionEvent{
411+
height: height,
412+
blobs: matched,
413+
}
414+
415+
select {
416+
case sub.ch <- evt:
417+
default:
418+
// Slow consumer — drop to avoid blocking block production
419+
d.logger.Warn().Int("subID", id).Uint64("height", height).Msg("dropping event for slow subscriber")
420+
}
421+
}
422+
}

tools/local-da/rpc.go

Lines changed: 35 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ func (s *blobServer) Submit(_ context.Context, blobs []*jsonrpc.Blob, _ *jsonrpc
3131

3232
if len(blobs) == 0 {
3333
s.da.timestamps[height] = time.Now()
34+
s.da.notifySubscribers(height)
3435
return height, nil
3536
}
3637

@@ -48,6 +49,7 @@ func (s *blobServer) Submit(_ context.Context, blobs []*jsonrpc.Blob, _ *jsonrpc
4849
s.da.blobData[height] = append(s.da.blobData[height], b)
4950
}
5051
s.da.timestamps[height] = time.Now()
52+
s.da.notifySubscribers(height)
5153

5254
return height, nil
5355
}
@@ -127,11 +129,39 @@ func (s *blobServer) GetCommitmentProof(_ context.Context, _ uint64, _ libshare.
127129
return &jsonrpc.CommitmentProof{}, nil
128130
}
129131

130-
// Subscribe returns a closed channel; LocalDA does not push live updates.
131-
func (s *blobServer) Subscribe(_ context.Context, _ libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) {
132-
ch := make(chan *jsonrpc.SubscriptionResponse)
133-
close(ch)
134-
return ch, nil
132+
// Subscribe streams blobs as they are included for the given namespace.
133+
// The returned channel emits a SubscriptionResponse for every new DA block.
134+
// The channel is closed when ctx is cancelled.
135+
func (s *blobServer) Subscribe(ctx context.Context, namespace libshare.Namespace) (<-chan *jsonrpc.SubscriptionResponse, error) {
136+
eventCh, subID := s.da.subscribe(namespace)
137+
138+
out := make(chan *jsonrpc.SubscriptionResponse, 64)
139+
go func() {
140+
defer close(out)
141+
defer s.da.unsubscribe(subID)
142+
143+
for {
144+
select {
145+
case <-ctx.Done():
146+
return
147+
case evt, ok := <-eventCh:
148+
if !ok {
149+
return
150+
}
151+
resp := &jsonrpc.SubscriptionResponse{
152+
Height: evt.height,
153+
Blobs: evt.blobs,
154+
}
155+
select {
156+
case out <- resp:
157+
case <-ctx.Done():
158+
return
159+
}
160+
}
161+
}
162+
}()
163+
164+
return out, nil
135165
}
136166

137167
// startBlobServer starts an HTTP JSON-RPC server on addr serving the blob namespace.

0 commit comments

Comments
 (0)