Skip to content

Commit 4cf2a07

Browse files
authored
refactor: try CN approach (#2812)
<!-- Please read and fill out this form before submitting your PR. Please make sure you have reviewed our contributors guide before submitting your first PR. NOTE: PR titles should follow semantic commits: https://www.conventionalcommits.org/en/v1.0.0/ --> ## Overview <!-- Please provide an explanation of the PR, including the appropriate context, background, goal, and rationale. If there is an issue with this information, please provide a tl;dr and link the issue. Ex: Closes #<issue number> -->
1 parent b3f0a90 commit 4cf2a07

File tree

14 files changed

+238
-1352
lines changed

14 files changed

+238
-1352
lines changed

block/internal/common/broadcaster_mock.go

Lines changed: 0 additions & 47 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

block/internal/common/expected_interfaces.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,10 @@ import (
66
pubsub "github.com/libp2p/go-libp2p-pubsub"
77

88
"github.com/celestiaorg/go-header"
9-
10-
syncnotifier "github.com/evstack/ev-node/pkg/sync/notifier"
119
)
1210

1311
// broadcaster interface for P2P broadcasting
1412
type Broadcaster[H header.Header[H]] interface {
1513
WriteToStoreAndBroadcast(ctx context.Context, payload H, opts ...pubsub.PubOpt) error
1614
Store() header.Store[H]
17-
Notifier() *syncnotifier.Notifier
1815
}

block/internal/syncing/p2p_handler.go

Lines changed: 25 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -55,74 +55,42 @@ func (h *P2PHandler) SetProcessedHeight(height uint64) {
5555
h.mu.Unlock()
5656
}
5757

58-
// ProcessHeaderRange scans the provided heights and emits events when both the
59-
// header and data are available.
60-
func (h *P2PHandler) ProcessHeaderRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
61-
if startHeight > endHeight {
62-
return
63-
}
64-
65-
for height := startHeight; height <= endHeight; height++ {
66-
h.mu.Lock()
67-
shouldProcess := height > h.processedHeight
68-
h.mu.Unlock()
69-
70-
if !shouldProcess {
71-
continue
72-
}
73-
h.processHeight(ctx, height, heightInCh, "header_range")
74-
}
75-
}
76-
77-
// ProcessDataRange scans the provided heights and emits events when both the
78-
// header and data are available.
79-
func (h *P2PHandler) ProcessDataRange(ctx context.Context, startHeight, endHeight uint64, heightInCh chan<- common.DAHeightEvent) {
80-
if startHeight > endHeight {
81-
return
82-
}
83-
84-
for height := startHeight; height <= endHeight; height++ {
85-
h.mu.Lock()
86-
shouldProcess := height > h.processedHeight
87-
h.mu.Unlock()
58+
// ProcessHeight waits until both header and data for the given height are available.
59+
// Once available, it validates and emits the event to the provided channel or stores it as pending.
60+
func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
61+
h.mu.Lock()
62+
shouldProcess := height > h.processedHeight
63+
h.mu.Unlock()
8864

89-
if !shouldProcess {
90-
continue
91-
}
92-
h.processHeight(ctx, height, heightInCh, "data_range")
65+
if !shouldProcess {
66+
return nil
9367
}
94-
}
9568

96-
func (h *P2PHandler) processHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent, source string) {
9769
header, err := h.headerStore.GetByHeight(ctx, height)
9870
if err != nil {
9971
if ctx.Err() == nil {
100-
h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("header unavailable in store")
72+
h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store")
10173
}
102-
return
74+
return err
10375
}
10476
if err := h.assertExpectedProposer(header.ProposerAddress); err != nil {
105-
h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("invalid header from P2P")
106-
return
77+
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
78+
return err
10779
}
10880

10981
data, err := h.dataStore.GetByHeight(ctx, height)
11082
if err != nil {
11183
if ctx.Err() == nil {
112-
h.logger.Debug().Uint64("height", height).Err(err).Str("source", source).Msg("data unavailable in store")
84+
h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store")
11385
}
114-
return
86+
return err
11587
}
11688

11789
dataCommitment := data.DACommitment()
11890
if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
119-
h.logger.Warn().
120-
Uint64("height", height).
121-
Str("header_data_hash", fmt.Sprintf("%x", header.DataHash)).
122-
Str("actual_data_hash", fmt.Sprintf("%x", dataCommitment)).
123-
Str("source", source).
124-
Msg("DataHash mismatch: header and data do not match from P2P, discarding")
125-
return
91+
err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment)
92+
h.logger.Warn().Uint64("height", height).Err(err).Msg("discarding inconsistent block from P2P")
93+
return err
12694
}
12795

12896
event := common.DAHeightEvent{
@@ -138,7 +106,14 @@ func (h *P2PHandler) processHeight(ctx context.Context, height uint64, heightInC
138106
h.cache.SetPendingEvent(event.Header.Height(), &event)
139107
}
140108

141-
h.logger.Debug().Uint64("height", height).Str("source", source).Msg("processed event from P2P")
109+
h.mu.Lock()
110+
if height > h.processedHeight {
111+
h.processedHeight = height
112+
}
113+
h.mu.Unlock()
114+
115+
h.logger.Debug().Uint64("height", height).Msg("processed event from P2P")
116+
return nil
142117
}
143118

144119
// assertExpectedProposer validates the proposer address.

block/internal/syncing/p2p_handler_test.go

Lines changed: 28 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,7 @@ func collectEvents(t *testing.T, ch <-chan common.DAHeightEvent, timeout time.Du
130130
}
131131
}
132132

133-
func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T) {
133+
func TestP2PHandler_ProcessHeight_EmitsEventWhenHeaderAndDataPresent(t *testing.T) {
134134
p := setupP2P(t)
135135
ctx := context.Background()
136136

@@ -149,15 +149,16 @@ func TestP2PHandler_ProcessRange_EmitsEventWhenHeaderAndDataPresent(t *testing.T
149149
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(5)).Return(data, nil).Once()
150150

151151
ch := make(chan common.DAHeightEvent, 1)
152-
p.Handler.ProcessHeaderRange(ctx, 5, 5, ch)
152+
err = p.Handler.ProcessHeight(ctx, 5, ch)
153+
require.NoError(t, err)
153154

154155
events := collectEvents(t, ch, 50*time.Millisecond)
155156
require.Len(t, events, 1)
156157
require.Equal(t, uint64(5), events[0].Header.Height())
157158
require.NotNil(t, events[0].Data)
158159
}
159160

160-
func TestP2PHandler_ProcessRange_SkipsWhenDataMissing(t *testing.T) {
161+
func TestP2PHandler_ProcessHeight_SkipsWhenDataMissing(t *testing.T) {
161162
p := setupP2P(t)
162163
ctx := context.Background()
163164

@@ -174,27 +175,30 @@ func TestP2PHandler_ProcessRange_SkipsWhenDataMissing(t *testing.T) {
174175
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(7)).Return(nil, errors.New("missing")).Once()
175176

176177
ch := make(chan common.DAHeightEvent, 1)
177-
p.Handler.ProcessHeaderRange(ctx, 7, 7, ch)
178+
err = p.Handler.ProcessHeight(ctx, 7, ch)
179+
require.Error(t, err)
178180

179181
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
180182
}
181183

182-
func TestP2PHandler_ProcessRange_SkipsWhenHeaderMissing(t *testing.T) {
184+
func TestP2PHandler_ProcessHeight_SkipsWhenHeaderMissing(t *testing.T) {
183185
p := setupP2P(t)
184186
ctx := context.Background()
185187

186188
p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(9)).Return(nil, errors.New("missing")).Once()
187189

188190
ch := make(chan common.DAHeightEvent, 1)
189-
p.Handler.ProcessHeaderRange(ctx, 9, 9, ch)
191+
err := p.Handler.ProcessHeight(ctx, 9, ch)
192+
require.Error(t, err)
190193

191194
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
192195
p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(9))
193196
}
194197

195-
func TestP2PHandler_ProcessRange_SkipsOnProposerMismatch(t *testing.T) {
198+
func TestP2PHandler_ProcessHeight_SkipsOnProposerMismatch(t *testing.T) {
196199
p := setupP2P(t)
197200
ctx := context.Background()
201+
var err error
198202

199203
badAddr, pub, signer := buildTestSigner(t)
200204
require.NotEqual(t, string(p.Genesis.ProposerAddress), string(badAddr))
@@ -205,19 +209,29 @@ func TestP2PHandler_ProcessRange_SkipsOnProposerMismatch(t *testing.T) {
205209
p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(11)).Return(header, nil).Once()
206210

207211
ch := make(chan common.DAHeightEvent, 1)
208-
p.Handler.ProcessHeaderRange(ctx, 11, 11, ch)
212+
err = p.Handler.ProcessHeight(ctx, 11, ch)
213+
require.Error(t, err)
209214

210215
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
211216
p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(11))
212217
}
213218

214-
func TestP2PHandler_ProcessRange_UsesProcessedHeightToSkip(t *testing.T) {
219+
func TestP2PHandler_ProcessedHeightSkipsPreviouslyHandledBlocks(t *testing.T) {
215220
p := setupP2P(t)
216221
ctx := context.Background()
217222

218223
// Mark up to height 5 as processed.
219224
p.Handler.SetProcessedHeight(5)
220225

226+
ch := make(chan common.DAHeightEvent, 1)
227+
228+
// Heights below or equal to 5 should be skipped without touching the stores.
229+
require.NoError(t, p.Handler.ProcessHeight(ctx, 4, ch))
230+
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
231+
p.HeaderStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(4))
232+
p.DataStore.AssertNotCalled(t, "GetByHeight", mock.Anything, uint64(4))
233+
234+
// Height 6 should be fetched normally.
221235
header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 6, p.ProposerAddr, p.ProposerPub, p.Signer)
222236
data := makeData(p.Genesis.ChainID, 6, 1)
223237
header.DataHash = data.DACommitment()
@@ -230,15 +244,14 @@ func TestP2PHandler_ProcessRange_UsesProcessedHeightToSkip(t *testing.T) {
230244
p.HeaderStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(header, nil).Once()
231245
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(6)).Return(data, nil).Once()
232246

233-
ch := make(chan common.DAHeightEvent, 1)
234-
p.Handler.ProcessHeaderRange(ctx, 4, 6, ch)
247+
require.NoError(t, p.Handler.ProcessHeight(ctx, 6, ch))
235248

236249
events := collectEvents(t, ch, 50*time.Millisecond)
237250
require.Len(t, events, 1)
238251
require.Equal(t, uint64(6), events[0].Header.Height())
239252
}
240253

241-
func TestP2PHandler_OnHeightProcessedPreventsDuplicates(t *testing.T) {
254+
func TestP2PHandler_SetProcessedHeightPreventsDuplicates(t *testing.T) {
242255
p := setupP2P(t)
243256
ctx := context.Background()
244257

@@ -255,18 +268,18 @@ func TestP2PHandler_OnHeightProcessedPreventsDuplicates(t *testing.T) {
255268
p.DataStore.EXPECT().GetByHeight(mock.Anything, uint64(8)).Return(data, nil).Once()
256269

257270
ch := make(chan common.DAHeightEvent, 1)
258-
p.Handler.ProcessHeaderRange(ctx, 8, 8, ch)
271+
require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch))
259272

260273
events := collectEvents(t, ch, 50*time.Millisecond)
261274
require.Len(t, events, 1)
262275

263-
// Mark the height as processed; a subsequent range should skip lookups.
276+
// Mark the height as processed; a subsequent request should skip store access.
264277
p.Handler.SetProcessedHeight(8)
265278

266279
p.HeaderStore.AssertExpectations(t)
267280
p.DataStore.AssertExpectations(t)
268281

269282
// No additional expectations set; if the handler queried the stores again the mock would fail.
270-
p.Handler.ProcessHeaderRange(ctx, 7, 8, ch)
283+
require.NoError(t, p.Handler.ProcessHeight(ctx, 8, ch))
271284
require.Empty(t, collectEvents(t, ch, 50*time.Millisecond))
272285
}

0 commit comments

Comments
 (0)