Skip to content

Commit d08236b

Browse files
committed
Include timestamp for subscription events
1 parent 4b234de commit d08236b

File tree

17 files changed

+184
-161
lines changed

17 files changed

+184
-161
lines changed

apps/evm/server/force_inclusion_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ func (m *mockDA) Get(ctx context.Context, ids []da.ID, namespace []byte) ([]da.B
5050
return nil, nil
5151
}
5252

53-
func (m *mockDA) Subscribe(_ context.Context, _ []byte) (<-chan da.SubscriptionEvent, error) {
53+
func (m *mockDA) Subscribe(_ context.Context, _ []byte, _ bool) (<-chan da.SubscriptionEvent, error) {
5454
// Not needed in these tests; return a closed channel.
5555
ch := make(chan da.SubscriptionEvent)
5656
close(ch)

block/internal/da/async_block_retriever.go

Lines changed: 20 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -79,11 +79,12 @@ func NewAsyncBlockRetriever(
7979
}
8080

8181
f.subscriber = NewSubscriber(SubscriberConfig{
82-
Client: client,
83-
Logger: logger,
84-
Namespaces: namespaces,
85-
DABlockTime: daBlockTime,
86-
Handler: f,
82+
Client: client,
83+
Logger: logger,
84+
Namespaces: namespaces,
85+
DABlockTime: daBlockTime,
86+
Handler: f,
87+
FetchBlockTimestamp: true,
8788
})
8889
f.subscriber.SetStartHeight(daStartHeight)
8990

@@ -118,7 +119,7 @@ func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) {
118119
f.logger.Debug().
119120
Uint64("new_height", height).
120121
Msg("updated current DA height")
121-
f.cleanupOldBlocks(height)
122+
f.cleanupOldBlocks(context.Background(), height)
122123
return
123124
}
124125
}
@@ -167,14 +168,10 @@ func (f *asyncBlockRetriever) GetCachedBlock(ctx context.Context, daHeight uint6
167168
return block, nil
168169
}
169170

170-
// ---------------------------------------------------------------------------
171-
// SubscriberHandler implementation
172-
// ---------------------------------------------------------------------------
173-
174171
// HandleEvent caches blobs from the subscription inline.
175172
func (f *asyncBlockRetriever) HandleEvent(ctx context.Context, ev datypes.SubscriptionEvent) {
176173
if len(ev.Blobs) > 0 {
177-
f.cacheBlock(ctx, ev.Height, ev.Blobs)
174+
f.cacheBlock(ctx, ev.Height, ev.Timestamp, ev.Blobs)
178175
}
179176
}
180177

@@ -215,15 +212,15 @@ func (f *asyncBlockRetriever) fetchAndCacheBlock(ctx context.Context, height uin
215212
f.logger.Debug().Uint64("height", height).Msg("block height not yet available - will retry")
216213
return
217214
case datypes.StatusNotFound:
218-
f.cacheBlock(ctx, height, nil)
215+
f.cacheBlock(ctx, height, result.Timestamp, nil)
219216
case datypes.StatusSuccess:
220217
blobs := make([][]byte, 0, len(result.Data))
221218
for _, blob := range result.Data {
222219
if len(blob) > 0 {
223220
blobs = append(blobs, blob)
224221
}
225222
}
226-
f.cacheBlock(ctx, height, blobs)
223+
f.cacheBlock(ctx, height, result.Timestamp, blobs)
227224
default:
228225
f.logger.Debug().
229226
Uint64("height", height).
@@ -233,41 +230,41 @@ func (f *asyncBlockRetriever) fetchAndCacheBlock(ctx context.Context, height uin
233230
}
234231

235232
// cacheBlock serializes and stores a block in the in-memory cache.
236-
func (f *asyncBlockRetriever) cacheBlock(ctx context.Context, height uint64, blobs [][]byte) {
233+
func (f *asyncBlockRetriever) cacheBlock(ctx context.Context, daHeight uint64, daTimestamp time.Time, blobs [][]byte) {
237234
if blobs == nil {
238235
blobs = [][]byte{}
239236
}
240237

241238
pbBlock := &pb.BlockData{
242-
Height: height,
243-
Timestamp: time.Now().UnixNano(),
239+
Height: daHeight,
240+
Timestamp: daTimestamp.UnixNano(),
244241
Blobs: blobs,
245242
}
246243
data, err := proto.Marshal(pbBlock)
247244
if err != nil {
248-
f.logger.Error().Err(err).Uint64("height", height).Msg("failed to marshal block for caching")
245+
f.logger.Error().Err(err).Uint64("height", daHeight).Msg("failed to marshal block for caching")
249246
return
250247
}
251248

252-
key := newBlockDataKey(height)
249+
key := newBlockDataKey(daHeight)
253250
if err := f.cache.Put(ctx, key, data); err != nil {
254-
f.logger.Error().Err(err).Uint64("height", height).Msg("failed to cache block")
251+
f.logger.Error().Err(err).Uint64("height", daHeight).Msg("failed to cache block")
255252
return
256253
}
257254

258-
f.logger.Debug().Uint64("height", height).Int("blob_count", len(blobs)).Msg("cached block")
255+
f.logger.Debug().Uint64("height", daHeight).Int("blob_count", len(blobs)).Msg("cached block")
259256
}
260257

261258
// cleanupOldBlocks removes blocks older than currentHeight − prefetchWindow.
262-
func (f *asyncBlockRetriever) cleanupOldBlocks(currentHeight uint64) {
259+
func (f *asyncBlockRetriever) cleanupOldBlocks(ctx context.Context, currentHeight uint64) {
263260
if currentHeight < f.prefetchWindow {
264261
return
265262
}
266263

267264
cleanupThreshold := currentHeight - f.prefetchWindow
268265

269266
query := dsq.Query{Prefix: "/block/"}
270-
results, err := f.cache.Query(context.Background(), query)
267+
results, err := f.cache.Query(ctx, query)
271268
if err != nil {
272269
f.logger.Debug().Err(err).Msg("failed to query cache for cleanup")
273270
return
@@ -287,7 +284,7 @@ func (f *asyncBlockRetriever) cleanupOldBlocks(currentHeight uint64) {
287284
}
288285

289286
if height < cleanupThreshold {
290-
if err := f.cache.Delete(context.Background(), key); err != nil {
287+
if err := f.cache.Delete(ctx, key); err != nil {
291288
f.logger.Debug().Err(err).Uint64("height", height).Msg("failed to delete old block from cache")
292289
}
293290
}

block/internal/da/async_block_retriever_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -60,15 +60,15 @@ func TestAsyncBlockRetriever_SubscriptionDrivenCaching(t *testing.T) {
6060
Blobs: testBlobs,
6161
}
6262

63-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
63+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
6464
// Catchup loop may call Retrieve for heights beyond 100 — stub those.
6565
client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{
6666
BaseResult: datypes.BaseResult{Code: datypes.StatusHeightFromFuture},
6767
}).Maybe()
6868

6969
// On second subscribe (after watchdog timeout) just block forever.
7070
blockCh := make(chan datypes.SubscriptionEvent)
71-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
71+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
7272

7373
logger := zerolog.Nop()
7474
fetcher := NewAsyncBlockRetriever(client, logger, fiNs, 200*time.Millisecond, 100, 5)
@@ -109,9 +109,9 @@ func TestAsyncBlockRetriever_CatchupFillsGaps(t *testing.T) {
109109
subCh := make(chan datypes.SubscriptionEvent, 1)
110110
subCh <- datypes.SubscriptionEvent{Height: 105}
111111

112-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
112+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
113113
blockCh := make(chan datypes.SubscriptionEvent)
114-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
114+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
115115

116116
// Height 102 has blobs; rest return not found or future.
117117
client.On("Retrieve", mock.Anything, uint64(102), fiNs).Return(datypes.ResultRetrieve{
@@ -159,9 +159,9 @@ func TestAsyncBlockRetriever_HeightFromFuture(t *testing.T) {
159159
subCh := make(chan datypes.SubscriptionEvent, 1)
160160
subCh <- datypes.SubscriptionEvent{Height: 100}
161161

162-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
162+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
163163
blockCh := make(chan datypes.SubscriptionEvent)
164-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
164+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
165165

166166
// All Retrieve calls return HeightFromFuture.
167167
client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{
@@ -190,7 +190,7 @@ func TestAsyncBlockRetriever_StopGracefully(t *testing.T) {
190190
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
191191

192192
blockCh := make(chan datypes.SubscriptionEvent)
193-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
193+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
194194

195195
logger := zerolog.Nop()
196196
fetcher := NewAsyncBlockRetriever(client, logger, fiNs, 100*time.Millisecond, 100, 10)
@@ -214,19 +214,19 @@ func TestAsyncBlockRetriever_ReconnectOnSubscriptionError(t *testing.T) {
214214
// First subscription closes immediately (simulating error).
215215
closedCh := make(chan datypes.SubscriptionEvent)
216216
close(closedCh)
217-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(closedCh), nil).Once()
217+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(closedCh), nil).Once()
218218

219219
// Second subscription delivers a blob.
220220
subCh := make(chan datypes.SubscriptionEvent, 1)
221221
subCh <- datypes.SubscriptionEvent{
222222
Height: 100,
223223
Blobs: testBlobs,
224224
}
225-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
225+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(subCh), nil).Once()
226226

227227
// Third+ subscribe returns a blocking channel so it doesn't loop forever.
228228
blockCh := make(chan datypes.SubscriptionEvent)
229-
client.On("Subscribe", mock.Anything, fiNs).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
229+
client.On("Subscribe", mock.Anything, fiNs, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(blockCh), nil).Maybe()
230230

231231
// Stub Retrieve for catchup.
232232
client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{

block/internal/da/client.go

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -355,7 +355,10 @@ func (c *client) HasForcedInclusionNamespace() bool {
355355
// DA block containing a matching blob. The channel is closed when ctx is
356356
// cancelled. The caller must drain the channel after cancellation to avoid
357357
// goroutine leaks.
358-
func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error) {
358+
// Timestamps are included from the header if available (celestia-node v0.29.1+§), otherwise
359+
// fetched via a separate call when includeTimestamp is true. Be aware that fetching timestamps
360+
// separately is an additional call to the celestia node for each event.
361+
func (c *client) Subscribe(ctx context.Context, namespace []byte, includeTimestamp bool) (<-chan datypes.SubscriptionEvent, error) {
359362
ns, err := share.NewNamespaceFromBytes(namespace)
360363
if err != nil {
361364
return nil, fmt.Errorf("invalid namespace: %w", err)
@@ -380,10 +383,24 @@ func (c *client) Subscribe(ctx context.Context, namespace []byte) (<-chan datype
380383
if resp == nil {
381384
continue
382385
}
386+
var blockTime time.Time
387+
// Use header time if available (celestia-node v0.21.0+)
388+
if resp.Header != nil && !resp.Header.Time.IsZero() {
389+
blockTime = resp.Header.Time
390+
} else if includeTimestamp {
391+
// Fallback to fetching timestamp for older nodes
392+
blockTime, err = c.getBlockTimestamp(ctx, resp.Height)
393+
if err != nil {
394+
c.logger.Error().Uint64("height", resp.Height).Err(err).Msg("failed to get DA block timestamp for subscription event")
395+
blockTime = time.Now()
396+
// TODO: we should retry fetching the timestamp. Current time may mess block time consistency for based sequencers.
397+
}
398+
}
383399
select {
384400
case out <- datypes.SubscriptionEvent{
385-
Height: resp.Height,
386-
Blobs: extractBlobData(resp),
401+
Height: resp.Height,
402+
Timestamp: blockTime,
403+
Blobs: extractBlobData(resp),
387404
}:
388405
case <-ctx.Done():
389406
return

block/internal/da/forced_inclusion_retriever_test.go

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ import (
1717
func TestNewForcedInclusionRetriever(t *testing.T) {
1818
client := mocks.NewMockClient(t)
1919
client.On("GetForcedInclusionNamespace").Return(datypes.NamespaceFromString("test-fi-ns").Bytes()).Maybe()
20-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
20+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
2121

2222
gen := genesis.Genesis{
2323
DAStartHeight: 100,
@@ -53,7 +53,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NotAtEpochStart(t *t
5353
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
5454
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
5555
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
56-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
56+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
5757

5858
gen := genesis.Genesis{
5959
DAStartHeight: 100,
@@ -84,7 +84,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartSuccess(t
8484
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
8585
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
8686
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
87-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
87+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
8888
client.On("Retrieve", mock.Anything, mock.Anything, fiNs).Return(datypes.ResultRetrieve{
8989
BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: []datypes.ID{[]byte("id1"), []byte("id2"), []byte("id3")}, Timestamp: time.Now()},
9090
Data: testBlobs,
@@ -114,7 +114,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EpochStartNotAvailab
114114
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
115115
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
116116
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
117-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
117+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
118118

119119
// Mock the first height in epoch as not available
120120
client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{
@@ -141,7 +141,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_NoBlobsAtHeight(t *t
141141
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
142142
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
143143
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
144-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
144+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
145145
client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{
146146
BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound},
147147
}).Once()
@@ -172,7 +172,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_MultiHeightEpoch(t *
172172
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
173173
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
174174
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
175-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
175+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
176176
client.On("Retrieve", mock.Anything, uint64(102), fiNs).Return(datypes.ResultRetrieve{
177177
BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()},
178178
Data: testBlobsByHeight[102],
@@ -213,7 +213,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_ErrorHandling(t *tes
213213
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
214214
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
215215
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
216-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
216+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
217217
client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{
218218
BaseResult: datypes.BaseResult{
219219
Code: datypes.StatusError,
@@ -242,7 +242,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyBlobsSkipped(t
242242
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
243243
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
244244
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
245-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
245+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
246246
client.On("Retrieve", mock.Anything, uint64(100), fiNs).Return(datypes.ResultRetrieve{
247247
BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()},
248248
Data: [][]byte{[]byte("tx1"), {}, []byte("tx2"), nil, []byte("tx3")},
@@ -279,7 +279,7 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_OrderPreserved(t *te
279279
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
280280
client.On("HasForcedInclusionNamespace").Return(true).Maybe()
281281
client.On("GetForcedInclusionNamespace").Return(fiNs).Maybe()
282-
client.On("Subscribe", mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
282+
client.On("Subscribe", mock.Anything, mock.Anything, mock.Anything).Return((<-chan datypes.SubscriptionEvent)(make(chan datypes.SubscriptionEvent)), nil).Maybe()
283283
// Return heights out of order to test ordering is preserved
284284
client.On("Retrieve", mock.Anything, uint64(102), fiNs).Return(datypes.ResultRetrieve{
285285
BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, Timestamp: time.Now()},

block/internal/da/interface.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,8 @@ type Client interface {
2020
// Subscribe returns a channel that emits one SubscriptionEvent per DA block
2121
// that contains a blob in the given namespace. The channel is closed when ctx
2222
// is cancelled. Callers MUST drain the channel after cancellation.
23-
Subscribe(ctx context.Context, namespace []byte) (<-chan datypes.SubscriptionEvent, error)
23+
// The fetchTimestamp param is going to be removed with https://github.com/evstack/ev-node/issues/3142 as the timestamp is going to be included by default
24+
Subscribe(ctx context.Context, namespace []byte, fetchTimestamp bool) (<-chan datypes.SubscriptionEvent, error)
2425

2526
// GetLatestDAHeight returns the latest height available on the DA layer.
2627
GetLatestDAHeight(ctx context.Context) (uint64, error)

0 commit comments

Comments
 (0)