diff --git a/pkg/solana/logpoller/log_poller.go b/pkg/solana/logpoller/log_poller.go index 5d7b848d5..d09b22a2c 100644 --- a/pkg/solana/logpoller/log_poller.go +++ b/pkg/solana/logpoller/log_poller.go @@ -441,6 +441,13 @@ consumedAllBlocks: if err != nil { return fmt.Errorf("error processing blocks: %w", err) } + + // nolint:gosec + // G115: integer overflow conversion uint64 -> int64 + highestInBatch := int64(batch[len(batch)-1].SlotNumber) + if highestInBatch > lp.lastProcessedSlot { + lp.lastProcessedSlot = highestInBatch + } } } diff --git a/pkg/solana/logpoller/log_poller_test.go b/pkg/solana/logpoller/log_poller_test.go index d8d6f0eea..e3128698f 100644 --- a/pkg/solana/logpoller/log_poller_test.go +++ b/pkg/solana/logpoller/log_poller_test.go @@ -168,6 +168,119 @@ func TestLogPoller_run(t *testing.T) { require.NoError(t, err) require.Equal(t, int64(130), lp.LogPoller.lastProcessedSlot) }) + // These two sub-tests demonstrate the difference between single-batch and multi-batch + // failure scenarios, showing how the batch-level cursor update prevents redundant RPC calls. + // + // Both use the same range [101..105] with a failure at block 103. The difference is + // whether blocks land in one batch or multiple batches before the failure. + t.Run("All blocks in one batch: failure re-fetches entire range", func(t *testing.T) { + // When all blocks arrive in a single batch and processing fails, + // the cursor update never executes — lastProcessedSlot stays at its + // original value, so the retry re-fetches everything from the start. + lp := newMockedLP(t) + lp.LogPoller.lastProcessedSlot = 100 + addresses := []types.PublicKey{{1}} + + var backfillFromSlots []uint64 + + lp.LogPoller.processBlocks = func(_ context.Context, _ []types.Block) error { + return errors.New("simulated error processing batch") + } + + lp.Filters.EXPECT().LoadFilters(mock.Anything).Return(nil) + lp.Filters.EXPECT().GetFiltersToBackfill().Return(nil) + lp.Filters.EXPECT().GetDistinctAddresses(mock.Anything).Return(addresses, nil) + lp.Client.EXPECT().SlotHeightWithCommitment(mock.Anything, rpc.CommitmentFinalized).Return(uint64(105), nil) + + lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, addresses, mock.AnythingOfType("uint64"), uint64(105)). + RunAndReturn(func(_ context.Context, _ []types.PublicKey, from, _ uint64) (<-chan types.Block, func(), error) { + backfillFromSlots = append(backfillFromSlots, from) + ch := make(chan types.Block, 3) + ch <- types.Block{SlotNumber: 101} + ch <- types.Block{SlotNumber: 102} + ch <- types.Block{SlotNumber: 103} + close(ch) + return ch, func() {}, nil + }) + + // --- Run 1: single batch [101,102,103] fails --- + err := lp.LogPoller.run(t.Context()) + require.Error(t, err) + assert.Equal(t, int64(100), lp.LogPoller.lastProcessedSlot, + "cursor stays at 100 — batch failed before cursor could update") + + // --- Run 2: starts from 101 again, re-fetching all blocks --- + err = lp.LogPoller.run(t.Context()) + require.Error(t, err) + + require.Len(t, backfillFromSlots, 2) + assert.Equal(t, uint64(101), backfillFromSlots[0], "run 1: starts from 101") + assert.Equal(t, uint64(101), backfillFromSlots[1], + "run 2: starts from 101 AGAIN — all blocks re-fetched because no batch succeeded") + }) + t.Run("Split batches: failure only re-fetches unprocessed blocks", func(t *testing.T) { + // When blocks arrive in separate batches and the first batch succeeds, + // the cursor advances. The retry only fetches blocks after the cursor. + lp := newMockedLP(t) + lp.LogPoller.lastProcessedSlot = 100 + addresses := []types.PublicKey{{1}} + + var backfillFromSlots []uint64 + + callCount := 0 + blocks1 := make(chan types.Block, 2) + blocks1 <- types.Block{SlotNumber: 101} + blocks1 <- types.Block{SlotNumber: 102} + + lp.LogPoller.processBlocks = func(_ context.Context, batch []types.Block) error { + callCount++ + if callCount == 1 { + // First batch [101, 102] succeeds. Inject the failing block for the next batch. + blocks1 <- types.Block{SlotNumber: 103} + close(blocks1) + return nil + } + if callCount == 2 { + return errors.New("simulated error processing block 103") + } + return nil + } + + lp.Filters.EXPECT().LoadFilters(mock.Anything).Return(nil) + lp.Filters.EXPECT().GetFiltersToBackfill().Return(nil) + lp.Filters.EXPECT().GetDistinctAddresses(mock.Anything).Return(addresses, nil) + lp.Client.EXPECT().SlotHeightWithCommitment(mock.Anything, rpc.CommitmentFinalized).Return(uint64(105), nil) + + lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, addresses, mock.AnythingOfType("uint64"), uint64(105)). + RunAndReturn(func(_ context.Context, _ []types.PublicKey, from, _ uint64) (<-chan types.Block, func(), error) { + backfillFromSlots = append(backfillFromSlots, from) + if from == 101 { + return blocks1, func() {}, nil + } + ch := make(chan types.Block, 3) + ch <- types.Block{SlotNumber: 103} + ch <- types.Block{SlotNumber: 104} + ch <- types.Block{SlotNumber: 105} + close(ch) + return ch, func() {}, nil + }) + + // --- Run 1: batch [101,102] succeeds, batch [103] fails --- + err := lp.LogPoller.run(t.Context()) + require.Error(t, err) + assert.Equal(t, int64(102), lp.LogPoller.lastProcessedSlot, + "cursor advances to 102 — first batch succeeded before failure") + + // --- Run 2: starts from 103, blocks 101 and 102 are NOT re-fetched --- + err = lp.LogPoller.run(t.Context()) + require.NoError(t, err) + assert.Equal(t, int64(105), lp.LogPoller.lastProcessedSlot) + + require.Len(t, backfillFromSlots, 2) + assert.Equal(t, uint64(101), backfillFromSlots[0], "run 1: starts from 101") + assert.Equal(t, uint64(103), backfillFromSlots[1], + "run 2: starts from 103 — blocks 101,102 are NOT re-fetched") + }) } func Test_GetLastProcessedSlot(t *testing.T) { @@ -317,6 +430,30 @@ func TestLogPoller_processBlocksRange(t *testing.T) { lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(blocks, funcWithCallExpectation(t), nil).Once() err := lp.LogPoller.processBlocksRange(t.Context(), nil, 10, 20) require.NoError(t, err) + assert.Equal(t, int64(12), lp.LogPoller.lastProcessedSlot) + }) + t.Run("Updates lastProcessedSlot incrementally on partial failure", func(t *testing.T) { + lp := newMockedLP(t) + blocks := make(chan types.Block, 3) + blocks <- types.Block{SlotNumber: 11} + blocks <- types.Block{SlotNumber: 12} + + expectedErr := errors.New("simulated processing error") + callCount := 0 + lp.LogPoller.processBlocks = func(_ context.Context, _ []types.Block) error { + callCount++ + if callCount == 1 { + blocks <- types.Block{SlotNumber: 13} + close(blocks) + return nil + } + return expectedErr + } + + lp.Loader.EXPECT().BackfillForAddresses(mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(blocks, funcWithCallExpectation(t), nil).Once() + err := lp.LogPoller.processBlocksRange(t.Context(), nil, 10, 20) + require.ErrorIs(t, err, expectedErr) + assert.Equal(t, int64(12), lp.LogPoller.lastProcessedSlot) }) }