Skip to content

Commit 808976d

Browse files
committed
chore: merge main
2 parents ee0b53d + f70e6da commit 808976d

File tree

10 files changed

+1183
-34
lines changed

10 files changed

+1183
-34
lines changed

.mockery.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,23 @@ packages:
3737
filename: batch.go
3838
github.com/celestiaorg/go-header:
3939
interfaces:
40+
Exchange:
41+
config:
42+
dir: ./test/mocks
43+
pkgname: mocks
44+
filename: external/hexchange.go
4045
Store:
4146
config:
4247
dir: ./test/mocks
4348
pkgname: mocks
4449
filename: external/hstore.go
50+
github.com/evstack/ev-node/pkg/sync:
51+
interfaces:
52+
P2PExchange:
53+
config:
54+
dir: ./test/mocks
55+
pkgname: mocks
56+
filename: external/p2pexchange.go
4557
github.com/evstack/ev-node/block/internal/syncing:
4658
interfaces:
4759
DARetriever:

node/full.go

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -85,12 +85,12 @@ func newFullNode(
8585
evstore = store.WithTracingStore(evstore)
8686
}
8787

88-
headerSyncService, err := initHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
88+
headerSyncService, err := initHeaderSyncService(mainKV, evstore, nodeConfig, genesis, p2pClient, logger)
8989
if err != nil {
9090
return nil, err
9191
}
9292

93-
dataSyncService, err := initDataSyncService(mainKV, nodeConfig, genesis, p2pClient, logger)
93+
dataSyncService, err := initDataSyncService(mainKV, evstore, nodeConfig, genesis, p2pClient, logger)
9494
if err != nil {
9595
return nil, err
9696
}
@@ -147,14 +147,15 @@ func newFullNode(
147147

148148
func initHeaderSyncService(
149149
mainKV ds.Batching,
150+
daStore store.Store,
150151
nodeConfig config.Config,
151152
genesis genesispkg.Genesis,
152153
p2pClient *p2p.Client,
153154
logger zerolog.Logger,
154155
) (*evsync.HeaderSyncService, error) {
155156
componentLogger := logger.With().Str("component", "HeaderSyncService").Logger()
156157

157-
headerSyncService, err := evsync.NewHeaderSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger)
158+
headerSyncService, err := evsync.NewHeaderSyncService(mainKV, daStore, nodeConfig, genesis, p2pClient, componentLogger)
158159
if err != nil {
159160
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
160161
}
@@ -163,14 +164,15 @@ func initHeaderSyncService(
163164

164165
func initDataSyncService(
165166
mainKV ds.Batching,
167+
daStore store.Store,
166168
nodeConfig config.Config,
167169
genesis genesispkg.Genesis,
168170
p2pClient *p2p.Client,
169171
logger zerolog.Logger,
170172
) (*evsync.DataSyncService, error) {
171173
componentLogger := logger.With().Str("component", "DataSyncService").Logger()
172174

173-
dataSyncService, err := evsync.NewDataSyncService(mainKV, nodeConfig, genesis, p2pClient, componentLogger)
175+
dataSyncService, err := evsync.NewDataSyncService(mainKV, daStore, nodeConfig, genesis, p2pClient, componentLogger)
174176
if err != nil {
175177
return nil, fmt.Errorf("error while initializing DataSyncService: %w", err)
176178
}

node/light.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,13 @@ func newLightNode(
4343
logger zerolog.Logger,
4444
) (ln *LightNode, err error) {
4545
componentLogger := logger.With().Str("component", "HeaderSyncService").Logger()
46-
headerSyncService, err := sync.NewHeaderSyncService(database, conf, genesis, p2pClient, componentLogger)
46+
store := store.New(database)
47+
48+
headerSyncService, err := sync.NewHeaderSyncService(database, store, conf, genesis, p2pClient, componentLogger)
4749
if err != nil {
4850
return nil, fmt.Errorf("error while initializing HeaderSyncService: %w", err)
4951
}
5052

51-
store := store.New(database)
52-
5353
node := &LightNode{
5454
P2P: p2pClient,
5555
hSyncService: headerSyncService,

pkg/store/tracing.go

Lines changed: 11 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package store
22

33
import (
44
"context"
5+
"encoding/hex"
56

67
ds "github.com/ipfs/go-datastore"
78
"go.opentelemetry.io/otel"
@@ -60,7 +61,7 @@ func (t *tracedStore) GetBlockData(ctx context.Context, height uint64) (*types.S
6061

6162
func (t *tracedStore) GetBlockByHash(ctx context.Context, hash []byte) (*types.SignedHeader, *types.Data, error) {
6263
ctx, span := t.tracer.Start(ctx, "Store.GetBlockByHash",
63-
trace.WithAttributes(attribute.String("hash", string(hash))),
64+
trace.WithAttributes(attribute.String("hash", hex.EncodeToString(hash))),
6465
)
6566
defer span.End()
6667

@@ -95,7 +96,7 @@ func (t *tracedStore) GetSignature(ctx context.Context, height uint64) (*types.S
9596

9697
func (t *tracedStore) GetSignatureByHash(ctx context.Context, hash []byte) (*types.Signature, error) {
9798
ctx, span := t.tracer.Start(ctx, "Store.GetSignatureByHash",
98-
trace.WithAttributes(attribute.String("hash", string(hash))),
99+
trace.WithAttributes(attribute.String("hash", hex.EncodeToString(hash))),
99100
)
100101
defer span.End()
101102

@@ -229,6 +230,7 @@ func (t *tracedStore) NewBatch(ctx context.Context) (Batch, error) {
229230
return &tracedBatch{
230231
inner: batch,
231232
tracer: t.tracer,
233+
ctx: ctx,
232234
}, nil
233235
}
234236

@@ -237,10 +239,11 @@ var _ Batch = (*tracedBatch)(nil)
237239
type tracedBatch struct {
238240
inner Batch
239241
tracer trace.Tracer
242+
ctx context.Context
240243
}
241244

242245
func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data, signature *types.Signature) error {
243-
_, span := b.tracer.Start(context.Background(), "Batch.SaveBlockData",
246+
_, span := b.tracer.Start(b.ctx, "Batch.SaveBlockData",
244247
trace.WithAttributes(attribute.Int64("height", int64(header.Height()))),
245248
)
246249
defer span.End()
@@ -256,7 +259,7 @@ func (b *tracedBatch) SaveBlockData(header *types.SignedHeader, data *types.Data
256259
}
257260

258261
func (b *tracedBatch) SetHeight(height uint64) error {
259-
_, span := b.tracer.Start(context.Background(), "Batch.SetHeight",
262+
_, span := b.tracer.Start(b.ctx, "Batch.SetHeight",
260263
trace.WithAttributes(attribute.Int64("height", int64(height))),
261264
)
262265
defer span.End()
@@ -272,7 +275,7 @@ func (b *tracedBatch) SetHeight(height uint64) error {
272275
}
273276

274277
func (b *tracedBatch) UpdateState(state types.State) error {
275-
_, span := b.tracer.Start(context.Background(), "Batch.UpdateState",
278+
_, span := b.tracer.Start(b.ctx, "Batch.UpdateState",
276279
trace.WithAttributes(attribute.Int64("state.height", int64(state.LastBlockHeight))),
277280
)
278281
defer span.End()
@@ -288,7 +291,7 @@ func (b *tracedBatch) UpdateState(state types.State) error {
288291
}
289292

290293
func (b *tracedBatch) Commit() error {
291-
_, span := b.tracer.Start(context.Background(), "Batch.Commit")
294+
_, span := b.tracer.Start(b.ctx, "Batch.Commit")
292295
defer span.End()
293296

294297
err := b.inner.Commit()
@@ -302,7 +305,7 @@ func (b *tracedBatch) Commit() error {
302305
}
303306

304307
func (b *tracedBatch) Put(key ds.Key, value []byte) error {
305-
_, span := b.tracer.Start(context.Background(), "Batch.Put",
308+
_, span := b.tracer.Start(b.ctx, "Batch.Put",
306309
trace.WithAttributes(
307310
attribute.String("key", key.String()),
308311
attribute.Int("value.size", len(value)),
@@ -321,7 +324,7 @@ func (b *tracedBatch) Put(key ds.Key, value []byte) error {
321324
}
322325

323326
func (b *tracedBatch) Delete(key ds.Key) error {
324-
_, span := b.tracer.Start(context.Background(), "Batch.Delete",
327+
_, span := b.tracer.Start(b.ctx, "Batch.Delete",
325328
trace.WithAttributes(attribute.String("key", key.String())),
326329
)
327330
defer span.End()

pkg/sync/exchange_wrapper.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package sync
2+
3+
import (
4+
"context"
5+
6+
"github.com/celestiaorg/go-header"
7+
)
8+
9+
// GetterFunc retrieves a header by hash from a backing store.
10+
type GetterFunc[H header.Header[H]] func(context.Context, header.Hash) (H, error)
11+
12+
// GetterByHeightFunc retrieves a header by height from a backing store.
13+
type GetterByHeightFunc[H header.Header[H]] func(context.Context, uint64) (H, error)
14+
15+
// RangeGetterFunc retrieves headers in range [from, to) from a backing store.
16+
// Returns the contiguous headers found starting from 'from', and the next height needed.
17+
type RangeGetterFunc[H header.Header[H]] func(ctx context.Context, from, to uint64) ([]H, uint64, error)
18+
19+
// P2PExchange defines the interface for the underlying P2P exchange.
20+
type P2PExchange[H header.Header[H]] interface {
21+
header.Exchange[H]
22+
Start(context.Context) error
23+
Stop(context.Context) error
24+
}
25+
26+
type exchangeWrapper[H header.Header[H]] struct {
27+
p2pExchange P2PExchange[H]
28+
getter GetterFunc[H]
29+
getterByHeight GetterByHeightFunc[H]
30+
rangeGetter RangeGetterFunc[H]
31+
}
32+
33+
func (ew *exchangeWrapper[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
34+
// Check DA store first
35+
if ew.getter != nil {
36+
if h, err := ew.getter(ctx, hash); err == nil && !h.IsZero() {
37+
return h, nil
38+
}
39+
}
40+
41+
// Fallback to network exchange
42+
return ew.p2pExchange.Get(ctx, hash)
43+
}
44+
45+
func (ew *exchangeWrapper[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
46+
// Check DA store first
47+
if ew.getterByHeight != nil {
48+
if h, err := ew.getterByHeight(ctx, height); err == nil && !h.IsZero() {
49+
return h, nil
50+
}
51+
}
52+
53+
// Fallback to network exchange
54+
return ew.p2pExchange.GetByHeight(ctx, height)
55+
}
56+
57+
func (ew *exchangeWrapper[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
58+
return ew.p2pExchange.Head(ctx, opts...)
59+
}
60+
61+
func (ew *exchangeWrapper[H]) GetRangeByHeight(ctx context.Context, from H, to uint64) ([]H, error) {
62+
fromHeight := from.Height() + 1
63+
64+
// If no range getter, fallback entirely to P2P
65+
if ew.rangeGetter == nil {
66+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
67+
}
68+
69+
// Try DA store first for contiguous range
70+
daHeaders, nextHeight, err := ew.rangeGetter(ctx, fromHeight, to)
71+
if err != nil {
72+
// DA store failed, fallback to P2P for entire range
73+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
74+
}
75+
76+
// Got everything from DA
77+
if nextHeight >= to {
78+
return daHeaders, nil
79+
}
80+
81+
// Need remainder from P2P
82+
if len(daHeaders) == 0 {
83+
// Nothing from DA, get entire range from P2P
84+
return ew.p2pExchange.GetRangeByHeight(ctx, from, to)
85+
}
86+
87+
// Get remainder from P2P starting after last DA header
88+
lastDAHeader := daHeaders[len(daHeaders)-1]
89+
p2pHeaders, err := ew.p2pExchange.GetRangeByHeight(ctx, lastDAHeader, to)
90+
if err != nil {
91+
return nil, err
92+
}
93+
94+
return append(daHeaders, p2pHeaders...), nil
95+
}
96+
97+
func (ew *exchangeWrapper[H]) Start(ctx context.Context) error {
98+
return ew.p2pExchange.Start(ctx)
99+
}
100+
101+
func (ew *exchangeWrapper[H]) Stop(ctx context.Context) error {
102+
return ew.p2pExchange.Stop(ctx)
103+
}

0 commit comments

Comments
 (0)