Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ More example commands:

# Start the website (--dev reloads the template on every page load, for easier iteration)
./relayscan service website --dev

#
# backfill-runner: Backfill + Check Service
# - a single service to continuously run these
# - default interval: 5 minutes
#
# Test with just one relay (flashbots)
./relayscan service backfill-runner --relay fb

# Test with ultrasound relay and limited slots (last 50 slots)
./relayscan service backfill-runner --relay us --min-slot -50

# Combine flags for quick testing
./relayscan service backfill-runner --relay fb --min-slot -50 --skip-check-value

# Custom interval
./relayscan service backfill-runner --interval 10m

# Run once and exit (useful for testing)
./relayscan service backfill-runner --once

# Skip one of the steps
./relayscan service backfill-runner --skip-backfill
./relayscan service backfill-runner --skip-check-value
```

### Test & development
Expand Down
113 changes: 73 additions & 40 deletions cmd/core/check-payload-value.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,86 +48,118 @@ func init() {
var checkPayloadValueCmd = &cobra.Command{
Use: "check-payload-value",
Short: "Check payload value for delivered payloads",
Run: checkPayloadValue,
}
Run: func(cmd *cobra.Command, args []string) {
client, err := ethclient.Dial(ethNodeURI)
if err != nil {
log.Fatalf("Failed to create RPC client for '%s'", ethNodeURI)
}
log.Infof("Using eth node: %s", ethNodeURI)

func checkPayloadValue(cmd *cobra.Command, args []string) {
var err error
startTime := time.Now().UTC()
client2 := client
if ethNodeBackupURI != "" {
client2, err = ethclient.Dial(ethNodeBackupURI)
if err != nil {
log.Fatalf("Failed to create backup RPC client for '%s'", ethNodeBackupURI)
}
log.Infof("Using eth backup node: %s", ethNodeBackupURI)
}

client, err := ethclient.Dial(ethNodeURI)
if err != nil {
log.Fatalf("Failed to create RPC client for '%s'", ethNodeURI)
}
log.Infof("Using eth node: %s", ethNodeURI)
// Connect to Postgres
db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN)

opts := CheckPayloadValueOpts{
Limit: limit,
Slot: slot,
SlotMax: slotMax,
SlotMin: slotMin,
NumThreads: numThreads,
CheckIncorrectOnly: checkIncorrectOnly,
CheckMissedOnly: checkMissedOnly,
CheckTx: checkTx,
CheckAll: checkAll,
}

client2 := client
if ethNodeBackupURI != "" {
client2, err = ethclient.Dial(ethNodeBackupURI)
err = RunCheckPayloadValue(db, client, client2, opts)
if err != nil {
log.Fatalf("Failed to create backup RPC client for '%s'", ethNodeBackupURI)
log.WithError(err).Fatal("check payload value failed")
}
log.Infof("Using eth backup node: %s", ethNodeBackupURI)
}
},
}

// Connect to Postgres
db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN)
// CheckPayloadValueOpts contains options for running the payload value check
type CheckPayloadValueOpts struct {
Limit uint64
Slot uint64
SlotMax uint64
SlotMin uint64
NumThreads uint64
CheckIncorrectOnly bool
CheckMissedOnly bool
CheckTx bool
CheckAll bool
}

// RunCheckPayloadValue checks payload values for delivered payloads
func RunCheckPayloadValue(db *database.DatabaseService, client, client2 *ethclient.Client, opts CheckPayloadValueOpts) error {
startTime := time.Now().UTC()

entries := []database.DataAPIPayloadDeliveredEntry{}
query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + dbvars.TableDataAPIPayloadDelivered
if checkIncorrectOnly {

var err error
if opts.CheckIncorrectOnly {
query += ` WHERE value_check_ok=false ORDER BY slot DESC`
if limit > 0 {
query += fmt.Sprintf(" limit %d", limit)
if opts.Limit > 0 {
query += fmt.Sprintf(" limit %d", opts.Limit)
}
err = db.DB.Select(&entries, query)
} else if checkMissedOnly {
} else if opts.CheckMissedOnly {
query += ` WHERE slot_missed=true ORDER BY slot DESC`
if limit > 0 {
query += fmt.Sprintf(" limit %d", limit)
if opts.Limit > 0 {
query += fmt.Sprintf(" limit %d", opts.Limit)
}
err = db.DB.Select(&entries, query)
} else if checkAll {
if slotMax > 0 {
query += fmt.Sprintf(" WHERE slot<=%d", slotMax)
} else if opts.CheckAll {
if opts.SlotMax > 0 {
query += fmt.Sprintf(" WHERE slot<=%d", opts.SlotMax)
}
query += ` ORDER BY slot DESC`
if limit > 0 {
query += fmt.Sprintf(" limit %d", limit)
if opts.Limit > 0 {
query += fmt.Sprintf(" limit %d", opts.Limit)
}
err = db.DB.Select(&entries, query)
} else if slot != 0 {
} else if opts.Slot != 0 {
query += ` WHERE slot=$1`
err = db.DB.Select(&entries, query, slot)
err = db.DB.Select(&entries, query, opts.Slot)
} else {
// query += ` WHERE value_check_ok IS NULL AND slot_missed IS NULL ORDER BY slot DESC LIMIT $1`
query += ` WHERE value_check_ok IS NULL ORDER BY slot DESC LIMIT $1`
err = db.DB.Select(&entries, query, limit)
err = db.DB.Select(&entries, query, opts.Limit)
}
if err != nil {
log.WithError(err).Fatalf("couldn't get entries")
return fmt.Errorf("couldn't get entries: %w", err)
}

log.Infof("query: %s", query)
log.Infof("got %d entries", len(entries))
if len(entries) == 0 {
return
return nil
}

wg := new(sync.WaitGroup)
entryC := make(chan database.DataAPIPayloadDeliveredEntry)
if slot != 0 {
numThreads = 1
threads := opts.NumThreads
if opts.Slot != 0 {
threads = 1
}
for i := 0; i < int(numThreads); i++ { //nolint:gosec,intrange
for i := 0; i < int(threads); i++ { //nolint:gosec,intrange
log.Infof("starting worker %d", i+1)
wg.Add(1)
go startUpdateWorker(wg, db, client, client2, entryC)
}

for _, entry := range entries {
// possibly skip
if slotMin != 0 && entry.Slot < slotMin {
if opts.SlotMin != 0 && entry.Slot < opts.SlotMin {
continue
}

Expand All @@ -137,7 +169,8 @@ func checkPayloadValue(cmd *cobra.Command, args []string) {
wg.Wait()

timeNeeded := time.Since(startTime)
log.WithField("timeNeeded", timeNeeded).Info("All done!")
log.WithField("timeNeeded", timeNeeded).Info("Check payload value done!")
return nil
}

func _getBalanceDiff(ethClient *ethclient.Client, address ethcommon.Address, blockNumber *big.Int) (*big.Int, error) {
Expand Down
66 changes: 42 additions & 24 deletions cmd/core/data-api-backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ var backfillDataAPICmd = &cobra.Command{
Run: func(cmd *cobra.Command, args []string) {
var err error
var relays []common.RelayEntry
startTime := time.Now().UTC()

if cliRelay != "" {
var relayEntry common.RelayEntry
Expand All @@ -59,38 +58,57 @@ var backfillDataAPICmd = &cobra.Command{
}

log.Infof("Relayscan %s", vars.Version)
log.Infof("Using %d relays", len(relays))
for index, relay := range relays {
log.Infof("- relay #%d: %s", index+1, relay.Hostname())
}

// Connect to Postgres
db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN)

// If needed, get latest slot (i.e. if min-slot is negative)
if minSlot < 0 {
log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot)
latestSlotOnBeaconChain := common.MustGetLatestSlot()
log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain)
minSlot = int64(latestSlotOnBeaconChain) + minSlot
// Run backfill
err = RunBackfill(db, relays, initCursor, minSlot)
if err != nil {
log.WithError(err).Fatal("backfill failed")
}
},
}

if minSlot != 0 {
log.Infof("Using min slot: %d", minSlot)
}
// BackfillOpts contains options for running the backfill
type BackfillOpts struct {
InitCursor uint64
MinSlot int64
}
Comment on lines +73 to +77
Copy link

Copilot AI Dec 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The BackfillOpts struct is defined but never used. The RunBackfill function takes individual parameters instead. Either remove this unused struct or refactor RunBackfill to accept BackfillOpts for consistency with CheckPayloadValueOpts in the other file.

Copilot uses AI. Check for mistakes.

for _, relay := range relays {
log.Infof("Starting backfilling for relay %s ...", relay.Hostname())
backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot))
err = backfiller.backfillPayloadsDelivered()
if err != nil {
log.WithError(err).WithField("relay", relay).Error("backfill failed")
}
// RunBackfill runs the data API backfill for all given relays
func RunBackfill(db *database.DatabaseService, relays []common.RelayEntry, initCursor uint64, minSlot int64) error {
startTime := time.Now().UTC()

log.Infof("Using %d relays", len(relays))
for index, relay := range relays {
log.Infof("- relay #%d: %s", index+1, relay.Hostname())
}

// If needed, get latest slot (i.e. if min-slot is negative)
if minSlot < 0 {
log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot)
latestSlotOnBeaconChain := common.MustGetLatestSlot()
log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain)
minSlot = int64(latestSlotOnBeaconChain) + minSlot
}

if minSlot != 0 {
log.Infof("Using min slot: %d", minSlot)
}

for _, relay := range relays {
log.Infof("Starting backfilling for relay %s ...", relay.Hostname())
backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot))
err := backfiller.backfillPayloadsDelivered()
if err != nil {
log.WithError(err).WithField("relay", relay).Error("backfill failed")
}
}

timeNeeded := time.Since(startTime)
log.WithField("timeNeeded", timeNeeded).Info("All done!")
},
timeNeeded := time.Since(startTime)
log.WithField("timeNeeded", timeNeeded).Info("Backfill done!")
return nil
}

type backfiller struct {
Expand Down
Loading
Loading