Skip to content

Commit c40b96b

Browse files
committed
Indipendent types for p2p store
1 parent e3336ce commit c40b96b

26 files changed

+586
-196
lines changed

apps/evm/cmd/rollback.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import (
55
"errors"
66
"fmt"
77

8-
"github.com/evstack/ev-node/pkg/sync"
8+
"github.com/evstack/ev-node/types"
99
ds "github.com/ipfs/go-datastore"
1010
kt "github.com/ipfs/go-datastore/keytransform"
1111
"github.com/spf13/cobra"
@@ -70,7 +70,7 @@ func NewRollbackCmd() *cobra.Command {
7070
}
7171

7272
// rollback ev-node goheader state
73-
headerStore, err := goheaderstore.NewStore[*sync.SignedHeaderWithDAHint](
73+
headerStore, err := goheaderstore.NewStore[*types.P2PSignedHeader](
7474
evolveDB,
7575
goheaderstore.WithStorePrefix("headerSync"),
7676
goheaderstore.WithMetrics(),
@@ -79,7 +79,7 @@ func NewRollbackCmd() *cobra.Command {
7979
return err
8080
}
8181

82-
dataStore, err := goheaderstore.NewStore[*sync.DataWithDAHint](
82+
dataStore, err := goheaderstore.NewStore[*types.P2PData](
8383
evolveDB,
8484
goheaderstore.WithStorePrefix("dataSync"),
8585
goheaderstore.WithMetrics(),

apps/testapp/cmd/rollback.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import (
1010
"github.com/evstack/ev-node/node"
1111
rollcmd "github.com/evstack/ev-node/pkg/cmd"
1212
"github.com/evstack/ev-node/pkg/store"
13-
"github.com/evstack/ev-node/pkg/sync"
13+
"github.com/evstack/ev-node/types"
1414
ds "github.com/ipfs/go-datastore"
1515
kt "github.com/ipfs/go-datastore/keytransform"
1616
"github.com/spf13/cobra"
@@ -75,7 +75,7 @@ func NewRollbackCmd() *cobra.Command {
7575
}
7676

7777
// rollback ev-node goheader state
78-
headerStore, err := goheaderstore.NewStore[*sync.SignedHeaderWithDAHint](
78+
headerStore, err := goheaderstore.NewStore[*types.P2PSignedHeader](
7979
evolveDB,
8080
goheaderstore.WithStorePrefix("headerSync"),
8181
goheaderstore.WithMetrics(),
@@ -84,7 +84,7 @@ func NewRollbackCmd() *cobra.Command {
8484
return err
8585
}
8686

87-
dataStore, err := goheaderstore.NewStore[*sync.DataWithDAHint](
87+
dataStore, err := goheaderstore.NewStore[*types.P2PData](
8888
evolveDB,
8989
goheaderstore.WithStorePrefix("dataSync"),
9090
goheaderstore.WithMetrics(),

block/internal/common/expected_interfaces.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ import (
1010
)
1111

1212
type (
13-
HeaderP2PBroadcaster = Broadcaster[*types.SignedHeader]
14-
DataP2PBroadcaster = Broadcaster[*types.Data]
13+
HeaderP2PBroadcaster = Broadcaster[*types.P2PSignedHeader]
14+
DataP2PBroadcaster = Broadcaster[*types.P2PData]
1515
)
1616

1717
// Broadcaster interface for P2P broadcasting

block/internal/executing/executor.go

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -439,8 +439,12 @@ func (e *Executor) produceBlock() error {
439439

440440
// broadcast header and data to P2P network
441441
g, ctx := errgroup.WithContext(e.ctx)
442-
g.Go(func() error { return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, header) })
443-
g.Go(func() error { return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, data) })
442+
g.Go(func() error {
443+
return e.headerBroadcaster.WriteToStoreAndBroadcast(ctx, &types.P2PSignedHeader{SignedHeader: *header})
444+
})
445+
g.Go(func() error {
446+
return e.dataBroadcaster.WriteToStoreAndBroadcast(ctx, &types.P2PData{Data: *data})
447+
})
444448
if err := g.Wait(); err != nil {
445449
e.logger.Error().Err(err).Msg("failed to broadcast header and/data")
446450
// don't fail block production on broadcast error

block/internal/executing/executor_lazy_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ func TestLazyMode_ProduceBlockLogic(t *testing.T) {
4747

4848
mockExec := testmocks.NewMockExecutor(t)
4949
mockSeq := testmocks.NewMockSequencer(t)
50-
hb := common.NewMockBroadcaster[*types.SignedHeader](t)
50+
hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
5151
hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
52-
db := common.NewMockBroadcaster[*types.Data](t)
52+
db := common.NewMockBroadcaster[*types.P2PData](t)
5353
db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
5454

5555
exec, err := NewExecutor(
@@ -162,9 +162,9 @@ func TestRegularMode_ProduceBlockLogic(t *testing.T) {
162162

163163
mockExec := testmocks.NewMockExecutor(t)
164164
mockSeq := testmocks.NewMockSequencer(t)
165-
hb := common.NewMockBroadcaster[*types.SignedHeader](t)
165+
hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
166166
hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
167-
db := common.NewMockBroadcaster[*types.Data](t)
167+
db := common.NewMockBroadcaster[*types.P2PData](t)
168168
db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
169169

170170
exec, err := NewExecutor(

block/internal/executing/executor_logic_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -69,9 +69,9 @@ func TestProduceBlock_EmptyBatch_SetsEmptyDataHash(t *testing.T) {
6969
mockSeq := testmocks.NewMockSequencer(t)
7070

7171
// Broadcasters are required by produceBlock; use generated mocks
72-
hb := common.NewMockBroadcaster[*types.SignedHeader](t)
72+
hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
7373
hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
74-
db := common.NewMockBroadcaster[*types.Data](t)
74+
db := common.NewMockBroadcaster[*types.P2PData](t)
7575
db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
7676

7777
exec, err := NewExecutor(
@@ -159,9 +159,9 @@ func TestPendingLimit_SkipsProduction(t *testing.T) {
159159

160160
mockExec := testmocks.NewMockExecutor(t)
161161
mockSeq := testmocks.NewMockSequencer(t)
162-
hb := common.NewMockBroadcaster[*types.SignedHeader](t)
162+
hb := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
163163
hb.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
164-
db := common.NewMockBroadcaster[*types.Data](t)
164+
db := common.NewMockBroadcaster[*types.P2PData](t)
165165
db.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
166166

167167
exec, err := NewExecutor(

block/internal/executing/executor_restart_test.go

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
4747
// Create first executor instance
4848
mockExec1 := testmocks.NewMockExecutor(t)
4949
mockSeq1 := testmocks.NewMockSequencer(t)
50-
hb1 := common.NewMockBroadcaster[*types.SignedHeader](t)
50+
hb1 := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
5151
hb1.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
52-
db1 := common.NewMockBroadcaster[*types.Data](t)
52+
db1 := common.NewMockBroadcaster[*types.P2PData](t)
5353
db1.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
5454

5555
exec1, err := NewExecutor(
@@ -169,9 +169,9 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
169169
// Create second executor instance (restart scenario)
170170
mockExec2 := testmocks.NewMockExecutor(t)
171171
mockSeq2 := testmocks.NewMockSequencer(t)
172-
hb2 := common.NewMockBroadcaster[*types.SignedHeader](t)
172+
hb2 := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
173173
hb2.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
174-
db2 := common.NewMockBroadcaster[*types.Data](t)
174+
db2 := common.NewMockBroadcaster[*types.P2PData](t)
175175
db2.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
176176

177177
exec2, err := NewExecutor(
@@ -270,9 +270,9 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
270270
// Create first executor and produce one block
271271
mockExec1 := testmocks.NewMockExecutor(t)
272272
mockSeq1 := testmocks.NewMockSequencer(t)
273-
hb1 := common.NewMockBroadcaster[*types.SignedHeader](t)
273+
hb1 := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
274274
hb1.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
275-
db1 := common.NewMockBroadcaster[*types.Data](t)
275+
db1 := common.NewMockBroadcaster[*types.P2PData](t)
276276
db1.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
277277

278278
exec1, err := NewExecutor(
@@ -325,9 +325,9 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
325325
// Create second executor (restart)
326326
mockExec2 := testmocks.NewMockExecutor(t)
327327
mockSeq2 := testmocks.NewMockSequencer(t)
328-
hb2 := common.NewMockBroadcaster[*types.SignedHeader](t)
328+
hb2 := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
329329
hb2.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
330-
db2 := common.NewMockBroadcaster[*types.Data](t)
330+
db2 := common.NewMockBroadcaster[*types.P2PData](t)
331331
db2.EXPECT().WriteToStoreAndBroadcast(mock.Anything, mock.Anything).Return(nil).Maybe()
332332

333333
exec2, err := NewExecutor(

block/internal/executing/executor_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,8 +39,8 @@ func TestExecutor_BroadcasterIntegration(t *testing.T) {
3939
}
4040

4141
// Create mock broadcasters
42-
headerBroadcaster := common.NewMockBroadcaster[*types.SignedHeader](t)
43-
dataBroadcaster := common.NewMockBroadcaster[*types.Data](t)
42+
headerBroadcaster := common.NewMockBroadcaster[*types.P2PSignedHeader](t)
43+
dataBroadcaster := common.NewMockBroadcaster[*types.P2PData](t)
4444

4545
// Create executor with broadcasters
4646
executor, err := NewExecutor(

block/internal/syncing/p2p_handler.go

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -32,8 +32,8 @@ type HeightStore[H header.Header[H]] interface {
3232
// The handler maintains a processedHeight to track the highest block that has been
3333
// successfully validated and sent to the syncer, preventing duplicate processing.
3434
type P2PHandler struct {
35-
headerStore HeightStore[*types.SignedHeader]
36-
dataStore HeightStore[*types.Data]
35+
headerStore HeightStore[*types.P2PSignedHeader]
36+
dataStore HeightStore[*types.P2PData]
3737
cache cache.CacheManager
3838
genesis genesis.Genesis
3939
logger zerolog.Logger
@@ -43,8 +43,8 @@ type P2PHandler struct {
4343

4444
// NewP2PHandler creates a new P2P handler.
4545
func NewP2PHandler(
46-
headerStore HeightStore[*types.SignedHeader],
47-
dataStore HeightStore[*types.Data],
46+
headerStore HeightStore[*types.P2PSignedHeader],
47+
dataStore HeightStore[*types.P2PData],
4848
cache cache.CacheManager,
4949
genesis genesis.Genesis,
5050
logger zerolog.Logger,
@@ -79,25 +79,27 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC
7979
return nil
8080
}
8181

82-
header, headerDAHint, err := h.headerStore.GetByHeight(ctx, height)
82+
p2pHeader, headerDAHint, err := h.headerStore.GetByHeight(ctx, height)
8383
if err != nil {
8484
if ctx.Err() == nil {
8585
h.logger.Debug().Uint64("height", height).Err(err).Msg("header unavailable in store")
8686
}
8787
return err
8888
}
89+
header := &p2pHeader.SignedHeader
8990
if err := h.assertExpectedProposer(header.ProposerAddress); err != nil {
9091
h.logger.Debug().Uint64("height", height).Err(err).Msg("invalid header from P2P")
9192
return err
9293
}
9394

94-
data, dataDAHint, err := h.dataStore.GetByHeight(ctx, height)
95+
p2pData, dataDAHint, err := h.dataStore.GetByHeight(ctx, height)
9596
if err != nil {
9697
if ctx.Err() == nil {
9798
h.logger.Debug().Uint64("height", height).Err(err).Msg("data unavailable in store")
9899
}
99100
return err
100101
}
102+
data := &p2pData.Data
101103
dataCommitment := data.DACommitment()
102104
if !bytes.Equal(header.DataHash[:], dataCommitment[:]) {
103105
err := fmt.Errorf("data hash mismatch: header %x, data %x", header.DataHash, dataCommitment)

block/internal/syncing/p2p_handler_test.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@ func buildTestSigner(t *testing.T) ([]byte, crypto.PubKey, signerpkg.Signer) {
3636
}
3737

3838
// p2pMakeSignedHeader creates a minimally valid SignedHeader for P2P tests.
39-
func p2pMakeSignedHeader(t *testing.T, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer) *types.SignedHeader {
39+
func p2pMakeSignedHeader(t *testing.T, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer) *types.P2PSignedHeader {
4040
t.Helper()
4141
hdr := &types.SignedHeader{
4242
Header: types.Header{
@@ -50,14 +50,14 @@ func p2pMakeSignedHeader(t *testing.T, chainID string, height uint64, proposer [
5050
sig, err := signer.Sign(bz)
5151
require.NoError(t, err, "failed to sign header bytes")
5252
hdr.Signature = sig
53-
return hdr
53+
return &types.P2PSignedHeader{SignedHeader: *hdr}
5454
}
5555

5656
// P2PTestData aggregates dependencies used by P2P handler tests.
5757
type P2PTestData struct {
5858
Handler *P2PHandler
59-
HeaderStore *MockHeightStore[*types.SignedHeader]
60-
DataStore *MockHeightStore[*types.Data]
59+
HeaderStore *MockHeightStore[*types.P2PSignedHeader]
60+
DataStore *MockHeightStore[*types.P2PData]
6161
Cache cache.CacheManager
6262
Genesis genesis.Genesis
6363
ProposerAddr []byte
@@ -72,8 +72,8 @@ func setupP2P(t *testing.T) *P2PTestData {
7272

7373
gen := genesis.Genesis{ChainID: "p2p-test", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: proposerAddr}
7474

75-
headerStoreMock := NewMockHeightStore[*types.SignedHeader](t)
76-
dataStoreMock := NewMockHeightStore[*types.Data](t)
75+
headerStoreMock := NewMockHeightStore[*types.P2PSignedHeader](t)
76+
dataStoreMock := NewMockHeightStore[*types.P2PData](t)
7777

7878
cfg := config.Config{
7979
RootDir: t.TempDir(),
@@ -128,7 +128,7 @@ func TestP2PHandler_ProcessHeight_EmitsEventWhenHeaderAndDataPresent(t *testing.
128128
require.Equal(t, string(p.Genesis.ProposerAddress), string(p.ProposerAddr))
129129

130130
header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 5, p.ProposerAddr, p.ProposerPub, p.Signer)
131-
data := makeData(p.Genesis.ChainID, 5, 1)
131+
data := &types.P2PData{Data: *makeData(p.Genesis.ChainID, 5, 1)}
132132
header.DataHash = data.DACommitment()
133133
bz, err := types.DefaultAggregatorNodeSignatureBytesProvider(&header.Header)
134134
require.NoError(t, err)
@@ -154,7 +154,7 @@ func TestP2PHandler_ProcessHeight_SkipsWhenDataMissing(t *testing.T) {
154154
ctx := context.Background()
155155

156156
header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 7, p.ProposerAddr, p.ProposerPub, p.Signer)
157-
data := makeData(p.Genesis.ChainID, 7, 1)
157+
data := &types.P2PData{Data: *makeData(p.Genesis.ChainID, 7, 1)}
158158
header.DataHash = data.DACommitment()
159159
bz, err := types.DefaultAggregatorNodeSignatureBytesProvider(&header.Header)
160160
require.NoError(t, err)
@@ -224,7 +224,7 @@ func TestP2PHandler_ProcessedHeightSkipsPreviouslyHandledBlocks(t *testing.T) {
224224

225225
// Height 6 should be fetched normally.
226226
header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 6, p.ProposerAddr, p.ProposerPub, p.Signer)
227-
data := makeData(p.Genesis.ChainID, 6, 1)
227+
data := &types.P2PData{Data: *makeData(p.Genesis.ChainID, 6, 1)}
228228
header.DataHash = data.DACommitment()
229229
bz, err := types.DefaultAggregatorNodeSignatureBytesProvider(&header.Header)
230230
require.NoError(t, err)
@@ -247,7 +247,7 @@ func TestP2PHandler_SetProcessedHeightPreventsDuplicates(t *testing.T) {
247247
ctx := context.Background()
248248

249249
header := p2pMakeSignedHeader(t, p.Genesis.ChainID, 8, p.ProposerAddr, p.ProposerPub, p.Signer)
250-
data := makeData(p.Genesis.ChainID, 8, 0)
250+
data := &types.P2PData{Data: *makeData(p.Genesis.ChainID, 8, 0)}
251251
header.DataHash = data.DACommitment()
252252
bz, err := types.DefaultAggregatorNodeSignatureBytesProvider(&header.Header)
253253
require.NoError(t, err)

0 commit comments

Comments
 (0)