From 0610b3ac95f8eb94396c45e4e08090cc4883f318 Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 8 Jan 2026 10:12:04 -0800 Subject: [PATCH 1/4] Add datalake proxy support for getTransactions --- .../integrationtest/get_ledgers_test.go | 88 +-- .../integrationtest/get_transactions_test.go | 696 ++++++++++++++++++ .../integrationtest/infrastructure/gcs.go | 229 ++++++ cmd/stellar-rpc/internal/jsonrpc.go | 2 +- .../internal/methods/get_transactions.go | 86 ++- .../internal/methods/get_transactions_test.go | 42 +- cmd/stellar-rpc/internal/methods/mocks.go | 5 + .../internal/rpcdatastore/ledger_reader.go | 164 ++++- .../rpcdatastore/ledger_reader_test.go | 326 +++++++- go.mod | 11 +- go.sum | 26 +- 11 files changed, 1512 insertions(+), 163 deletions(-) create mode 100644 cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go diff --git a/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go b/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go index 9258fab2..cc55f93d 100644 --- a/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go +++ b/cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go @@ -1,21 +1,14 @@ package integrationtest import ( - "bytes" "testing" "time" - "github.com/fsouza/fake-gcs-server/fakestorage" "github.com/stretchr/testify/require" client "github.com/stellar/go-stellar-sdk/clients/rpcclient" - "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" - "github.com/stellar/go-stellar-sdk/support/compressxdr" - "github.com/stellar/go-stellar-sdk/support/datastore" - "github.com/stellar/go-stellar-sdk/xdr" - "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure" ) @@ -104,62 +97,22 @@ func TestGetLedgers(t *testing.T) { } func TestGetLedgersFromDatastore(t *testing.T) { - // setup fake GCS server - opts := fakestorage.Options{ - Scheme: "http", - PublicHost: "127.0.0.1", - } - gcsServer, err := fakestorage.NewServerWithOptions(opts) - require.NoError(t, err) - defer gcsServer.Stop() + gcsSetup := infrastructure.NewGCSTestSetup(t, infrastructure.DefaultGCSTestConfig()) + defer gcsSetup.Stop() - t.Setenv("STORAGE_EMULATOR_HOST", gcsServer.URL()) - bucketName := "test-bucket" - gcsServer.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName}) - - // datastore configuration function - schema := datastore.DataStoreSchema{ - FilesPerPartition: 1, - LedgersPerFile: 1, - } - setDatastoreConfig := func(cfg *config.Config) { - cfg.ServeLedgersFromDatastore = true - cfg.BufferedStorageBackendConfig = ledgerbackend.BufferedStorageBackendConfig{ - BufferSize: 15, - NumWorkers: 2, - } - cfg.DataStoreConfig = datastore.DataStoreConfig{ - Type: "GCS", - Params: map[string]string{"destination_bucket_path": bucketName}, - Schema: schema, - } - // reduce retention windows to force usage of datastore - cfg.HistoryRetentionWindow = 15 - cfg.ClassicFeeStatsLedgerRetentionWindow = 15 - cfg.SorobanFeeStatsLedgerRetentionWindow = 15 - } - - // add files to GCS - for seq := uint32(35); seq <= 40; seq++ { - gcsServer.CreateObject(fakestorage.Object{ - ObjectAttrs: fakestorage.ObjectAttrs{ - BucketName: bucketName, - Name: schema.GetObjectKeyFromSequenceNumber(seq), - }, - Content: createLCMBatchBuffer(seq), - }) - } + // add files to GCS for ledgers 35-40 + gcsSetup.AddLedgers(35, 40) test := infrastructure.NewTest(t, &infrastructure.TestConfig{ - DatastoreConfigFunc: setDatastoreConfig, + DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), NoParallel: true, // can't use parallel due to env vars }) - client := test.GetRPCLient() // at this point we're at like ledger 30 + cl := test.GetRPCLient() // at this point we're at like ledger 30 waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { var last protocol.GetHealthResponse require.Eventually(t, func() bool { - resp, err := client.GetHealth(t.Context()) + resp, err := cl.GetHealth(t.Context()) require.NoError(t, err) last = resp return cond(resp) @@ -183,7 +136,7 @@ func TestGetLedgersFromDatastore(t *testing.T) { Cursor: cursor, }, } - return client.GetLedgers(t.Context(), req) + return cl.GetLedgers(t.Context(), req) } // ensure oldest > 40 so datastore set ([35..40]) is below local window @@ -249,28 +202,3 @@ func TestGetLedgersFromDatastore(t *testing.T) { require.Empty(t, res.Ledgers, "expected no ledgers when requesting beyond latest") }) } - -func createLCMBatchBuffer(seq uint32) []byte { - lcm := xdr.LedgerCloseMetaBatch{ - StartSequence: xdr.Uint32(seq), - EndSequence: xdr.Uint32(seq), - LedgerCloseMetas: []xdr.LedgerCloseMeta{ - { - V: int32(0), - V0: &xdr.LedgerCloseMetaV0{ - LedgerHeader: xdr.LedgerHeaderHistoryEntry{ - Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(seq), - }, - }, - }, - }, - }, - } - - var buf bytes.Buffer - encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, lcm) - _, _ = encoder.WriteTo(&buf) - - return buf.Bytes() -} diff --git a/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go b/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go index 52df6e75..adf1a86e 100644 --- a/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go +++ b/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go @@ -3,6 +3,7 @@ package integrationtest import ( "context" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -112,3 +113,698 @@ func TestGetTransactionsEvents(t *testing.T) { assert.Len(t, response.Events.TransactionEventsXDR, 2) assert.NotEmpty(t, response.DiagnosticEventsXDR) } + +func TestGetTransactionsDataStore1(t *testing.T) { + gcsCfg := infrastructure.DefaultGCSTestConfig() + gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg) + defer gcsSetup.Stop() + + // add files to GCS + gcsSetup.AddLedgers(5, 40) + + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), + NoParallel: true, + }) + client := test.GetRPCLient() // at this point we're at like ledger 30 + + waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { + var last protocol.GetHealthResponse + require.Eventually(t, func() bool { + resp, err := client.GetHealth(t.Context()) + require.NoError(t, err) + last = resp + return cond(resp) + }, timeout, 100*time.Millisecond, "last health: %+v", last) + return last + } + ledgers := sendTransactions(t, client) + + request := func(start uint32, limit uint, cursor string) (protocol.GetTransactionsResponse, error) { + req := protocol.GetTransactionsRequest{ + StartLedger: start, + Pagination: &protocol.LedgerPaginationOptions{ + Limit: limit, + Cursor: cursor, + }, + } + return client.GetTransactions(t.Context(), req) + } + + // ensure oldest > 40 so datastore set ([35..40]) is below local window + health := waitUntil(func(h protocol.GetHealthResponse) bool { + return uint(h.OldestLedger) > 40 + }, 30*time.Second) + + oldest := health.OldestLedger + latest := health.LatestLedger + require.GreaterOrEqual(t, latest, oldest) + + getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 { + out := make([]uint32, len(resp.Transactions)) + for i, l := range resp.Transactions { + out[i] = l.TransactionDetails.Ledger + } + return out + } + // --- 1) datastore-only: entirely below oldest --- + t.Run("datastore_only", func(t *testing.T) { + res, err := request(35, 3, "") + require.NoError(t, err) + require.Len(t, res.Transactions, 3) + require.Equal(t, []uint32{35, 35, 36}, getSeqs(res)) + }) + + // --- 2) local-only: entirely at/above oldest --- + t.Run("local_only", func(t *testing.T) { + limit := 3 + res, err := request(ledgers[0], uint(limit), "") + require.NoError(t, err) + require.Len(t, res.Transactions, 3) + }) + + // --- 3) mixed: cross boundary (datastore then local) --- + t.Run("mixed_datastore_and_local", func(t *testing.T) { + // 39,40 from datastore; 41,42 from local + require.GreaterOrEqual(t, latest, uint32(42), "need latest >= 42") + res, err := request(39, 4, "") + require.NoError(t, err) + require.Len(t, res.Transactions, 4) + + // verify cursor continuity across boundary + next, err := request(0, 2, res.Cursor) + require.NoError(t, err) + if len(next.Transactions) > 0 { + require.EqualValues(t, 43, next.Transactions[0].TransactionHash) + } + }) + + // --- 4) negative: below datastore floor (not available anywhere) --- + t.Run("negative_below_datastore_floor", func(t *testing.T) { + res, err := request(2, 3, "") + // accept either an error or an empty page; but never data + if err != nil { + return + } + require.Empty(t, res.Transactions, "expected no ledgers when requesting below datastore floor") + }) + + // --- 5) negative: beyond latest --- + t.Run("negative_beyond_latest", func(t *testing.T) { + res, err := request(latest+1, 1, "") + if err != nil { + return + } + require.Empty(t, res.Transactions, "expected no ledgers when requesting beyond latest") + }) +} + +/* +func TestGetTransactionsDataStore1(t *testing.T) { + gcsCfg := infrastructure.DefaultGCSTestConfig() + gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg) + defer gcsSetup.Stop() + + // add files to GCS + gcsSetup.AddLedgers(5, 42) + + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), + NoParallel: true, + }) + client := test.GetRPCLient() + + waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { + var last protocol.GetHealthResponse + require.Eventually(t, func() bool { + resp, err := client.GetHealth(t.Context()) + require.NoError(t, err) + last = resp + return cond(resp) + }, timeout, 100*time.Millisecond, "last health: %+v", last) + return last + } + + + request := func(start uint32, limit uint, cursor string) (protocol.GetTransactionsResponse, error) { + req := protocol.GetTransactionsRequest{ + StartLedger: start, + Pagination: &protocol.LedgerPaginationOptions{ + Limit: limit, + Cursor: cursor, + }, + } + return client.GetTransactions(t.Context(), req) + } + + // Helper to extract ledger sequences from response + getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 { + out := make([]uint32, len(resp.Transactions)) + for i, tx := range resp.Transactions { + out[i] = tx.TransactionDetails.Ledger + } + return out + } + + // Helper to verify transactions are properly ordered + assertTransactionsOrdered := func(t *testing.T, txs []protocol.TransactionInfo, desc string) { + for i := 1; i < len(txs); i++ { + prev := txs[i-1].TransactionDetails + curr := txs[i].TransactionDetails + // Ledger should be non-decreasing + require.LessOrEqual(t, prev.Ledger, curr.Ledger, + "%s: transactions not ordered - ledger %d after %d", desc, curr.Ledger, prev.Ledger) + // If same ledger, verify we have proper transaction indices/ordering + if prev.Ledger == curr.Ledger { + // Transactions within same ledger should maintain order + require.NotEmpty(t, prev.TransactionHash, "%s: missing transaction hash", desc) + require.NotEmpty(t, curr.TransactionHash, "%s: missing transaction hash", desc) + } + } + } + + // Helper to verify transaction data integrity + assertTransactionValid := func(t *testing.T, tx protocol.TransactionInfo, desc string) { + require.NotEmpty(t, tx.TransactionHash, "%s: missing transaction hash", desc) + require.NotZero(t, tx.TransactionDetails.Ledger, "%s: zero ledger number", desc) + require.NotEmpty(t, tx.TransactionDetails.EnvelopeXDR, "%s: missing envelope XDR", desc) + require.NotEmpty(t, tx.TransactionDetails.ResultXDR, "%s: missing result XDR", desc) + require.NotEmpty(t, tx.TransactionDetails.ResultMetaXDR, "%s: missing result meta XDR", desc) + } + + // Ensure oldest > 40 so datastore set ([5..40]) is below local window + health := waitUntil(func(h protocol.GetHealthResponse) bool { + return uint(h.OldestLedger) > 40 + }, 30*time.Second) + + oldest := health.OldestLedger + latest := health.LatestLedger + require.GreaterOrEqual(t, latest, oldest, "latest should be >= oldest") + require.Greater(t, oldest, uint32(40), "oldest should be > 40 for this test") + + + // --- 1) Datastore-only: entirely below oldest --- + t.Run("datastore_only", func(t *testing.T) { + res, err := request(35, 3, "") + require.NoError(t, err, "should successfully fetch from datastore") + require.Len(t, res.Transactions, 3, "should return exactly 3 transactions") + + seqs := getSeqs(res) + require.Equal(t, []uint32{35, 35, 36}, seqs, "should get transactions from ledgers 35-36") + + // Verify transaction data integrity + for i, tx := range res.Transactions { + assertTransactionValid(t, tx, fmt.Sprintf("datastore tx[%d]", i)) + } + assertTransactionsOrdered(t, res.Transactions, "datastore-only") + + // Verify cursor is present and non-empty for pagination + require.NotEmpty(t, res.Cursor, "should have cursor for pagination") + + // Verify ledger range in response + require.GreaterOrEqual(t, res.OldestLedger, uint32(40), "oldest ledger should match request") + require.GreaterOrEqual(t, res.LatestLedger, uint32(40), "latest should include returned ledgers") + }) + + // --- 2) Local-only: entirely at/above oldest --- + t.Run("local_only", func(t *testing.T) { + ledgers := sendTransactions(t, client) + require.NotEmpty(t, ledgers, "should have sent some transactions") + limit := 3 + startLedger := ledgers[0] + require.GreaterOrEqual(t, startLedger, oldest, "test ledger should be in local range") + + res, err := request(startLedger, uint(limit), "") + require.NoError(t, err, "should successfully fetch from local storage") + require.Len(t, res.Transactions, 3, "should return exactly 3 transactions") + + // Verify ledger numbers are in the local range + seqs := getSeqs(res) + for _, seq := range seqs { + require.GreaterOrEqual(t, seq, oldest, "local tx should be >= oldest ledger") + } + + // Verify all start at or after the requested start + require.GreaterOrEqual(t, seqs[0], startLedger, "first tx should be >= start ledger") + + // Verify transaction data and ordering + for i, tx := range res.Transactions { + assertTransactionValid(t, tx, fmt.Sprintf("local tx[%d]", i)) + } + assertTransactionsOrdered(t, res.Transactions, "local-only") + + require.NotEmpty(t, res.Cursor, "should have cursor for pagination") + }) + + // --- 3) Mixed: cross boundary (datastore then local) --- + t.Run("mixed_datastore_and_local", func(t *testing.T) { + // Request starting from the datastore range, crossing into the local range + ledgers := sendTransactions(t, client) + require.NotEmpty(t, ledgers, "should have sent some transactions") + startLedger := uint32(39) + //require.Less(t, startLedger, oldest, "start should be in datastore range") + //require.GreaterOrEqual(t, latest, uint32(42), "need latest >= 42 for this test") + + res, err := request(startLedger, 15, "") + require.NoError(t, err, "should successfully fetch across boundary") + // require.Len(t, res.Transactions, 15, "should return exactly 15 transactions") + + seqs := getSeqs(res) + require.Equal(t, startLedger, seqs[0], "should start at requested ledger") + + // Verify we got data from both sources + hasDatastore := false + hasLocal := false + for _, seq := range seqs { + if seq < oldest { + hasDatastore = true + } + if seq >= oldest { + hasLocal = true + } + } + require.True(t, hasDatastore, "should have transactions from datastore") + require.True(t, hasLocal, "should have transactions from local storage") + + // Verify transaction validity and ordering across boundary + for i, tx := range res.Transactions { + assertTransactionValid(t, tx, fmt.Sprintf("mixed tx[%d]", i)) + } + assertTransactionsOrdered(t, res.Transactions, "mixed datastore+local") + + // Verify cursor continuity across boundary + require.NotEmpty(t, res.Cursor, "should have cursor") + next, err := request(0, 2, res.Cursor) + require.NoError(t, err, "cursor should work for next page") + + if len(next.Transactions) > 0 { + // Next page should continue from where we left off + lastSeq := seqs[len(seqs)-1] + nextSeqs := getSeqs(next) + require.GreaterOrEqual(t, nextSeqs[0], lastSeq, + "next page should continue from last ledger %d, got %d", lastSeq, nextSeqs[0]) + } + }) + + // --- 4) Pagination across boundary --- + t.Run("pagination_across_boundary", func(t *testing.T) { + // Start well before boundary, paginate through it + startLedger := uint32(38) + require.Less(t, startLedger, oldest, "start in datastore range") + + var allSeqs []uint32 + cursor := "" + pageCount := 0 + + for pageCount < 5 { // limit iterations to prevent infinite loop + res, err := request(startLedger, 2, cursor) + require.NoError(t, err, "pagination request should succeed") + + if len(res.Transactions) == 0 { + break + } + + seqs := getSeqs(res) + allSeqs = append(allSeqs, seqs...) + assertTransactionsOrdered(t, res.Transactions, fmt.Sprintf("page %d", pageCount)) + + if res.Cursor == "" { + break + } + cursor = res.Cursor + startLedger = 0 // Use cursor for subsequent requests + pageCount++ + } + + require.NotEmpty(t, allSeqs, "should have retrieved some transactions") + + // Verify overall ordering across all pages + for i := 1; i < len(allSeqs); i++ { + require.LessOrEqual(t, allSeqs[i-1], allSeqs[i], + "sequences across pages should be ordered: page had %d after %d", + allSeqs[i], allSeqs[i-1]) + } + + // Verify we crossed the boundary + require.Contains(t, allSeqs, uint32(39), "should include ledger before boundary") + require.True(t, func() bool { + for _, seq := range allSeqs { + if seq >= oldest { + return true + } + } + return false + }(), "should include ledgers after boundary (>= %d)", oldest) + }) + + // --- 5) Edge case: request exactly at boundary --- + t.Run("exactly_at_boundary", func(t *testing.T) { + res, err := request(oldest, 3, "") + require.NoError(t, err, "requesting at oldest should succeed") + require.NotEmpty(t, res.Transactions, "should return transactions at oldest ledger") + + seqs := getSeqs(res) + require.GreaterOrEqual(t, seqs[0], oldest, "should start at or after oldest") + assertTransactionsOrdered(t, res.Transactions, "at-boundary") + }) + + // --- 6) Large limit spanning multiple sources --- + t.Run("large_limit_spanning_sources", func(t *testing.T) { + res, err := request(37, 20, "") + require.NoError(t, err, "large limit should succeed") + + if len(res.Transactions) > 0 { + seqs := getSeqs(res) + require.LessOrEqual(t, seqs[0], uint32(37), "should start at or after requested") + assertTransactionsOrdered(t, res.Transactions, "large-limit") + + // Should span both sources + minSeq := seqs[0] + maxSeq := seqs[len(seqs)-1] + if minSeq < oldest && maxSeq >= oldest { + t.Logf("Successfully retrieved data spanning boundary: [%d..%d] across boundary at %d", + minSeq, maxSeq, oldest) + } + } + }) + + // --- 7) Invalid cursor --- + t.Run("invalid_cursor", func(t *testing.T) { + _, err := request(oldest, 1, "invalid-cursor-12345") + // Should either return error or handle gracefully + if err == nil { + t.Log("Invalid cursor handled gracefully (no error)") + } else { + t.Logf("Invalid cursor rejected with error: %v", err) + } + }) + + // --- 8) Zero/invalid limits --- + t.Run("zero_limit", func(t *testing.T) { + res, err := request(oldest, 0, "") + if err == nil { + // If no error, should return empty or use default limit + t.Logf("Zero limit returned %d transactions", len(res.Transactions)) + } + }) + + // --- 9) Negative: below datastore floor --- + t.Run("negative_below_datastore_floor", func(t *testing.T) { + // Ledger 2 is before our datastore range starts (5-40) + res, err := request(2, 3, "") + if err != nil { + t.Logf("Request below datastore floor returned expected error: %v", err) + return + } + + // If no error, should return empty + require.Empty(t, res.Transactions, + "expected no ledgers when requesting below datastore floor (ledger 2 < 5)") + + // Cursor should indicate no more data + if res.Cursor != "" { + next, err := request(0, 1, res.Cursor) + if err == nil { + require.Empty(t, next.Transactions, "cursor from below-floor should not return data") + } + } + }) + + // --- 10) Negative: beyond latest --- + t.Run("negative_beyond_latest", func(t *testing.T) { + beyondLatest := latest + 100 + res, err := request(beyondLatest, 1, "") + if err != nil { + t.Logf("Request beyond latest returned expected error: %v", err) + return + } + + require.Empty(t, res.Transactions, + "expected no ledgers when requesting beyond latest (requested %d, latest %d)", + beyondLatest, latest) + }) + + // --- 11) Multiple full pages in datastore-only range --- + t.Run("multiple_pages_datastore_only", func(t *testing.T) { + startLedger := uint32(10) + require.Less(t, startLedger, oldest, "should be in datastore range") + + // Get first page + page1, err := request(startLedger, 2, "") + require.NoError(t, err) + if len(page1.Transactions) == 0 { + t.Skip("No transactions in datastore range") + } + + require.NotEmpty(t, page1.Cursor, "should have cursor for next page") + + // Get second page using cursor + page2, err := request(0, 2, page1.Cursor) + require.NoError(t, err) + + if len(page2.Transactions) > 0 { + seqs1 := getSeqs(page1) + seqs2 := getSeqs(page2) + + // Verify pages don't overlap + lastOfPage1 := seqs1[len(seqs1)-1] + firstOfPage2 := seqs2[0] + require.LessOrEqual(t, lastOfPage1, firstOfPage2, + "pages should not overlap: page1 ends at %d, page2 starts at %d", + lastOfPage1, firstOfPage2) + } + }) + + // --- 12) Request with start=0 (should use oldest?) --- + t.Run("start_zero", func(t *testing.T) { + res, err := request(0, 3, "") + if err != nil { + t.Logf("start=0 returned error: %v", err) + return + } + + if len(res.Transactions) > 0 { + seqs := getSeqs(res) + t.Logf("start=0 returned ledgers starting at %d (oldest=%d)", seqs[0], oldest) + } + }) +} +*/ + +// TestGetTransactionsDataStore tests fetching transactions from datastore and local storage. +// Setup creates an overlap to guarantee no gaps: datastore=[5,50], local=[~45,~60] +func TestGetTransactionsDataStore(t *testing.T) { + gcsCfg := infrastructure.DefaultGCSTestConfig() + gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg) + defer gcsSetup.Stop() + + // Add more ledgers to datastore to create overlap with local window + // Datastore will contain ledgers [5, 50] + datastoreStart := uint32(5) + datastoreEnd := uint32(50) + gcsSetup.AddLedgers(datastoreStart, datastoreEnd) + + test := infrastructure.NewTest(t, &infrastructure.TestConfig{ + DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), + NoParallel: true, + }) + client := test.GetRPCLient() + + // Helper to wait for health condition + waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { + var last protocol.GetHealthResponse + require.Eventually(t, func() bool { + resp, err := client.GetHealth(t.Context()) + require.NoError(t, err) + last = resp + return cond(resp) + }, timeout, 100*time.Millisecond, "last health: %+v", last) + return last + } + + // Wait for system to stabilize with local window overlapping datastore + // We want: local oldest to be within datastore range to guarantee overlap + // With 15-ledger window and datastoreEnd=50, local will be around [45-50, 60-65] + health := waitUntil(func(h protocol.GetHealthResponse) bool { + // Wait until oldest is close to datastore end (within 10 ledgers) + return h.OldestLedger >= datastoreEnd-10 && h.OldestLedger <= datastoreEnd+5 + }, 30*time.Second) + + oldest := health.OldestLedger + latest := health.LatestLedger + + require.GreaterOrEqual(t, latest, oldest, "latest >= oldest") + require.LessOrEqual(t, oldest, datastoreEnd, "should have overlap: oldest <= datastoreEnd") + + // Helper to make requests + request := func(start uint32, limit uint) (protocol.GetTransactionsResponse, error) { + req := protocol.GetTransactionsRequest{ + StartLedger: start, + Pagination: &protocol.LedgerPaginationOptions{Limit: limit}, + } + return client.GetTransactions(t.Context(), req) + } + + // Helper to get ledger sequences from response + getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 { + out := make([]uint32, len(resp.Transactions)) + for i, tx := range resp.Transactions { + out[i] = tx.TransactionDetails.Ledger + } + return out + } + + // Helper to validate transaction data + assertTransactionValid := func(t *testing.T, tx protocol.TransactionInfo) { + require.NotEmpty(t, tx.TransactionHash, "missing hash") + require.NotZero(t, tx.TransactionDetails.Ledger, "zero ledger") + require.NotEmpty(t, tx.TransactionDetails.EnvelopeXDR, "missing envelope") + require.NotEmpty(t, tx.TransactionDetails.ResultXDR, "missing result") + require.NotEmpty(t, tx.TransactionDetails.ResultMetaXDR, "missing meta") + require.Contains(t, []string{"SUCCESS", "FAILED"}, tx.Status, "invalid status") + } + + // ======================================================================== + // Test 1: Datastore Only - fetch from below local range + // ======================================================================== + t.Run("datastore_only", func(t *testing.T) { + // Request ledger well below local range (in datastore only) + startLedger := datastoreStart + 5 + require.Less(t, startLedger, oldest, "start should be below local range") + + res, err := request(startLedger, 5) + require.NoError(t, err, "should fetch from datastore") + require.NotEmpty(t, res.Transactions, "should return transactions") + + seqs := getSeqs(res) + t.Logf(" Fetched ledgers: %v", seqs) + + // Verify all returned ledgers are below local range + for _, seq := range seqs { + require.Less(t, seq, oldest, "ledger %d should be below local range %d", seq, oldest) + } + + // Validate transaction data + for _, tx := range res.Transactions { + assertTransactionValid(t, tx) + } + + // Verify response metadata reflects local range + require.Equal(t, oldest, res.OldestLedger, "OldestLedger should be local oldest") + require.Equal(t, latest, res.LatestLedger, "LatestLedger should be local latest") + }) + + // ======================================================================== + // Test 2: Local Only - fetch from current local range + // ======================================================================== + t.Run("local_only", func(t *testing.T) { + // Request ledger above datastore range (local only) + startLedger := datastoreEnd + 1 + if startLedger < oldest { + startLedger = oldest + } + require.GreaterOrEqual(t, startLedger, oldest, "start should be in local range") + + res, err := request(startLedger, 5) + require.NoError(t, err, "should fetch from local") + require.NotEmpty(t, res.Transactions, "should return transactions") + + seqs := getSeqs(res) + t.Logf(" Fetched ledgers: %v", seqs) + + // Verify all returned ledgers are in local range + for _, seq := range seqs { + require.GreaterOrEqual(t, seq, oldest, "ledger %d should be >= oldest %d", seq, oldest) + require.LessOrEqual(t, seq, latest, "ledger %d should be <= latest %d", seq, latest) + } + + // Validate transaction data + for _, tx := range res.Transactions { + assertTransactionValid(t, tx) + } + + require.Equal(t, oldest, res.OldestLedger, "OldestLedger should be local oldest") + require.Equal(t, latest, res.LatestLedger, "LatestLedger should be local latest") + }) + + // ======================================================================== + // Test 3: Mixed - fetch across datastore and local boundary + // ======================================================================== + t.Run("mixed_datastore_and_local", func(t *testing.T) { + // Start well before local range, request enough to cross into local + startLedger := oldest - 5 + if startLedger < datastoreStart { + startLedger = datastoreStart + } + require.Less(t, startLedger, oldest, "start should be in datastore range") + + // Request enough transactions to span both ranges + res, err := request(startLedger, 20) + require.NoError(t, err, "should fetch across boundary") + require.NotEmpty(t, res.Transactions, "should return transactions") + + seqs := getSeqs(res) + t.Logf(" Fetched ledgers: %v", seqs) + t.Logf(" Boundary at ledger: %d", oldest) + + // Verify we got transactions from both sources + hasDatastore := false + hasLocal := false + for _, seq := range seqs { + if seq < oldest { + hasDatastore = true + } + if seq >= oldest { + hasLocal = true + } + } + + require.True(t, hasDatastore, "should have transactions from datastore (<%d)", oldest) + require.True(t, hasLocal, "should have transactions from local (>=%d)", oldest) + + // Verify transactions are ordered + for i := 1; i < len(seqs); i++ { + require.LessOrEqual(t, seqs[i-1], seqs[i], + "ledgers should be ordered: %d before %d", seqs[i-1], seqs[i]) + } + + // Validate all transaction data + for _, tx := range res.Transactions { + assertTransactionValid(t, tx) + } + + require.Equal(t, oldest, res.OldestLedger, "OldestLedger should be local oldest") + require.Equal(t, latest, res.LatestLedger, "LatestLedger should be local latest") + }) + + // ======================================================================== + // Error Cases + // ======================================================================== + t.Run("below_datastore_floor", func(t *testing.T) { + belowFloor := datastoreStart - 3 + + res, err := request(belowFloor, 3) + + if err != nil { + t.Logf(" Below floor returned error (expected): %v", err) + } else { + require.Empty(t, res.Transactions, + "should return empty for ledger %d below datastore floor %d", + belowFloor, datastoreStart) + } + }) + + t.Run("beyond_latest", func(t *testing.T) { + beyondLatest := latest + 100 + + res, err := request(beyondLatest, 3) + + if err != nil { + t.Logf(" Beyond latest returned error (expected): %v", err) + } else { + require.Empty(t, res.Transactions, + "should return empty for ledger %d beyond latest %d", + beyondLatest, latest) + } + }) +} diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go new file mode 100644 index 00000000..81995c71 --- /dev/null +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go @@ -0,0 +1,229 @@ +package infrastructure + +import ( + "bytes" + "path" + "testing" + + "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + "github.com/stellar/go-stellar-sdk/network" + "github.com/stellar/go-stellar-sdk/support/compressxdr" + "github.com/stellar/go-stellar-sdk/support/datastore" + "github.com/stellar/go-stellar-sdk/xdr" + "github.com/stretchr/testify/require" + + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" +) + +// GCSTestSetup holds the resources for a fake GCS server used in tests. +type GCSTestSetup struct { + Server *fakestorage.Server + BucketName string + Prefix string + Schema datastore.DataStoreSchema +} + +// GCSTestConfig provides options for setting up the fake GCS server. +type GCSTestConfig struct { + BucketName string + Prefix string + FilesPerPartition uint32 + LedgersPerFile uint32 +} + +// DefaultGCSTestConfig returns a default GCS test configuration. +func DefaultGCSTestConfig() GCSTestConfig { + return GCSTestConfig{ + BucketName: "test-bucket", + Prefix: "ledgers", + FilesPerPartition: 1, + LedgersPerFile: 1, + } +} + +// NewGCSTestSetup creates a fake GCS server and bucket for testing. +// It sets the STORAGE_EMULATOR_HOST environment variable. +// The caller is responsible for calling Stop() on the returned GCSTestSetup.Server. +func NewGCSTestSetup(t *testing.T, cfg GCSTestConfig) *GCSTestSetup { + t.Helper() + + opts := fakestorage.Options{ + Scheme: "http", + Host: "127.0.0.1", + PublicHost: "127.0.0.1", + } + gcsServer, err := fakestorage.NewServerWithOptions(opts) + require.NoError(t, err) + + t.Setenv("STORAGE_EMULATOR_HOST", gcsServer.URL()) + gcsServer.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: cfg.BucketName}) + + schema := datastore.DataStoreSchema{ + FilesPerPartition: cfg.FilesPerPartition, + LedgersPerFile: cfg.LedgersPerFile, + } + + return &GCSTestSetup{ + Server: gcsServer, + BucketName: cfg.BucketName, + Prefix: cfg.Prefix, + Schema: schema, + } +} + +// Stop stops the fake GCS server. +func (g *GCSTestSetup) Stop() { + g.Server.Stop() +} + +// AddLedgers adds ledger objects to the fake GCS bucket for the given sequence range. +func (g *GCSTestSetup) AddLedgers(startSeq, endSeq uint32) { + for seq := startSeq; seq <= endSeq; seq++ { + g.Server.CreateObject(fakestorage.Object{ + ObjectAttrs: fakestorage.ObjectAttrs{ + BucketName: g.BucketName, + Name: path.Join(g.Prefix, g.Schema.GetObjectKeyFromSequenceNumber(seq)), + }, + Content: CreateLCMBatchBuffer(seq), + }) + } +} + +// DatastoreConfigFunc returns a function that configures the RPC config to use the fake GCS datastore. +func (g *GCSTestSetup) DatastoreConfigFunc() func(cfg *config.Config) { + return func(cfg *config.Config) { + cfg.ServeLedgersFromDatastore = true + cfg.BufferedStorageBackendConfig = ledgerbackend.BufferedStorageBackendConfig{ + BufferSize: 15, + NumWorkers: 2, + } + cfg.DataStoreConfig = datastore.DataStoreConfig{ + Type: "GCS", + Params: map[string]string{"destination_bucket_path": path.Join(g.BucketName, g.Prefix)}, + Schema: g.Schema, + } + // reduce retention windows to force usage of datastore + cfg.HistoryRetentionWindow = 15 + cfg.ClassicFeeStatsLedgerRetentionWindow = 15 + cfg.SorobanFeeStatsLedgerRetentionWindow = 15 + } +} + +// CreateLCMBatchBuffer creates a compressed XDR LedgerCloseMetaBatch for the given sequence. +func CreateLCMBatchBuffer(seq uint32) []byte { + lcm := xdr.LedgerCloseMetaBatch{ + StartSequence: xdr.Uint32(seq), + EndSequence: xdr.Uint32(seq), + LedgerCloseMetas: []xdr.LedgerCloseMeta{createTestLedger(seq)}, + } + + var buf bytes.Buffer + encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, lcm) + _, _ = encoder.WriteTo(&buf) + + return buf.Bytes() +} + +func createTestLedger(sequence uint32) xdr.LedgerCloseMeta { + // Build the transaction envelope (inlined txEnvelope) + envelope, err := xdr.NewTransactionEnvelope( + xdr.EnvelopeTypeEnvelopeTypeTx, + xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + Fee: 1, + SeqNum: xdr.SequenceNumber(sequence), + SourceAccount: xdr.MustMuxedAddress( + "MA7QYNF7SOWQ3GLR2BGMZEHXAVIRZA4KVWLTJJFC7MGXUA74P7UJVAAAAAAAAAAAAAJLK", + ), + Ext: xdr.TransactionExt{ + V: 1, + SorobanData: &xdr.SorobanTransactionData{}, + }, + }, + }, + ) + if err != nil { + panic(err) + } + + // compute tx hash + hash, err := network.HashTransactionInEnvelope(envelope, StandaloneNetworkPassphrase) + if err != nil { + panic(err) + } + + // build transaction result + makeResult := func(successful bool) xdr.TransactionResult { + code := xdr.TransactionResultCodeTxBadSeq + if successful { + code = xdr.TransactionResultCodeTxSuccess + } + opResults := []xdr.OperationResult{} + return xdr.TransactionResult{ + FeeCharged: 100, + Result: xdr.TransactionResultResult{ + Code: code, + Results: &opResults, + }, + } + } + + // build TxProcessing entry + makeTxMeta := func(res xdr.TransactionResult) xdr.TransactionResultMetaV1 { + return xdr.TransactionResultMetaV1{ + TxApplyProcessing: xdr.TransactionMeta{ + V: 3, + Operations: &[]xdr.OperationMeta{}, + V3: &xdr.TransactionMetaV3{}, + }, + Result: xdr.TransactionResultPair{ + TransactionHash: hash, + Result: res, + }, + } + } + + // Two transactions: one successful, one failed + txProcessing := []xdr.TransactionResultMetaV1{ + makeTxMeta(makeResult(true)), + makeTxMeta(makeResult(false)), + } + + return xdr.LedgerCloseMeta{ + V: 2, + V2: &xdr.LedgerCloseMetaV2{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: xdr.TimePoint(int64(sequence+100)*25 + 100), + }, + LedgerSeq: xdr.Uint32(sequence), + }, + }, + TxProcessing: txProcessing, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + PreviousLedgerHash: xdr.Hash{1}, + Phases: []xdr.TransactionPhase{ + { + V: 0, + V0Components: &[]xdr.TxSetComponent{ + { + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + BaseFee: nil, + Txs: []xdr.TransactionEnvelope{ + envelope, + }, + }, + }, + }, + }, + }, + }, + }, + }, + } +} diff --git a/cmd/stellar-rpc/internal/jsonrpc.go b/cmd/stellar-rpc/internal/jsonrpc.go index d5b05981..1aa7bd0b 100644 --- a/cmd/stellar-rpc/internal/jsonrpc.go +++ b/cmd/stellar-rpc/internal/jsonrpc.go @@ -238,7 +238,7 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { { methodName: protocol.GetTransactionsMethodName, underlyingHandler: methods.NewGetTransactionsHandler(params.Logger, params.LedgerReader, - cfg.MaxTransactionsLimit, cfg.DefaultTransactionsLimit, cfg.NetworkPassphrase), + cfg.MaxTransactionsLimit, cfg.DefaultTransactionsLimit, cfg.NetworkPassphrase, params.DataStoreLedgerReader), longName: toSnakeCase(protocol.GetTransactionsMethodName), queueLimit: cfg.RequestBacklogGetTransactionsQueueLimit, requestDurationLimit: cfg.MaxGetTransactionsExecutionDuration, diff --git a/cmd/stellar-rpc/internal/methods/get_transactions.go b/cmd/stellar-rpc/internal/methods/get_transactions.go index 9ab02ae3..62bed595 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions.go @@ -18,14 +18,16 @@ import ( "github.com/stellar/go-stellar-sdk/xdr" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/db" + "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/rpcdatastore" ) type transactionsRPCHandler struct { - ledgerReader db.LedgerReader - maxLimit uint - defaultLimit uint - logger *log.Entry - networkPassphrase string + ledgerReader db.LedgerReader + maxLimit uint + defaultLimit uint + logger *log.Entry + networkPassphrase string + datastoreLedgerReader rpcdatastore.LedgerReader } // initializePagination sets the pagination limit and cursor @@ -55,21 +57,36 @@ func (h transactionsRPCHandler) initializePagination(request protocol.GetTransac // fetchLedgerData calls the meta table to fetch the corresponding ledger data. func (h transactionsRPCHandler) fetchLedgerData(ctx context.Context, ledgerSeq uint32, - readTx db.LedgerReaderTx, + readTx db.LedgerReaderTx, localRange protocol.LedgerSeqRange, ) (xdr.LedgerCloseMeta, error) { - ledger, found, err := readTx.GetLedger(ctx, ledgerSeq) - if err != nil { - return ledger, &jrpc2.Error{ - Code: jrpc2.InternalError, - Message: err.Error(), + if protocol.IsLedgerWithinRange(ledgerSeq, localRange) { + ledger, found, err := readTx.GetLedger(ctx, ledgerSeq) + if err != nil { + return xdr.LedgerCloseMeta{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: err.Error(), + } + } else if !found { + return xdr.LedgerCloseMeta{}, &jrpc2.Error{ + Code: jrpc2.InvalidParams, + Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq), + } } - } else if !found { - return ledger, &jrpc2.Error{ - Code: jrpc2.InvalidParams, - Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq), + return ledger, nil + } else if h.datastoreLedgerReader != nil { + ledger, err := h.datastoreLedgerReader.GetLedgerCached(ctx, ledgerSeq) + if err != nil { + return xdr.LedgerCloseMeta{}, &jrpc2.Error{ + Code: jrpc2.InternalError, + Message: fmt.Sprintf("error fetching ledgers from datastore: %v", err), + } } + return ledger, nil + } + return xdr.LedgerCloseMeta{}, &jrpc2.Error{ + Code: jrpc2.InvalidParams, + Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq), } - return ledger, nil } // processTransactionsInLedger cycles through all the transactions in a ledger, extracts the transaction info @@ -211,8 +228,26 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont } } - err = request.IsValid(h.maxLimit, ledgerRange.ToLedgerSeqRange()) - if err != nil { + localRange := ledgerRange.ToLedgerSeqRange() + availableLedgerRange := localRange + + if h.datastoreLedgerReader != nil { + var dsRange protocol.LedgerSeqRange + + dsRange, err = h.datastoreLedgerReader.GetAvailableLedgerRange(ctx) + if err != nil { + // log error but continue using local ledger range + h.logger.WithError(err).Error("failed to get available ledger range from datastore") + } else if dsRange.FirstLedger != 0 { + // extend available range to include datastore history (older ledgers) + if dsRange.FirstLedger < availableLedgerRange.FirstLedger { + availableLedgerRange.FirstLedger = dsRange.FirstLedger + } + } + } + + // Validate request against combined available range. + if err := request.IsValid(h.maxLimit, availableLedgerRange); err != nil { return protocol.GetTransactionsResponse{}, &jrpc2.Error{ Code: jrpc2.InvalidRequest, Message: err.Error(), @@ -230,7 +265,7 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont var done bool cursor := toid.New(0, 0, 0) for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ { - ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq), readTx) + ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq), readTx, localRange) if err != nil { return protocol.GetTransactionsResponse{}, err } @@ -255,14 +290,15 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont } func NewGetTransactionsHandler(logger *log.Entry, ledgerReader db.LedgerReader, maxLimit, - defaultLimit uint, networkPassphrase string, + defaultLimit uint, networkPassphrase string, datastoreLedgerReader rpcdatastore.LedgerReader, ) jrpc2.Handler { transactionsHandler := transactionsRPCHandler{ - ledgerReader: ledgerReader, - maxLimit: maxLimit, - defaultLimit: defaultLimit, - logger: logger, - networkPassphrase: networkPassphrase, + ledgerReader: ledgerReader, + maxLimit: maxLimit, + defaultLimit: defaultLimit, + logger: logger, + networkPassphrase: networkPassphrase, + datastoreLedgerReader: datastoreLedgerReader, } return handler.New(transactionsHandler.getTransactionsByLedgerSequence) diff --git a/cmd/stellar-rpc/internal/methods/get_transactions_test.go b/cmd/stellar-rpc/internal/methods/get_transactions_test.go index fbca299c..b0ee22b8 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions_test.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions_test.go @@ -8,6 +8,7 @@ import ( "github.com/creachadair/jrpc2" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" @@ -311,7 +312,7 @@ func createEmptyTestLedger(sequence uint32) xdr.LedgerCloseMeta { func setupDB(t *testing.T, numLedgers int, skipLedger int) *db.DB { testDB := NewTestDB(t) daemon := interfaces.MakeNoOpDeamon() - for sequence := 1; sequence <= numLedgers; sequence++ { + for sequence := 2; sequence <= numLedgers; sequence++ { if sequence == skipLedger { continue } @@ -337,3 +338,42 @@ func setupDBNoTxs(t *testing.T, numLedgers int) *db.DB { } return testDB } + +func TestGetTransactions_UsesDatastoreForOlderHistory(t *testing.T) { + ctx := context.TODO() + + // DB has ledgers 3..5 (skip ledger 2). + testDB := setupDB(t, 5, 2) + + ds := &MockDatastoreReader{} + + dsRange := protocol.LedgerSeqRange{ + FirstLedger: 2, + LastLedger: 2, + } + ds.On("GetAvailableLedgerRange", mock.Anything).Return(dsRange, nil).Once() + + ledger1 := createTestLedger(2) + ds.On("GetLedgerCached", mock.Anything, uint32(2)).Return(ledger1, nil).Once() + handler := transactionsRPCHandler{ + ledgerReader: db.NewLedgerReader(testDB), + datastoreLedgerReader: ds, + maxLimit: 100, + defaultLimit: 6, + networkPassphrase: NetworkPassphrase, + } + + request := protocol.GetTransactionsRequest{ + StartLedger: 2, + } + + resp, err := handler.getTransactionsByLedgerSequence(ctx, request) + require.NoError(t, err) + + assert.Equal(t, uint32(3), resp.OldestLedger) + assert.Equal(t, uint32(5), resp.LatestLedger) + assert.Equal(t, 6, len(resp.Transactions)) + assert.Equal(t, uint32(2), resp.Transactions[0].Ledger) + + ds.AssertExpectations(t) +} diff --git a/cmd/stellar-rpc/internal/methods/mocks.go b/cmd/stellar-rpc/internal/methods/mocks.go index 39dd0e1d..e8f132b2 100644 --- a/cmd/stellar-rpc/internal/methods/mocks.go +++ b/cmd/stellar-rpc/internal/methods/mocks.go @@ -83,6 +83,11 @@ type MockDatastoreReader struct { mock.Mock } +func (m *MockDatastoreReader) GetLedgerCached(ctx context.Context, seq uint32) (xdr.LedgerCloseMeta, error) { + args := m.Called(ctx, seq) + return args.Get(0).(xdr.LedgerCloseMeta), args.Error(1) //nolint:forcetypeassert +} + func (m *MockDatastoreReader) GetAvailableLedgerRange(ctx context.Context) (protocol.LedgerSeqRange, error) { args := m.Called(ctx) return args.Get(0).(protocol.LedgerSeqRange), args.Error(1) //nolint:forcetypeassert diff --git a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go index 1916b45e..4f3e403f 100644 --- a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go +++ b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go @@ -3,7 +3,10 @@ package rpcdatastore import ( "context" "fmt" + "sync" + "time" + lru "github.com/hashicorp/golang-lru/v2" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" "github.com/stellar/go-stellar-sdk/support/datastore" @@ -30,11 +33,14 @@ func (f *bufferedBackendFactory) NewBufferedBackend( return ledgerbackend.NewBufferedStorageBackend(config, store, schema) } +const defaultLedgerCacheSize = 1000 + // LedgerReader provides access to historical ledger data // stored in a remote object store (e.g., S3 or GCS) via buffered storage backend. type LedgerReader interface { GetLedgers(ctx context.Context, start, end uint32) ([]xdr.LedgerCloseMeta, error) GetAvailableLedgerRange(ctx context.Context) (protocol.LedgerSeqRange, error) + GetLedgerCached(ctx context.Context, u uint32) (xdr.LedgerCloseMeta, error) } // ledgerReader is the default implementation of LedgerReader. @@ -43,6 +49,16 @@ type ledgerReader struct { dataStore datastore.DataStore schema datastore.DataStoreSchema ledgerBackendFactory LedgerBackendFactory + // ledgerCache caches ledgers to avoid repeated remote fetches + // when callers logically access one ledger at a time. + cacheMu sync.Mutex + ledgerCache *lru.Cache[uint32, xdr.LedgerCloseMeta] + + // range cache with TTL + rangeMu sync.RWMutex + cachedRange protocol.LedgerSeqRange + cachedRangeAt time.Time + rangeTTL time.Duration } // NewLedgerReader constructs a new LedgerReader using the provided @@ -51,11 +67,15 @@ func NewLedgerReader(storageBackendConfig ledgerbackend.BufferedStorageBackendCo dataStore datastore.DataStore, schema datastore.DataStoreSchema, ) LedgerReader { + + cache, _ := lru.New[uint32, xdr.LedgerCloseMeta](defaultLedgerCacheSize) return &ledgerReader{ storageBackendConfig: storageBackendConfig, dataStore: dataStore, schema: schema, ledgerBackendFactory: &bufferedBackendFactory{}, + ledgerCache: cache, + rangeTTL: time.Minute * 10, // refresh at most once per minute } } @@ -89,9 +109,143 @@ func (r *ledgerReader) GetLedgers(ctx context.Context, start, end uint32) ([]xdr } // GetAvailableLedgerRange returns the assumed available ledger range. -// TODO: Support fetching the actual range from the datastore. -func (r *ledgerReader) GetAvailableLedgerRange(_ context.Context) (protocol.LedgerSeqRange, error) { - return protocol.LedgerSeqRange{ - FirstLedger: 2, // Assume datastore holds all ledgers from genesis. - }, nil +func (r *ledgerReader) GetAvailableLedgerRange(ctx context.Context) (protocol.LedgerSeqRange, error) { + // Fast path: use cached value if still fresh. + r.rangeMu.RLock() + rng := r.cachedRange + age := time.Since(r.cachedRangeAt) + ttl := r.rangeTTL + r.rangeMu.RUnlock() + + if rng.FirstLedger != 0 && age < ttl { + return rng, nil + } + + // Slow path: refresh and update cache. + return r.refreshAvailableRange(ctx) +} + +// Cache miss: fetch a batch and populate cache. +const windowSize uint32 = 20 + +// GetLedgerCached returns a single ledger by sequence using an internal LRU cache. +// On cache miss it fetches a batch via GetLedgers and populates the cache. +func (r *ledgerReader) GetLedgerCached(ctx context.Context, seq uint32) (xdr.LedgerCloseMeta, error) { + // Fast path: check cache first. + if lcm, ok := r.getFromCache(seq); ok { + return lcm, nil + } + + // Look at current datastore range so we don't request beyond what's available. + dsRange, err := r.GetAvailableLedgerRange(ctx) + if err != nil { + return xdr.LedgerCloseMeta{}, fmt.Errorf("getting available ledger range: %w", err) + } + + if (dsRange.FirstLedger != 0 && seq < dsRange.FirstLedger) || + (dsRange.LastLedger != 0 && seq > dsRange.LastLedger) { + return xdr.LedgerCloseMeta{}, fmt.Errorf( + "ledger %d is outside datastore range [%d,%d]", + seq, dsRange.FirstLedger, dsRange.LastLedger, + ) + } + + start := seq + end := min(seq+windowSize-1, dsRange.LastLedger) + + // Clamp end to LastLedger if it's known. + if dsRange.LastLedger != 0 && end > dsRange.LastLedger { + end = dsRange.LastLedger + } + + // If clamping makes the range empty, seq is beyond what's available. + if end < start { + return xdr.LedgerCloseMeta{}, fmt.Errorf( + "ledger %d is beyond datastore last ledger %d", + seq, dsRange.LastLedger, + ) + } + + ledgers, err := r.GetLedgers(ctx, start, end) + if err != nil { + return xdr.LedgerCloseMeta{}, fmt.Errorf("fetching ledgers [%d,%d]: %w", start, end, err) + } + if len(ledgers) == 0 { + return xdr.LedgerCloseMeta{}, fmt.Errorf("ledger %d not found in datastore", seq) + } + + r.storeBatchInCache(ledgers) + return ledgers[0], nil +} + +func (r *ledgerReader) refreshAvailableRange(ctx context.Context) (protocol.LedgerSeqRange, error) { + // Fast path: serve valid cached range + r.rangeMu.Lock() + cached := r.cachedRange + cachedAt := r.cachedRangeAt + ttl := r.rangeTTL + r.rangeMu.Unlock() + + if cached.FirstLedger > 0 && + cached.LastLedger > 0 && + cached.FirstLedger <= cached.LastLedger && + time.Since(cachedAt) < ttl { + return cached, nil + } + + // Slow path: query datastore + oldest, err := datastore.FindOldestLedgerSequence(ctx, r.dataStore, r.schema) + if err != nil { + return protocol.LedgerSeqRange{}, fmt.Errorf("find oldest ledger: %w", err) + } + + latest, err := datastore.FindLatestLedgerSequence(ctx, r.dataStore) + if err != nil { + return protocol.LedgerSeqRange{}, fmt.Errorf("find latest ledger: %w", err) + } + + rng := protocol.LedgerSeqRange{ + FirstLedger: oldest, + LastLedger: latest, + } + + // Validate result + if rng.FirstLedger == 0 || rng.LastLedger == 0 || rng.FirstLedger > rng.LastLedger { + return protocol.LedgerSeqRange{}, fmt.Errorf("invalid ledger range: %+v", rng) + } + + // Update cache + r.rangeMu.Lock() + r.cachedRange = rng + r.cachedRangeAt = time.Now() + r.rangeMu.Unlock() + + return rng, nil +} + +// getFromCache returns a cached ledger if present. +func (r *ledgerReader) getFromCache(seq uint32) (xdr.LedgerCloseMeta, bool) { + if r.ledgerCache == nil { + return xdr.LedgerCloseMeta{}, false + } + + r.cacheMu.Lock() + defer r.cacheMu.Unlock() + + lcm, ok := r.ledgerCache.Get(seq) + return lcm, ok +} + +// storeBatchInCache adds a contiguous batch of ledgers to the cache. +func (r *ledgerReader) storeBatchInCache(ledgers []xdr.LedgerCloseMeta) { + if r.ledgerCache == nil || len(ledgers) == 0 { + return + } + + r.cacheMu.Lock() + defer r.cacheMu.Unlock() + + for _, lcm := range ledgers { + r.ledgerCache.Add(lcm.LedgerSequence(), lcm) + } } diff --git a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go index 3cf965de..7fd272f1 100644 --- a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go +++ b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go @@ -2,8 +2,12 @@ package rpcdatastore import ( "context" + "fmt" "testing" + "time" + lru "github.com/hashicorp/golang-lru/v2" + protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" @@ -12,9 +16,18 @@ import ( "github.com/stellar/go-stellar-sdk/xdr" ) -type mockLedgerBackend struct { - mock.Mock -} +var ( + testSchema = datastore.DataStoreSchema{ + LedgersPerFile: 1, + FilesPerPartition: 1, + } + testBSBConfig = ledgerbackend.BufferedStorageBackendConfig{ + BufferSize: 10, + NumWorkers: 1, + } +) + +type mockLedgerBackend struct{ mock.Mock } var _ ledgerbackend.LedgerBackend = (*mockLedgerBackend)(nil) @@ -43,63 +56,308 @@ func (m *mockLedgerBackend) Close() error { return args.Error(0) } -func createLedgerCloseMeta(ledgerSeq uint32) xdr.LedgerCloseMeta { +type mockBackendFactory struct{ mock.Mock } + +func (m *mockBackendFactory) NewBufferedBackend( + cfg ledgerbackend.BufferedStorageBackendConfig, + ds datastore.DataStore, + schema datastore.DataStoreSchema, +) (ledgerbackend.LedgerBackend, error) { + args := m.Called(cfg, ds, schema) + return args.Get(0).(ledgerbackend.LedgerBackend), args.Error(1) //nolint:forcetypeassert +} + +func lcm(seq uint32) xdr.LedgerCloseMeta { return xdr.LedgerCloseMeta{ V0: &xdr.LedgerCloseMetaV0{ LedgerHeader: xdr.LedgerHeaderHistoryEntry{ Header: xdr.LedgerHeader{ - LedgerSeq: xdr.Uint32(ledgerSeq), + LedgerSeq: xdr.Uint32(seq), }, }, }, - V1: nil, } } -type mockBackendFactory struct { - mock.Mock +func newReader(t *testing.T, ds datastore.DataStore, factory *mockBackendFactory, cacheSize int) *ledgerReader { + t.Helper() + + var cache *lru.Cache[uint32, xdr.LedgerCloseMeta] + if cacheSize > 0 { + var err error + cache, err = lru.New[uint32, xdr.LedgerCloseMeta](cacheSize) + require.NoError(t, err) + } + + return &ledgerReader{ + storageBackendConfig: testBSBConfig, + dataStore: ds, + ledgerBackendFactory: factory, + ledgerCache: cache, + schema: testSchema, + } +} + +func seedRange(r *ledgerReader, first, last uint32) { + r.cachedRange = protocol.LedgerSeqRange{FirstLedger: first, LastLedger: last} + r.cachedRangeAt = time.Now() + r.rangeTTL = 24 * time.Hour } -func (m *mockBackendFactory) NewBufferedBackend(cfg ledgerbackend.BufferedStorageBackendConfig, - ds datastore.DataStore, - schema datastore.DataStoreSchema, -) (ledgerbackend.LedgerBackend, error) { - args := m.Called(cfg, ds, schema) - return args.Get(0).(ledgerbackend.LedgerBackend), args.Error(1) //nolint:forcetypeassert +func expectBatchFetch( + t *testing.T, + ctx context.Context, + backend *mockLedgerBackend, + start, end uint32, +) []xdr.LedgerCloseMeta { + t.Helper() + + backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)). + Return(nil).Once() + + expected := make([]xdr.LedgerCloseMeta, 0, end-start+1) + for seq := start; seq <= end; seq++ { + meta := lcm(seq) + backend.On("GetLedger", ctx, seq).Return(meta, nil).Once() + expected = append(expected, meta) + } + return expected } func TestLedgerReaderGetLedgers(t *testing.T) { ctx := t.Context() - mockBackend := new(mockLedgerBackend) - mockDatastore := new(datastore.MockDataStore) - mockFactory := mockBackendFactory{} + ds := new(datastore.MockDataStore) + backend := new(mockLedgerBackend) + factory := new(mockBackendFactory) + start := uint32(100) end := uint32(102) - var expected []xdr.LedgerCloseMeta - for seq := start; seq <= end; seq++ { - meta := createLedgerCloseMeta(seq) - mockBackend.On("GetLedger", ctx, seq).Return(meta, nil) - expected = append(expected, meta) + expected := expectBatchFetch(t, ctx, backend, start, end) + backend.On("Close").Return(nil).Once() + factory.On("NewBufferedBackend", testBSBConfig, ds, datastore.DataStoreSchema{}).Return(backend, nil).Once() + + reader := &ledgerReader{ + storageBackendConfig: testBSBConfig, + dataStore: ds, + ledgerBackendFactory: factory, } - bsbConfig := ledgerbackend.BufferedStorageBackendConfig{ - BufferSize: 10, - NumWorkers: 1, + + got, err := reader.GetLedgers(ctx, start, end) + require.NoError(t, err) + require.Equal(t, expected, got) + + backend.AssertExpectations(t) + factory.AssertExpectations(t) +} + +func TestLedgerReaderGetLedgerCached_CacheHitNoBackendCalls(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + factory := new(mockBackendFactory) + + cache, err := lru.New[uint32, xdr.LedgerCloseMeta](100) + require.NoError(t, err) + + const seq = uint32(123) + expected := lcm(seq) + cache.Add(seq, expected) + + reader := &ledgerReader{ + storageBackendConfig: testBSBConfig, + dataStore: ds, + ledgerBackendFactory: factory, + ledgerCache: cache, } - mockBackend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(nil) - mockBackend.On("Close").Return(nil) - mockFactory.On("NewBufferedBackend", bsbConfig, mockDatastore, datastore.DataStoreSchema{}).Return(mockBackend, nil) + + got, err := reader.GetLedgerCached(ctx, seq) + require.NoError(t, err) + require.Equal(t, expected, got) + + factory.AssertNotCalled(t, "NewBufferedBackend", mock.Anything, mock.Anything, mock.Anything) +} + +func TestLedgerReaderGetLedgers_FactoryError(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + factory := new(mockBackendFactory) + + expectedErr := fmt.Errorf("factory failed") + factory.On("NewBufferedBackend", testBSBConfig, ds, datastore.DataStoreSchema{}). + Return((*mockLedgerBackend)(nil), expectedErr).Once() reader := &ledgerReader{ - storageBackendConfig: bsbConfig, - dataStore: mockDatastore, - ledgerBackendFactory: &mockFactory, + storageBackendConfig: testBSBConfig, + dataStore: ds, + ledgerBackendFactory: factory, } - ledgers, err := reader.GetLedgers(ctx, start, end) + _, err := reader.GetLedgers(ctx, 100, 102) + require.Error(t, err) + require.ErrorIs(t, err, expectedErr) + + factory.AssertExpectations(t) +} + +func TestLedgerReaderGetLedgers_PrepareRangeError_ClosesBackend(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + backend := new(mockLedgerBackend) + factory := new(mockBackendFactory) + + start := uint32(100) + end := uint32(102) + + expectedErr := fmt.Errorf("prepare failed") + backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(expectedErr).Once() + backend.On("Close").Return(nil).Once() + + factory.On("NewBufferedBackend", testBSBConfig, ds, datastore.DataStoreSchema{}). + Return(backend, nil).Once() + + reader := &ledgerReader{ + storageBackendConfig: testBSBConfig, + dataStore: ds, + ledgerBackendFactory: factory, + } + + _, err := reader.GetLedgers(ctx, start, end) + require.Error(t, err) + require.ErrorIs(t, err, expectedErr) + + backend.AssertExpectations(t) + factory.AssertExpectations(t) +} + +func TestLedgerReaderGetLedgers_GetLedgerError_ClosesBackend(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + backend := new(mockLedgerBackend) + factory := new(mockBackendFactory) + + start := uint32(100) + end := uint32(102) + + expectedErr := fmt.Errorf("get ledger failed") + backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(nil).Once() + backend.On("GetLedger", ctx, uint32(100)).Return(lcm(100), nil).Once() + backend.On("GetLedger", ctx, uint32(101)).Return(xdr.LedgerCloseMeta{}, expectedErr).Once() + backend.On("Close").Return(nil).Once() + + factory.On("NewBufferedBackend", testBSBConfig, ds, datastore.DataStoreSchema{}). + Return(backend, nil).Once() + + reader := &ledgerReader{ + storageBackendConfig: testBSBConfig, + dataStore: ds, + ledgerBackendFactory: factory, + } + + _, err := reader.GetLedgers(ctx, start, end) + require.Error(t, err) + require.ErrorIs(t, err, expectedErr) + + backend.AssertExpectations(t) + factory.AssertExpectations(t) +} + +func TestLedgerReaderGetLedgerCached_BatchAndCache(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + backend := new(mockLedgerBackend) + factory := new(mockBackendFactory) + + start := uint32(100) + end := uint32(102) + + // Batch fetch expectation + expectedBatch := expectBatchFetch(t, ctx, backend, start, end) + backend.On("Close").Return(nil).Once() + factory.On("NewBufferedBackend", testBSBConfig, ds, testSchema).Return(backend, nil).Once() + + reader := newReader(t, ds, factory, 100) + + // Seed range so GetAvailableLedgerRange doesn't hit datastore internals. + seedRange(reader, start, end) + + // First call triggers batch fetch. + got, err := reader.GetLedgerCached(ctx, start) + require.NoError(t, err) + require.Equal(t, expectedBatch[0], got) + + // Subsequent calls should hit cache (backend GetLedger already constrained by .Once()). + for seq := start; seq <= end; seq++ { + got, err := reader.GetLedgerCached(ctx, seq) + require.NoError(t, err) + require.Equal(t, expectedBatch[seq-start], got) + } + + backend.AssertExpectations(t) + factory.AssertExpectations(t) +} + +func TestLedgerReaderGetLedgerCached_GetLedgerError(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + backend := new(mockLedgerBackend) + factory := new(mockBackendFactory) + + start := uint32(100) + end := uint32(119) // window + + expectedErr := fmt.Errorf("ledger fetch failed") + + backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(nil).Once() + backend.On("GetLedger", ctx, start).Return(xdr.LedgerCloseMeta{}, expectedErr).Once() + backend.On("Close").Return(nil).Once() + + factory.On("NewBufferedBackend", testBSBConfig, ds, testSchema).Return(backend, nil).Once() + + reader := newReader(t, ds, factory, 0) + seedRange(reader, start, end) + + _, err := reader.GetLedgerCached(ctx, start) + require.Error(t, err) + require.ErrorIs(t, err, expectedErr) + + backend.AssertExpectations(t) + factory.AssertExpectations(t) +} + +func TestLedgerReaderGetLedgerCached_NewWindowTriggersSecondBatch(t *testing.T) { + ctx := t.Context() + + ds := new(datastore.MockDataStore) + backend := new(mockLedgerBackend) + factory := new(mockBackendFactory) + + start := uint32(100) + nextWindowStart := start + windowSize + + // Expect full-window fetch for each window. + expectBatchFetch(t, ctx, backend, start, start+windowSize-1) + expectBatchFetch(t, ctx, backend, nextWindowStart, nextWindowStart+windowSize-1) + + // One backend per batch (adjust if your implementation reuses). + backend.On("Close").Return(nil).Twice() + factory.On("NewBufferedBackend", testBSBConfig, ds, testSchema).Return(backend, nil).Twice() + + reader := newReader(t, ds, factory, 100) + seedRange(reader, start, nextWindowStart+windowSize-1) + + _, err := reader.GetLedgerCached(ctx, start) + require.NoError(t, err) + + _, err = reader.GetLedgerCached(ctx, nextWindowStart) require.NoError(t, err) - require.Equal(t, expected, ledgers) - mockBackend.AssertExpectations(t) + backend.AssertExpectations(t) + factory.AssertExpectations(t) } diff --git a/go.mod b/go.mod index 1e45d072..70adcda1 100644 --- a/go.mod +++ b/go.mod @@ -10,6 +10,7 @@ require ( github.com/creachadair/jrpc2 v1.3.3 github.com/fsouza/fake-gcs-server v1.49.2 github.com/go-chi/chi v4.1.2+incompatible + github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/mattn/go-sqlite3 v1.14.17 github.com/montanaflynn/stats v0.7.1 github.com/pelletier/go-toml v1.9.5 @@ -129,14 +130,14 @@ require ( go.opentelemetry.io/otel/metric v1.38.0 // indirect go.opentelemetry.io/otel/trace v1.38.0 // indirect go.uber.org/multierr v1.11.0 // indirect - golang.org/x/crypto v0.43.0 // indirect + golang.org/x/crypto v0.45.0 // indirect golang.org/x/exp v0.0.0-20251023183803-a4bb9ffd2546 // indirect golang.org/x/mod v0.29.0 - golang.org/x/net v0.46.0 // indirect + golang.org/x/net v0.47.0 // indirect golang.org/x/oauth2 v0.32.0 // indirect - golang.org/x/sync v0.17.0 // indirect - golang.org/x/sys v0.37.0 // indirect - golang.org/x/text v0.30.0 // indirect + golang.org/x/sync v0.18.0 // indirect + golang.org/x/sys v0.38.0 // indirect + golang.org/x/text v0.31.0 // indirect golang.org/x/time v0.14.0 // indirect google.golang.org/api v0.254.0 // indirect google.golang.org/genproto v0.0.0-20251029180050-ab9386a59fda // indirect diff --git a/go.sum b/go.sum index 4162620f..b0f2b498 100644 --- a/go.sum +++ b/go.sum @@ -297,6 +297,8 @@ github.com/hashicorp/golang-lru v0.5.0/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ github.com/hashicorp/golang-lru v0.5.1/go.mod h1:/m3WP610KZHVQ1SGc6re/UDhFvYD7pJ4Ao+sR/qLZy8= github.com/hashicorp/golang-lru v1.0.2 h1:dV3g9Z/unq5DpblPpw+Oqcv4dU/1omnb4Ok8iPY6p1c= github.com/hashicorp/golang-lru v1.0.2/go.mod h1:iADmTwqILo4mZ8BN3D2Q6+9jd8WM5uGBxy+E8yxSoD4= +github.com/hashicorp/golang-lru/v2 v2.0.7 h1:a+bsQ5rvGLjzHuww6tVxozPZFVghXaHOwFs4luLUK2k= +github.com/hashicorp/golang-lru/v2 v2.0.7/go.mod h1:QeFd9opnmA6QUJc5vARoKUSoFhyfM2/ZepoAG6RGpeM= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= @@ -515,8 +517,8 @@ golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8U golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20220722155217-630584e8d5aa/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= -golang.org/x/crypto v0.43.0 h1:dduJYIi3A3KOfdGOHX8AVZ/jGiyPa3IbBozJ5kNuE04= -golang.org/x/crypto v0.43.0/go.mod h1:BFbav4mRNlXJL4wNeejLpWxB7wMbc79PdRGhWKncxR0= +golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q= +golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4= golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8= @@ -587,8 +589,8 @@ golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= -golang.org/x/net v0.46.0 h1:giFlY12I07fugqwPuWJi68oOnpfqFnJIJzaIIm2JVV4= -golang.org/x/net v0.46.0/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY= +golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw= @@ -610,8 +612,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.17.0 h1:l60nONMj9l5drqw6jlhIELNv9I0A4OFgRsG9k2oT9Ug= -golang.org/x/sync v0.17.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= +golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I= +golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -650,11 +652,11 @@ golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220408201424-a24fb2fb8a0f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.37.0 h1:fdNQudmxPjkdUTPnLn5mdQv7Zwvbvpaxqs831goi9kQ= -golang.org/x/sys v0.37.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= +golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc= +golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= -golang.org/x/term v0.36.0 h1:zMPR+aF8gfksFprF/Nc/rd1wRS1EI6nDBGyWAvDzx2Q= -golang.org/x/term v0.36.0/go.mod h1:Qu394IJq6V6dCBRgwqshf3mPF85AqzYEzofzRdZkWss= +golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU= +golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.1-0.20180807135948-17ff2d5776d2/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -663,8 +665,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.4/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= -golang.org/x/text v0.30.0 h1:yznKA/E9zq54KzlzBEAWn1NXSQ8DIp/NYMy88xJjl4k= -golang.org/x/text v0.30.0/go.mod h1:yDdHFIX9t+tORqspjENWgzaCVXgk0yYnYuSZ8UzzBVM= +golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM= +golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= From af7c64c82e12fa1c2908caf7c0b9b753e175d4da Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 8 Jan 2026 11:00:38 -0800 Subject: [PATCH 2/4] fix UT --- .../internal/methods/get_transactions_test.go | 46 +++++++++---------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/cmd/stellar-rpc/internal/methods/get_transactions_test.go b/cmd/stellar-rpc/internal/methods/get_transactions_test.go index b0ee22b8..ccbc2d32 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions_test.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions_test.go @@ -27,11 +27,11 @@ const ( var expectedTransactionInfo = protocol.TransactionInfo{ TransactionDetails: protocol.TransactionDetails{ Status: "SUCCESS", - TransactionHash: "04ce64806f4c2566e67bbc4472c6469c6f06c44524bf20cf3611885e98b29d50", + TransactionHash: "d68ad0eb1626ccd8c6c9f9231d170a1409289c86e291547beb5e4df3f91692a4", ApplicationOrder: 1, FeeBump: false, - Ledger: 1, - EnvelopeXDR: "AAAAAgAAAQCAAAAAAAAAAD8MNL+TrQ2ZcdBMzJD3BVEcg4qtlzSkovsNegP8f+iaAAAAAQAAAAD///+dAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", //nolint:lll + Ledger: 2, + EnvelopeXDR: "AAAAAgAAAQCAAAAAAAAAAD8MNL+TrQ2ZcdBMzJD3BVEcg4qtlzSkovsNegP8f+iaAAAAAQAAAAD///+eAAAAAAAAAAAAAAAAAAAAAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==", //nolint:lll ResultMetaXDR: "AAAAAwAAAAAAAAAAAAAAAAAAAAAAAAAA", ResultXDR: "AAAAAAAAAGQAAAAAAAAAAAAAAAA=", DiagnosticEventsXDR: []string{}, @@ -40,7 +40,7 @@ var expectedTransactionInfo = protocol.TransactionInfo{ TransactionEventsXDR: []string{}, }, }, - LedgerCloseTime: 125, + LedgerCloseTime: 150, } func TestGetTransactions_DefaultLimit(t *testing.T) { //nolint:dupl @@ -53,7 +53,7 @@ func TestGetTransactions_DefaultLimit(t *testing.T) { //nolint:dupl } request := protocol.GetTransactionsRequest{ - StartLedger: 1, + StartLedger: 2, } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) @@ -64,7 +64,7 @@ func TestGetTransactions_DefaultLimit(t *testing.T) { //nolint:dupl assert.Equal(t, int64(350), response.LatestLedgerCloseTime) // assert pagination - assert.Equal(t, toid.New(5, 2, 1).String(), response.Cursor) + assert.Equal(t, toid.New(6, 2, 1).String(), response.Cursor) // assert transactions result assert.Len(t, response.Transactions, 10) @@ -83,7 +83,7 @@ func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { //nolin } request := protocol.GetTransactionsRequest{ - StartLedger: 1, + StartLedger: 2, } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) @@ -91,7 +91,7 @@ func TestGetTransactions_DefaultLimitExceedsLatestLedger(t *testing.T) { //nolin assert.Equal(t, uint32(3), response.LatestLedger) assert.Equal(t, int64(175), response.LatestLedgerCloseTime) assert.Equal(t, toid.New(3, 2, 1).String(), response.Cursor) - assert.Len(t, response.Transactions, 6) + assert.Len(t, response.Transactions, 4) assert.Equal(t, expectedTransactionInfo, response.Transactions[0]) } @@ -105,7 +105,7 @@ func TestGetTransactions_CustomLimit(t *testing.T) { } request := protocol.GetTransactionsRequest{ - StartLedger: 1, + StartLedger: 2, Pagination: &protocol.LedgerPaginationOptions{ Limit: 2, }, @@ -115,10 +115,10 @@ func TestGetTransactions_CustomLimit(t *testing.T) { require.NoError(t, err) assert.Equal(t, uint32(10), response.LatestLedger) assert.Equal(t, int64(350), response.LatestLedgerCloseTime) - assert.Equal(t, toid.New(1, 2, 1).String(), response.Cursor) + assert.Equal(t, toid.New(2, 2, 1).String(), response.Cursor) assert.Len(t, response.Transactions, 2) - assert.Equal(t, uint32(1), response.Transactions[0].Ledger) - assert.Equal(t, uint32(1), response.Transactions[1].Ledger) + assert.Equal(t, uint32(2), response.Transactions[0].Ledger) + assert.Equal(t, uint32(2), response.Transactions[1].Ledger) assert.Equal(t, expectedTransactionInfo, response.Transactions[0]) } @@ -133,7 +133,7 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { request := protocol.GetTransactionsRequest{ Pagination: &protocol.LedgerPaginationOptions{ - Cursor: toid.New(1, 2, 1).String(), + Cursor: toid.New(2, 2, 1).String(), Limit: 3, }, } @@ -142,11 +142,11 @@ func TestGetTransactions_CustomLimitAndCursor(t *testing.T) { require.NoError(t, err) assert.Equal(t, uint32(10), response.LatestLedger) assert.Equal(t, int64(350), response.LatestLedgerCloseTime) - assert.Equal(t, toid.New(3, 1, 1).String(), response.Cursor) + assert.Equal(t, toid.New(4, 1, 1).String(), response.Cursor) assert.Len(t, response.Transactions, 3) - assert.Equal(t, uint32(2), response.Transactions[0].Ledger) - assert.Equal(t, uint32(2), response.Transactions[1].Ledger) - assert.Equal(t, uint32(3), response.Transactions[2].Ledger) + assert.Equal(t, uint32(3), response.Transactions[0].Ledger) + assert.Equal(t, uint32(3), response.Transactions[1].Ledger) + assert.Equal(t, uint32(4), response.Transactions[2].Ledger) } func TestGetTransactions_InvalidStartLedger(t *testing.T) { @@ -165,7 +165,7 @@ func TestGetTransactions_InvalidStartLedger(t *testing.T) { response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) expectedErr := fmt.Errorf( - "[%d] start ledger (4) must be between the oldest ledger: 1 and the latest ledger: 3 for this rpc instance", + "[%d] start ledger (4) must be between the oldest ledger: 2 and the latest ledger: 3 for this rpc instance", jrpc2.InvalidRequest, ) assert.Equal(t, expectedErr.Error(), err.Error()) @@ -173,7 +173,7 @@ func TestGetTransactions_InvalidStartLedger(t *testing.T) { } func TestGetTransactions_LedgerNotFound(t *testing.T) { - testDB := setupDB(t, 3, 2) + testDB := setupDB(t, 4, 3) handler := transactionsRPCHandler{ ledgerReader: db.NewLedgerReader(testDB), maxLimit: 100, @@ -182,11 +182,11 @@ func TestGetTransactions_LedgerNotFound(t *testing.T) { } request := protocol.GetTransactionsRequest{ - StartLedger: 1, + StartLedger: 3, } response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) - expectedErr := fmt.Errorf("[%d] database does not contain metadata for ledger: 2", jrpc2.InvalidParams) + expectedErr := fmt.Errorf("[%d] database does not contain metadata for ledger: 3", jrpc2.InvalidParams) assert.Equal(t, expectedErr.Error(), err.Error()) assert.Nil(t, response.Transactions) } @@ -201,7 +201,7 @@ func TestGetTransactions_LimitGreaterThanMaxLimit(t *testing.T) { } request := protocol.GetTransactionsRequest{ - StartLedger: 1, + StartLedger: 2, Pagination: &protocol.LedgerPaginationOptions{ Limit: 200, }, @@ -243,7 +243,7 @@ func TestGetTransactions_JSONFormat(t *testing.T) { request := protocol.GetTransactionsRequest{ Format: protocol.FormatJSON, - StartLedger: 1, + StartLedger: 2, } js, err := handler.getTransactionsByLedgerSequence(context.TODO(), request) From 3c8751514199cdcb3f859f1e675434680cc42672 Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 8 Jan 2026 13:22:32 -0800 Subject: [PATCH 3/4] fix linter --- cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go | 9 ++++++--- .../internal/rpcdatastore/ledger_reader_test.go | 2 +- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go index 4f3e403f..22c9487d 100644 --- a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go +++ b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader.go @@ -7,6 +7,7 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" "github.com/stellar/go-stellar-sdk/support/datastore" @@ -63,11 +64,11 @@ type ledgerReader struct { // NewLedgerReader constructs a new LedgerReader using the provided // buffered storage backend configuration and datastore configuration. -func NewLedgerReader(storageBackendConfig ledgerbackend.BufferedStorageBackendConfig, +func NewLedgerReader( + storageBackendConfig ledgerbackend.BufferedStorageBackendConfig, dataStore datastore.DataStore, schema datastore.DataStoreSchema, ) LedgerReader { - cache, _ := lru.New[uint32, xdr.LedgerCloseMeta](defaultLedgerCacheSize) return &ledgerReader{ storageBackendConfig: storageBackendConfig, @@ -87,7 +88,9 @@ func (r *ledgerReader) GetLedgers(ctx context.Context, start, end uint32) ([]xdr if err != nil { return nil, fmt.Errorf("failed to create buffered storage backend: %w", err) } - defer bufferedBackend.Close() + defer func(bufferedBackend ledgerbackend.LedgerBackend) { + _ = bufferedBackend.Close() + }(bufferedBackend) // Prepare the requested ledger range in the backend ledgerRange := ledgerbackend.BoundedRange(start, end) diff --git a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go index 7fd272f1..5ceb8234 100644 --- a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go +++ b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go @@ -7,11 +7,11 @@ import ( "time" lru "github.com/hashicorp/golang-lru/v2" - protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" + protocol "github.com/stellar/go-stellar-sdk/protocols/rpc" "github.com/stellar/go-stellar-sdk/support/datastore" "github.com/stellar/go-stellar-sdk/xdr" ) From 65c2d15186fe0a047615a213209b80f8f8c07811 Mon Sep 17 00:00:00 2001 From: Urvi Date: Thu, 8 Jan 2026 13:51:19 -0800 Subject: [PATCH 4/4] moar linter fixes --- .../integrationtest/get_transactions_test.go | 496 +----------------- .../integrationtest/infrastructure/gcs.go | 3 +- .../internal/methods/get_transactions.go | 2 +- .../internal/methods/get_transactions_test.go | 4 +- .../rpcdatastore/ledger_reader_test.go | 20 +- 5 files changed, 24 insertions(+), 501 deletions(-) diff --git a/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go b/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go index adf1a86e..ee7fa79f 100644 --- a/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go +++ b/cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go @@ -114,485 +114,7 @@ func TestGetTransactionsEvents(t *testing.T) { assert.NotEmpty(t, response.DiagnosticEventsXDR) } -func TestGetTransactionsDataStore1(t *testing.T) { - gcsCfg := infrastructure.DefaultGCSTestConfig() - gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg) - defer gcsSetup.Stop() - - // add files to GCS - gcsSetup.AddLedgers(5, 40) - - test := infrastructure.NewTest(t, &infrastructure.TestConfig{ - DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), - NoParallel: true, - }) - client := test.GetRPCLient() // at this point we're at like ledger 30 - - waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { - var last protocol.GetHealthResponse - require.Eventually(t, func() bool { - resp, err := client.GetHealth(t.Context()) - require.NoError(t, err) - last = resp - return cond(resp) - }, timeout, 100*time.Millisecond, "last health: %+v", last) - return last - } - ledgers := sendTransactions(t, client) - - request := func(start uint32, limit uint, cursor string) (protocol.GetTransactionsResponse, error) { - req := protocol.GetTransactionsRequest{ - StartLedger: start, - Pagination: &protocol.LedgerPaginationOptions{ - Limit: limit, - Cursor: cursor, - }, - } - return client.GetTransactions(t.Context(), req) - } - - // ensure oldest > 40 so datastore set ([35..40]) is below local window - health := waitUntil(func(h protocol.GetHealthResponse) bool { - return uint(h.OldestLedger) > 40 - }, 30*time.Second) - - oldest := health.OldestLedger - latest := health.LatestLedger - require.GreaterOrEqual(t, latest, oldest) - - getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 { - out := make([]uint32, len(resp.Transactions)) - for i, l := range resp.Transactions { - out[i] = l.TransactionDetails.Ledger - } - return out - } - // --- 1) datastore-only: entirely below oldest --- - t.Run("datastore_only", func(t *testing.T) { - res, err := request(35, 3, "") - require.NoError(t, err) - require.Len(t, res.Transactions, 3) - require.Equal(t, []uint32{35, 35, 36}, getSeqs(res)) - }) - - // --- 2) local-only: entirely at/above oldest --- - t.Run("local_only", func(t *testing.T) { - limit := 3 - res, err := request(ledgers[0], uint(limit), "") - require.NoError(t, err) - require.Len(t, res.Transactions, 3) - }) - - // --- 3) mixed: cross boundary (datastore then local) --- - t.Run("mixed_datastore_and_local", func(t *testing.T) { - // 39,40 from datastore; 41,42 from local - require.GreaterOrEqual(t, latest, uint32(42), "need latest >= 42") - res, err := request(39, 4, "") - require.NoError(t, err) - require.Len(t, res.Transactions, 4) - - // verify cursor continuity across boundary - next, err := request(0, 2, res.Cursor) - require.NoError(t, err) - if len(next.Transactions) > 0 { - require.EqualValues(t, 43, next.Transactions[0].TransactionHash) - } - }) - - // --- 4) negative: below datastore floor (not available anywhere) --- - t.Run("negative_below_datastore_floor", func(t *testing.T) { - res, err := request(2, 3, "") - // accept either an error or an empty page; but never data - if err != nil { - return - } - require.Empty(t, res.Transactions, "expected no ledgers when requesting below datastore floor") - }) - - // --- 5) negative: beyond latest --- - t.Run("negative_beyond_latest", func(t *testing.T) { - res, err := request(latest+1, 1, "") - if err != nil { - return - } - require.Empty(t, res.Transactions, "expected no ledgers when requesting beyond latest") - }) -} - -/* -func TestGetTransactionsDataStore1(t *testing.T) { - gcsCfg := infrastructure.DefaultGCSTestConfig() - gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg) - defer gcsSetup.Stop() - - // add files to GCS - gcsSetup.AddLedgers(5, 42) - - test := infrastructure.NewTest(t, &infrastructure.TestConfig{ - DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), - NoParallel: true, - }) - client := test.GetRPCLient() - - waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { - var last protocol.GetHealthResponse - require.Eventually(t, func() bool { - resp, err := client.GetHealth(t.Context()) - require.NoError(t, err) - last = resp - return cond(resp) - }, timeout, 100*time.Millisecond, "last health: %+v", last) - return last - } - - - request := func(start uint32, limit uint, cursor string) (protocol.GetTransactionsResponse, error) { - req := protocol.GetTransactionsRequest{ - StartLedger: start, - Pagination: &protocol.LedgerPaginationOptions{ - Limit: limit, - Cursor: cursor, - }, - } - return client.GetTransactions(t.Context(), req) - } - - // Helper to extract ledger sequences from response - getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 { - out := make([]uint32, len(resp.Transactions)) - for i, tx := range resp.Transactions { - out[i] = tx.TransactionDetails.Ledger - } - return out - } - - // Helper to verify transactions are properly ordered - assertTransactionsOrdered := func(t *testing.T, txs []protocol.TransactionInfo, desc string) { - for i := 1; i < len(txs); i++ { - prev := txs[i-1].TransactionDetails - curr := txs[i].TransactionDetails - // Ledger should be non-decreasing - require.LessOrEqual(t, prev.Ledger, curr.Ledger, - "%s: transactions not ordered - ledger %d after %d", desc, curr.Ledger, prev.Ledger) - // If same ledger, verify we have proper transaction indices/ordering - if prev.Ledger == curr.Ledger { - // Transactions within same ledger should maintain order - require.NotEmpty(t, prev.TransactionHash, "%s: missing transaction hash", desc) - require.NotEmpty(t, curr.TransactionHash, "%s: missing transaction hash", desc) - } - } - } - - // Helper to verify transaction data integrity - assertTransactionValid := func(t *testing.T, tx protocol.TransactionInfo, desc string) { - require.NotEmpty(t, tx.TransactionHash, "%s: missing transaction hash", desc) - require.NotZero(t, tx.TransactionDetails.Ledger, "%s: zero ledger number", desc) - require.NotEmpty(t, tx.TransactionDetails.EnvelopeXDR, "%s: missing envelope XDR", desc) - require.NotEmpty(t, tx.TransactionDetails.ResultXDR, "%s: missing result XDR", desc) - require.NotEmpty(t, tx.TransactionDetails.ResultMetaXDR, "%s: missing result meta XDR", desc) - } - - // Ensure oldest > 40 so datastore set ([5..40]) is below local window - health := waitUntil(func(h protocol.GetHealthResponse) bool { - return uint(h.OldestLedger) > 40 - }, 30*time.Second) - - oldest := health.OldestLedger - latest := health.LatestLedger - require.GreaterOrEqual(t, latest, oldest, "latest should be >= oldest") - require.Greater(t, oldest, uint32(40), "oldest should be > 40 for this test") - - - // --- 1) Datastore-only: entirely below oldest --- - t.Run("datastore_only", func(t *testing.T) { - res, err := request(35, 3, "") - require.NoError(t, err, "should successfully fetch from datastore") - require.Len(t, res.Transactions, 3, "should return exactly 3 transactions") - - seqs := getSeqs(res) - require.Equal(t, []uint32{35, 35, 36}, seqs, "should get transactions from ledgers 35-36") - - // Verify transaction data integrity - for i, tx := range res.Transactions { - assertTransactionValid(t, tx, fmt.Sprintf("datastore tx[%d]", i)) - } - assertTransactionsOrdered(t, res.Transactions, "datastore-only") - - // Verify cursor is present and non-empty for pagination - require.NotEmpty(t, res.Cursor, "should have cursor for pagination") - - // Verify ledger range in response - require.GreaterOrEqual(t, res.OldestLedger, uint32(40), "oldest ledger should match request") - require.GreaterOrEqual(t, res.LatestLedger, uint32(40), "latest should include returned ledgers") - }) - - // --- 2) Local-only: entirely at/above oldest --- - t.Run("local_only", func(t *testing.T) { - ledgers := sendTransactions(t, client) - require.NotEmpty(t, ledgers, "should have sent some transactions") - limit := 3 - startLedger := ledgers[0] - require.GreaterOrEqual(t, startLedger, oldest, "test ledger should be in local range") - - res, err := request(startLedger, uint(limit), "") - require.NoError(t, err, "should successfully fetch from local storage") - require.Len(t, res.Transactions, 3, "should return exactly 3 transactions") - - // Verify ledger numbers are in the local range - seqs := getSeqs(res) - for _, seq := range seqs { - require.GreaterOrEqual(t, seq, oldest, "local tx should be >= oldest ledger") - } - - // Verify all start at or after the requested start - require.GreaterOrEqual(t, seqs[0], startLedger, "first tx should be >= start ledger") - - // Verify transaction data and ordering - for i, tx := range res.Transactions { - assertTransactionValid(t, tx, fmt.Sprintf("local tx[%d]", i)) - } - assertTransactionsOrdered(t, res.Transactions, "local-only") - - require.NotEmpty(t, res.Cursor, "should have cursor for pagination") - }) - - // --- 3) Mixed: cross boundary (datastore then local) --- - t.Run("mixed_datastore_and_local", func(t *testing.T) { - // Request starting from the datastore range, crossing into the local range - ledgers := sendTransactions(t, client) - require.NotEmpty(t, ledgers, "should have sent some transactions") - startLedger := uint32(39) - //require.Less(t, startLedger, oldest, "start should be in datastore range") - //require.GreaterOrEqual(t, latest, uint32(42), "need latest >= 42 for this test") - - res, err := request(startLedger, 15, "") - require.NoError(t, err, "should successfully fetch across boundary") - // require.Len(t, res.Transactions, 15, "should return exactly 15 transactions") - - seqs := getSeqs(res) - require.Equal(t, startLedger, seqs[0], "should start at requested ledger") - - // Verify we got data from both sources - hasDatastore := false - hasLocal := false - for _, seq := range seqs { - if seq < oldest { - hasDatastore = true - } - if seq >= oldest { - hasLocal = true - } - } - require.True(t, hasDatastore, "should have transactions from datastore") - require.True(t, hasLocal, "should have transactions from local storage") - - // Verify transaction validity and ordering across boundary - for i, tx := range res.Transactions { - assertTransactionValid(t, tx, fmt.Sprintf("mixed tx[%d]", i)) - } - assertTransactionsOrdered(t, res.Transactions, "mixed datastore+local") - - // Verify cursor continuity across boundary - require.NotEmpty(t, res.Cursor, "should have cursor") - next, err := request(0, 2, res.Cursor) - require.NoError(t, err, "cursor should work for next page") - - if len(next.Transactions) > 0 { - // Next page should continue from where we left off - lastSeq := seqs[len(seqs)-1] - nextSeqs := getSeqs(next) - require.GreaterOrEqual(t, nextSeqs[0], lastSeq, - "next page should continue from last ledger %d, got %d", lastSeq, nextSeqs[0]) - } - }) - - // --- 4) Pagination across boundary --- - t.Run("pagination_across_boundary", func(t *testing.T) { - // Start well before boundary, paginate through it - startLedger := uint32(38) - require.Less(t, startLedger, oldest, "start in datastore range") - - var allSeqs []uint32 - cursor := "" - pageCount := 0 - - for pageCount < 5 { // limit iterations to prevent infinite loop - res, err := request(startLedger, 2, cursor) - require.NoError(t, err, "pagination request should succeed") - - if len(res.Transactions) == 0 { - break - } - - seqs := getSeqs(res) - allSeqs = append(allSeqs, seqs...) - assertTransactionsOrdered(t, res.Transactions, fmt.Sprintf("page %d", pageCount)) - - if res.Cursor == "" { - break - } - cursor = res.Cursor - startLedger = 0 // Use cursor for subsequent requests - pageCount++ - } - - require.NotEmpty(t, allSeqs, "should have retrieved some transactions") - - // Verify overall ordering across all pages - for i := 1; i < len(allSeqs); i++ { - require.LessOrEqual(t, allSeqs[i-1], allSeqs[i], - "sequences across pages should be ordered: page had %d after %d", - allSeqs[i], allSeqs[i-1]) - } - - // Verify we crossed the boundary - require.Contains(t, allSeqs, uint32(39), "should include ledger before boundary") - require.True(t, func() bool { - for _, seq := range allSeqs { - if seq >= oldest { - return true - } - } - return false - }(), "should include ledgers after boundary (>= %d)", oldest) - }) - - // --- 5) Edge case: request exactly at boundary --- - t.Run("exactly_at_boundary", func(t *testing.T) { - res, err := request(oldest, 3, "") - require.NoError(t, err, "requesting at oldest should succeed") - require.NotEmpty(t, res.Transactions, "should return transactions at oldest ledger") - - seqs := getSeqs(res) - require.GreaterOrEqual(t, seqs[0], oldest, "should start at or after oldest") - assertTransactionsOrdered(t, res.Transactions, "at-boundary") - }) - - // --- 6) Large limit spanning multiple sources --- - t.Run("large_limit_spanning_sources", func(t *testing.T) { - res, err := request(37, 20, "") - require.NoError(t, err, "large limit should succeed") - - if len(res.Transactions) > 0 { - seqs := getSeqs(res) - require.LessOrEqual(t, seqs[0], uint32(37), "should start at or after requested") - assertTransactionsOrdered(t, res.Transactions, "large-limit") - - // Should span both sources - minSeq := seqs[0] - maxSeq := seqs[len(seqs)-1] - if minSeq < oldest && maxSeq >= oldest { - t.Logf("Successfully retrieved data spanning boundary: [%d..%d] across boundary at %d", - minSeq, maxSeq, oldest) - } - } - }) - - // --- 7) Invalid cursor --- - t.Run("invalid_cursor", func(t *testing.T) { - _, err := request(oldest, 1, "invalid-cursor-12345") - // Should either return error or handle gracefully - if err == nil { - t.Log("Invalid cursor handled gracefully (no error)") - } else { - t.Logf("Invalid cursor rejected with error: %v", err) - } - }) - - // --- 8) Zero/invalid limits --- - t.Run("zero_limit", func(t *testing.T) { - res, err := request(oldest, 0, "") - if err == nil { - // If no error, should return empty or use default limit - t.Logf("Zero limit returned %d transactions", len(res.Transactions)) - } - }) - - // --- 9) Negative: below datastore floor --- - t.Run("negative_below_datastore_floor", func(t *testing.T) { - // Ledger 2 is before our datastore range starts (5-40) - res, err := request(2, 3, "") - if err != nil { - t.Logf("Request below datastore floor returned expected error: %v", err) - return - } - - // If no error, should return empty - require.Empty(t, res.Transactions, - "expected no ledgers when requesting below datastore floor (ledger 2 < 5)") - - // Cursor should indicate no more data - if res.Cursor != "" { - next, err := request(0, 1, res.Cursor) - if err == nil { - require.Empty(t, next.Transactions, "cursor from below-floor should not return data") - } - } - }) - - // --- 10) Negative: beyond latest --- - t.Run("negative_beyond_latest", func(t *testing.T) { - beyondLatest := latest + 100 - res, err := request(beyondLatest, 1, "") - if err != nil { - t.Logf("Request beyond latest returned expected error: %v", err) - return - } - - require.Empty(t, res.Transactions, - "expected no ledgers when requesting beyond latest (requested %d, latest %d)", - beyondLatest, latest) - }) - - // --- 11) Multiple full pages in datastore-only range --- - t.Run("multiple_pages_datastore_only", func(t *testing.T) { - startLedger := uint32(10) - require.Less(t, startLedger, oldest, "should be in datastore range") - - // Get first page - page1, err := request(startLedger, 2, "") - require.NoError(t, err) - if len(page1.Transactions) == 0 { - t.Skip("No transactions in datastore range") - } - - require.NotEmpty(t, page1.Cursor, "should have cursor for next page") - - // Get second page using cursor - page2, err := request(0, 2, page1.Cursor) - require.NoError(t, err) - - if len(page2.Transactions) > 0 { - seqs1 := getSeqs(page1) - seqs2 := getSeqs(page2) - - // Verify pages don't overlap - lastOfPage1 := seqs1[len(seqs1)-1] - firstOfPage2 := seqs2[0] - require.LessOrEqual(t, lastOfPage1, firstOfPage2, - "pages should not overlap: page1 ends at %d, page2 starts at %d", - lastOfPage1, firstOfPage2) - } - }) - - // --- 12) Request with start=0 (should use oldest?) --- - t.Run("start_zero", func(t *testing.T) { - res, err := request(0, 3, "") - if err != nil { - t.Logf("start=0 returned error: %v", err) - return - } - - if len(res.Transactions) > 0 { - seqs := getSeqs(res) - t.Logf("start=0 returned ledgers starting at %d (oldest=%d)", seqs[0], oldest) - } - }) -} -*/ - -// TestGetTransactionsDataStore tests fetching transactions from datastore and local storage. -// Setup creates an overlap to guarantee no gaps: datastore=[5,50], local=[~45,~60] +//nolint:gocognit,cyclop,funlen func TestGetTransactionsDataStore(t *testing.T) { gcsCfg := infrastructure.DefaultGCSTestConfig() gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg) @@ -608,13 +130,13 @@ func TestGetTransactionsDataStore(t *testing.T) { DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(), NoParallel: true, }) - client := test.GetRPCLient() + cl := test.GetRPCLient() // Helper to wait for health condition waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse { var last protocol.GetHealthResponse require.Eventually(t, func() bool { - resp, err := client.GetHealth(t.Context()) + resp, err := cl.GetHealth(t.Context()) require.NoError(t, err) last = resp return cond(resp) @@ -642,14 +164,14 @@ func TestGetTransactionsDataStore(t *testing.T) { StartLedger: start, Pagination: &protocol.LedgerPaginationOptions{Limit: limit}, } - return client.GetTransactions(t.Context(), req) + return cl.GetTransactions(t.Context(), req) } // Helper to get ledger sequences from response getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 { out := make([]uint32, len(resp.Transactions)) for i, tx := range resp.Transactions { - out[i] = tx.TransactionDetails.Ledger + out[i] = tx.Ledger } return out } @@ -657,10 +179,10 @@ func TestGetTransactionsDataStore(t *testing.T) { // Helper to validate transaction data assertTransactionValid := func(t *testing.T, tx protocol.TransactionInfo) { require.NotEmpty(t, tx.TransactionHash, "missing hash") - require.NotZero(t, tx.TransactionDetails.Ledger, "zero ledger") - require.NotEmpty(t, tx.TransactionDetails.EnvelopeXDR, "missing envelope") - require.NotEmpty(t, tx.TransactionDetails.ResultXDR, "missing result") - require.NotEmpty(t, tx.TransactionDetails.ResultMetaXDR, "missing meta") + require.NotZero(t, tx.Ledger, "zero ledger") + require.NotEmpty(t, tx.EnvelopeXDR, "missing envelope") + require.NotEmpty(t, tx.ResultXDR, "missing result") + require.NotEmpty(t, tx.ResultMetaXDR, "missing meta") require.Contains(t, []string{"SUCCESS", "FAILED"}, tx.Status, "invalid status") } diff --git a/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go b/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go index 81995c71..c422fe41 100644 --- a/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go +++ b/cmd/stellar-rpc/internal/integrationtest/infrastructure/gcs.go @@ -6,12 +6,13 @@ import ( "testing" "github.com/fsouza/fake-gcs-server/fakestorage" + "github.com/stretchr/testify/require" + "github.com/stellar/go-stellar-sdk/ingest/ledgerbackend" "github.com/stellar/go-stellar-sdk/network" "github.com/stellar/go-stellar-sdk/support/compressxdr" "github.com/stellar/go-stellar-sdk/support/datastore" "github.com/stellar/go-stellar-sdk/xdr" - "github.com/stretchr/testify/require" "github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config" ) diff --git a/cmd/stellar-rpc/internal/methods/get_transactions.go b/cmd/stellar-rpc/internal/methods/get_transactions.go index 62bed595..93fd8ec9 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions.go @@ -265,7 +265,7 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont var done bool cursor := toid.New(0, 0, 0) for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ { - ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq), readTx, localRange) + ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq), readTx, localRange) //nolint:gosec if err != nil { return protocol.GetTransactionsResponse{}, err } diff --git a/cmd/stellar-rpc/internal/methods/get_transactions_test.go b/cmd/stellar-rpc/internal/methods/get_transactions_test.go index ccbc2d32..9b84f50d 100644 --- a/cmd/stellar-rpc/internal/methods/get_transactions_test.go +++ b/cmd/stellar-rpc/internal/methods/get_transactions_test.go @@ -340,7 +340,7 @@ func setupDBNoTxs(t *testing.T, numLedgers int) *db.DB { } func TestGetTransactions_UsesDatastoreForOlderHistory(t *testing.T) { - ctx := context.TODO() + ctx := t.Context() // DB has ledgers 3..5 (skip ledger 2). testDB := setupDB(t, 5, 2) @@ -372,7 +372,7 @@ func TestGetTransactions_UsesDatastoreForOlderHistory(t *testing.T) { assert.Equal(t, uint32(3), resp.OldestLedger) assert.Equal(t, uint32(5), resp.LatestLedger) - assert.Equal(t, 6, len(resp.Transactions)) + assert.Len(t, resp.Transactions, 6) assert.Equal(t, uint32(2), resp.Transactions[0].Ledger) ds.AssertExpectations(t) diff --git a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go index 5ceb8234..09ba169a 100644 --- a/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go +++ b/cmd/stellar-rpc/internal/rpcdatastore/ledger_reader_test.go @@ -2,7 +2,7 @@ package rpcdatastore import ( "context" - "fmt" + "errors" "testing" "time" @@ -105,8 +105,8 @@ func seedRange(r *ledgerReader, first, last uint32) { } func expectBatchFetch( - t *testing.T, ctx context.Context, + t *testing.T, backend *mockLedgerBackend, start, end uint32, ) []xdr.LedgerCloseMeta { @@ -134,7 +134,7 @@ func TestLedgerReaderGetLedgers(t *testing.T) { start := uint32(100) end := uint32(102) - expected := expectBatchFetch(t, ctx, backend, start, end) + expected := expectBatchFetch(ctx, t, backend, start, end) backend.On("Close").Return(nil).Once() factory.On("NewBufferedBackend", testBSBConfig, ds, datastore.DataStoreSchema{}).Return(backend, nil).Once() @@ -185,7 +185,7 @@ func TestLedgerReaderGetLedgers_FactoryError(t *testing.T) { ds := new(datastore.MockDataStore) factory := new(mockBackendFactory) - expectedErr := fmt.Errorf("factory failed") + expectedErr := errors.New("factory failed") factory.On("NewBufferedBackend", testBSBConfig, ds, datastore.DataStoreSchema{}). Return((*mockLedgerBackend)(nil), expectedErr).Once() @@ -212,7 +212,7 @@ func TestLedgerReaderGetLedgers_PrepareRangeError_ClosesBackend(t *testing.T) { start := uint32(100) end := uint32(102) - expectedErr := fmt.Errorf("prepare failed") + expectedErr := errors.New("prepare failed") backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(expectedErr).Once() backend.On("Close").Return(nil).Once() @@ -243,7 +243,7 @@ func TestLedgerReaderGetLedgers_GetLedgerError_ClosesBackend(t *testing.T) { start := uint32(100) end := uint32(102) - expectedErr := fmt.Errorf("get ledger failed") + expectedErr := errors.New("get ledger failed") backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(nil).Once() backend.On("GetLedger", ctx, uint32(100)).Return(lcm(100), nil).Once() backend.On("GetLedger", ctx, uint32(101)).Return(xdr.LedgerCloseMeta{}, expectedErr).Once() @@ -277,7 +277,7 @@ func TestLedgerReaderGetLedgerCached_BatchAndCache(t *testing.T) { end := uint32(102) // Batch fetch expectation - expectedBatch := expectBatchFetch(t, ctx, backend, start, end) + expectedBatch := expectBatchFetch(ctx, t, backend, start, end) backend.On("Close").Return(nil).Once() factory.On("NewBufferedBackend", testBSBConfig, ds, testSchema).Return(backend, nil).Once() @@ -312,7 +312,7 @@ func TestLedgerReaderGetLedgerCached_GetLedgerError(t *testing.T) { start := uint32(100) end := uint32(119) // window - expectedErr := fmt.Errorf("ledger fetch failed") + expectedErr := errors.New("ledger fetch failed") backend.On("PrepareRange", ctx, ledgerbackend.BoundedRange(start, end)).Return(nil).Once() backend.On("GetLedger", ctx, start).Return(xdr.LedgerCloseMeta{}, expectedErr).Once() @@ -342,8 +342,8 @@ func TestLedgerReaderGetLedgerCached_NewWindowTriggersSecondBatch(t *testing.T) nextWindowStart := start + windowSize // Expect full-window fetch for each window. - expectBatchFetch(t, ctx, backend, start, start+windowSize-1) - expectBatchFetch(t, ctx, backend, nextWindowStart, nextWindowStart+windowSize-1) + expectBatchFetch(ctx, t, backend, start, start+windowSize-1) + expectBatchFetch(ctx, t, backend, nextWindowStart, nextWindowStart+windowSize-1) // One backend per batch (adjust if your implementation reuses). backend.On("Close").Return(nil).Twice()