Skip to content

Commit 4ff727d

Browse files
committed
marshal in cache
1 parent fa683c2 commit 4ff727d

File tree

12 files changed

+157
-103
lines changed

12 files changed

+157
-103
lines changed

block/internal/cache/bench_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ func BenchmarkManager_GetPendingHeaders(b *testing.B) {
7272
b.ReportAllocs()
7373
b.ResetTimer()
7474
for b.Loop() {
75-
hs, err := m.GetPendingHeaders(ctx)
75+
hs, _, err := m.GetPendingHeaders(ctx)
7676
if err != nil {
7777
b.Fatal(err)
7878
}
@@ -93,7 +93,7 @@ func BenchmarkManager_GetPendingData(b *testing.B) {
9393
b.ReportAllocs()
9494
b.ResetTimer()
9595
for b.Loop() {
96-
ds, err := m.GetPendingData(ctx)
96+
ds, _, err := m.GetPendingData(ctx)
9797
if err != nil {
9898
b.Fatal(err)
9999
}

block/internal/cache/manager.go

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ type CacheManager interface {
7878

7979
// PendingManager provides operations for managing pending headers and data
8080
type PendingManager interface {
81-
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error)
82-
GetPendingData(ctx context.Context) ([]*types.SignedData, error)
81+
GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error)
82+
GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error)
8383
SetLastSubmittedHeaderHeight(ctx context.Context, height uint64)
8484
SetLastSubmittedDataHeight(ctx context.Context, height uint64)
8585
NumPendingHeaders() uint64
@@ -318,20 +318,21 @@ func (m *implementation) DeleteHeight(blockHeight uint64) {
318318
}
319319

320320
// Pending operations
321-
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
321+
func (m *implementation) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
322322
return m.pendingHeaders.GetPendingHeaders(ctx)
323323
}
324324

325-
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, error) {
326-
// Get pending raw data
327-
dataList, err := m.pendingData.GetPendingData(ctx)
325+
func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedData, [][]byte, error) {
326+
// Get pending raw data with marshalled bytes
327+
dataList, marshalledData, err := m.pendingData.GetPendingData(ctx)
328328
if err != nil {
329-
return nil, err
329+
return nil, nil, err
330330
}
331331

332332
// Convert to SignedData (this logic was in manager.go)
333333
signedDataList := make([]*types.SignedData, 0, len(dataList))
334-
for _, data := range dataList {
334+
marshalledSignedData := make([][]byte, 0, len(dataList))
335+
for i, data := range dataList {
335336
if len(data.Txs) == 0 {
336337
continue // Skip empty data
337338
}
@@ -342,9 +343,10 @@ func (m *implementation) GetPendingData(ctx context.Context) ([]*types.SignedDat
342343
Data: *data,
343344
// Signature and Signer will be set by executing component
344345
})
346+
marshalledSignedData = append(marshalledSignedData, marshalledData[i])
345347
}
346348

347-
return signedDataList, nil
349+
return signedDataList, marshalledSignedData, nil
348350
}
349351

350352
func (m *implementation) SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) {

block/internal/cache/manager_test.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -183,14 +183,14 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
183183
require.NoError(t, err)
184184

185185
// headers: all 3 should be pending initially
186-
headers, err := cm.GetPendingHeaders(ctx)
186+
headers, _, err := cm.GetPendingHeaders(ctx)
187187
require.NoError(t, err)
188188
require.Len(t, headers, 3)
189189
assert.Equal(t, uint64(1), headers[0].Height())
190190
assert.Equal(t, uint64(3), headers[2].Height())
191191

192192
// data: empty one filtered, so 2 and 3 only
193-
signedData, err := cm.GetPendingData(ctx)
193+
signedData, _, err := cm.GetPendingData(ctx)
194194
require.NoError(t, err)
195195
require.Len(t, signedData, 2)
196196
assert.Equal(t, uint64(2), signedData[0].Height())
@@ -200,12 +200,12 @@ func TestPendingHeadersAndData_Flow(t *testing.T) {
200200
cm.SetLastSubmittedHeaderHeight(ctx, 1)
201201
cm.SetLastSubmittedDataHeight(ctx, 2)
202202

203-
headers, err = cm.GetPendingHeaders(ctx)
203+
headers, _, err = cm.GetPendingHeaders(ctx)
204204
require.NoError(t, err)
205205
require.Len(t, headers, 2)
206206
assert.Equal(t, uint64(2), headers[0].Height())
207207

208-
signedData, err = cm.GetPendingData(ctx)
208+
signedData, _, err = cm.GetPendingData(ctx)
209209
require.NoError(t, err)
210210
require.Len(t, signedData, 1)
211211
assert.Equal(t, uint64(3), signedData[0].Height())

block/internal/cache/pending_base.go

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ import (
55
"encoding/binary"
66
"errors"
77
"fmt"
8+
"sync"
89
"sync/atomic"
910

1011
ds "github.com/ipfs/go-datastore"
@@ -22,15 +23,20 @@ type pendingBase[T any] struct {
2223
metaKey string
2324
fetch func(ctx context.Context, store store.Store, height uint64) (T, error)
2425
lastHeight atomic.Uint64
26+
27+
// Marshalling cache to avoid redundant marshalling
28+
marshalledMtx sync.RWMutex
29+
marshalledCache map[uint64][]byte // key: height
2530
}
2631

2732
// newPendingBase constructs a new pendingBase for a given type.
2833
func newPendingBase[T any](store store.Store, logger zerolog.Logger, metaKey string, fetch func(ctx context.Context, store store.Store, height uint64) (T, error)) (*pendingBase[T], error) {
2934
pb := &pendingBase[T]{
30-
store: store,
31-
logger: logger,
32-
metaKey: metaKey,
33-
fetch: fetch,
35+
store: store,
36+
logger: logger,
37+
metaKey: metaKey,
38+
fetch: fetch,
39+
marshalledCache: make(map[uint64][]byte),
3440
}
3541
if err := pb.init(); err != nil {
3642
return nil, err
@@ -80,6 +86,9 @@ func (pb *pendingBase[T]) setLastSubmittedHeight(ctx context.Context, newLastSub
8086
if err != nil {
8187
pb.logger.Error().Err(err).Msg("failed to store height of latest item submitted to DA")
8288
}
89+
90+
// Clear marshalled cache for submitted heights
91+
pb.clearMarshalledCacheUpTo(newLastSubmittedHeight)
8392
}
8493
}
8594

@@ -101,3 +110,28 @@ func (pb *pendingBase[T]) init() error {
101110
pb.lastHeight.CompareAndSwap(0, lsh)
102111
return nil
103112
}
113+
114+
// getMarshalledForHeight returns cached marshalled bytes for a height, or nil if not cached
115+
func (pb *pendingBase[T]) getMarshalledForHeight(height uint64) []byte {
116+
pb.marshalledMtx.RLock()
117+
defer pb.marshalledMtx.RUnlock()
118+
return pb.marshalledCache[height]
119+
}
120+
121+
// setMarshalledForHeight caches marshalled bytes for a height
122+
func (pb *pendingBase[T]) setMarshalledForHeight(height uint64, marshalled []byte) {
123+
pb.marshalledMtx.Lock()
124+
defer pb.marshalledMtx.Unlock()
125+
pb.marshalledCache[height] = marshalled
126+
}
127+
128+
// clearMarshalledCacheUpTo removes cached marshalled bytes up to and including the given height
129+
func (pb *pendingBase[T]) clearMarshalledCacheUpTo(height uint64) {
130+
pb.marshalledMtx.Lock()
131+
defer pb.marshalledMtx.Unlock()
132+
for h := range pb.marshalledCache {
133+
if h <= height {
134+
delete(pb.marshalledCache, h)
135+
}
136+
}
137+
}

block/internal/cache/pending_base_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestPendingBase_ErrorConditions(t *testing.T) {
3535
// ensure store height stays lower (0)
3636
ph, err := NewPendingHeaders(st, logger)
3737
require.NoError(t, err)
38-
pending, err := ph.GetPendingHeaders(ctx)
38+
pending, _, err := ph.GetPendingHeaders(ctx)
3939
assert.Error(t, err)
4040
assert.Len(t, pending, 0)
4141

block/internal/cache/pending_data.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/rs/zerolog"
78

@@ -46,9 +47,42 @@ func (pd *PendingData) init() error {
4647
return pd.base.init()
4748
}
4849

49-
// GetPendingData returns a sorted slice of pending Data.
50-
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, error) {
51-
return pd.base.getPending(ctx)
50+
// GetPendingData returns a sorted slice of pending Data along with their marshalled bytes.
51+
// It uses an internal cache to avoid re-marshalling data on subsequent calls.
52+
func (pd *PendingData) GetPendingData(ctx context.Context) ([]*types.Data, [][]byte, error) {
53+
dataList, err := pd.base.getPending(ctx)
54+
if err != nil {
55+
return nil, nil, err
56+
}
57+
58+
if len(dataList) == 0 {
59+
return nil, nil, nil
60+
}
61+
62+
marshalled := make([][]byte, len(dataList))
63+
lastSubmitted := pd.base.lastHeight.Load()
64+
65+
for i, data := range dataList {
66+
height := lastSubmitted + uint64(i) + 1
67+
68+
// Try to get from cache first
69+
if cached := pd.base.getMarshalledForHeight(height); cached != nil {
70+
marshalled[i] = cached
71+
continue
72+
}
73+
74+
// Marshal if not in cache
75+
dataBytes, err := data.MarshalBinary()
76+
if err != nil {
77+
return nil, nil, fmt.Errorf("failed to marshal data at height %d: %w", height, err)
78+
}
79+
marshalled[i] = dataBytes
80+
81+
// Store in cache
82+
pd.base.setMarshalledForHeight(height, dataBytes)
83+
}
84+
85+
return dataList, marshalled, nil
5286
}
5387

5488
func (pd *PendingData) NumPendingData() uint64 {

block/internal/cache/pending_data_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
3939

4040
// initially all 3 data items are pending, incl. empty
4141
require.Equal(t, uint64(3), pendingData.NumPendingData())
42-
pendingDataList, err := pendingData.GetPendingData(ctx)
42+
pendingDataList, _, err := pendingData.GetPendingData(ctx)
4343
require.NoError(t, err)
4444
require.Len(t, pendingDataList, 3)
4545
require.Equal(t, uint64(1), pendingDataList[0].Height())
@@ -53,7 +53,7 @@ func TestPendingData_BasicFlow(t *testing.T) {
5353
require.Equal(t, uint64(1), binary.LittleEndian.Uint64(metadataRaw))
5454

5555
require.Equal(t, uint64(2), pendingData.NumPendingData())
56-
pendingDataList, err = pendingData.GetPendingData(ctx)
56+
pendingDataList, _, err = pendingData.GetPendingData(ctx)
5757
require.NoError(t, err)
5858
require.Len(t, pendingDataList, 2)
5959
require.Equal(t, uint64(2), pendingDataList[0].Height())
@@ -97,7 +97,7 @@ func TestPendingData_GetPending_PropagatesFetchError(t *testing.T) {
9797
require.NoError(t, err)
9898

9999
// fetching pending should propagate the not-found error from store
100-
pending, err := pendingData.GetPendingData(ctx)
100+
pending, _, err := pendingData.GetPendingData(ctx)
101101
require.Error(t, err)
102102
require.Empty(t, pending)
103103
}

block/internal/cache/pending_headers.go

Lines changed: 37 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package cache
22

33
import (
44
"context"
5+
"fmt"
56

67
"github.com/rs/zerolog"
78

@@ -39,9 +40,42 @@ func NewPendingHeaders(store storepkg.Store, logger zerolog.Logger) (*PendingHea
3940
return &PendingHeaders{base: base}, nil
4041
}
4142

42-
// GetPendingHeaders returns a sorted slice of pending headers.
43-
func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) {
44-
return ph.base.getPending(ctx)
43+
// GetPendingHeaders returns a sorted slice of pending headers along with their marshalled bytes.
44+
// It uses an internal cache to avoid re-marshalling headers on subsequent calls.
45+
func (ph *PendingHeaders) GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, [][]byte, error) {
46+
headers, err := ph.base.getPending(ctx)
47+
if err != nil {
48+
return nil, nil, err
49+
}
50+
51+
if len(headers) == 0 {
52+
return nil, nil, nil
53+
}
54+
55+
marshalled := make([][]byte, len(headers))
56+
lastSubmitted := ph.base.lastHeight.Load()
57+
58+
for i, header := range headers {
59+
height := lastSubmitted + uint64(i) + 1
60+
61+
// Try to get from cache first
62+
if cached := ph.base.getMarshalledForHeight(height); cached != nil {
63+
marshalled[i] = cached
64+
continue
65+
}
66+
67+
// Marshal if not in cache
68+
data, err := header.MarshalBinary()
69+
if err != nil {
70+
return nil, nil, fmt.Errorf("failed to marshal header at height %d: %w", height, err)
71+
}
72+
marshalled[i] = data
73+
74+
// Store in cache
75+
ph.base.setMarshalledForHeight(height, data)
76+
}
77+
78+
return headers, marshalled, nil
4579
}
4680

4781
func (ph *PendingHeaders) NumPendingHeaders() uint64 {

block/internal/cache/pending_headers_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
3939

4040
// initially all three are pending
4141
require.Equal(t, uint64(3), pendingHeaders.NumPendingHeaders())
42-
headers, err := pendingHeaders.GetPendingHeaders(ctx)
42+
headers, _, err := pendingHeaders.GetPendingHeaders(ctx)
4343
require.NoError(t, err)
4444
require.Len(t, headers, 3)
4545
require.Equal(t, uint64(1), headers[0].Height())
@@ -53,7 +53,7 @@ func TestPendingHeaders_BasicFlow(t *testing.T) {
5353
require.Equal(t, uint64(2), binary.LittleEndian.Uint64(metadataRaw))
5454

5555
require.Equal(t, uint64(1), pendingHeaders.NumPendingHeaders())
56-
headers, err = pendingHeaders.GetPendingHeaders(ctx)
56+
headers, _, err = pendingHeaders.GetPendingHeaders(ctx)
5757
require.NoError(t, err)
5858
require.Len(t, headers, 1)
5959
require.Equal(t, uint64(3), headers[0].Height())
@@ -82,7 +82,7 @@ func TestPendingHeaders_EmptyWhenUpToDate(t *testing.T) {
8282
// set last submitted to the current height, so nothing pending
8383
pendingHeaders.SetLastSubmittedHeaderHeight(ctx, 1)
8484
require.Equal(t, uint64(0), pendingHeaders.NumPendingHeaders())
85-
headers, err := pendingHeaders.GetPendingHeaders(ctx)
85+
headers, _, err := pendingHeaders.GetPendingHeaders(ctx)
8686
require.NoError(t, err)
8787
require.Empty(t, headers)
8888
}

block/internal/submitting/da_submitter_integration_test.go

Lines changed: 3 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -98,29 +98,13 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted(
9898
}).Twice()
9999
daSubmitter := NewDASubmitter(client, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop())
100100

101-
// Submit headers and data
102-
headers, err := cm.GetPendingHeaders(context.Background())
101+
// Submit headers and data - cache returns both items and marshalled bytes
102+
headers, marshalledHeaders, err := cm.GetPendingHeaders(context.Background())
103103
require.NoError(t, err)
104-
105-
// Marshal headers
106-
marshalledHeaders := make([][]byte, len(headers))
107-
for i, h := range headers {
108-
data, err := h.MarshalBinary()
109-
require.NoError(t, err)
110-
marshalledHeaders[i] = data
111-
}
112104
require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), headers, marshalledHeaders, cm, n))
113105

114-
dataList, err := cm.GetPendingData(context.Background())
106+
dataList, marshalledData, err := cm.GetPendingData(context.Background())
115107
require.NoError(t, err)
116-
117-
// Marshal data
118-
marshalledData := make([][]byte, len(dataList))
119-
for i, d := range dataList {
120-
data, err := d.MarshalBinary()
121-
require.NoError(t, err)
122-
marshalledData[i] = data
123-
}
124108
require.NoError(t, daSubmitter.SubmitData(context.Background(), dataList, marshalledData, cm, n, gen))
125109

126110
// After submission, inclusion markers should be set

0 commit comments

Comments
 (0)