Skip to content

Commit c31af08

Browse files
committed
Merge branch 'main' into alex/2803_best_2worlds_fi_rebased2
* main: fix(syncer): include in-flight and pending-cache work in PendingCount (#3162) chore: re-add replaces for development (#3161)
2 parents a56fa63 + e4e35bf commit c31af08

File tree

6 files changed

+145
-58
lines changed

6 files changed

+145
-58
lines changed

CHANGELOG.md

Lines changed: 130 additions & 54 deletions
Large diffs are not rendered by default.

apps/testapp/go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ module github.com/evstack/ev-node/apps/testapp
22

33
go 1.25.6
44

5-
//replace github.com/evstack/ev-node => ../../.
5+
replace github.com/evstack/ev-node => ../../.
66

77
require (
88
github.com/evstack/ev-node v1.0.0

apps/testapp/go.sum

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,8 +367,6 @@ github.com/envoyproxy/protoc-gen-validate v0.10.0/go.mod h1:DRjgyB0I43LtJapqN6Ni
367367
github.com/envoyproxy/protoc-gen-validate v0.10.1/go.mod h1:DRjgyB0I43LtJapqN6NiRwroiAU2PaFuvk/vjgh61ss=
368368
github.com/envoyproxy/protoc-gen-validate v1.0.1/go.mod h1:0vj8bNkYbSTNS2PIyH87KZaeN4x9zpL9Qt8fQC7d+vs=
369369
github.com/envoyproxy/protoc-gen-validate v1.0.2/go.mod h1:GpiZQP3dDbg4JouG/NNS7QWXpgx6x8QiMKdmN72jogE=
370-
github.com/evstack/ev-node v1.0.0 h1:m3e51fo4Dk9Z32XRV56GJKEeAiqvjiJ9n3SRjG7C5n8=
371-
github.com/evstack/ev-node v1.0.0/go.mod h1:85H7BPvvRoA+uPfCiIcyWMBN728Wv1uLNhOsjLifJgw=
372370
github.com/evstack/ev-node/core v1.0.0 h1:s0Tx0uWHme7SJn/ZNEtee4qNM8UO6PIxXnHhPbbKTz8=
373371
github.com/evstack/ev-node/core v1.0.0/go.mod h1:n2w/LhYQTPsi48m6lMj16YiIqsaQw6gxwjyJvR+B3sY=
374372
github.com/fatih/color v1.10.0/go.mod h1:ELkj/draVOlAH/xkhN6mQ50Qd0MPOk5AAr3maGEBuJM=

block/internal/cache/generic_cache.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,11 @@ func (c *Cache[T]) getNextItem(height uint64) *T {
113113
return item
114114
}
115115

116+
// itemCount returns the number of items currently stored by height.
117+
func (c *Cache[T]) itemCount() int {
118+
return c.itemsByHeight.Len()
119+
}
120+
116121
// isSeen returns true if the hash has been seen.
117122
func (c *Cache[T]) isSeen(hash string) bool {
118123
seen, ok := c.hashes.Get(hash)

block/internal/cache/manager.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ type CacheManager interface {
5858
// Pending events syncing coordination
5959
GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent
6060
SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent)
61+
PendingEventsCount() int
6162

6263
// Store operations
6364
SaveToStore() error
@@ -321,6 +322,10 @@ func (m *implementation) SetPendingEvent(height uint64, event *common.DAHeightEv
321322
m.pendingEventsCache.setItem(height, event)
322323
}
323324

325+
func (m *implementation) PendingEventsCount() int {
326+
return m.pendingEventsCache.itemCount()
327+
}
328+
324329
// GetNextPendingEvent efficiently retrieves and removes the event at the specified height.
325330
// Returns nil if no event exists at that height.
326331
func (m *implementation) GetNextPendingEvent(height uint64) *common.DAHeightEvent {

block/internal/syncing/syncer.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@ type Syncer struct {
7272
// Channels for coordination
7373
heightInCh chan common.DAHeightEvent
7474
errorCh chan<- error // Channel to report critical execution client failures
75+
inFlight atomic.Int64
7576

7677
// Handlers
7778
daRetriever DARetriever
@@ -380,7 +381,9 @@ func (s *Syncer) processLoop(ctx context.Context) {
380381
return
381382
case heightEvent, ok := <-s.heightInCh:
382383
if ok {
384+
s.inFlight.Add(1)
383385
s.processHeightEvent(ctx, &heightEvent)
386+
s.inFlight.Add(-1)
384387
}
385388
}
386389
}
@@ -404,7 +407,7 @@ func (s *Syncer) HasReachedDAHead() bool {
404407

405408
// PendingCount returns the number of unprocessed height events in the pipeline.
406409
func (s *Syncer) PendingCount() int {
407-
return len(s.heightInCh)
410+
return len(s.heightInCh) + int(s.inFlight.Load()) + s.cache.PendingEventsCount()
408411
}
409412

410413
func (s *Syncer) pendingWorkerLoop(ctx context.Context) {

0 commit comments

Comments
 (0)