Skip to content

Commit 7d39e71

Browse files
committed
align subscribe with forced inclusion
1 parent 127440b commit 7d39e71

File tree

6 files changed

+192
-36
lines changed

6 files changed

+192
-36
lines changed
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
package common
2+
3+
import blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
4+
5+
// BlobsFromSubscription returns non-empty blob data from a subscription response.
6+
func BlobsFromSubscription(resp *blobrpc.SubscriptionResponse) [][]byte {
7+
if resp == nil || len(resp.Blobs) == 0 {
8+
return nil
9+
}
10+
11+
blobs := make([][]byte, 0, len(resp.Blobs))
12+
for _, blob := range resp.Blobs {
13+
if blob == nil {
14+
continue
15+
}
16+
data := blob.Data()
17+
if len(data) == 0 {
18+
continue
19+
}
20+
blobs = append(blobs, data)
21+
}
22+
23+
return blobs
24+
}

block/internal/da/async_block_retriever.go

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ type AsyncBlockRetriever interface {
2525
Stop()
2626
GetCachedBlock(ctx context.Context, daHeight uint64) (*BlockData, error)
2727
UpdateCurrentHeight(height uint64)
28+
StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time)
2829
}
2930

3031
// BlockData contains data retrieved from a single DA height
@@ -125,6 +126,68 @@ func (f *asyncBlockRetriever) UpdateCurrentHeight(height uint64) {
125126
}
126127
}
127128

129+
// StoreBlock caches a block's blobs, favoring existing data to avoid churn.
130+
func (f *asyncBlockRetriever) StoreBlock(ctx context.Context, height uint64, blobs [][]byte, timestamp time.Time) {
131+
if len(f.namespace) == 0 {
132+
return
133+
}
134+
if height < f.daStartHeight {
135+
return
136+
}
137+
if len(blobs) == 0 {
138+
return
139+
}
140+
141+
filtered := make([][]byte, 0, len(blobs))
142+
for _, blob := range blobs {
143+
if len(blob) > 0 {
144+
filtered = append(filtered, blob)
145+
}
146+
}
147+
if len(filtered) == 0 {
148+
return
149+
}
150+
151+
if timestamp.IsZero() {
152+
timestamp = time.Now().UTC()
153+
}
154+
155+
key := newBlockDataKey(height)
156+
if existing, err := f.cache.Get(ctx, key); err == nil {
157+
var pbBlock pb.BlockData
158+
if err := proto.Unmarshal(existing, &pbBlock); err == nil && len(pbBlock.Blobs) > 0 {
159+
return
160+
}
161+
}
162+
163+
pbBlock := &pb.BlockData{
164+
Height: height,
165+
Timestamp: timestamp.Unix(),
166+
Blobs: filtered,
167+
}
168+
data, err := proto.Marshal(pbBlock)
169+
if err != nil {
170+
f.logger.Error().
171+
Err(err).
172+
Uint64("height", height).
173+
Msg("failed to marshal block for caching")
174+
return
175+
}
176+
177+
if err := f.cache.Put(ctx, key, data); err != nil {
178+
f.logger.Error().
179+
Err(err).
180+
Uint64("height", height).
181+
Msg("failed to cache block")
182+
return
183+
}
184+
185+
f.logger.Debug().
186+
Uint64("height", height).
187+
Int("blob_count", len(filtered)).
188+
Msg("cached block from subscription")
189+
}
190+
128191
func newBlockDataKey(height uint64) ds.Key {
129192
return ds.NewKey(fmt.Sprintf("/block/%d", height))
130193
}

block/internal/da/forced_inclusion_retriever.go

Lines changed: 86 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,14 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
78
"time"
89

910
"github.com/rs/zerolog"
1011

12+
"github.com/evstack/ev-node/block/internal/common"
1113
"github.com/evstack/ev-node/pkg/config"
14+
blobrpc "github.com/evstack/ev-node/pkg/da/jsonrpc"
1215
datypes "github.com/evstack/ev-node/pkg/da/types"
1316
"github.com/evstack/ev-node/types"
1417
)
@@ -23,6 +26,12 @@ type ForcedInclusionRetriever struct {
2326
daEpochSize uint64
2427
daStartHeight uint64
2528
asyncFetcher AsyncBlockRetriever
29+
30+
mu sync.Mutex
31+
pendingEpochStart uint64
32+
pendingEpochEnd uint64
33+
lastProcessedEpochEnd uint64
34+
hasProcessedEpoch bool
2635
}
2736

2837
// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
@@ -68,6 +77,25 @@ func (r *ForcedInclusionRetriever) Stop() {
6877
r.asyncFetcher.Stop()
6978
}
7079

80+
// HandleSubscriptionResponse caches forced inclusion blobs from subscription updates.
81+
func (r *ForcedInclusionRetriever) HandleSubscriptionResponse(resp *blobrpc.SubscriptionResponse) {
82+
if resp == nil {
83+
return
84+
}
85+
if !r.client.HasForcedInclusionNamespace() {
86+
return
87+
}
88+
89+
r.asyncFetcher.UpdateCurrentHeight(resp.Height)
90+
91+
blobs := common.BlobsFromSubscription(resp)
92+
if len(blobs) == 0 {
93+
return
94+
}
95+
96+
r.asyncFetcher.StoreBlock(context.Background(), resp.Height, blobs, time.Now().UTC())
97+
}
98+
7199
// RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height.
72100
// It respects epoch boundaries and only fetches at epoch end.
73101
// It tries to get blocks from the async fetcher cache first, then falls back to sync fetching.
@@ -86,12 +114,50 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
86114
// Update the async fetcher's current height so it knows what to prefetch
87115
r.asyncFetcher.UpdateCurrentHeight(daHeight)
88116

117+
r.mu.Lock()
118+
pendingStart := r.pendingEpochStart
119+
pendingEnd := r.pendingEpochEnd
120+
lastProcessed := r.lastProcessedEpochEnd
121+
hasProcessed := r.hasProcessedEpoch
122+
r.mu.Unlock()
123+
124+
if pendingEnd != 0 {
125+
if daHeight < pendingEnd {
126+
return &ForcedInclusionEvent{
127+
StartDaHeight: daHeight,
128+
EndDaHeight: daHeight,
129+
Txs: [][]byte{},
130+
}, nil
131+
}
132+
133+
event, err := r.retrieveEpoch(ctx, pendingStart, pendingEnd)
134+
if err != nil {
135+
return nil, err
136+
}
137+
138+
r.mu.Lock()
139+
r.pendingEpochStart = 0
140+
r.pendingEpochEnd = 0
141+
r.lastProcessedEpochEnd = pendingEnd
142+
r.hasProcessedEpoch = true
143+
r.mu.Unlock()
144+
145+
return event, nil
146+
}
147+
89148
if daHeight != epochEnd {
90149
r.logger.Debug().
91150
Uint64("da_height", daHeight).
92151
Uint64("epoch_end", epochEnd).
93152
Msg("not at epoch end - returning empty transactions")
153+
return &ForcedInclusionEvent{
154+
StartDaHeight: daHeight,
155+
EndDaHeight: daHeight,
156+
Txs: [][]byte{},
157+
}, nil
158+
}
94159

160+
if hasProcessed && epochEnd <= lastProcessed {
95161
return &ForcedInclusionEvent{
96162
StartDaHeight: daHeight,
97163
EndDaHeight: daHeight,
@@ -106,6 +172,24 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
106172
Uint64("epoch_num", currentEpochNumber).
107173
Msg("retrieving forced included transactions from DA epoch")
108174

175+
event, err := r.retrieveEpoch(ctx, epochStart, epochEnd)
176+
if err != nil {
177+
r.mu.Lock()
178+
r.pendingEpochStart = epochStart
179+
r.pendingEpochEnd = epochEnd
180+
r.mu.Unlock()
181+
return nil, err
182+
}
183+
184+
r.mu.Lock()
185+
r.lastProcessedEpochEnd = epochEnd
186+
r.hasProcessedEpoch = true
187+
r.mu.Unlock()
188+
189+
return event, nil
190+
}
191+
192+
func (r *ForcedInclusionRetriever) retrieveEpoch(ctx context.Context, epochStart, epochEnd uint64) (*ForcedInclusionEvent, error) {
109193
event := &ForcedInclusionEvent{
110194
StartDaHeight: epochStart,
111195
EndDaHeight: epochEnd,
@@ -211,18 +295,12 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
211295
// any error during process, need to retry at next call
212296
if processErrs != nil {
213297
r.logger.Warn().
214-
Uint64("da_height", daHeight).
215298
Uint64("epoch_start", epochStart).
216299
Uint64("epoch_end", epochEnd).
217-
Uint64("epoch_num", currentEpochNumber).
218300
Err(processErrs).
219-
Msg("Failed to retrieve DA epoch.. retrying next iteration")
301+
Msg("failed to retrieve DA epoch")
220302

221-
return &ForcedInclusionEvent{
222-
StartDaHeight: daHeight,
223-
EndDaHeight: daHeight,
224-
Txs: [][]byte{},
225-
}, nil
303+
return nil, processErrs
226304
}
227305

228306
return event, nil

block/internal/da/forced_inclusion_retriever_test.go

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -224,11 +224,9 @@ func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_ErrorHandling(t *tes
224224
defer retriever.Stop()
225225
ctx := context.Background()
226226

227-
// Should return empty event with no error (errors are logged and retried later)
228-
event, err := retriever.RetrieveForcedIncludedTxs(ctx, 100)
229-
assert.NilError(t, err)
230-
assert.Assert(t, event != nil)
231-
assert.Equal(t, len(event.Txs), 0)
227+
// Should return error so caller can retry without skipping the epoch
228+
_, err := retriever.RetrieveForcedIncludedTxs(ctx, 100)
229+
assert.Assert(t, err != nil)
232230
}
233231

234232
func TestForcedInclusionRetriever_RetrieveForcedIncludedTxs_EmptyBlobsSkipped(t *testing.T) {

block/internal/syncing/syncer.go

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -254,6 +254,9 @@ func (s *Syncer) Stop() error {
254254
if s.cancel != nil {
255255
s.cancel()
256256
}
257+
if s.fiRetriever != nil {
258+
s.fiRetriever.Stop()
259+
}
257260
s.cancelP2PWait(0)
258261
s.wg.Wait()
259262
s.logger.Info().Msg("syncer stopped")
@@ -524,12 +527,10 @@ func (s *Syncer) subscribeAndFollow() error {
524527
if !ok {
525528
return errors.New("forced inclusion subscription closed")
526529
}
527-
// Forced inclusion responses are logged but not processed through processSubscriptionResponse
528-
// They are handled separately by the forced inclusion retriever during block verification
529-
s.logger.Debug().
530-
Uint64("da_height", resp.Height).
531-
Int("blobs", len(resp.Blobs)).
532-
Msg("received forced inclusion subscription notification")
530+
// Cache forced inclusion blobs for epoch retrieval.
531+
if s.fiRetriever != nil {
532+
s.fiRetriever.HandleSubscriptionResponse(resp)
533+
}
533534

534535
case <-time.After(watchdogTimeout):
535536
// Watchdog: if no events for watchdogTimeout, recheck mode
@@ -544,7 +545,7 @@ func (s *Syncer) subscribeAndFollow() error {
544545

545546
// processSubscriptionResponse processes a subscription response and sends events to the processing channel.
546547
func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse) error {
547-
if resp == nil || len(resp.Blobs) == 0 {
548+
if resp == nil {
548549
return nil
549550
}
550551

@@ -554,9 +555,9 @@ func (s *Syncer) processSubscriptionResponse(resp *blobrpc.SubscriptionResponse)
554555
Msg("processing subscription response")
555556

556557
// Convert blobs to raw byte slices for processing
557-
blobs := make([][]byte, len(resp.Blobs))
558-
for i, blob := range resp.Blobs {
559-
blobs[i] = blob.Data()
558+
blobs := common.BlobsFromSubscription(resp)
559+
if len(blobs) == 0 {
560+
return nil
560561
}
561562

562563
// Process blobs using the DA retriever's ProcessBlobs method
@@ -836,7 +837,11 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
836837

837838
// Verify forced inclusion transactions if configured
838839
if event.Source == common.SourceDA {
839-
if err := s.verifyForcedInclusionTxs(currentState, data); err != nil {
840+
verifyState := currentState
841+
if event.DaHeight > verifyState.DAHeight {
842+
verifyState.DAHeight = event.DaHeight
843+
}
844+
if err := s.verifyForcedInclusionTxs(verifyState, data); err != nil {
840845
s.logger.Error().Err(err).Uint64("height", nextHeight).Msg("forced inclusion verification failed")
841846
if errors.Is(err, errMaliciousProposer) {
842847
s.cache.RemoveHeaderDAIncluded(headerHash)

block/internal/syncing/syncer_forced_inclusion_test.go

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -851,18 +851,6 @@ func TestVerifyForcedInclusionTxs_DeferralWithinEpoch(t *testing.T) {
851851
return true
852852
})
853853

854-
// Mock DA for second verification at same epoch (height 104 - epoch end)
855-
for height := uint64(101); height <= 104; height++ {
856-
client.On("Retrieve", mock.Anything, height, []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{
857-
BaseResult: datypes.BaseResult{Code: datypes.StatusNotFound, Timestamp: time.Now()},
858-
}).Once()
859-
}
860-
861-
client.On("Retrieve", mock.Anything, uint64(100), []byte("nsForcedInclusion")).Return(datypes.ResultRetrieve{
862-
BaseResult: datypes.BaseResult{Code: datypes.StatusSuccess, IDs: [][]byte{[]byte("fi1"), []byte("fi2")}, Timestamp: time.Now()},
863-
Data: [][]byte{dataBin1, dataBin2},
864-
}).Once()
865-
866854
// Second block includes BOTH the previously included dataBin1 AND the deferred dataBin2
867855
// This simulates the block containing both forced inclusion txs
868856
data2 := makeData(gen.ChainID, 2, 2)

0 commit comments

Comments
 (0)