Skip to content

Commit d40805f

Browse files
committed
add caching
1 parent 8b0f5c9 commit d40805f

3 files changed

Lines changed: 66 additions & 16 deletions

File tree

block/manager.go

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -106,8 +106,9 @@ type Manager struct {
106106
headerStore goheader.Store[*types.SignedHeader]
107107
dataStore goheader.Store[*types.Data]
108108

109-
headerCache *cache.Cache[types.SignedHeader]
110-
dataCache *cache.Cache[types.Data]
109+
headerCache *cache.Cache[types.SignedHeader]
110+
dataCache *cache.Cache[types.Data]
111+
pendingEventsCache *cache.Cache[pendingDAEvent]
111112

112113
// headerStoreCh is used to notify sync goroutine (HeaderStoreRetrieveLoop) that it needs to retrieve headers from headerStore
113114
headerStoreCh chan struct{}
@@ -385,6 +386,7 @@ func NewManager(
385386
lastBatchData: lastBatchData,
386387
headerCache: cache.NewCache[types.SignedHeader](),
387388
dataCache: cache.NewCache[types.Data](),
389+
pendingEventsCache: cache.NewCache[pendingDAEvent](),
388390
retrieveCh: make(chan struct{}, 1),
389391
daIncluderCh: make(chan struct{}, 1),
390392
logger: logger,
@@ -1060,9 +1062,10 @@ func (m *Manager) NotifyNewTransactions() {
10601062
}
10611063

10621064
var (
1063-
cacheDir = "cache"
1064-
headerCacheDir = filepath.Join(cacheDir, "header")
1065-
dataCacheDir = filepath.Join(cacheDir, "data")
1065+
cacheDir = "cache"
1066+
headerCacheDir = filepath.Join(cacheDir, "header")
1067+
dataCacheDir = filepath.Join(cacheDir, "data")
1068+
pendingEventsCacheDir = filepath.Join(cacheDir, "pending_da_events")
10661069
)
10671070

10681071
// HeaderCache returns the headerCache used by the manager.
@@ -1079,6 +1082,7 @@ func (m *Manager) DataCache() *cache.Cache[types.Data] {
10791082
func (m *Manager) LoadCache() error {
10801083
gob.Register(&types.SignedHeader{})
10811084
gob.Register(&types.Data{})
1085+
gob.Register(&pendingDAEvent{})
10821086

10831087
cfgDir := filepath.Join(m.config.RootDir, "data")
10841088

@@ -1090,6 +1094,10 @@ func (m *Manager) LoadCache() error {
10901094
return fmt.Errorf("failed to load data cache from disk: %w", err)
10911095
}
10921096

1097+
if err := m.pendingEventsCache.LoadFromDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil {
1098+
return fmt.Errorf("failed to load pending events cache from disk")
1099+
}
1100+
10931101
return nil
10941102
}
10951103

@@ -1104,6 +1112,11 @@ func (m *Manager) SaveCache() error {
11041112
if err := m.dataCache.SaveToDisk(filepath.Join(cfgDir, dataCacheDir)); err != nil {
11051113
return fmt.Errorf("failed to save data cache to disk: %w", err)
11061114
}
1115+
1116+
if err := m.pendingEventsCache.SaveToDisk(filepath.Join(cfgDir, pendingEventsCacheDir)); err != nil {
1117+
return fmt.Errorf("failed to save pending events cache to disk: %w", err)
1118+
}
1119+
11071120
return nil
11081121
}
11091122

block/retriever_da.go

Lines changed: 31 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,11 +21,11 @@ const (
2121
dAFetcherRetries = 10
2222
)
2323

24-
// daRetriever encapsulates DA retrieval with pending events management
24+
// daRetriever encapsulates DA retrieval with pending events management.
25+
// Pending events are persisted via Manager.pendingEventsCache to avoid data loss on retries or restarts.
2526
type daRetriever struct {
26-
manager *Manager
27-
mutex sync.RWMutex // mutex for pendingEvents
28-
pendingEvents map[uint64]pendingDAEvent
27+
manager *Manager
28+
mutex sync.RWMutex // mutex for pendingEvents
2929
}
3030

3131
// pendingDAEvent represents a DA event waiting for processing
@@ -38,8 +38,7 @@ type pendingDAEvent struct {
3838
// newDARetriever creates a new DA retriever
3939
func newDARetriever(manager *Manager) *daRetriever {
4040
return &daRetriever{
41-
manager: manager,
42-
pendingEvents: make(map[uint64]pendingDAEvent),
41+
manager: manager,
4342
}
4443
}
4544

@@ -51,6 +50,9 @@ func (m *Manager) DARetrieveLoop(ctx context.Context) {
5150

5251
// run executes the main DA retrieval loop
5352
func (dr *daRetriever) run(ctx context.Context) {
53+
// attempt to process any pending events loaded from disk before starting retrieval loop.
54+
dr.processPendingEvents(ctx)
55+
5456
// blobsFoundCh is used to track when we successfully found a header so
5557
// that we can continue to try and find headers that are in the next DA height.
5658
// This enables syncing faster than the DA block time.
@@ -292,17 +294,23 @@ func (dr *daRetriever) sendHeightEventIfValid(ctx context.Context, header *types
292294
dr.processEvent(ctx, header, data, daHeight)
293295
}
294296

295-
// queuePendingEvent queues a DA event that cannot be processed immediately
297+
// queuePendingEvent queues a DA event that cannot be processed immediately.
298+
// The event is persisted via pendingEventsCache to survive restarts.
296299
func (dr *daRetriever) queuePendingEvent(header *types.SignedHeader, data *types.Data, daHeight uint64) {
297300
dr.mutex.Lock()
298301
defer dr.mutex.Unlock()
299302

303+
if dr.manager.pendingEventsCache == nil {
304+
return
305+
}
306+
300307
height := header.Height()
301-
dr.pendingEvents[height] = pendingDAEvent{
308+
event := pendingDAEvent{
302309
header: header,
303310
data: data,
304311
daHeight: daHeight,
305312
}
313+
dr.manager.pendingEventsCache.SetItem(height, &event)
306314

307315
dr.manager.logger.Debug().
308316
Uint64("height", height).
@@ -341,10 +349,12 @@ func (dr *daRetriever) processEvent(ctx context.Context, header *types.SignedHea
341349
Uint64("daHeight", daHeight).
342350
Msg("sent complete height event with header and data")
343351
default:
352+
// Channel full: keep event in pending cache for retry
353+
dr.queuePendingEvent(header, data, daHeight)
344354
dr.manager.logger.Warn().
345355
Uint64("height", header.Height()).
346356
Uint64("daHeight", daHeight).
347-
Msg("heightInCh backlog full, dropping complete event")
357+
Msg("heightInCh backlog full, re-queued event to pending cache")
348358
}
349359

350360
// Try to process any pending events that might now be ready
@@ -353,6 +363,10 @@ func (dr *daRetriever) processEvent(ctx context.Context, header *types.SignedHea
353363

354364
// processPendingEvents tries to process queued DA events that might now be ready
355365
func (dr *daRetriever) processPendingEvents(ctx context.Context) {
366+
if dr.manager.pendingEventsCache == nil {
367+
return
368+
}
369+
356370
currentHeight, err := dr.manager.GetStoreHeight(ctx)
357371
if err != nil {
358372
dr.manager.logger.Debug().Err(err).Msg("failed to get store height for pending DA events")
@@ -362,15 +376,21 @@ func (dr *daRetriever) processPendingEvents(ctx context.Context) {
362376
dr.mutex.Lock()
363377
defer dr.mutex.Unlock()
364378

365-
for height, event := range dr.pendingEvents {
379+
toDelete := make([]uint64, 0)
380+
dr.manager.pendingEventsCache.RangeByHeight(func(height uint64, event *pendingDAEvent) bool {
366381
if height <= currentHeight+1 {
367382
dr.manager.logger.Debug().
368383
Uint64("height", height).
369384
Uint64("daHeight", event.daHeight).
370385
Msg("processing previously queued DA event")
371386
go dr.processEvent(ctx, event.header, event.data, event.daHeight)
372-
delete(dr.pendingEvents, height)
387+
toDelete = append(toDelete, height)
373388
}
389+
return true
390+
})
391+
392+
for _, h := range toDelete {
393+
dr.manager.pendingEventsCache.DeleteItem(h)
374394
}
375395
}
376396

pkg/cache/cache.go

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,23 @@ func (c *Cache[T]) DeleteItem(height uint64) {
4444
c.items.Delete(height)
4545
}
4646

47+
// RangeByHeight iterates over all items keyed by uint64 height and calls fn for each.
48+
// If fn returns false, iteration stops early.
49+
// Non-uint64 keys (e.g. string hash entries) are ignored.
50+
func (c *Cache[T]) RangeByHeight(fn func(height uint64, item *T) bool) {
51+
c.items.Range(func(k, v interface{}) bool {
52+
height, ok := k.(uint64)
53+
if !ok {
54+
return true
55+
}
56+
item, ok := v.(*T)
57+
if !ok {
58+
return true
59+
}
60+
return fn(height, item)
61+
})
62+
}
63+
4764
// IsSeen returns true if the hash has been seen
4865
func (c *Cache[T]) IsSeen(hash string) bool {
4966
seen, ok := c.hashes.Load(hash)

0 commit comments

Comments
 (0)