Skip to content

Commit 8d12323

Browse files
committed
fix errors
1 parent 699cadf commit 8d12323

2 files changed

Lines changed: 26 additions & 35 deletions

File tree

block/internal/syncing/p2p_handler.go

Lines changed: 12 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ import (
44
"bytes"
55
"context"
66
"fmt"
7-
"sync"
7+
"sync/atomic"
88

99
goheader "github.com/celestiaorg/go-header"
1010
"github.com/rs/zerolog"
@@ -28,8 +28,7 @@ type P2PHandler struct {
2828
genesis genesis.Genesis
2929
logger zerolog.Logger
3030

31-
mu sync.Mutex
32-
processedHeight uint64 // highest block fully applied by the syncer
31+
processedHeight atomic.Uint64
3332
}
3433

3534
// NewP2PHandler creates a new P2P handler.
@@ -51,22 +50,22 @@ func NewP2PHandler(
5150

5251
// SetProcessedHeight updates the highest processed block height.
5352
func (h *P2PHandler) SetProcessedHeight(height uint64) {
54-
h.mu.Lock()
55-
if height > h.processedHeight {
56-
h.processedHeight = height
53+
for {
54+
current := h.processedHeight.Load()
55+
if height <= current {
56+
return
57+
}
58+
if h.processedHeight.CompareAndSwap(current, height) {
59+
return
60+
}
5761
}
58-
h.mu.Unlock()
5962
}
6063

6164
// ProcessHeight retrieves and validates both header and data for the given height from P2P stores.
6265
// It blocks until both are available, validates consistency (proposer address and data hash match),
6366
// then emits the event to heightInCh or stores it as pending. Updates processedHeight on success.
6467
func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error {
65-
h.mu.Lock()
66-
shouldProcess := height > h.processedHeight
67-
h.mu.Unlock()
68-
69-
if !shouldProcess {
68+
if height <= h.processedHeight.Load() {
7069
return nil
7170
}
7271

@@ -110,11 +109,7 @@ func (h *P2PHandler) ProcessHeight(ctx context.Context, height uint64, heightInC
110109
h.cache.SetPendingEvent(event.Header.Height(), &event)
111110
}
112111

113-
h.mu.Lock()
114-
if height > h.processedHeight {
115-
h.processedHeight = height
116-
}
117-
h.mu.Unlock()
112+
h.SetProcessedHeight(height)
118113

119114
h.logger.Debug().Uint64("height", height).Msg("processed event from P2P")
120115
return nil

block/internal/syncing/syncer.go

Lines changed: 14 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,7 @@ type Syncer struct {
7676
wg sync.WaitGroup
7777

7878
// P2P wait coordination
79-
p2pWaitMu sync.Mutex
80-
p2pWaitState p2pWaitState
79+
p2pWaitState atomic.Value // stores p2pWaitState
8180
}
8281

8382
// NewSyncer creates a new block syncer
@@ -257,10 +256,6 @@ func (s *Syncer) startSyncWorkers() {
257256
go s.p2pWorkerLoop()
258257
}
259258

260-
const (
261-
futureHeightBackoff = 6 * time.Second // current celestia block time
262-
)
263-
264259
func (s *Syncer) daWorkerLoop() {
265260
defer s.wg.Done()
266261

@@ -277,7 +272,7 @@ func (s *Syncer) daWorkerLoop() {
277272
var backoff time.Duration
278273
if err == nil {
279274
// No error, means we are caught up.
280-
backoff = futureHeightBackoff
275+
backoff = s.config.DA.BlockTime.Duration
281276
} else {
282277
// Error, back off for a shorter duration.
283278
backoff = s.config.DA.BlockTime.Duration
@@ -772,20 +767,21 @@ type p2pWaitState struct {
772767
}
773768

774769
func (s *Syncer) setP2PWaitState(height uint64, cancel context.CancelFunc) {
775-
s.p2pWaitMu.Lock()
776-
defer s.p2pWaitMu.Unlock()
777-
s.p2pWaitState = p2pWaitState{
778-
height: height,
779-
cancel: cancel,
780-
}
770+
s.p2pWaitState.Store(p2pWaitState{height: height, cancel: cancel})
781771
}
782772

783773
func (s *Syncer) cancelP2PWait(height uint64) {
784-
s.p2pWaitMu.Lock()
785-
defer s.p2pWaitMu.Unlock()
774+
val := s.p2pWaitState.Load()
775+
if val == nil {
776+
return
777+
}
778+
state, ok := val.(p2pWaitState)
779+
if !ok || state.cancel == nil {
780+
return
781+
}
786782

787-
if s.p2pWaitState.cancel != nil && (height == 0 || s.p2pWaitState.height <= height) {
788-
s.p2pWaitState.cancel()
789-
s.p2pWaitState = p2pWaitState{}
783+
if height == 0 || state.height <= height {
784+
s.p2pWaitState.Store(p2pWaitState{})
785+
state.cancel()
790786
}
791787
}

0 commit comments

Comments
 (0)