Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 8 additions & 80 deletions cmd/stellar-rpc/internal/integrationtest/get_ledgers_test.go
Original file line number Diff line number Diff line change
@@ -1,21 +1,14 @@
package integrationtest

import (
"bytes"
"testing"
"time"

"github.com/fsouza/fake-gcs-server/fakestorage"
"github.com/stretchr/testify/require"

client "github.com/stellar/go-stellar-sdk/clients/rpcclient"
"github.com/stellar/go-stellar-sdk/ingest/ledgerbackend"
protocol "github.com/stellar/go-stellar-sdk/protocols/rpc"
"github.com/stellar/go-stellar-sdk/support/compressxdr"
"github.com/stellar/go-stellar-sdk/support/datastore"
"github.com/stellar/go-stellar-sdk/xdr"

"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/config"
"github.com/stellar/stellar-rpc/cmd/stellar-rpc/internal/integrationtest/infrastructure"
)

Expand Down Expand Up @@ -104,62 +97,22 @@ func TestGetLedgers(t *testing.T) {
}

func TestGetLedgersFromDatastore(t *testing.T) {
// setup fake GCS server
opts := fakestorage.Options{
Scheme: "http",
PublicHost: "127.0.0.1",
}
gcsServer, err := fakestorage.NewServerWithOptions(opts)
require.NoError(t, err)
defer gcsServer.Stop()
gcsSetup := infrastructure.NewGCSTestSetup(t, infrastructure.DefaultGCSTestConfig())
defer gcsSetup.Stop()

t.Setenv("STORAGE_EMULATOR_HOST", gcsServer.URL())
bucketName := "test-bucket"
gcsServer.CreateBucketWithOpts(fakestorage.CreateBucketOpts{Name: bucketName})

// datastore configuration function
schema := datastore.DataStoreSchema{
FilesPerPartition: 1,
LedgersPerFile: 1,
}
setDatastoreConfig := func(cfg *config.Config) {
cfg.ServeLedgersFromDatastore = true
cfg.BufferedStorageBackendConfig = ledgerbackend.BufferedStorageBackendConfig{
BufferSize: 15,
NumWorkers: 2,
}
cfg.DataStoreConfig = datastore.DataStoreConfig{
Type: "GCS",
Params: map[string]string{"destination_bucket_path": bucketName},
Schema: schema,
}
// reduce retention windows to force usage of datastore
cfg.HistoryRetentionWindow = 15
cfg.ClassicFeeStatsLedgerRetentionWindow = 15
cfg.SorobanFeeStatsLedgerRetentionWindow = 15
}

// add files to GCS
for seq := uint32(35); seq <= 40; seq++ {
gcsServer.CreateObject(fakestorage.Object{
ObjectAttrs: fakestorage.ObjectAttrs{
BucketName: bucketName,
Name: schema.GetObjectKeyFromSequenceNumber(seq),
},
Content: createLCMBatchBuffer(seq),
})
}
// add files to GCS for ledgers 35-40
gcsSetup.AddLedgers(35, 40)

test := infrastructure.NewTest(t, &infrastructure.TestConfig{
DatastoreConfigFunc: setDatastoreConfig,
DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(),
NoParallel: true, // can't use parallel due to env vars
})
client := test.GetRPCLient() // at this point we're at like ledger 30
cl := test.GetRPCLient() // at this point we're at like ledger 30

waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse {
var last protocol.GetHealthResponse
require.Eventually(t, func() bool {
resp, err := client.GetHealth(t.Context())
resp, err := cl.GetHealth(t.Context())
require.NoError(t, err)
last = resp
return cond(resp)
Expand All @@ -183,7 +136,7 @@ func TestGetLedgersFromDatastore(t *testing.T) {
Cursor: cursor,
},
}
return client.GetLedgers(t.Context(), req)
return cl.GetLedgers(t.Context(), req)
}

// ensure oldest > 40 so datastore set ([35..40]) is below local window
Expand Down Expand Up @@ -249,28 +202,3 @@ func TestGetLedgersFromDatastore(t *testing.T) {
require.Empty(t, res.Ledgers, "expected no ledgers when requesting beyond latest")
})
}

func createLCMBatchBuffer(seq uint32) []byte {
lcm := xdr.LedgerCloseMetaBatch{
StartSequence: xdr.Uint32(seq),
EndSequence: xdr.Uint32(seq),
LedgerCloseMetas: []xdr.LedgerCloseMeta{
{
V: int32(0),
V0: &xdr.LedgerCloseMetaV0{
LedgerHeader: xdr.LedgerHeaderHistoryEntry{
Header: xdr.LedgerHeader{
LedgerSeq: xdr.Uint32(seq),
},
},
},
},
},
}

var buf bytes.Buffer
encoder := compressxdr.NewXDREncoder(compressxdr.DefaultCompressor, lcm)
_, _ = encoder.WriteTo(&buf)

return buf.Bytes()
}
218 changes: 218 additions & 0 deletions cmd/stellar-rpc/internal/integrationtest/get_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package integrationtest
import (
"context"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -112,3 +113,220 @@ func TestGetTransactionsEvents(t *testing.T) {
assert.Len(t, response.Events.TransactionEventsXDR, 2)
assert.NotEmpty(t, response.DiagnosticEventsXDR)
}

//nolint:gocognit,cyclop,funlen
func TestGetTransactionsDataStore(t *testing.T) {
gcsCfg := infrastructure.DefaultGCSTestConfig()
gcsSetup := infrastructure.NewGCSTestSetup(t, gcsCfg)
defer gcsSetup.Stop()

// Add more ledgers to datastore to create overlap with local window
// Datastore will contain ledgers [5, 50]
datastoreStart := uint32(5)
datastoreEnd := uint32(50)
gcsSetup.AddLedgers(datastoreStart, datastoreEnd)

test := infrastructure.NewTest(t, &infrastructure.TestConfig{
DatastoreConfigFunc: gcsSetup.DatastoreConfigFunc(),
NoParallel: true,
})
cl := test.GetRPCLient()

// Helper to wait for health condition
waitUntil := func(cond func(h protocol.GetHealthResponse) bool, timeout time.Duration) protocol.GetHealthResponse {
var last protocol.GetHealthResponse
require.Eventually(t, func() bool {
resp, err := cl.GetHealth(t.Context())
require.NoError(t, err)
last = resp
return cond(resp)
}, timeout, 100*time.Millisecond, "last health: %+v", last)
return last
}

// Wait for system to stabilize with local window overlapping datastore
// We want: local oldest to be within datastore range to guarantee overlap
// With 15-ledger window and datastoreEnd=50, local will be around [45-50, 60-65]
health := waitUntil(func(h protocol.GetHealthResponse) bool {
// Wait until oldest is close to datastore end (within 10 ledgers)
return h.OldestLedger >= datastoreEnd-10 && h.OldestLedger <= datastoreEnd+5
}, 30*time.Second)

oldest := health.OldestLedger
latest := health.LatestLedger

require.GreaterOrEqual(t, latest, oldest, "latest >= oldest")
require.LessOrEqual(t, oldest, datastoreEnd, "should have overlap: oldest <= datastoreEnd")

// Helper to make requests
request := func(start uint32, limit uint) (protocol.GetTransactionsResponse, error) {
req := protocol.GetTransactionsRequest{
StartLedger: start,
Pagination: &protocol.LedgerPaginationOptions{Limit: limit},
}
return cl.GetTransactions(t.Context(), req)
}

// Helper to get ledger sequences from response
getSeqs := func(resp protocol.GetTransactionsResponse) []uint32 {
out := make([]uint32, len(resp.Transactions))
for i, tx := range resp.Transactions {
out[i] = tx.Ledger
}
return out
}

// Helper to validate transaction data
assertTransactionValid := func(t *testing.T, tx protocol.TransactionInfo) {
require.NotEmpty(t, tx.TransactionHash, "missing hash")
require.NotZero(t, tx.Ledger, "zero ledger")
require.NotEmpty(t, tx.EnvelopeXDR, "missing envelope")
require.NotEmpty(t, tx.ResultXDR, "missing result")
require.NotEmpty(t, tx.ResultMetaXDR, "missing meta")
require.Contains(t, []string{"SUCCESS", "FAILED"}, tx.Status, "invalid status")
}

// ========================================================================
// Test 1: Datastore Only - fetch from below local range
// ========================================================================
t.Run("datastore_only", func(t *testing.T) {
// Request ledger well below local range (in datastore only)
startLedger := datastoreStart + 5
require.Less(t, startLedger, oldest, "start should be below local range")

res, err := request(startLedger, 5)
require.NoError(t, err, "should fetch from datastore")
require.NotEmpty(t, res.Transactions, "should return transactions")

seqs := getSeqs(res)
t.Logf(" Fetched ledgers: %v", seqs)

// Verify all returned ledgers are below local range
for _, seq := range seqs {
require.Less(t, seq, oldest, "ledger %d should be below local range %d", seq, oldest)
}

// Validate transaction data
for _, tx := range res.Transactions {
assertTransactionValid(t, tx)
}

// Verify response metadata reflects local range
require.Equal(t, oldest, res.OldestLedger, "OldestLedger should be local oldest")
require.Equal(t, latest, res.LatestLedger, "LatestLedger should be local latest")
})

// ========================================================================
// Test 2: Local Only - fetch from current local range
// ========================================================================
t.Run("local_only", func(t *testing.T) {
// Request ledger above datastore range (local only)
startLedger := datastoreEnd + 1
if startLedger < oldest {
startLedger = oldest
}
require.GreaterOrEqual(t, startLedger, oldest, "start should be in local range")

res, err := request(startLedger, 5)
require.NoError(t, err, "should fetch from local")
require.NotEmpty(t, res.Transactions, "should return transactions")

seqs := getSeqs(res)
t.Logf(" Fetched ledgers: %v", seqs)

// Verify all returned ledgers are in local range
for _, seq := range seqs {
require.GreaterOrEqual(t, seq, oldest, "ledger %d should be >= oldest %d", seq, oldest)
require.LessOrEqual(t, seq, latest, "ledger %d should be <= latest %d", seq, latest)
}

// Validate transaction data
for _, tx := range res.Transactions {
assertTransactionValid(t, tx)
}

require.Equal(t, oldest, res.OldestLedger, "OldestLedger should be local oldest")
require.Equal(t, latest, res.LatestLedger, "LatestLedger should be local latest")
})

// ========================================================================
// Test 3: Mixed - fetch across datastore and local boundary
// ========================================================================
t.Run("mixed_datastore_and_local", func(t *testing.T) {
// Start well before local range, request enough to cross into local
startLedger := oldest - 5
if startLedger < datastoreStart {
startLedger = datastoreStart
}
require.Less(t, startLedger, oldest, "start should be in datastore range")

// Request enough transactions to span both ranges
res, err := request(startLedger, 20)
require.NoError(t, err, "should fetch across boundary")
require.NotEmpty(t, res.Transactions, "should return transactions")

seqs := getSeqs(res)
t.Logf(" Fetched ledgers: %v", seqs)
t.Logf(" Boundary at ledger: %d", oldest)

// Verify we got transactions from both sources
hasDatastore := false
hasLocal := false
for _, seq := range seqs {
if seq < oldest {
hasDatastore = true
}
if seq >= oldest {
hasLocal = true
}
}

require.True(t, hasDatastore, "should have transactions from datastore (<%d)", oldest)
require.True(t, hasLocal, "should have transactions from local (>=%d)", oldest)

// Verify transactions are ordered
for i := 1; i < len(seqs); i++ {
require.LessOrEqual(t, seqs[i-1], seqs[i],
"ledgers should be ordered: %d before %d", seqs[i-1], seqs[i])
}

// Validate all transaction data
for _, tx := range res.Transactions {
assertTransactionValid(t, tx)
}

require.Equal(t, oldest, res.OldestLedger, "OldestLedger should be local oldest")
require.Equal(t, latest, res.LatestLedger, "LatestLedger should be local latest")
})

// ========================================================================
// Error Cases
// ========================================================================
t.Run("below_datastore_floor", func(t *testing.T) {
belowFloor := datastoreStart - 3

res, err := request(belowFloor, 3)

if err != nil {
t.Logf(" Below floor returned error (expected): %v", err)
} else {
require.Empty(t, res.Transactions,
"should return empty for ledger %d below datastore floor %d",
belowFloor, datastoreStart)
}
})

t.Run("beyond_latest", func(t *testing.T) {
beyondLatest := latest + 100

res, err := request(beyondLatest, 3)

if err != nil {
t.Logf(" Beyond latest returned error (expected): %v", err)
} else {
require.Empty(t, res.Transactions,
"should return empty for ledger %d beyond latest %d",
beyondLatest, latest)
}
})
}
Loading
Loading