@@ -22,6 +22,14 @@ import (
2222 "github.com/evstack/ev-node/types"
2323)
2424
25+ type daRetriever interface {
26+ RetrieveFromDA (ctx context.Context , daHeight uint64 ) ([]common.DAHeightEvent , error )
27+ }
28+ type p2pHandler interface {
29+ ProcessHeaderRange (ctx context.Context , fromHeight , toHeight uint64 ) []common.DAHeightEvent
30+ ProcessDataRange (ctx context.Context , fromHeight , toHeight uint64 ) []common.DAHeightEvent
31+ }
32+
2533// Syncer handles block synchronization from DA and P2P sources.
2634type Syncer struct {
2735 // Core components
@@ -51,14 +59,12 @@ type Syncer struct {
5159 dataStore goheader.Store [* types.Data ]
5260
5361 // Channels for coordination
54- heightInCh chan common.DAHeightEvent
55- headerStoreCh chan struct {}
56- dataStoreCh chan struct {}
57- errorCh chan <- error // Channel to report critical execution client failures
62+ heightInCh chan common.DAHeightEvent
63+ errorCh chan <- error // Channel to report critical execution client failures
5864
5965 // Handlers
60- daRetriever * DARetriever
61- p2pHandler * P2PHandler
66+ daRetriever daRetriever
67+ p2pHandler p2pHandler
6268
6369 // Logging
6470 logger zerolog.Logger
@@ -85,23 +91,21 @@ func NewSyncer(
8591 errorCh chan <- error ,
8692) * Syncer {
8793 return & Syncer {
88- store : store ,
89- exec : exec ,
90- da : da ,
91- cache : cache ,
92- metrics : metrics ,
93- config : config ,
94- genesis : genesis ,
95- options : options ,
96- headerStore : headerStore ,
97- dataStore : dataStore ,
98- lastStateMtx : & sync.RWMutex {},
99- daStateMtx : & sync.RWMutex {},
100- heightInCh : make (chan common.DAHeightEvent , 10000 ),
101- headerStoreCh : make (chan struct {}, 1 ),
102- dataStoreCh : make (chan struct {}, 1 ),
103- errorCh : errorCh ,
104- logger : logger .With ().Str ("component" , "syncer" ).Logger (),
94+ store : store ,
95+ exec : exec ,
96+ da : da ,
97+ cache : cache ,
98+ metrics : metrics ,
99+ config : config ,
100+ genesis : genesis ,
101+ options : options ,
102+ headerStore : headerStore ,
103+ dataStore : dataStore ,
104+ lastStateMtx : & sync.RWMutex {},
105+ daStateMtx : & sync.RWMutex {},
106+ heightInCh : make (chan common.DAHeightEvent , 10_000 ),
107+ errorCh : errorCh ,
108+ logger : logger .With ().Str ("component" , "syncer" ).Logger (),
105109 }
106110}
107111
@@ -212,20 +216,10 @@ func (s *Syncer) processLoop() {
212216 s .logger .Info ().Msg ("starting process loop" )
213217 defer s .logger .Info ().Msg ("process loop stopped" )
214218
215- blockTicker := time .NewTicker (s .config .Node .BlockTime .Duration )
216- defer blockTicker .Stop ()
217-
218219 for {
219- // Process pending events from cache on every iteration
220- s .processPendingEvents ()
221-
222220 select {
223221 case <- s .ctx .Done ():
224222 return
225- case <- blockTicker .C :
226- // Signal P2P stores to check for new data
227- s .sendNonBlockingSignal (s .headerStoreCh , "header_store" )
228- s .sendNonBlockingSignal (s .dataStoreCh , "data_store" )
229223 case heightEvent := <- s .heightInCh :
230224 s .processHeightEvent (& heightEvent )
231225 }
@@ -250,15 +244,19 @@ func (s *Syncer) syncLoop() {
250244 var hffDelay time.Duration
251245 var nextDARequestAt time.Time
252246
247+ blockTicker := time .NewTicker (s .config .Node .BlockTime .Duration )
248+ defer blockTicker .Stop ()
249+
253250 // TODO: we should request to see what the head of the chain is at
254251 // then we know if we are falling behind or in sync mode
255- syncLoop:
256252 for {
257253 select {
258254 case <- s .ctx .Done ():
259255 return
260256 default :
261257 }
258+ // Process pending events from cache on every iteration
259+ s .processPendingEvents ()
262260
263261 now := time .Now ()
264262 // Respect backoff window if set
@@ -291,9 +289,7 @@ syncLoop:
291289 select {
292290 case s .heightInCh <- event :
293291 default :
294- s .logger .Warn ().Msg ("height channel full, dropping DA event" )
295- time .Sleep (10 * time .Millisecond )
296- continue syncLoop
292+ s .cache .SetPendingEvent (event .Header .Height (), & event )
297293 }
298294 }
299295
@@ -303,34 +299,34 @@ syncLoop:
303299 }
304300 }
305301
302+ // Opportunistically process any P2P signals
306303 select {
307- case <- s . headerStoreCh :
304+ case <- blockTicker . C :
308305 newHeaderHeight := s .headerStore .Height ()
309306 if newHeaderHeight > lastHeaderHeight {
310307 events := s .p2pHandler .ProcessHeaderRange (s .ctx , lastHeaderHeight + 1 , newHeaderHeight )
311308 for _ , event := range events {
312309 select {
313310 case s .heightInCh <- event :
314311 default :
315- s .logger .Warn ().Msg ("height channel full, dropping P2P header event" )
316- time .Sleep (10 * time .Millisecond )
317- continue syncLoop
312+ s .cache .SetPendingEvent (event .Header .Height (), & event )
318313 }
319314 }
320-
321315 lastHeaderHeight = newHeaderHeight
322316 }
323- case <- s . dataStoreCh :
317+
324318 newDataHeight := s .dataStore .Height ()
319+ if newDataHeight == newHeaderHeight {
320+ lastDataHeight = newDataHeight
321+ continue
322+ }
325323 if newDataHeight > lastDataHeight {
326324 events := s .p2pHandler .ProcessDataRange (s .ctx , lastDataHeight + 1 , newDataHeight )
327325 for _ , event := range events {
328326 select {
329327 case s .heightInCh <- event :
330328 default :
331- s .logger .Warn ().Msg ("height channel full, dropping P2P data event" )
332- time .Sleep (10 * time .Millisecond )
333- continue syncLoop
329+ s .cache .SetPendingEvent (event .Header .Height (), & event )
334330 }
335331 }
336332 lastDataHeight = newDataHeight
0 commit comments