Skip to content

Commit 096d70e

Browse files
committed
rebase
1 parent 36f3d0e commit 096d70e

File tree

10 files changed

+336
-1184
lines changed

10 files changed

+336
-1184
lines changed

block/internal/da/forced_inclusion_retriever.go

Lines changed: 288 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,9 @@ import (
44
"context"
55
"errors"
66
"fmt"
7+
"sync"
8+
"sync/atomic"
9+
"time"
710

811
"github.com/rs/zerolog"
912

@@ -12,15 +15,120 @@ import (
1215
"github.com/evstack/ev-node/types"
1316
)
1417

18+
const (
19+
// defaultEpochLag is the default number of blocks to lag behind DA height when fetching forced inclusion txs
20+
defaultEpochLag = 10
21+
22+
// defaultMinEpochWindow is the minimum window size for epoch lag calculation
23+
defaultMinEpochWindow = 5
24+
25+
// defaultMaxEpochWindow is the maximum window size for epoch lag calculation
26+
defaultMaxEpochWindow = 100
27+
28+
// defaultFetchInterval is the interval between async fetch attempts
29+
defaultFetchInterval = 2 * time.Second
30+
)
31+
1532
// ErrForceInclusionNotConfigured is returned when the forced inclusion namespace is not configured.
1633
var ErrForceInclusionNotConfigured = errors.New("forced inclusion namespace not configured")
1734

35+
// epochCache stores fetched forced inclusion events by epoch start height
36+
type epochCache struct {
37+
events atomic.Pointer[map[uint64]*ForcedInclusionEvent]
38+
fetchTimes atomic.Pointer[[]time.Duration]
39+
maxSamples int
40+
}
41+
42+
func newEpochCache(maxSamples int) *epochCache {
43+
c := &epochCache{
44+
maxSamples: maxSamples,
45+
}
46+
initialEvents := make(map[uint64]*ForcedInclusionEvent)
47+
c.events.Store(&initialEvents)
48+
initialTimes := make([]time.Duration, 0, maxSamples)
49+
c.fetchTimes.Store(&initialTimes)
50+
return c
51+
}
52+
53+
func (c *epochCache) get(epochStart uint64) (*ForcedInclusionEvent, bool) {
54+
events := c.events.Load()
55+
event, ok := (*events)[epochStart]
56+
return event, ok
57+
}
58+
59+
func (c *epochCache) set(epochStart uint64, event *ForcedInclusionEvent) {
60+
for {
61+
oldEventsPtr := c.events.Load()
62+
oldEvents := *oldEventsPtr
63+
newEvents := make(map[uint64]*ForcedInclusionEvent, len(oldEvents)+1)
64+
for k, v := range oldEvents {
65+
newEvents[k] = v
66+
}
67+
newEvents[epochStart] = event
68+
if c.events.CompareAndSwap(oldEventsPtr, &newEvents) {
69+
return
70+
}
71+
}
72+
}
73+
74+
func (c *epochCache) recordFetchTime(duration time.Duration) {
75+
for {
76+
oldTimesPtr := c.fetchTimes.Load()
77+
oldTimes := *oldTimesPtr
78+
newTimes := make([]time.Duration, 0, c.maxSamples)
79+
newTimes = append(newTimes, oldTimes...)
80+
newTimes = append(newTimes, duration)
81+
if len(newTimes) > c.maxSamples {
82+
newTimes = newTimes[1:]
83+
}
84+
if c.fetchTimes.CompareAndSwap(oldTimesPtr, &newTimes) {
85+
return
86+
}
87+
}
88+
}
89+
90+
func (c *epochCache) averageFetchTime() time.Duration {
91+
timesPtr := c.fetchTimes.Load()
92+
times := *timesPtr
93+
if len(times) == 0 {
94+
return 0
95+
}
96+
var sum time.Duration
97+
for _, d := range times {
98+
sum += d
99+
}
100+
return sum / time.Duration(len(times))
101+
}
102+
103+
func (c *epochCache) cleanup(beforeEpoch uint64) {
104+
for {
105+
oldEventsPtr := c.events.Load()
106+
oldEvents := *oldEventsPtr
107+
newEvents := make(map[uint64]*ForcedInclusionEvent)
108+
for epoch, event := range oldEvents {
109+
if epoch >= beforeEpoch {
110+
newEvents[epoch] = event
111+
}
112+
}
113+
if c.events.CompareAndSwap(oldEventsPtr, &newEvents) {
114+
return
115+
}
116+
}
117+
}
118+
18119
// ForcedInclusionRetriever handles retrieval of forced inclusion transactions from DA.
19120
type ForcedInclusionRetriever struct {
20121
client Client
21122
genesis genesis.Genesis
22123
logger zerolog.Logger
23124
daEpochSize uint64
125+
126+
// Async forced inclusion fetching
127+
epochCache *epochCache
128+
fetcherCtx context.Context
129+
fetcherCancel context.CancelFunc
130+
fetcherWg sync.WaitGroup
131+
currentDAHeight atomic.Uint64
24132
}
25133

26134
// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
@@ -36,22 +144,173 @@ func NewForcedInclusionRetriever(
36144
genesis genesis.Genesis,
37145
logger zerolog.Logger,
38146
) *ForcedInclusionRetriever {
39-
return &ForcedInclusionRetriever{
40-
client: client,
41-
genesis: genesis,
42-
logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(),
43-
daEpochSize: genesis.DAEpochForcedInclusion,
147+
ctx, cancel := context.WithCancel(context.Background())
148+
149+
r := &ForcedInclusionRetriever{
150+
client: client,
151+
genesis: genesis,
152+
logger: logger.With().Str("component", "forced_inclusion_retriever").Logger(),
153+
daEpochSize: genesis.DAEpochForcedInclusion,
154+
epochCache: newEpochCache(10), // Keep last 10 fetch times for averaging
155+
fetcherCtx: ctx,
156+
fetcherCancel: cancel,
157+
}
158+
r.currentDAHeight.Store(genesis.DAStartHeight)
159+
160+
// Start background fetcher if forced inclusion is configured
161+
if client.HasForcedInclusionNamespace() {
162+
r.fetcherWg.Add(1)
163+
go r.backgroundFetcher()
164+
}
165+
166+
return r
167+
}
168+
169+
// StopBackgroundFetcher stops the background fetcher goroutine
170+
func (r *ForcedInclusionRetriever) StopBackgroundFetcher() {
171+
if r.fetcherCancel != nil {
172+
r.fetcherCancel()
173+
}
174+
r.fetcherWg.Wait()
175+
}
176+
177+
// SetDAHeight updates the current DA height for async fetching
178+
func (r *ForcedInclusionRetriever) SetDAHeight(height uint64) {
179+
for {
180+
current := r.currentDAHeight.Load()
181+
if height <= current {
182+
return
183+
}
184+
if r.currentDAHeight.CompareAndSwap(current, height) {
185+
return
186+
}
187+
}
188+
}
189+
190+
// GetDAHeight returns the current DA height
191+
func (r *ForcedInclusionRetriever) GetDAHeight() uint64 {
192+
return r.currentDAHeight.Load()
193+
}
194+
195+
// calculateAdaptiveEpochWindow calculates the epoch lag window based on average fetch time
196+
func (r *ForcedInclusionRetriever) calculateAdaptiveEpochWindow() uint64 {
197+
avgFetchTime := r.epochCache.averageFetchTime()
198+
if avgFetchTime == 0 {
199+
return defaultEpochLag
200+
}
201+
202+
// Scale window based on fetch time: faster fetches = smaller window
203+
// If fetch takes 1 second, window = 5
204+
// If fetch takes 5 seconds, window = 25
205+
// If fetch takes 10 seconds, window = 50
206+
window := uint64(avgFetchTime.Seconds() * 5)
207+
208+
if window < defaultMinEpochWindow {
209+
window = defaultMinEpochWindow
210+
}
211+
if window > defaultMaxEpochWindow {
212+
window = defaultMaxEpochWindow
213+
}
214+
215+
return window
216+
}
217+
218+
// backgroundFetcher continuously fetches forced inclusion transactions ahead of time
219+
func (r *ForcedInclusionRetriever) backgroundFetcher() {
220+
defer r.fetcherWg.Done()
221+
222+
ticker := time.NewTicker(defaultFetchInterval)
223+
defer ticker.Stop()
224+
225+
r.logger.Info().Msg("started background forced inclusion fetcher")
226+
227+
for {
228+
select {
229+
case <-r.fetcherCtx.Done():
230+
r.logger.Info().Msg("stopped background forced inclusion fetcher")
231+
return
232+
case <-ticker.C:
233+
r.fetchNextEpoch()
234+
}
235+
}
236+
}
237+
238+
// fetchNextEpoch fetches the next epoch that should be available based on current DA height and lag
239+
func (r *ForcedInclusionRetriever) fetchNextEpoch() {
240+
currentHeight := r.GetDAHeight()
241+
if currentHeight == 0 {
242+
return
243+
}
244+
245+
window := r.calculateAdaptiveEpochWindow()
246+
247+
// Calculate which epoch the sequencer will need soon (lagging behind current height)
248+
// We want to prefetch this epoch before it's actually requested
249+
laggedHeight := currentHeight
250+
if currentHeight > window {
251+
laggedHeight = currentHeight - window
252+
}
253+
254+
epochStart, _ := types.CalculateEpochBoundaries(laggedHeight, r.genesis.DAStartHeight, r.daEpochSize)
255+
256+
// Check if we already have this epoch cached
257+
if _, ok := r.epochCache.get(epochStart); ok {
258+
return
259+
}
260+
261+
// Fetch this epoch in the background
262+
r.logger.Debug().
263+
Uint64("current_height", currentHeight).
264+
Uint64("lagged_height", laggedHeight).
265+
Uint64("epoch_start", epochStart).
266+
Uint64("window", window).
267+
Msg("fetching epoch in background")
268+
269+
startTime := time.Now()
270+
ctx, cancel := context.WithTimeout(r.fetcherCtx, 30*time.Second)
271+
defer cancel()
272+
273+
event, err := r.fetchEpochSync(ctx, epochStart)
274+
if err != nil {
275+
r.logger.Debug().Err(err).Uint64("epoch_start", epochStart).Msg("failed to fetch epoch in background")
276+
return
277+
}
278+
279+
// Record fetch time for adaptive window
280+
fetchDuration := time.Since(startTime)
281+
r.epochCache.recordFetchTime(fetchDuration)
282+
283+
// Cache the event
284+
r.epochCache.set(epochStart, event)
285+
286+
r.logger.Debug().
287+
Uint64("epoch_start", epochStart).
288+
Int("tx_count", len(event.Txs)).
289+
Dur("fetch_duration", fetchDuration).
290+
Msg("cached epoch in background")
291+
292+
// Cleanup old epochs (keep last 5 epochs)
293+
if epochStart >= r.genesis.DAStartHeight+r.daEpochSize*5 {
294+
cleanupBefore := epochStart - r.daEpochSize*5
295+
if cleanupBefore < r.genesis.DAStartHeight {
296+
cleanupBefore = r.genesis.DAStartHeight
297+
}
298+
r.epochCache.cleanup(cleanupBefore)
44299
}
45300
}
46301

47302
// RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height.
48303
// It respects epoch boundaries and only fetches at epoch start.
304+
// Uses cached results from background fetcher when available.
49305
func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
50306
if !r.client.HasForcedInclusionNamespace() {
51307
return nil, ErrForceInclusionNotConfigured
52308
}
53309

54-
epochStart, epochEnd := types.CalculateEpochBoundaries(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
310+
// Update our tracking of DA height
311+
r.SetDAHeight(daHeight)
312+
313+
epochStart, _ := types.CalculateEpochBoundaries(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
55314

56315
if daHeight != epochStart {
57316
r.logger.Debug().
@@ -66,20 +325,39 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
66325
}, nil
67326
}
68327

69-
// We're at epoch start - fetch transactions from DA
70-
currentEpochNumber := types.CalculateEpochNumber(daHeight, r.genesis.DAStartHeight, r.daEpochSize)
328+
// Check if we have this epoch cached from background fetcher
329+
if cachedEvent, ok := r.epochCache.get(epochStart); ok {
330+
r.logger.Debug().
331+
Uint64("epoch_start", epochStart).
332+
Int("tx_count", len(cachedEvent.Txs)).
333+
Msg("using cached forced inclusion transactions")
334+
return cachedEvent, nil
335+
}
336+
337+
// Not cached, fetch synchronously
338+
r.logger.Debug().
339+
Uint64("da_height", daHeight).
340+
Uint64("epoch_start", epochStart).
341+
Msg("cache miss, fetching forced inclusion transactions synchronously")
342+
343+
return r.fetchEpochSync(ctx, epochStart)
344+
}
345+
346+
// fetchEpochSync synchronously fetches an entire epoch's forced inclusion transactions
347+
func (r *ForcedInclusionRetriever) fetchEpochSync(ctx context.Context, epochStart uint64) (*ForcedInclusionEvent, error) {
348+
epochEnd := epochStart + r.daEpochSize - 1
349+
currentEpochNumber := types.CalculateEpochNumber(epochStart, r.genesis.DAStartHeight, r.daEpochSize)
71350

72351
event := &ForcedInclusionEvent{
73352
StartDaHeight: epochStart,
74353
Txs: [][]byte{},
75354
}
76355

77356
r.logger.Debug().
78-
Uint64("da_height", daHeight).
79357
Uint64("epoch_start", epochStart).
80358
Uint64("epoch_end", epochEnd).
81359
Uint64("epoch_num", currentEpochNumber).
82-
Msg("retrieving forced included transactions from DA")
360+
Msg("fetching forced included transactions from DA")
83361

84362
epochStartResult := r.client.RetrieveForcedInclusion(ctx, epochStart)
85363
if epochStartResult.Code == coreda.StatusHeightFromFuture {

0 commit comments

Comments
 (0)