Skip to content

Commit a1f1a90

Browse files
committed
simplficiation
1 parent 7d39e71 commit a1f1a90

File tree

1 file changed

+67
-128
lines changed

1 file changed

+67
-128
lines changed

block/internal/da/forced_inclusion_retriever.go

Lines changed: 67 additions & 128 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@ type ForcedInclusionRetriever struct {
2828
asyncFetcher AsyncBlockRetriever
2929

3030
mu sync.Mutex
31-
pendingEpochStart uint64
32-
pendingEpochEnd uint64
3331
lastProcessedEpochEnd uint64
3432
hasProcessedEpoch bool
3533
}
@@ -42,6 +40,14 @@ type ForcedInclusionEvent struct {
4240
Txs [][]byte
4341
}
4442

43+
func emptyForcedInclusionEvent(daHeight uint64) *ForcedInclusionEvent {
44+
return &ForcedInclusionEvent{
45+
StartDaHeight: daHeight,
46+
EndDaHeight: daHeight,
47+
Txs: [][]byte{},
48+
}
49+
}
50+
4551
// NewForcedInclusionRetriever creates a new forced inclusion retriever.
4652
// It internally creates and manages an AsyncBlockRetriever for background prefetching.
4753
func NewForcedInclusionRetriever(
@@ -100,7 +106,6 @@ func (r *ForcedInclusionRetriever) HandleSubscriptionResponse(resp *blobrpc.Subs
100106
// It respects epoch boundaries and only fetches at epoch end.
101107
// It tries to get blocks from the async fetcher cache first, then falls back to sync fetching.
102108
func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context, daHeight uint64) (*ForcedInclusionEvent, error) {
103-
// when daStartHeight is not set or no namespace is configured, we retrieve nothing.
104109
if !r.client.HasForcedInclusionNamespace() {
105110
return nil, ErrForceInclusionNotConfigured
106111
}
@@ -110,59 +115,28 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
110115
}
111116

112117
epochStart, epochEnd, currentEpochNumber := types.CalculateEpochBoundaries(daHeight, r.daStartHeight, r.daEpochSize)
113-
114-
// Update the async fetcher's current height so it knows what to prefetch
115118
r.asyncFetcher.UpdateCurrentHeight(daHeight)
116119

117120
r.mu.Lock()
118-
pendingStart := r.pendingEpochStart
119-
pendingEnd := r.pendingEpochEnd
120121
lastProcessed := r.lastProcessedEpochEnd
121122
hasProcessed := r.hasProcessedEpoch
122123
r.mu.Unlock()
123124

124-
if pendingEnd != 0 {
125-
if daHeight < pendingEnd {
126-
return &ForcedInclusionEvent{
127-
StartDaHeight: daHeight,
128-
EndDaHeight: daHeight,
129-
Txs: [][]byte{},
130-
}, nil
131-
}
132-
133-
event, err := r.retrieveEpoch(ctx, pendingStart, pendingEnd)
134-
if err != nil {
135-
return nil, err
125+
if hasProcessed {
126+
if r.daEpochSize == 0 {
127+
return emptyForcedInclusionEvent(daHeight), nil
136128
}
137-
138-
r.mu.Lock()
139-
r.pendingEpochStart = 0
140-
r.pendingEpochEnd = 0
141-
r.lastProcessedEpochEnd = pendingEnd
142-
r.hasProcessedEpoch = true
143-
r.mu.Unlock()
144-
145-
return event, nil
129+
epochStart = lastProcessed + 1
130+
epochEnd = epochStart + r.daEpochSize - 1
131+
currentEpochNumber = types.CalculateEpochNumber(epochEnd, r.daStartHeight, r.daEpochSize)
146132
}
147133

148-
if daHeight != epochEnd {
134+
if daHeight < epochEnd {
149135
r.logger.Debug().
150136
Uint64("da_height", daHeight).
151137
Uint64("epoch_end", epochEnd).
152138
Msg("not at epoch end - returning empty transactions")
153-
return &ForcedInclusionEvent{
154-
StartDaHeight: daHeight,
155-
EndDaHeight: daHeight,
156-
Txs: [][]byte{},
157-
}, nil
158-
}
159-
160-
if hasProcessed && epochEnd <= lastProcessed {
161-
return &ForcedInclusionEvent{
162-
StartDaHeight: daHeight,
163-
EndDaHeight: daHeight,
164-
Txs: [][]byte{},
165-
}, nil
139+
return emptyForcedInclusionEvent(daHeight), nil
166140
}
167141

168142
r.logger.Debug().
@@ -174,10 +148,6 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
174148

175149
event, err := r.retrieveEpoch(ctx, epochStart, epochEnd)
176150
if err != nil {
177-
r.mu.Lock()
178-
r.pendingEpochStart = epochStart
179-
r.pendingEpochEnd = epochEnd
180-
r.mu.Unlock()
181151
return nil, err
182152
}
183153

@@ -190,118 +160,87 @@ func (r *ForcedInclusionRetriever) RetrieveForcedIncludedTxs(ctx context.Context
190160
}
191161

192162
func (r *ForcedInclusionRetriever) retrieveEpoch(ctx context.Context, epochStart, epochEnd uint64) (*ForcedInclusionEvent, error) {
193-
event := &ForcedInclusionEvent{
194-
StartDaHeight: epochStart,
195-
EndDaHeight: epochEnd,
196-
Txs: [][]byte{},
197-
}
198-
199-
// Collect all heights in this epoch
200-
var heights []uint64
201-
for h := epochStart; h <= epochEnd; h++ {
202-
heights = append(heights, h)
203-
}
204-
205-
// Try to get blocks from cache first
206-
cachedBlocks := make(map[uint64]*BlockData)
163+
epochSize := epochEnd - epochStart + 1
164+
blocks := make(map[uint64]*BlockData, epochSize)
207165
var missingHeights []uint64
208166

209-
for _, h := range heights {
167+
// Check cache for each height in the epoch
168+
for h := epochStart; h <= epochEnd; h++ {
210169
block, err := r.asyncFetcher.GetCachedBlock(ctx, h)
211170
if err != nil {
212-
r.logger.Debug().
213-
Err(err).
214-
Uint64("height", h).
215-
Msg("error getting cached block, will fetch synchronously")
171+
r.logger.Debug().Err(err).Uint64("height", h).Msg("cache error, will fetch synchronously")
216172
missingHeights = append(missingHeights, h)
217173
continue
218174
}
219-
if block == nil { // Cache miss
175+
if block == nil {
220176
missingHeights = append(missingHeights, h)
221-
} else { // Cache hit
222-
cachedBlocks[h] = block
177+
} else {
178+
blocks[h] = block
223179
}
224180
}
225181

226-
// Fetch missing heights synchronously and store in map
227-
syncFetchedBlocks := make(map[uint64]*BlockData)
182+
// Fetch missing heights synchronously
228183
var processErrs error
184+
namespace := r.client.GetForcedInclusionNamespace()
229185
for _, h := range missingHeights {
230-
result := r.client.Retrieve(ctx, h, r.client.GetForcedInclusionNamespace())
231-
if result.Code == datypes.StatusHeightFromFuture {
232-
r.logger.Debug().
233-
Uint64("height", h).
234-
Msg("height not yet available on DA - backoff required")
186+
result := r.client.Retrieve(ctx, h, namespace)
187+
188+
switch result.Code {
189+
case datypes.StatusHeightFromFuture:
190+
r.logger.Debug().Uint64("height", h).Msg("height not yet available on DA")
235191
return nil, fmt.Errorf("%w: height %d not yet available", datypes.ErrHeightFromFuture, h)
236-
}
237192

238-
if result.Code == datypes.StatusNotFound {
193+
case datypes.StatusNotFound:
239194
r.logger.Debug().Uint64("height", h).Msg("no forced inclusion blobs at height")
240-
continue
241-
}
242-
243-
if result.Code != datypes.StatusSuccess {
244-
err := fmt.Errorf("failed to retrieve forced inclusion blobs at height %d: %s", h, result.Message)
245-
processErrs = errors.Join(processErrs, err)
246-
continue
247-
}
248-
249-
// Store the sync-fetched block data
250-
syncFetchedBlocks[h] = &BlockData{
251-
Blobs: result.Data,
252-
Timestamp: result.Timestamp,
253-
}
254-
}
255-
256-
// Process all blocks in height order
257-
for _, h := range heights {
258-
var block *BlockData
259-
var source string
260-
261-
// Check cached blocks first, then sync-fetched
262-
if cachedBlock, ok := cachedBlocks[h]; ok {
263-
block = cachedBlock
264-
source = "cache"
265-
} else if syncBlock, ok := syncFetchedBlocks[h]; ok {
266-
block = syncBlock
267-
source = "sync"
268-
}
269-
270-
if block != nil {
271-
// Add blobs from block
272-
for _, blob := range block.Blobs {
273-
if len(blob) > 0 {
274-
event.Txs = append(event.Txs, blob)
275-
}
276-
}
277195

278-
// Update timestamp if newer
279-
if block.Timestamp.After(event.Timestamp) {
280-
event.Timestamp = block.Timestamp
196+
case datypes.StatusSuccess:
197+
blocks[h] = &BlockData{
198+
Blobs: result.Data,
199+
Timestamp: result.Timestamp,
281200
}
282201

283-
r.logger.Debug().
284-
Uint64("height", h).
285-
Int("blob_count", len(block.Blobs)).
286-
Str("source", source).
287-
Msg("added blobs from block")
202+
default:
203+
processErrs = errors.Join(processErrs, fmt.Errorf("failed to retrieve at height %d: %s", h, result.Message))
288204
}
289-
290-
// Clean up maps to prevent unbounded memory growth
291-
delete(cachedBlocks, h)
292-
delete(syncFetchedBlocks, h)
293205
}
294206

295-
// any error during process, need to retry at next call
296207
if processErrs != nil {
297208
r.logger.Warn().
298209
Uint64("epoch_start", epochStart).
299210
Uint64("epoch_end", epochEnd).
300211
Err(processErrs).
301212
Msg("failed to retrieve DA epoch")
302-
303213
return nil, processErrs
304214
}
305215

216+
// Aggregate blobs in height order
217+
event := &ForcedInclusionEvent{
218+
StartDaHeight: epochStart,
219+
EndDaHeight: epochEnd,
220+
Txs: [][]byte{},
221+
}
222+
223+
for h := epochStart; h <= epochEnd; h++ {
224+
block, ok := blocks[h]
225+
if !ok {
226+
continue
227+
}
228+
229+
for _, blob := range block.Blobs {
230+
if len(blob) > 0 {
231+
event.Txs = append(event.Txs, blob)
232+
}
233+
}
234+
235+
if block.Timestamp.After(event.Timestamp) {
236+
event.Timestamp = block.Timestamp
237+
}
238+
239+
r.logger.Debug().
240+
Uint64("height", h).
241+
Int("blob_count", len(block.Blobs)).
242+
Msg("added blobs from block")
243+
}
244+
306245
return event, nil
307246
}

0 commit comments

Comments
 (0)