Skip to content

Commit 7f789ee

Browse files
committed
chore: use interfaces
1 parent d08b3bf commit 7f789ee

File tree

9 files changed

+57
-89
lines changed

9 files changed

+57
-89
lines changed

block/internal/da/async_epoch_fetcher.go

Lines changed: 20 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,16 @@ import (
1616
"github.com/evstack/ev-node/types"
1717
)
1818

19-
// AsyncEpochFetcher handles background prefetching of DA epoch data
19+
// AsyncEpochFetcher provides background prefetching of DA epoch data
20+
type AsyncEpochFetcher interface {
21+
Start()
22+
Stop()
23+
GetCachedEpoch(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error)
24+
}
25+
26+
// asyncEpochFetcher handles background prefetching of DA epoch data
2027
// to speed up processing at epoch boundaries.
21-
type AsyncEpochFetcher struct {
28+
type asyncEpochFetcher struct {
2229
client Client
2330
logger zerolog.Logger
2431
daEpochSize uint64
@@ -35,7 +42,6 @@ type AsyncEpochFetcher struct {
3542

3643
// Current DA height tracking
3744
currentDAHeight uint64
38-
heightMu sync.RWMutex
3945

4046
// Prefetch window - how many epochs ahead to prefetch
4147
prefetchWindow uint64
@@ -51,7 +57,7 @@ func NewAsyncEpochFetcher(
5157
daStartHeight, daEpochSize uint64,
5258
prefetchWindow uint64,
5359
pollInterval time.Duration,
54-
) *AsyncEpochFetcher {
60+
) AsyncEpochFetcher {
5561
if prefetchWindow == 0 {
5662
prefetchWindow = 1 // Default: prefetch next epoch
5763
}
@@ -61,7 +67,7 @@ func NewAsyncEpochFetcher(
6167

6268
ctx, cancel := context.WithCancel(context.Background())
6369

64-
return &AsyncEpochFetcher{
70+
return &asyncEpochFetcher{
6571
client: client,
6672
logger: logger.With().Str("component", "async_epoch_fetcher").Logger(),
6773
daStartHeight: daStartHeight,
@@ -76,7 +82,7 @@ func NewAsyncEpochFetcher(
7682
}
7783

7884
// Start begins the background prefetching process.
79-
func (f *AsyncEpochFetcher) Start() {
85+
func (f *asyncEpochFetcher) Start() {
8086
f.wg.Add(1)
8187
go f.backgroundFetchLoop()
8288
f.logger.Info().
@@ -88,35 +94,16 @@ func (f *AsyncEpochFetcher) Start() {
8894
}
8995

9096
// Stop gracefully stops the background prefetching process.
91-
func (f *AsyncEpochFetcher) Stop() {
97+
func (f *asyncEpochFetcher) Stop() {
9298
f.logger.Info().Msg("stopping async epoch fetcher")
9399
f.cancel()
94100
f.wg.Wait()
95101
f.logger.Info().Msg("async epoch fetcher stopped")
96102
}
97103

98-
// SetDAHeight updates the current DA height being processed.
99-
// This is called by sequencers to inform the fetcher of progress.
100-
func (f *AsyncEpochFetcher) SetDAHeight(height uint64) {
101-
f.heightMu.Lock()
102-
defer f.heightMu.Unlock()
103-
104-
if height > f.currentDAHeight {
105-
f.currentDAHeight = height
106-
f.logger.Debug().Uint64("da_height", height).Msg("updated current DA height")
107-
}
108-
}
109-
110-
// GetDAHeight returns the current DA height.
111-
func (f *AsyncEpochFetcher) GetDAHeight() uint64 {
112-
f.heightMu.RLock()
113-
defer f.heightMu.RUnlock()
114-
return f.currentDAHeight
115-
}
116-
117104
// GetCachedEpoch retrieves a cached epoch from memory.
118105
// Returns nil if the epoch is not cached.
119-
func (f *AsyncEpochFetcher) GetCachedEpoch(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
106+
func (f *asyncEpochFetcher) GetCachedEpoch(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
120107
if !f.client.HasForcedInclusionNamespace() {
121108
return nil, ErrForceInclusionNotConfigured
122109
}
@@ -167,7 +154,7 @@ func (f *AsyncEpochFetcher) GetCachedEpoch(ctx context.Context, daHeight uint64)
167154
}
168155

169156
// backgroundFetchLoop runs in the background and prefetches epochs ahead of time.
170-
func (f *AsyncEpochFetcher) backgroundFetchLoop() {
157+
func (f *asyncEpochFetcher) backgroundFetchLoop() {
171158
defer f.wg.Done()
172159

173160
ticker := time.NewTicker(f.pollInterval)
@@ -184,15 +171,13 @@ func (f *AsyncEpochFetcher) backgroundFetchLoop() {
184171
}
185172

186173
// prefetchEpochs prefetches epochs within the prefetch window.
187-
func (f *AsyncEpochFetcher) prefetchEpochs() {
174+
func (f *asyncEpochFetcher) prefetchEpochs() {
188175
if !f.client.HasForcedInclusionNamespace() {
189176
return
190177
}
191178

192-
currentHeight := f.GetDAHeight()
193-
194179
// Calculate the current epoch and epochs to prefetch
195-
_, currentEpochEnd, _ := types.CalculateEpochBoundaries(currentHeight, f.daStartHeight, f.daEpochSize)
180+
_, currentEpochEnd, _ := types.CalculateEpochBoundaries(f.currentDAHeight, f.daStartHeight, f.daEpochSize)
196181

197182
// Prefetch upcoming epochs
198183
for i := uint64(0); i < f.prefetchWindow; i++ {
@@ -218,7 +203,7 @@ func (f *AsyncEpochFetcher) prefetchEpochs() {
218203
}
219204

220205
// fetchAndCacheEpoch fetches an epoch and stores it in the cache.
221-
func (f *AsyncEpochFetcher) fetchAndCacheEpoch(epochEnd uint64) {
206+
func (f *asyncEpochFetcher) fetchAndCacheEpoch(epochEnd uint64) {
222207
epochStart := epochEnd - (f.daEpochSize - 1)
223208
if epochStart < f.daStartHeight {
224209
epochStart = f.daStartHeight
@@ -313,7 +298,7 @@ func (f *AsyncEpochFetcher) fetchAndCacheEpoch(epochEnd uint64) {
313298
}
314299

315300
// processForcedInclusionBlobs processes blobs from a single DA height for forced inclusion.
316-
func (f *AsyncEpochFetcher) processForcedInclusionBlobs(
301+
func (f *asyncEpochFetcher) processForcedInclusionBlobs(
317302
event *ForcedInclusionEvent,
318303
result datypes.ResultRetrieve,
319304
height uint64,
@@ -347,7 +332,7 @@ func (f *AsyncEpochFetcher) processForcedInclusionBlobs(
347332
}
348333

349334
// cleanupOldEpochs removes epochs older than the current epoch from cache.
350-
func (f *AsyncEpochFetcher) cleanupOldEpochs(currentEpochEnd uint64) {
335+
func (f *asyncEpochFetcher) cleanupOldEpochs(currentEpochEnd uint64) {
351336
// Remove epochs older than current - 1
352337
// Keep current and previous in case of reorgs or restarts
353338
cleanupThreshold := currentEpochEnd - f.daEpochSize

block/internal/da/async_epoch_fetcher_test.go

Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,6 @@ import (
1515
mocks "github.com/evstack/ev-node/test/mocks"
1616
)
1717

18-
func TestAsyncEpochFetcher_Creation(t *testing.T) {
19-
client := &mocks.MockClient{}
20-
logger := zerolog.Nop()
21-
22-
fetcher := NewAsyncEpochFetcher(client, logger, 100, 10, 2, 100*time.Millisecond)
23-
assert.NotNil(t, fetcher)
24-
assert.Equal(t, uint64(100), fetcher.daStartHeight)
25-
assert.Equal(t, uint64(10), fetcher.daEpochSize)
26-
assert.Equal(t, uint64(2), fetcher.prefetchWindow)
27-
assert.Equal(t, 100*time.Millisecond, fetcher.pollInterval)
28-
}
29-
30-
func TestAsyncEpochFetcher_SetAndGetDAHeight(t *testing.T) {
31-
client := &mocks.MockClient{}
32-
logger := zerolog.Nop()
33-
34-
fetcher := NewAsyncEpochFetcher(client, logger, 100, 10, 1, time.Second)
35-
36-
assert.Equal(t, uint64(100), fetcher.GetDAHeight())
37-
38-
fetcher.SetDAHeight(150)
39-
assert.Equal(t, uint64(150), fetcher.GetDAHeight())
40-
41-
// Should not decrease
42-
fetcher.SetDAHeight(120)
43-
assert.Equal(t, uint64(150), fetcher.GetDAHeight())
44-
}
45-
4618
func TestAsyncEpochFetcher_GetCachedEpoch_NotAtEpochEnd(t *testing.T) {
4719
client := &mocks.MockClient{}
4820
fiNs := datypes.NamespaceFromString("test-fi-ns").Bytes()
@@ -122,16 +94,27 @@ func TestAsyncEpochFetcher_FetchAndCache(t *testing.T) {
12294
}
12395

12496
logger := zerolog.Nop()
125-
fetcher := NewAsyncEpochFetcher(client, logger, 100, 10, 1, time.Second)
126-
127-
// Manually trigger fetch
128-
fetcher.fetchAndCacheEpoch(109)
97+
// Use a short poll interval for faster test execution
98+
fetcher := NewAsyncEpochFetcher(client, logger, 100, 10, 1, 50*time.Millisecond)
99+
fetcher.Start()
100+
defer fetcher.Stop()
129101

130-
// Now try to get from cache
102+
// Wait for the background fetch to complete by polling the cache
131103
ctx := context.Background()
132-
event, err := fetcher.GetCachedEpoch(ctx, 109)
133-
require.NoError(t, err)
134-
assert.NotNil(t, event)
104+
var event *ForcedInclusionEvent
105+
var err error
106+
107+
// Poll for up to 2 seconds for the epoch to be cached
108+
for i := 0; i < 40; i++ {
109+
event, err = fetcher.GetCachedEpoch(ctx, 109)
110+
require.NoError(t, err)
111+
if event != nil {
112+
break
113+
}
114+
time.Sleep(50 * time.Millisecond)
115+
}
116+
117+
require.NotNil(t, event, "epoch should be cached after background fetch")
135118
assert.Equal(t, 3, len(event.Txs))
136119
assert.Equal(t, testBlobs[0], event.Txs[0])
137120
assert.Equal(t, uint64(100), event.StartDaHeight)
@@ -176,7 +159,6 @@ func TestAsyncEpochFetcher_BackgroundPrefetch(t *testing.T) {
176159
logger := zerolog.Nop()
177160
fetcher := NewAsyncEpochFetcher(client, logger, 100, 10, 1, 50*time.Millisecond)
178161

179-
fetcher.SetDAHeight(100)
180162
fetcher.Start()
181163
defer fetcher.Stop()
182164

@@ -250,9 +232,8 @@ func TestAsyncEpochFetcher_HeightFromFuture(t *testing.T) {
250232

251233
logger := zerolog.Nop()
252234
fetcher := NewAsyncEpochFetcher(client, logger, 100, 10, 1, time.Second)
253-
254-
// This should not panic and should handle gracefully
255-
fetcher.fetchAndCacheEpoch(109)
235+
fetcher.Start()
236+
defer fetcher.Stop()
256237

257238
// Cache should be empty
258239
ctx := context.Background()

block/internal/da/forced_inclusion_retriever.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ type ForcedInclusionRetriever struct {
2121
logger zerolog.Logger
2222
daEpochSize uint64
2323
daStartHeight uint64
24-
asyncFetcher *AsyncEpochFetcher // Required for async prefetching
24+
asyncFetcher AsyncEpochFetcher // Required for async prefetching
2525
}
2626

2727
// ForcedInclusionEvent contains forced inclusion transactions retrieved from DA.
@@ -33,12 +33,11 @@ type ForcedInclusionEvent struct {
3333
}
3434

3535
// NewForcedInclusionRetriever creates a new forced inclusion retriever.
36-
// The asyncFetcher parameter is required for background prefetching of DA epoch data.
3736
func NewForcedInclusionRetriever(
3837
client Client,
3938
logger zerolog.Logger,
4039
daStartHeight, daEpochSize uint64,
41-
asyncFetcher *AsyncEpochFetcher,
40+
asyncFetcher AsyncEpochFetcher,
4241
) *ForcedInclusionRetriever {
4342
return &ForcedInclusionRetriever{
4443
client: client,
@@ -51,7 +50,6 @@ func NewForcedInclusionRetriever(
5150

5251
// RetrieveForcedIncludedTxs retrieves forced inclusion transactions at the given DA height.
5352
// It respects epoch boundaries and only fetches at epoch start.
54-
// If an async fetcher is configured, it will try to use cached data first for better performance.
5553
func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
5654
// when daStartHeight is not set or no namespace is configured, we retrieve nothing.
5755
if !r.client.HasForcedInclusionNamespace() {

block/internal/da/forced_inclusion_retriever_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,8 @@ import (
1414
"github.com/evstack/ev-node/test/mocks"
1515
)
1616

17-
// createTestAsyncFetcher creates a minimal async fetcher for tests (without starting it)
18-
func createTestAsyncFetcher(client Client, gen genesis.Genesis) *AsyncEpochFetcher {
17+
// createTestAsyncFetcher creates a minimal async fetcher for tests
18+
func createTestAsyncFetcher(client Client, gen genesis.Genesis) AsyncEpochFetcher {
1919
return NewAsyncEpochFetcher(
2020
client,
2121
zerolog.Nop(),

block/internal/syncing/syncer.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -194,8 +194,8 @@ func (s *Syncer) Start(ctx context.Context) error {
194194
s.logger,
195195
s.genesis.DAStartHeight,
196196
s.genesis.DAEpochForcedInclusion,
197-
1, // prefetch 1 epoch ahead
198-
2*time.Second, // check every 2 seconds
197+
1, // prefetch 1 epoch ahead
198+
s.config.DA.BlockTime.Duration,
199199
)
200200
asyncFetcher.Start()
201201

block/internal/syncing/syncer_forced_inclusion_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import (
2424
)
2525

2626
// createTestAsyncFetcher creates a minimal async fetcher for tests (without starting it)
27-
func createTestAsyncFetcherForSyncer(client da.Client, gen genesis.Genesis) *da.AsyncEpochFetcher {
27+
func createTestAsyncFetcherForSyncer(client da.Client, gen genesis.Genesis) da.AsyncEpochFetcher {
2828
return da.NewAsyncEpochFetcher(
2929
client,
3030
zerolog.Nop(),

block/public.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,11 @@ var ErrForceInclusionNotConfigured = da.ErrForceInclusionNotConfigured
6767
type ForcedInclusionEvent = da.ForcedInclusionEvent
6868

6969
// AsyncEpochFetcher provides background prefetching of DA epoch data
70-
type AsyncEpochFetcher = da.AsyncEpochFetcher
70+
type AsyncEpochFetcher interface {
71+
Start()
72+
Stop()
73+
GetCachedEpoch(ctx context.Context, daHeight uint64) (*da.ForcedInclusionEvent, error)
74+
}
7175

7276
// ForcedInclusionRetriever defines the interface for retrieving forced inclusion transactions from DA
7377
type ForcedInclusionRetriever interface {
@@ -88,7 +92,7 @@ func NewAsyncEpochFetcher(
8892
daStartHeight, daEpochSize uint64,
8993
prefetchWindow uint64,
9094
pollInterval time.Duration,
91-
) *AsyncEpochFetcher {
95+
) AsyncEpochFetcher {
9296
return da.NewAsyncEpochFetcher(client, logger, daStartHeight, daEpochSize, prefetchWindow, pollInterval)
9397
}
9498

@@ -98,7 +102,7 @@ func NewForcedInclusionRetriever(
98102
client DAClient,
99103
logger zerolog.Logger,
100104
daStartHeight, daEpochSize uint64,
101-
asyncFetcher *AsyncEpochFetcher,
105+
asyncFetcher AsyncEpochFetcher,
102106
) ForcedInclusionRetriever {
103107
return da.NewForcedInclusionRetriever(client, logger, daStartHeight, daEpochSize, asyncFetcher)
104108
}

pkg/sequencers/based/sequencer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,7 @@ var _ coresequencer.Sequencer = (*BasedSequencer)(nil)
2525
type BasedSequencer struct {
2626
logger zerolog.Logger
2727

28-
asyncFetcher *block.AsyncEpochFetcher
28+
asyncFetcher block.AsyncEpochFetcher
2929
fiRetriever block.ForcedInclusionRetriever
3030
daHeight atomic.Uint64
3131
checkpointStore *seqcommon.CheckpointStore

pkg/sequencers/single/sequencer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ type Sequencer struct {
4040
queue *BatchQueue // single queue for immediate availability
4141

4242
// Forced inclusion support
43-
asyncFetcher *block.AsyncEpochFetcher
43+
asyncFetcher block.AsyncEpochFetcher
4444
fiRetriever block.ForcedInclusionRetriever
4545
daHeight atomic.Uint64
4646
daStartHeight atomic.Uint64

0 commit comments

Comments
 (0)