Skip to content

Commit 2101f3e

Browse files
authored
feat: p2p exchange wrapper (#2855)
* add exchange wrapper to first check da store for height before requesting from p2p * add tests and remove type switching * fix tests * lint * move exchange to a single place * lint * fix build * address comments * address comment
1 parent 140b24a commit 2101f3e

File tree

9 files changed

+1172
-26
lines changed

9 files changed

+1172
-26
lines changed

.mockery.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,23 @@ packages:
3737
filename: batch.go
3838
github.com/celestiaorg/go-header:
3939
interfaces:
40+
Exchange:
41+
config:
42+
dir: ./test/mocks
43+
pkgname: mocks
44+
filename: external/hexchange.go
4045
Store:
4146
config:
4247
dir: ./test/mocks
4348
pkgname: mocks
4449
filename: external/hstore.go
50+
github.com/evstack/ev-node/pkg/sync:
51+
interfaces:
52+
P2PExchange:
53+
config:
54+
dir: ./test/mocks
55+
pkgname: mocks
56+
filename: external/p2pexchange.go
4557
github.com/evstack/ev-node/block/internal/syncing:
4658
interfaces:
4759
DARetriever:

node/full.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -82,12 +82,12 @@ func newFullNode(
8282
mainKV := store.NewEvNodeKVStore(database)
8383
evstore := store.New(mainKV)
8484

85-
headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
85+
headerSyncService, err := initHeaderSyncService(mainKV, evstore, nodeConfig, genesis, p2pClient, logger)
8686
if err != nil {
8787
return nil, err
8888
}
8989

90-
dataSyncService, err := initDataSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
90+
dataSyncService, err := initDataSyncService(mainKV, evstore, nodeConfig, genesis, p2pClient, logger)
9191
if err != nil {
9292
return nil, err
9393
}
@@ -144,14 +144,15 @@ func newFullNode(
144144

145145
func initHeaderSyncService(
146146
mainKV ds.Batching,
147+
daStore store.Store,
147148
nodeConfig config.Config,
148149
genesis genesispkg.Genesis,
149150
p2pClient *p2p.Client,
150151
logger zerolog.Logger,
151152
) (*evsync.HeaderSyncService, error) {
152153
componentLogger := logger.With().Str("component", "HeaderSyncService").Logger()
153154

154-
headerSyncService, err := evsync.NewHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger)
155+
headerSyncService, err := evsync.NewHeaderSyncService(mainKV, daStore, nodeConfig, genesis, p2pClient, componentLogger)
155156
if err != nil {
156157
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
157158
}
@@ -160,14 +161,15 @@ func initHeaderSyncService(
160161

161162
func initDataSyncService(
162163
mainKV ds.Batching,
164+
daStore store.Store,
163165
nodeConfig config.Config,
164166
genesis genesispkg.Genesis,
165167
p2pClient *p2p.Client,
166168
logger zerolog.Logger,
167169
) (*evsync.DataSyncService, error) {
168170
componentLogger := logger.With().Str("component", "DataSyncService").Logger()
169171

170-
dataSyncService, err := evsync.NewDataSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger)
172+
dataSyncService, err := evsync.NewDataSyncService(mainKV, daStore, nodeConfig, genesis, p2pClient, componentLogger)
171173
if err != nil {
172174
return nil, fmt.Errorf("error while initializing DataSyncService: %w", err)
173175
}

node/light.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ func newLightNode(
4343
logger zerolog.Logger,
4444
) (ln *LightNode, err error) {
4545
componentLogger := logger.With().Str("component", "HeaderSyncService").Logger()
46-
headerSyncService, err := sync.NewHeaderSyncService(database, conf, genesis, p2pClient, componentLogger)
46+
store := store.New(database)
47+
48+
headerSyncService, err := sync.NewHeaderSyncService(database, store, conf, genesis, p2pClient, componentLogger)
4749
if err != nil {
4850
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
4951
}
5052

51-
store := store.New(database)
52-
5353
node := &LightNode{
5454
P2P: p2pClient,
5555
hSyncService: headerSyncService,

pkg/sync/exchange_wrapper.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
6+
"github.com/celestiaorg/go-header"
7+
)
8+
9+
// GetterFunc retrieves a header by hash from a backing store.
10+
type GetterFunc[H header.Header[H]] func(context.Context, header.Hash) (H, error)
11+
12+
// GetterByHeightFunc retrieves a header by height from a backing store.
13+
type GetterByHeightFunc[H header.Header[H]] func(context.Context, uint64) (H, error)
14+
15+
// RangeGetterFunc retrieves headers in range [from, to) from a backing store.
16+
// Returns the contiguous headers found starting from 'from', and the next height needed.
17+
type RangeGetterFunc[H header.Header[H]] func(ctx context.Context, from, to uint64) ([]H, uint64, error)
18+
19+
// P2PExchange defines the interface for the underlying P2P exchange.
20+
type P2PExchange[H header.Header[H]] interface {
21+
header.Exchange[H]
22+
Start(context.Context) error
23+
Stop(context.Context) error
24+
}
25+
26+
type exchangeWrapper[H header.Header[H]] struct {
27+
p2pExchange P2PExchange[H]
28+
getter GetterFunc[H]
29+
getterByHeight GetterByHeightFunc[H]
30+
rangeGetter RangeGetterFunc[H]
31+
}
32+
33+
func (ew *exchangeWrapper[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
34+
// Check DA store first
35+
if ew.getter != nil {
36+
if h, err := ew.getter(ctx, hash); err == nil && !h.IsZero() {
37+
return h, nil
38+
}
39+
}
40+
41+
// Fallback to network exchange
42+
return ew.p2pExchange.Get(ctx, hash)
43+
}
44+
45+
func (ew *exchangeWrapper[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
46+
// Check DA store first
47+
if ew.getterByHeight != nil {
48+
if h, err := ew.getterByHeight(ctx, height); err == nil && !h.IsZero() {
49+
return h, nil
50+
}
51+
}
52+
53+
// Fallback to network exchange
54+
return ew.p2pExchange.GetByHeight(ctx, height)
55+
}
56+
57+
func (ew *exchangeWrapper[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
58+
return ew.p2pExchange.Head(ctx, opts...)
59+
}
60+
61+
func (ew *exchangeWrapper[H]) GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error) {
62+
fromHeight := from.Height() + 1
63+
64+
// If no range getter, fallback entirely to P2P
65+
if ew.rangeGetter == nil {
66+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
67+
}
68+
69+
// Try DA store first for contiguous range
70+
daHeaders, nextHeight, err := ew.rangeGetter(ctx, fromHeight, to)
71+
if err != nil {
72+
// DA store failed, fallback to P2P for entire range
73+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
74+
}
75+
76+
// Got everything from DA
77+
if nextHeight >= to {
78+
return daHeaders, nil
79+
}
80+
81+
// Need remainder from P2P
82+
if len(daHeaders) == 0 {
83+
// Nothing from DA, get entire range from P2P
84+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
85+
}
86+
87+
// Get remainder from P2P starting after last DA header
88+
lastDAHeader := daHeaders[len(daHeaders)-1]
89+
p2pHeaders, err := ew.p2pExchange.GetRangeByHeight(ctx, lastDAHeader, to)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
return append(daHeaders, p2pHeaders...), nil
95+
}
96+
97+
func (ew *exchangeWrapper[H]) Start(ctx context.Context) error {
98+
return ew.p2pExchange.Start(ctx)
99+
}
100+
101+
func (ew *exchangeWrapper[H]) Stop(ctx context.Context) error {
102+
return ew.p2pExchange.Stop(ctx)
103+
}

pkg/sync/exchange_wrapper_test.go

Lines changed: 181 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,181 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
"errors"
6+
"testing"
7+
8+
"github.com/celestiaorg/go-header"
9+
extmocks "github.com/evstack/ev-node/test/mocks/external"
10+
"github.com/evstack/ev-node/types"
11+
"github.com/stretchr/testify/assert"
12+
)
13+
14+
func TestExchangeWrapper_Get(t *testing.T) {
15+
ctx := context.Background()
16+
hash := header.Hash([]byte("test-hash"))
17+
expectedHeader := &types.SignedHeader{} // Just a dummy
18+
19+
t.Run("Hit in Store", func(t *testing.T) {
20+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
21+
// Exchange should NOT be called
22+
23+
getter := func(ctx context.Context, h header.Hash) (*types.SignedHeader, error) {
24+
return expectedHeader, nil
25+
}
26+
27+
ew := &exchangeWrapper[*types.SignedHeader]{
28+
p2pExchange: mockEx,
29+
getter: getter,
30+
}
31+
32+
h, err := ew.Get(ctx, hash)
33+
assert.NoError(t, err)
34+
assert.Equal(t, expectedHeader, h)
35+
})
36+
37+
t.Run("Miss in Store", func(t *testing.T) {
38+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
39+
mockEx.EXPECT().Get(ctx, hash).Return(expectedHeader, nil)
40+
41+
getter := func(ctx context.Context, h header.Hash) (*types.SignedHeader, error) {
42+
return nil, errors.New("not found")
43+
}
44+
45+
ew := &exchangeWrapper[*types.SignedHeader]{
46+
p2pExchange: mockEx,
47+
getter: getter,
48+
}
49+
50+
h, err := ew.Get(ctx, hash)
51+
assert.NoError(t, err)
52+
assert.Equal(t, expectedHeader, h)
53+
})
54+
55+
t.Run("Getter Not Configured", func(t *testing.T) {
56+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
57+
mockEx.EXPECT().Get(ctx, hash).Return(expectedHeader, nil)
58+
59+
ew := &exchangeWrapper[*types.SignedHeader]{
60+
p2pExchange: mockEx,
61+
getter: nil,
62+
}
63+
64+
h, err := ew.Get(ctx, hash)
65+
assert.NoError(t, err)
66+
assert.Equal(t, expectedHeader, h)
67+
})
68+
}
69+
70+
func TestExchangeWrapper_GetByHeight(t *testing.T) {
71+
ctx := context.Background()
72+
height := uint64(10)
73+
expectedHeader := &types.SignedHeader{}
74+
75+
t.Run("Hit in Store", func(t *testing.T) {
76+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
77+
78+
getterByHeight := func(ctx context.Context, h uint64) (*types.SignedHeader, error) {
79+
return expectedHeader, nil
80+
}
81+
82+
ew := &exchangeWrapper[*types.SignedHeader]{
83+
p2pExchange: mockEx,
84+
getterByHeight: getterByHeight,
85+
}
86+
87+
h, err := ew.GetByHeight(ctx, height)
88+
assert.NoError(t, err)
89+
assert.Equal(t, expectedHeader, h)
90+
})
91+
92+
t.Run("Miss in Store", func(t *testing.T) {
93+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
94+
mockEx.EXPECT().GetByHeight(ctx, height).Return(expectedHeader, nil)
95+
96+
getterByHeight := func(ctx context.Context, h uint64) (*types.SignedHeader, error) {
97+
return nil, errors.New("not found")
98+
}
99+
100+
ew := &exchangeWrapper[*types.SignedHeader]{
101+
p2pExchange: mockEx,
102+
getterByHeight: getterByHeight,
103+
}
104+
105+
h, err := ew.GetByHeight(ctx, height)
106+
assert.NoError(t, err)
107+
assert.Equal(t, expectedHeader, h)
108+
})
109+
}
110+
111+
func TestExchangeWrapper_GetRangeByHeight(t *testing.T) {
112+
ctx := context.Background()
113+
114+
t.Run("All from DA", func(t *testing.T) {
115+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
116+
117+
headers := []*types.SignedHeader{
118+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 2}}},
119+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 3}}},
120+
}
121+
from := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 1}}}
122+
123+
rangeGetter := func(ctx context.Context, fromH, toH uint64) ([]*types.SignedHeader, uint64, error) {
124+
return headers, 4, nil
125+
}
126+
127+
ew := &exchangeWrapper[*types.SignedHeader]{
128+
p2pExchange: mockEx,
129+
rangeGetter: rangeGetter,
130+
}
131+
132+
result, err := ew.GetRangeByHeight(ctx, from, 4)
133+
assert.NoError(t, err)
134+
assert.Equal(t, headers, result)
135+
})
136+
137+
t.Run("Partial from DA then P2P", func(t *testing.T) {
138+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
139+
140+
daHeaders := []*types.SignedHeader{
141+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 2}}},
142+
}
143+
p2pHeaders := []*types.SignedHeader{
144+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 3}}},
145+
}
146+
from := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 1}}}
147+
148+
rangeGetter := func(ctx context.Context, fromH, toH uint64) ([]*types.SignedHeader, uint64, error) {
149+
return daHeaders, 3, nil // only got height 2, need 3+
150+
}
151+
mockEx.EXPECT().GetRangeByHeight(ctx, daHeaders[0], uint64(4)).Return(p2pHeaders, nil)
152+
153+
ew := &exchangeWrapper[*types.SignedHeader]{
154+
p2pExchange: mockEx,
155+
rangeGetter: rangeGetter,
156+
}
157+
158+
result, err := ew.GetRangeByHeight(ctx, from, 4)
159+
assert.NoError(t, err)
160+
assert.Equal(t, append(daHeaders, p2pHeaders...), result)
161+
})
162+
163+
t.Run("No range getter fallback to P2P", func(t *testing.T) {
164+
mockEx := extmocks.NewMockP2PExchange[*types.SignedHeader](t)
165+
166+
from := &types.SignedHeader{Header: types.Header{BaseHeader: types.BaseHeader{Height: 1}}}
167+
expected := []*types.SignedHeader{
168+
{Header: types.Header{BaseHeader: types.BaseHeader{Height: 2}}},
169+
}
170+
mockEx.EXPECT().GetRangeByHeight(ctx, from, uint64(3)).Return(expected, nil)
171+
172+
ew := &exchangeWrapper[*types.SignedHeader]{
173+
p2pExchange: mockEx,
174+
rangeGetter: nil,
175+
}
176+
177+
result, err := ew.GetRangeByHeight(ctx, from, 3)
178+
assert.NoError(t, err)
179+
assert.Equal(t, expected, result)
180+
})
181+
}

0 commit comments

Comments
 (0)