Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions pkg/solana/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,13 @@ consumedAllBlocks:
if err != nil {
return fmt.Errorf("error processing blocks: %w", err)
}

// nolint:gosec
// G115: integer overflow conversion uint64 -> int64
highestInBatch := int64(batch[len(batch)-1].SlotNumber)
if highestInBatch > lp.lastProcessedSlot {
lp.lastProcessedSlot = highestInBatch
}
}
}

Expand Down
137 changes: 137 additions & 0 deletions pkg/solana/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,119 @@ func TestLogPoller_run(t *testing.T) {
require.NoError(t, err)
require.Equal(t, int64(130), lp.LogPoller.lastProcessedSlot)
})
// These two sub-tests demonstrate the difference between single-batch and multi-batch
// failure scenarios, showing how the batch-level cursor update prevents redundant RPC calls.
//
// Both use the same range [101..105] with a failure at block 103. The difference is
// whether blocks land in one batch or multiple batches before the failure.
t.Run("All blocks in one batch: failure re-fetches entire range", func(t *testing.T) {
// When all blocks arrive in a single batch and processing fails,
// the cursor update never executes — lastProcessedSlot stays at its
// original value, so the retry re-fetches everything from the start.
lp := newMockedLP(t)
lp.LogPoller.lastProcessedSlot = 100
addresses := []types.PublicKey{{1}}

var backfillFromSlots []uint64

lp.LogPoller.processBlocks = func(_ context.Context, _ []types.Block) error {
return errors.New("simulated error processing batch")
}

lp.Filters.EXPECT().LoadFilters(mock.Anything).Return(nil)
lp.Filters.EXPECT().GetFiltersToBackfill().Return(nil)
lp.Filters.EXPECT().GetDistinctAddresses(mock.Anything).Return(addresses, nil)
lp.Client.EXPECT().SlotHeightWithCommitment(mock.Anything, rpc.CommitmentFinalized).Return(uint64(105), nil)

lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, addresses, mock.AnythingOfType("uint64"), uint64(105)).
RunAndReturn(func(_ context.Context, _ []types.PublicKey, from, _ uint64) (<-chan types.Block, func(), error) {
backfillFromSlots = append(backfillFromSlots, from)
ch := make(chan types.Block, 3)
ch <- types.Block{SlotNumber: 101}
ch <- types.Block{SlotNumber: 102}
ch <- types.Block{SlotNumber: 103}
close(ch)
return ch, func() {}, nil
})

// --- Run 1: single batch [101,102,103] fails ---
err := lp.LogPoller.run(t.Context())
require.Error(t, err)
assert.Equal(t, int64(100), lp.LogPoller.lastProcessedSlot,
"cursor stays at 100 — batch failed before cursor could update")

// --- Run 2: starts from 101 again, re-fetching all blocks ---
err = lp.LogPoller.run(t.Context())
require.Error(t, err)

require.Len(t, backfillFromSlots, 2)
assert.Equal(t, uint64(101), backfillFromSlots[0], "run 1: starts from 101")
assert.Equal(t, uint64(101), backfillFromSlots[1],
"run 2: starts from 101 AGAIN — all blocks re-fetched because no batch succeeded")
})
t.Run("Split batches: failure only re-fetches unprocessed blocks", func(t *testing.T) {
// When blocks arrive in separate batches and the first batch succeeds,
// the cursor advances. The retry only fetches blocks after the cursor.
lp := newMockedLP(t)
lp.LogPoller.lastProcessedSlot = 100
addresses := []types.PublicKey{{1}}

var backfillFromSlots []uint64

callCount := 0
blocks1 := make(chan types.Block, 2)
blocks1 <- types.Block{SlotNumber: 101}
blocks1 <- types.Block{SlotNumber: 102}

lp.LogPoller.processBlocks = func(_ context.Context, batch []types.Block) error {
callCount++
if callCount == 1 {
// First batch [101, 102] succeeds. Inject the failing block for the next batch.
blocks1 <- types.Block{SlotNumber: 103}
close(blocks1)
return nil
}
if callCount == 2 {
return errors.New("simulated error processing block 103")
}
return nil
}

lp.Filters.EXPECT().LoadFilters(mock.Anything).Return(nil)
lp.Filters.EXPECT().GetFiltersToBackfill().Return(nil)
lp.Filters.EXPECT().GetDistinctAddresses(mock.Anything).Return(addresses, nil)
lp.Client.EXPECT().SlotHeightWithCommitment(mock.Anything, rpc.CommitmentFinalized).Return(uint64(105), nil)

lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, addresses, mock.AnythingOfType("uint64"), uint64(105)).
RunAndReturn(func(_ context.Context, _ []types.PublicKey, from, _ uint64) (<-chan types.Block, func(), error) {
backfillFromSlots = append(backfillFromSlots, from)
if from == 101 {
return blocks1, func() {}, nil
}
ch := make(chan types.Block, 3)
ch <- types.Block{SlotNumber: 103}
ch <- types.Block{SlotNumber: 104}
ch <- types.Block{SlotNumber: 105}
close(ch)
return ch, func() {}, nil
})

// --- Run 1: batch [101,102] succeeds, batch [103] fails ---
err := lp.LogPoller.run(t.Context())
require.Error(t, err)
assert.Equal(t, int64(102), lp.LogPoller.lastProcessedSlot,
"cursor advances to 102 — first batch succeeded before failure")

// --- Run 2: starts from 103, blocks 101 and 102 are NOT re-fetched ---
err = lp.LogPoller.run(t.Context())
require.NoError(t, err)
assert.Equal(t, int64(105), lp.LogPoller.lastProcessedSlot)

require.Len(t, backfillFromSlots, 2)
assert.Equal(t, uint64(101), backfillFromSlots[0], "run 1: starts from 101")
assert.Equal(t, uint64(103), backfillFromSlots[1],
"run 2: starts from 103 — blocks 101,102 are NOT re-fetched")
})
}

func Test_GetLastProcessedSlot(t *testing.T) {
Expand Down Expand Up @@ -317,6 +430,30 @@ func TestLogPoller_processBlocksRange(t *testing.T) {
lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(blocks, funcWithCallExpectation(t), nil).Once()
err := lp.LogPoller.processBlocksRange(t.Context(), nil, 10, 20)
require.NoError(t, err)
assert.Equal(t, int64(12), lp.LogPoller.lastProcessedSlot)
})
t.Run("Updates lastProcessedSlot incrementally on partial failure", func(t *testing.T) {
lp := newMockedLP(t)
blocks := make(chan types.Block, 3)
blocks <- types.Block{SlotNumber: 11}
blocks <- types.Block{SlotNumber: 12}

expectedErr := errors.New("simulated processing error")
callCount := 0
lp.LogPoller.processBlocks = func(_ context.Context, _ []types.Block) error {
callCount++
if callCount == 1 {
blocks <- types.Block{SlotNumber: 13}
close(blocks)
return nil
}
return expectedErr
}

lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(blocks, funcWithCallExpectation(t), nil).Once()
err := lp.LogPoller.processBlocksRange(t.Context(), nil, 10, 20)
require.ErrorIs(t, err, expectedErr)
assert.Equal(t, int64(12), lp.LogPoller.lastProcessedSlot)
})
}

Expand Down
Loading