Skip to content

Commit 387c3fb

Browse files
committed
Minor updates
1 parent bdca5fb commit 387c3fb

File tree

7 files changed

+17
-57
lines changed

7 files changed

+17
-57
lines changed

block/internal/executing/executor_restart_test.go

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
7979
require.NoError(t, exec1.initializeState())
8080

8181
// Set up context for first executor
82-
exec1.ctx, exec1.cancel = context.WithCancel(context.Background())
82+
exec1.ctx, exec1.cancel = context.WithCancel(t.Context())
8383

8484
// First executor produces a block normally
8585
mockSeq1.EXPECT().GetNextBatch(mock.Anything, mock.AnythingOfType("sequencer.GetNextBatchRequest")).
@@ -101,12 +101,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
101101
require.NoError(t, err)
102102

103103
// Verify first block was produced
104-
h1, err := memStore.Height(context.Background())
104+
h1, err := memStore.Height(t.Context())
105105
require.NoError(t, err)
106106
assert.Equal(t, uint64(1), h1)
107107

108108
// Store the produced block data for later verification
109-
originalHeader, originalData, err := memStore.GetBlockData(context.Background(), 1)
109+
originalHeader, originalData, err := memStore.GetBlockData(t.Context(), 1)
110110
require.NoError(t, err)
111111
assert.Equal(t, 2, len(originalData.Txs), "first block should have 2 transactions")
112112

@@ -158,8 +158,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
158158
pendingHeader.DataHash = pendingData.DACommitment()
159159

160160
// Save pending block data (this is what would happen during a crash)
161-
// We use savePendingBlock directly which writes to the metadata keys expected by the executor on restart
162-
err = exec1.savePendingBlock(context.Background(), pendingHeader, pendingData)
161+
err = exec1.savePendingBlock(t.Context(), pendingHeader, pendingData)
163162
require.NoError(t, err)
164163

165164
// Stop first executor (simulating crash/restart)
@@ -196,7 +195,7 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
196195
require.NoError(t, exec2.initializeState())
197196

198197
// Set up context for second executor
199-
exec2.ctx, exec2.cancel = context.WithCancel(context.Background())
198+
exec2.ctx, exec2.cancel = context.WithCancel(t.Context())
200199
defer exec2.cancel()
201200

202201
// Verify that the state is at height 1 (pending block at height 2 wasn't committed)
@@ -218,12 +217,12 @@ func TestExecutor_RestartUsesPendingHeader(t *testing.T) {
218217
require.NoError(t, err)
219218

220219
// Verify height advanced to 2
221-
h2, err := memStore.Height(context.Background())
220+
h2, err := memStore.Height(t.Context())
222221
require.NoError(t, err)
223222
assert.Equal(t, uint64(2), h2, "height should advance to 2 using pending block")
224223

225224
// Verify the block at height 2 matches the pending block data
226-
finalHeader, finalData, err := memStore.GetBlockData(context.Background(), 2)
225+
finalHeader, finalData, err := memStore.GetBlockData(t.Context(), 2)
227226
require.NoError(t, err)
228227
assert.Equal(t, 3, len(finalData.Txs), "should use pending block with 3 transactions")
229228
assert.Equal(t, []byte("pending_tx1"), []byte(finalData.Txs[0]))
@@ -385,7 +384,7 @@ func TestExecutor_RestartNoPendingHeader(t *testing.T) {
385384
require.NoError(t, err)
386385

387386
// Verify normal operation
388-
h, err := memStore.Height(context.Background())
387+
h, err := memStore.Height(t.Context())
389388
require.NoError(t, err)
390389
assert.Equal(t, uint64(numBlocks+1), h)
391390

block/internal/syncing/syncer.go

Lines changed: 2 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -722,7 +722,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
722722
currentState := s.getLastState()
723723
headerHash := header.Hash().String()
724724

725-
s.logger.Info().Uint64("height", nextHeight).Msg("syncing block started")
725+
s.logger.Info().Uint64("height", nextHeight).Msg("syncing block")
726726

727727
// Compared to the executor logic where the current block needs to be applied first,
728728
// here only the previous block needs to be applied to proceed to the verification.
@@ -732,17 +732,6 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
732732
s.cache.RemoveHeaderDAIncluded(headerHash)
733733
s.cache.RemoveDataDAIncluded(data.DACommitment().String())
734734

735-
s.logger.Warn().
736-
Err(err).
737-
Uint64("height", header.Height()).
738-
Uint64("time", uint64(header.Time().Unix())).
739-
Hex("proposer", header.ProposerAddress).
740-
Str("data_hash", hex.EncodeToString(header.DataHash)).
741-
Str("app_hash", hex.EncodeToString(header.AppHash)).
742-
Hex("last_header_hash", header.LastHeaderHash).
743-
Int("len signature", len(header.Signature)).
744-
Msg("block validation failed")
745-
746735
if !errors.Is(err, errInvalidState) && !errors.Is(err, errInvalidBlock) {
747736
return errors.Join(errInvalidBlock, err)
748737
}
@@ -810,7 +799,7 @@ func (s *Syncer) TrySyncNextBlock(ctx context.Context, event *common.DAHeightEve
810799
if s.p2pHandler != nil {
811800
s.p2pHandler.SetProcessedHeight(newState.LastBlockHeight)
812801
}
813-
s.logger.Info().Uint64("height", nextHeight).Msg("syncing block completed")
802+
814803
return nil
815804
}
816805

node/failover.go

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -189,23 +189,17 @@ func (f *failoverState) Run(pCtx context.Context) (multiErr error) {
189189
return nil
190190
})
191191

192-
// P2P client persists across mode switches (started/closed by FullNode.Run).
193-
// Reconfigure() was already called in setupFailoverState to re-bootstrap DHT.
194-
195-
// Start header and data sync services concurrently. Each service's
196-
// initFromP2PWithRetry can block up to 30s when peers have no blocks
197-
// (e.g. lazy mode sequencer at height 0). Running them in parallel
198-
// avoids a 60s cumulative startup delay.
192+
// start header and data sync services concurrently to avoid cumulative startup delay.
199193
syncWg, syncCtx := errgroup.WithContext(ctx)
200194
syncWg.Go(func() error {
201195
if err := f.headerSyncService.Start(syncCtx); err != nil {
202-
return fmt.Errorf("error while starting header sync service: %w", err)
196+
return fmt.Errorf("header sync service: %w", err)
203197
}
204198
return nil
205199
})
206200
syncWg.Go(func() error {
207201
if err := f.dataSyncService.Start(syncCtx); err != nil {
208-
return fmt.Errorf("error while starting data sync service: %w", err)
202+
return fmt.Errorf("data sync service: %w", err)
209203
}
210204
return nil
211205
})

node/full.go

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -104,8 +104,6 @@ func newFullNode(
104104
}
105105
}
106106

107-
// The p2p client is fully configured and started before leader election.
108-
// SyncService.getPeerIDs() gates peer usage on conf.Node.Aggregator.
109107
leaderFactory := func() (raftpkg.Runnable, error) {
110108
logger.Info().Msg("Starting aggregator-MODE")
111109
nodeConfig.Node.Aggregator = true
@@ -283,8 +281,7 @@ func (n *FullNode) Run(parentCtx context.Context) error {
283281
n.prometheusSrv, n.pprofSrv = n.startInstrumentationServer()
284282
}
285283

286-
// Start the P2P client once. It persists across mode switches so that
287-
// the host and PubSub (including externally registered topics) survive.
284+
// Start the P2P client once. It persists across mode switches
288285
if err := n.p2pClient.Start(ctx); err != nil {
289286
return fmt.Errorf("start p2p: %w", err)
290287
}

node/node.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,6 @@ type NodeOptions struct {
2828
// NewNode returns a new Full or Light Node based on the config.
2929
// This is the entry point for composing a node, when compiling a node, you need to provide an executor.
3030
// Example executors can be found in apps/
31-
//
32-
// The p2pClient owns the node identity (private key) and is shared across
33-
// mode switches. It supports in-place reconfiguration via Reconfigure().
3431
func NewNode(
3532
conf config.Config,
3633
exec coreexecutor.Executor,

pkg/cmd/run_node.go

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -164,8 +164,6 @@ func StartNode(
164164
executor = telemetry.WithTracingExecutor(executor)
165165
}
166166

167-
// Create the P2P client. It is long-lived and reconfigured in-place
168-
// on mode switches, avoiding costly teardown of the libp2p stack.
169167
p2pClient, err := p2p.NewClient(nodeConfig.P2P, nodeKey.PrivKey, datastore, genesis.ChainID, logger, nil)
170168
if err != nil {
171169
return fmt.Errorf("create p2p client: %w", err)

pkg/p2p/client.go

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,7 @@ type Client struct {
4949
chainID string
5050
privKey crypto.PrivKey
5151

52-
rawHost host.Host // unwrapped libp2p host, stored to avoid double-wrapping via routedhost
53-
host host.Host // may be wrapped with routedhost after DHT setup
52+
host host.Host
5453
dht *dht.IpfsDHT
5554
disc *discovery.RoutingDiscovery
5655
gater *conngater.BasicConnectionGater
@@ -124,20 +123,18 @@ func NewClientWithHost(
124123
// 4. Use active peer discovery to look for peers from same ORU network.
125124
func (c *Client) Start(ctx context.Context) error {
126125
if c.started {
127-
return nil // already started — called from FullNode.Run()
126+
return nil
128127
}
129128
c.logger.Debug().Msg("starting P2P client")
130129

131130
if c.host != nil {
132-
c.rawHost = c.host
133131
return c.startWithHost(ctx, c.host)
134132
}
135133

136134
h, err := c.listen()
137135
if err != nil {
138136
return err
139137
}
140-
c.rawHost = h
141138
return c.startWithHost(ctx, h)
142139
}
143140

@@ -185,21 +182,10 @@ func (c *Client) Close() error {
185182
if c.host != nil {
186183
err = errors.Join(err, c.host.Close())
187184
}
185+
c.started = false
188186
return err
189187
}
190188

191-
// PrivKey returns the node's private key.
192-
func (c *Client) PrivKey() crypto.PrivKey {
193-
return c.privKey
194-
}
195-
196-
// Reconfigure updates the mutable P2P configuration without tearing down
197-
// the libp2p host, PubSub, or DHT. Currently this only updates the
198-
// stored config; the sync service gates peer usage on conf.Node.Aggregator.
199-
func (c *Client) Reconfigure(conf config.P2PConfig) {
200-
c.conf = conf
201-
}
202-
203189
// Addrs returns listen addresses of Client.
204190
func (c *Client) Addrs() []multiaddr.Multiaddr {
205191
return c.host.Addrs()

0 commit comments

Comments
 (0)