-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathcommitment_tracker.go
More file actions
276 lines (229 loc) · 7.78 KB
/
commitment_tracker.go
File metadata and controls
276 lines (229 loc) · 7.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
package dasmon
import (
"context"
"errors"
"fmt"
"sync"
"sync/atomic"
)
const (
// MaxCustodyEpochs is the number of epochs to track (4096)
MaxCustodyEpochs = 4096
// SlotsPerEpoch is the number of slots per epoch (32)
SlotsPerEpoch = 32
// MaxCustodySlots is the total number of slots to track
MaxCustodySlots = MaxCustodyEpochs * SlotsPerEpoch // 128Ki slots
)
var (
ErrSlotUnavailable = errors.New("slot unavailable")
ErrBufferEmpty = errors.New("commitment buffer is empty")
)
// BlockCommitmentInfo stores KZG commitments for a single block
type BlockCommitmentInfo struct {
Slot uint64
BlockRoot [32]byte
BlobCommitments [][]byte // KZG commitments for blobs
}
// CommitmentTracker stores a fixed-size ring buffer of block commitments.
// It is sized to the maximum number of slots possible according to the block filter.
//
// Indices: 0 1 2 3 4 5 6 7
// Slots: [ ] [ ] [O] [O] [O] [O] [O] [ ]
// Epoch: 32 33 34 35 36 37 38 (39)
//
// ^ ^ → (about be appended)
// | |
// tail_pos (eldest) head_pos (newest)
//
// Legend:
// [O] = filled slot
// [ ] = empty slot
//
// Invariants:
// - Contiguous fill: for any i in [tail_pos, head_pos] (mod N), slot i is filled.
// - Append increments head_pos (wrapping mod N) and fills that slot.
type CommitmentTracker struct {
sync.RWMutex
entries []BlockCommitmentInfo
// Track the slot number of the newest (head) and oldest (tail) entries
// tail = min slot (oldest/earliest slot in buffer)
// head = max slot (newest/latest slot in buffer)
// filled count = head - tail + 1 (when initialized)
head uint64
tail uint64
length int
filter *BlockFilter
backfilling atomic.Bool
}
// NewCommitmentTracker creates a new CommitmentTracker with the specified BlockFilter.
// The buffer size is determined by the filter's RequiredBufferSize().
// The buffer starts uninitialized with head=0, tail=0.
// Backfill() can be called to populate historical data.
func NewCommitmentTracker(filter *BlockFilter, head BlockCommitmentInfo) *CommitmentTracker {
var (
length = filter.RequiredBufferSize()
entries = make([]BlockCommitmentInfo, length)
idx = int(head.Slot) % length
)
entries[idx] = head
ct := &CommitmentTracker{
entries: entries,
head: head.Slot,
tail: head.Slot,
length: length,
filter: filter,
}
return ct
}
// Backfill populates the data structure with existing commitments obtained from the fetcher.
// It fills backwards from the current head (or end of range) to the start of the effective range.
// If a slot is unavailable, backfill stops immediately to maintain the no-gaps invariant.
// Backfill checks the filter range on each iteration since concurrent Append calls can move the head forward,
// potentially invalidating older slots. This can happen on background backfill.
// Fetches are batched according to the fetcher's MaxSlotsPerFetch() limit.
func (rb *CommitmentTracker) Backfill(ctx context.Context, fetcher Fetcher) error {
// Try to set backfilling flag atomically
if !rb.backfilling.CompareAndSwap(false, true) {
return fmt.Errorf("backfill already running")
}
// Ensure backfilling flag is cleared on exit
defer rb.backfilling.Store(false)
batchSize := fetcher.MaxSlotsPerFetch()
rb.Lock()
next, head := rb.tail, rb.head
rb.Unlock()
// head == 0, we're starting at genesis.
// tail == 0, we have already backfilled up until genesis.
if head == 0 || next == 0 {
return nil
}
next--
for {
rb.Lock()
// We cannot use rb.filter.EffectiveRange(next) because the filter
// could be in trailing mode (negative start). Therefore, our reference
// point must always be head.
range_ := rb.filter.EffectiveRange(rb.head)
rb.Unlock()
if !range_.Contains(next) {
return nil
}
// Clamp to [range.Start, next] - we only want to fetch up to next (not beyond)
range_ = RangeClosed(range_.Start, next)
// Now intersect with the batch size range anchored at next
// This creates [max(range.Start, next-batchSize), next]
var ok bool
range_, ok = range_.Intersect(range_.FromEnd(int(-batchSize)))
if !ok {
// This should not happen, but if it does, we stop.
return nil
}
// Fetch commitment data for the batch (without holding any locks)
infos, err := fetcher.FetchCommitments(ctx, range_)
if err != nil {
return fmt.Errorf("backfill errored fetching range [%d, %d]: %w", range_.Start, range_.End, err)
}
// Store the fetched commitments in reverse order (newest to oldest)
// This maintains the invariant that we fill backwards from tail
for i := len(infos) - 1; i >= 0; i-- {
info := infos[i]
slot := info.Slot
// Check if tail moved forward past this slot due to concurrent Append operations
// This would mean the buffer wrapped around and we're outside the valid window
rb.Lock()
if rb.tail < slot {
rb.Unlock()
return nil
}
idx := int(slot) % rb.length
rb.entries[idx] = *info
rb.tail = slot
rb.Unlock()
}
if range_.Start == 0 {
// We processed up to genesis, so nothing more to do.
return nil
}
next = range_.Start - 1
}
}
// Append stores a block's commitment information in the ring buffer.
// Slots must be appended contiguously (each slot must be head+1, or the first slot if uninitialized).
// When the buffer wraps around, old entries are automatically overwritten and tail advances.
func (rb *CommitmentTracker) Append(slot uint64, root [32]byte, commitments [][]byte) bool {
commitments = deepCopyCommitments(commitments)
rb.Lock()
defer rb.Unlock()
// Ensure contiguity - slots must be appended in order
if expected := rb.head + 1; slot != expected {
panic(fmt.Sprintf("invalid slot append: got %d, expected %d", slot, expected))
}
idx := int(slot) % rb.length
rb.entries[idx] = BlockCommitmentInfo{
Slot: slot,
BlockRoot: root,
BlobCommitments: commitments,
}
rb.head = slot
if length := uint64(rb.length); rb.head-rb.tail+1 > length {
// Buffer overflow: we've wrapped around and overwritten the oldest entry.
// Since appends are sequential, advance tail by 1 to reflect the new oldest slot.
rb.tail++
}
return true
}
// Undo removes the most recently appended slot from the buffer.
// It returns the slot number that was removed and true, or 0 and false if the buffer is empty.
func (rb *CommitmentTracker) Undo() (uint64, bool) {
rb.Lock()
defer rb.Unlock()
if rb.tail == rb.head {
// Cannot undo as head would be undefined
return 0, false
}
removedSlot := rb.head
// Clear the head entry and move head back by one slot
idx := int(removedSlot) % rb.length
rb.entries[idx] = BlockCommitmentInfo{}
rb.head--
return removedSlot, true
}
// Get retrieves the commitment information for a specific slot.
func (rb *CommitmentTracker) Get(slot uint64) (info BlockCommitmentInfo, ok bool) {
rb.RLock()
defer rb.RUnlock()
if slot < rb.tail || slot > rb.head {
// Slot is within the valid range
return BlockCommitmentInfo{}, false
}
// Verify the slot matches and entry is valid (in case of wrap-around overwrites or cleared entries)
idx := int(slot) % rb.length
info = rb.entries[idx]
if info.Slot != slot {
panic("slot mismatch in commitment tracker")
}
return info, true
}
// Range returns the current range of slots stored in the buffer [tail, head]
func (rb *CommitmentTracker) Range() Range[uint64] {
rb.RLock()
defer rb.RUnlock()
return Range[uint64]{rb.tail, rb.head}
}
// Capacity returns the maximum number of slots the buffer can hold
func (rb *CommitmentTracker) Capacity() int {
return rb.length
}
func deepCopyCommitments(src [][]byte) [][]byte {
if src == nil {
return nil
}
dst := make([][]byte, len(src))
for i, commitment := range src {
if commitment != nil {
dst[i] = make([]byte, len(commitment))
copy(dst[i], commitment)
}
}
return dst
}