Skip to content

Commit 57e77fc

Browse files
committed
feat: add background P2P init retries to SyncService
1 parent 9ad4016 commit 57e77fc

File tree

1 file changed

+82
-36
lines changed

1 file changed

+82
-36
lines changed

pkg/sync/sync_service.go

Lines changed: 82 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,10 @@ type SyncService[H header.Header[H]] struct {
5656
syncerStatus *SyncerStatus
5757
topicSubscription header.Subscription[H]
5858
storeInitialized atomic.Bool
59+
60+
// context for background operations
61+
bgCtx context.Context
62+
bgCancel context.CancelFunc
5963
}
6064

6165
// DataSyncService is the P2P Sync Service for blocks.
@@ -107,6 +111,8 @@ func newSyncService[H header.Header[H]](
107111
return nil, fmt.Errorf("failed to initialize the %s store: %w", syncType, err)
108112
}
109113

114+
bgCtx, bgCancel := context.WithCancel(context.Background())
115+
110116
svc := &SyncService[H]{
111117
conf: conf,
112118
genesis: genesis,
@@ -115,6 +121,8 @@ func newSyncService[H header.Header[H]](
115121
syncType: syncType,
116122
logger: logger,
117123
syncerStatus: new(SyncerStatus),
124+
bgCtx: bgCtx,
125+
bgCancel: bgCancel,
118126
}
119127

120128
return svc, nil
@@ -307,6 +315,42 @@ func (syncService *SyncService[H]) startSubscriber(ctx context.Context) error {
307315
return nil
308316
}
309317

318+
// tryInit attempts to initialize the syncer from P2P once.
319+
// Returns true if successful, false otherwise with an error.
320+
func (syncService *SyncService[H]) tryInit(ctx context.Context) (bool, error) {
321+
var (
322+
trusted H
323+
err error
324+
heightToQuery uint64
325+
)
326+
327+
head, headErr := syncService.store.Head(ctx)
328+
switch {
329+
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
330+
heightToQuery = syncService.genesis.InitialHeight
331+
case headErr != nil:
332+
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
333+
default:
334+
heightToQuery = head.Height()
335+
}
336+
337+
if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
338+
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
339+
}
340+
341+
if syncService.storeInitialized.CompareAndSwap(false, true) {
342+
if _, err := syncService.initStore(ctx, trusted); err != nil {
343+
syncService.storeInitialized.Store(false)
344+
return false, fmt.Errorf("failed to initialize the store: %w", err)
345+
}
346+
}
347+
if err := syncService.startSyncer(ctx); err != nil {
348+
return false, err
349+
}
350+
351+
return true, nil
352+
}
353+
310354
// initFromP2PWithRetry initializes the syncer from P2P with a retry mechanism.
311355
// It inspects the local store to determine the first height to request:
312356
// - when the store already contains items, it reuses the latest height as the starting point;
@@ -316,48 +360,15 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
316360
return nil
317361
}
318362

319-
tryInit := func(ctx context.Context) (bool, error) {
320-
var (
321-
trusted H
322-
err error
323-
heightToQuery uint64
324-
)
325-
326-
head, headErr := syncService.store.Head(ctx)
327-
switch {
328-
case errors.Is(headErr, header.ErrNotFound), errors.Is(headErr, header.ErrEmptyStore):
329-
heightToQuery = syncService.genesis.InitialHeight
330-
case headErr != nil:
331-
return false, fmt.Errorf("failed to inspect local store head: %w", headErr)
332-
default:
333-
heightToQuery = head.Height()
334-
}
335-
336-
if trusted, err = syncService.ex.GetByHeight(ctx, heightToQuery); err != nil {
337-
return false, fmt.Errorf("failed to fetch height %d from peers: %w", heightToQuery, err)
338-
}
339-
340-
if syncService.storeInitialized.CompareAndSwap(false, true) {
341-
if _, err := syncService.initStore(ctx, trusted); err != nil {
342-
syncService.storeInitialized.Store(false)
343-
return false, fmt.Errorf("failed to initialize the store: %w", err)
344-
}
345-
}
346-
if err := syncService.startSyncer(ctx); err != nil {
347-
return false, err
348-
}
349-
return true, nil
350-
}
351-
352363
// block with exponential backoff until initialization succeeds or context is canceled.
353364
backoff := 1 * time.Second
354365
maxBackoff := 10 * time.Second
355366

356-
timeoutTimer := time.NewTimer(time.Minute * 10)
367+
timeoutTimer := time.NewTimer(time.Minute * 5)
357368
defer timeoutTimer.Stop()
358369

359370
for {
360-
ok, err := tryInit(ctx)
371+
ok, err := syncService.tryInit(ctx)
361372
if ok {
362373
return nil
363374
}
@@ -368,7 +379,39 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
368379
case <-ctx.Done():
369380
return ctx.Err()
370381
case <-timeoutTimer.C:
371-
return fmt.Errorf("timeout reached while trying to initialize the store after 10 minutes: %w", err)
382+
syncService.logger.Warn().Err(err).Msg("timeout reached while trying to initialize the store, scheduling background retry")
383+
384+
go syncService.retryInitInBackground()
385+
386+
return nil
387+
case <-time.After(backoff):
388+
}
389+
390+
backoff *= 2
391+
if backoff > maxBackoff {
392+
backoff = maxBackoff
393+
}
394+
}
395+
}
396+
397+
// retryInitInBackground continues attempting to initialize the syncer in the background.
398+
func (syncService *SyncService[H]) retryInitInBackground() {
399+
backoff := 15 * time.Second
400+
maxBackoff := 5 * time.Minute
401+
402+
for {
403+
ok, err := syncService.tryInit(syncService.bgCtx)
404+
if ok {
405+
syncService.logger.Info().Msg("successfully initialized store from P2P in background")
406+
return
407+
}
408+
409+
syncService.logger.Info().Err(err).Dur("retry_in", backoff).Msg("background retry: headers not yet available from peers")
410+
411+
select {
412+
case <-syncService.bgCtx.Done():
413+
syncService.logger.Info().Msg("background retry cancelled")
414+
return
372415
case <-time.After(backoff):
373416
}
374417

@@ -383,6 +426,9 @@ func (syncService *SyncService[H]) initFromP2PWithRetry(ctx context.Context, pee
383426
//
384427
// `store` is closed last because it's used by other services.
385428
func (syncService *SyncService[H]) Stop(ctx context.Context) error {
429+
// cancel background operations
430+
syncService.bgCancel()
431+
386432
// unsubscribe from topic first so that sub.Stop() does not fail
387433
syncService.topicSubscription.Cancel()
388434
err := errors.Join(

0 commit comments

Comments
 (0)