Skip to content

Commit a09461e

Browse files
committed
feat: add ProcessBlobs method to DARetriever interface and implement in daRetriever; update tests and syncer for subscription handling
1 parent e3a0ed3 commit a09461e

File tree

5 files changed

+116
-23
lines changed

5 files changed

+116
-23
lines changed

block/internal/syncing/da_retriever.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,9 @@ import (
2121
// DARetriever defines the interface for retrieving events from the DA layer
2222
type DARetriever interface {
2323
RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)
24+
// ProcessBlobs processes raw blobs from subscription and returns height events.
25+
// Used by follow mode to process real-time blob notifications.
26+
ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent
2427
}
2528

2629
// daRetriever handles DA retrieval operations for syncing
@@ -72,7 +75,7 @@ func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co
7275
}
7376

7477
r.logger.Debug().Int("blobs", len(blobsResp.Data)).Uint64("da_height", daHeight).Msg("retrieved blob data")
75-
return r.processBlobs(ctx, blobsResp.Data, daHeight), nil
78+
return r.ProcessBlobs(ctx, blobsResp.Data, daHeight), nil
7679
}
7780

7881
// fetchBlobs retrieves blobs from both header and data namespaces
@@ -148,8 +151,9 @@ func (r *daRetriever) validateBlobResponse(res datypes.ResultRetrieve, daHeight
148151
}
149152
}
150153

151-
// processBlobs processes retrieved blobs to extract headers and data and returns height events
152-
func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
154+
// ProcessBlobs processes retrieved blobs to extract headers and data and returns height events.
155+
// This method implements the DARetriever interface and is used by both polling and subscription modes.
156+
func (r *daRetriever) ProcessBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent {
153157
// Decode all blobs
154158
for _, bz := range blobs {
155159
if len(bz) == 0 {

block/internal/syncing/da_retriever_mock.go

Lines changed: 65 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

block/internal/syncing/da_retriever_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) {
148148
dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2)
149149
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil)
150150

151-
events := r.processBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
151+
events := r.ProcessBlobs(context.Background(), [][]byte{hdrBin, dataBin}, 77)
152152
require.Len(t, events, 1)
153153
assert.Equal(t, uint64(2), events[0].Header.Height())
154154
assert.Equal(t, uint64(2), events[0].Data.Height())
@@ -172,7 +172,7 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) {
172172
// Header with no data hash present should trigger empty data creation (per current logic)
173173
hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil)
174174

175-
events := r.processBlobs(context.Background(), [][]byte{hb}, 88)
175+
events := r.ProcessBlobs(context.Background(), [][]byte{hb}, 88)
176176
require.Len(t, events, 1)
177177
assert.Equal(t, uint64(3), events[0].Header.Height())
178178
assert.NotNil(t, events[0].Data)
@@ -282,14 +282,14 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) {
282282
hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data.Data, nil)
283283

284284
// Process header from DA height 100 first
285-
events1 := r.processBlobs(context.Background(), [][]byte{hdrBin}, 100)
285+
events1 := r.ProcessBlobs(context.Background(), [][]byte{hdrBin}, 100)
286286
require.Len(t, events1, 0, "should not create event yet - data is missing")
287287

288288
// Verify header is stored in pending headers
289289
require.Contains(t, r.pendingHeaders, uint64(5), "header should be stored as pending")
290290

291291
// Process data from DA height 102
292-
events2 := r.processBlobs(context.Background(), [][]byte{dataBin}, 102)
292+
events2 := r.ProcessBlobs(context.Background(), [][]byte{dataBin}, 102)
293293
require.Len(t, events2, 1, "should create event when matching data arrives")
294294

295295
event := events2[0]
@@ -319,7 +319,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
319319
hdr5Bin, _ := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, &data5.Data, nil)
320320

321321
// Process multiple headers from DA height 200 - should be stored as pending
322-
events1 := r.processBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
322+
events1 := r.ProcessBlobs(context.Background(), [][]byte{hdr3Bin, hdr4Bin, hdr5Bin}, 200)
323323
require.Len(t, events1, 0, "should not create events yet - all data is missing")
324324

325325
// Verify all headers are stored in pending
@@ -328,7 +328,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
328328
require.Contains(t, r.pendingHeaders, uint64(5), "header 5 should be pending")
329329

330330
// Process some data from DA height 203 - should create partial events
331-
events2 := r.processBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203)
331+
events2 := r.ProcessBlobs(context.Background(), [][]byte{data3Bin, data5Bin}, 203)
332332
require.Len(t, events2, 2, "should create events for heights 3 and 5")
333333

334334
// Sort events by height for consistent testing
@@ -352,7 +352,7 @@ func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testin
352352
require.NotContains(t, r.pendingHeaders, uint64(5), "header 5 should be removed from pending")
353353

354354
// Process remaining data from DA height 205
355-
events3 := r.processBlobs(context.Background(), [][]byte{data4Bin}, 205)
355+
events3 := r.ProcessBlobs(context.Background(), [][]byte{data4Bin}, 205)
356356
require.Len(t, events3, 1, "should create event for height 4")
357357

358358
// Verify final event for height 4

block/internal/syncing/syncer.go

Lines changed: 31 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -446,7 +446,7 @@ func (s *Syncer) runFollowMode() {
446446
}
447447

448448
// subscribeAndFollow uses the DA subscription API to receive real-time blob notifications.
449-
// It subscribes to both header and data namespaces and processes incoming blobs.
449+
// It subscribes to header, data, and forced inclusion namespaces and processes incoming blobs.
450450
// Returns when subscription fails, context is cancelled, or node falls behind.
451451
func (s *Syncer) subscribeAndFollow() error {
452452
// Get namespaces
@@ -472,6 +472,20 @@ func (s *Syncer) subscribeAndFollow() error {
472472
}
473473
}
474474

475+
// Subscribe to forced inclusion namespace if configured
476+
var forcedInclusionCh <-chan *blobrpc.SubscriptionResponse
477+
if s.daClient.HasForcedInclusionNamespace() {
478+
fiNS := s.daClient.GetForcedInclusionNamespace()
479+
// Only subscribe if it's different from both header and data namespaces
480+
if !bytes.Equal(fiNS, headerNS) && !bytes.Equal(fiNS, dataNS) {
481+
forcedInclusionCh, err = s.daClient.Subscribe(subCtx, fiNS)
482+
if err != nil {
483+
return fmt.Errorf("failed to subscribe to forced inclusion namespace: %w", err)
484+
}
485+
s.logger.Info().Msg("subscribed to forced inclusion namespace for follow mode")
486+
}
487+
}
488+
475489
s.logger.Info().Msg("subscribed to DA namespaces for follow mode")
476490

477491
// Calculate watchdog timeout
@@ -481,6 +495,7 @@ func (s *Syncer) subscribeAndFollow() error {
481495
}
482496

483497
// Process subscription events
498+
// Note: Select on a nil channel blocks forever, so nil channels are effectively disabled
484499
for {
485500
select {
486501
case <-s.ctx.Done():
@@ -495,17 +510,26 @@ func (s *Syncer) subscribeAndFollow() error {
495510
}
496511

497512
case resp, ok := <-dataCh:
498-
if dataCh == nil {
499-
// Data channel not used (same namespace), continue
500-
continue
501-
}
513+
// Note: if dataCh is nil (same namespace as header), this case never fires
502514
if !ok {
503515
return errors.New("data subscription closed")
504516
}
505517
if err := s.processSubscriptionResponse(resp); err != nil {
506518
s.logger.Error().Err(err).Uint64("height", resp.Height).Msg("failed to process data subscription")
507519
}
508520

521+
case resp, ok := <-forcedInclusionCh:
522+
// Note: if forcedInclusionCh is nil (not configured), this case never fires
523+
if !ok {
524+
return errors.New("forced inclusion subscription closed")
525+
}
526+
// Forced inclusion responses are logged but not processed through processSubscriptionResponse
527+
// They are handled separately by the forced inclusion retriever during block verification
528+
s.logger.Debug().
529+
Uint64("da_height", resp.Height).
530+
Int("blobs", len(resp.Blobs)).
531+
Msg("received forced inclusion subscription notification")
532+
509533
case <-time.After(watchdogTimeout):
510534
// Watchdog: if no events for watchdogTimeout, recheck mode
511535
// Might have fallen behind due to network issues
@@ -534,8 +558,8 @@ func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse)
534558
blobs[i] = blob.Data()
535559
}
536560

537-
// Process blobs using the DA retriever's processBlobs method
538-
events := s.daRetriever.(*daRetriever).processBlobs(s.ctx, blobs, resp.Height)
561+
// Process blobs using the DA retriever's ProcessBlobs method
562+
events := s.daRetriever.ProcessBlobs(s.ctx, blobs, resp.Height)
539563

540564
// Send events to the processing channel
541565
for _, event := range events {

block/internal/syncing/syncer_test.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -859,23 +859,23 @@ func TestSyncer_modeSwitching(t *testing.T) {
859859
// mockCacheManager is a minimal implementation for testing
860860
type mockCacheManager struct{}
861861

862-
func (m *mockCacheManager) DaHeight() uint64 { return 0 }
862+
func (m *mockCacheManager) DaHeight() uint64 { return 0 }
863863
func (m *mockCacheManager) SetHeaderSeen(hash string, height uint64) {
864864
}
865865
func (m *mockCacheManager) IsHeaderSeen(hash string) bool { return false }
866866
func (m *mockCacheManager) SetDataSeen(hash string, height uint64) {
867867
}
868-
func (m *mockCacheManager) IsDataSeen(hash string) bool { return false }
868+
func (m *mockCacheManager) IsDataSeen(hash string) bool { return false }
869869
func (m *mockCacheManager) SetHeaderDAIncluded(hash string, daHeight, height uint64) {
870870
}
871871
func (m *mockCacheManager) GetHeaderDAIncluded(hash string) (uint64, bool) { return 0, false }
872872
func (m *mockCacheManager) RemoveHeaderDAIncluded(hash string) {}
873873
func (m *mockCacheManager) SetDataDAIncluded(hash string, daHeight, height uint64) {
874874
}
875-
func (m *mockCacheManager) GetDataDAIncluded(hash string) (uint64, bool) { return 0, false }
876-
func (m *mockCacheManager) IsTxSeen(hash string) bool { return false }
877-
func (m *mockCacheManager) SetTxSeen(hash string) {}
878-
func (m *mockCacheManager) CleanupOldTxs(olderThan time.Duration) int { return 0 }
875+
func (m *mockCacheManager) GetDataDAIncluded(hash string) (uint64, bool) { return 0, false }
876+
func (m *mockCacheManager) IsTxSeen(hash string) bool { return false }
877+
func (m *mockCacheManager) SetTxSeen(hash string) {}
878+
func (m *mockCacheManager) CleanupOldTxs(olderThan time.Duration) int { return 0 }
879879
func (m *mockCacheManager) SetPendingEvent(height uint64, event *common.DAHeightEvent) {
880880
}
881881
func (m *mockCacheManager) GetNextPendingEvent(height uint64) *common.DAHeightEvent { return nil }

0 commit comments

Comments
 (0)