Skip to content

Commit 9673004

Browse files
committed
change da sync flow
1 parent 0c9ac5a commit 9673004

File tree

1 file changed

+60
-64
lines changed

1 file changed

+60
-64
lines changed

block/internal/syncing/syncer.go

Lines changed: 60 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -258,7 +258,6 @@ func (s *Syncer) startSyncWorkers() {
258258
}
259259

260260
const (
261-
fastDAPollInterval = 10 * time.Millisecond
262261
futureHeightBackoff = 6 * time.Second // current celestia block time
263262
)
264263

@@ -272,16 +271,71 @@ func (s *Syncer) daWorkerLoop() {
272271
s.logger.Info().Msg("starting DA worker")
273272
defer s.logger.Info().Msg("DA worker stopped")
274273

275-
nextDARequestAt := &time.Time{}
276-
pollInterval := fastDAPollInterval
277-
278274
for {
279-
s.tryFetchFromDA(nextDARequestAt)
275+
err := s.fetchDAUntilCaughtUp()
276+
277+
var backoff time.Duration
278+
if err == nil {
279+
// No error, means we are caught up.
280+
backoff = futureHeightBackoff
281+
} else {
282+
// Error, back off for a shorter duration.
283+
backoff = s.config.DA.BlockTime.Duration
284+
if backoff <= 0 {
285+
backoff = 2 * time.Second
286+
}
287+
}
288+
280289
select {
281290
case <-s.ctx.Done():
282291
return
283-
case <-time.After(pollInterval):
292+
case <-time.After(backoff):
293+
}
294+
}
295+
}
296+
297+
func (s *Syncer) fetchDAUntilCaughtUp() error {
298+
for {
299+
select {
300+
case <-s.ctx.Done():
301+
return s.ctx.Err()
302+
default:
284303
}
304+
305+
daHeight := s.GetDAHeight()
306+
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
307+
if err != nil {
308+
switch {
309+
case errors.Is(err, coreda.ErrBlobNotFound):
310+
s.SetDAHeight(daHeight + 1)
311+
continue // Fetch next height immediately
312+
case errors.Is(err, coreda.ErrHeightFromFuture):
313+
s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests")
314+
return nil // Caught up
315+
default:
316+
s.logger.Error().Err(err).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests")
317+
return err // Other errors
318+
}
319+
}
320+
321+
if len(events) == 0 {
322+
// This can happen if RetrieveFromDA returns no events and no error.
323+
// This is unexpected, but we should handle it to avoid busy-looping.
324+
s.logger.Warn().Uint64("da_height", daHeight).Msg("no events returned from DA, but no error either. Backing off.")
325+
return fmt.Errorf("no events returned from DA for height %d", daHeight)
326+
}
327+
328+
// Process DA events
329+
for _, event := range events {
330+
select {
331+
case s.heightInCh <- event:
332+
default:
333+
s.cache.SetPendingEvent(event.Header.Height(), &event)
334+
}
335+
}
336+
337+
// increment DA height on successful retrieval
338+
s.SetDAHeight(daHeight + 1)
285339
}
286340
}
287341

@@ -379,64 +433,6 @@ func (s *Syncer) waitForGenesis() bool {
379433
return true
380434
}
381435

382-
// tryFetchFromDA attempts to fetch events from the DA layer.
383-
// It handles backoff timing, DA height management, and error classification.
384-
// Returns true if any events were successfully processed.
385-
func (s *Syncer) tryFetchFromDA(nextDARequestAt *time.Time) {
386-
now := time.Now()
387-
daHeight := s.GetDAHeight()
388-
389-
// Respect backoff window if set
390-
if !nextDARequestAt.IsZero() && now.Before(*nextDARequestAt) {
391-
return
392-
}
393-
394-
// Retrieve from DA as fast as possible (unless throttled by HFF)
395-
// DaHeight is only increased on successful retrieval, it will retry on failure at the next iteration
396-
events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
397-
if err != nil {
398-
switch {
399-
case errors.Is(err, coreda.ErrBlobNotFound):
400-
// no data at this height, increase DA height
401-
s.SetDAHeight(daHeight + 1)
402-
// Reset backoff on success
403-
*nextDARequestAt = time.Time{}
404-
return
405-
case errors.Is(err, coreda.ErrHeightFromFuture):
406-
delay := futureHeightBackoff
407-
*nextDARequestAt = now.Add(delay)
408-
s.logger.Debug().Err(err).Dur("delay", delay).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests")
409-
return
410-
default:
411-
// Back off exactly by DA block time to avoid overloading
412-
backoffDelay := s.config.DA.BlockTime.Duration
413-
if backoffDelay <= 0 {
414-
backoffDelay = 2 * time.Second
415-
}
416-
*nextDARequestAt = now.Add(backoffDelay)
417-
418-
s.logger.Error().Err(err).Dur("delay", backoffDelay).Uint64("da_height", daHeight).Msg("failed to retrieve from DA; backing off DA requests")
419-
420-
return
421-
}
422-
}
423-
424-
// Reset backoff on success
425-
*nextDARequestAt = time.Time{}
426-
427-
// Process DA events
428-
for _, event := range events {
429-
select {
430-
case s.heightInCh <- event:
431-
default:
432-
s.cache.SetPendingEvent(event.Header.Height(), &event)
433-
}
434-
}
435-
436-
// increment DA height on successful retrieval
437-
s.SetDAHeight(daHeight + 1)
438-
}
439-
440436
func (s *Syncer) processHeightEvent(event *common.DAHeightEvent) {
441437
height := event.Header.Height()
442438
headerHash := event.Header.Hash().String()

0 commit comments

Comments
 (0)