diff --git a/README.md b/README.md index 867b042..c358641 100644 --- a/README.md +++ b/README.md @@ -88,6 +88,30 @@ More example commands: # Start the website (--dev reloads the template on every page load, for easier iteration) ./relayscan service website --dev + +# +# backfill-runner: Backfill + Check Service +# - a single service to continuously run these +# - default interval: 5 minutes +# +# Test with just one relay (flashbots) +./relayscan service backfill-runner --relay fb + +# Test with ultrasound relay and limited slots (last 50 slots) +./relayscan service backfill-runner --relay us --min-slot -50 + +# Combine flags for quick testing +./relayscan service backfill-runner --relay fb --min-slot -50 --skip-check-value + +# Custom interval +./relayscan service backfill-runner --interval 10m + +# Run once and exit (useful for testing) +./relayscan service backfill-runner --once + +# Skip one of the steps +./relayscan service backfill-runner --skip-backfill +./relayscan service backfill-runner --skip-check-value ``` ### Test & development diff --git a/cmd/core/check-payload-value.go b/cmd/core/check-payload-value.go index bb4bcc3..68097fa 100644 --- a/cmd/core/check-payload-value.go +++ b/cmd/core/check-payload-value.go @@ -48,78 +48,110 @@ func init() { var checkPayloadValueCmd = &cobra.Command{ Use: "check-payload-value", Short: "Check payload value for delivered payloads", - Run: checkPayloadValue, -} + Run: func(cmd *cobra.Command, args []string) { + client, err := ethclient.Dial(ethNodeURI) + if err != nil { + log.Fatalf("Failed to create RPC client for '%s'", ethNodeURI) + } + log.Infof("Using eth node: %s", ethNodeURI) -func checkPayloadValue(cmd *cobra.Command, args []string) { - var err error - startTime := time.Now().UTC() + client2 := client + if ethNodeBackupURI != "" { + client2, err = ethclient.Dial(ethNodeBackupURI) + if err != nil { + log.Fatalf("Failed to create backup RPC client for '%s'", ethNodeBackupURI) + } + log.Infof("Using eth backup node: %s", ethNodeBackupURI) + } - client, err := ethclient.Dial(ethNodeURI) - if err != nil { - log.Fatalf("Failed to create RPC client for '%s'", ethNodeURI) - } - log.Infof("Using eth node: %s", ethNodeURI) + // Connect to Postgres + db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) + + opts := CheckPayloadValueOpts{ + Limit: limit, + Slot: slot, + SlotMax: slotMax, + SlotMin: slotMin, + NumThreads: numThreads, + CheckIncorrectOnly: checkIncorrectOnly, + CheckMissedOnly: checkMissedOnly, + CheckTx: checkTx, + CheckAll: checkAll, + } - client2 := client - if ethNodeBackupURI != "" { - client2, err = ethclient.Dial(ethNodeBackupURI) + err = RunCheckPayloadValue(db, client, client2, opts) if err != nil { - log.Fatalf("Failed to create backup RPC client for '%s'", ethNodeBackupURI) + log.WithError(err).Fatal("check payload value failed") } - log.Infof("Using eth backup node: %s", ethNodeBackupURI) - } + }, +} - // Connect to Postgres - db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) +// CheckPayloadValueOpts contains options for running the payload value check +type CheckPayloadValueOpts struct { + Limit uint64 + Slot uint64 + SlotMax uint64 + SlotMin uint64 + NumThreads uint64 + CheckIncorrectOnly bool + CheckMissedOnly bool + CheckTx bool + CheckAll bool +} + +// RunCheckPayloadValue checks payload values for delivered payloads +func RunCheckPayloadValue(db *database.DatabaseService, client, client2 *ethclient.Client, opts CheckPayloadValueOpts) error { + startTime := time.Now().UTC() entries := []database.DataAPIPayloadDeliveredEntry{} query := `SELECT id, inserted_at, relay, epoch, slot, parent_hash, block_hash, builder_pubkey, proposer_pubkey, proposer_fee_recipient, gas_limit, gas_used, value_claimed_wei, value_claimed_eth, num_tx, block_number FROM ` + dbvars.TableDataAPIPayloadDelivered - if checkIncorrectOnly { + + var err error + if opts.CheckIncorrectOnly { query += ` WHERE value_check_ok=false ORDER BY slot DESC` - if limit > 0 { - query += fmt.Sprintf(" limit %d", limit) + if opts.Limit > 0 { + query += fmt.Sprintf(" limit %d", opts.Limit) } err = db.DB.Select(&entries, query) - } else if checkMissedOnly { + } else if opts.CheckMissedOnly { query += ` WHERE slot_missed=true ORDER BY slot DESC` - if limit > 0 { - query += fmt.Sprintf(" limit %d", limit) + if opts.Limit > 0 { + query += fmt.Sprintf(" limit %d", opts.Limit) } err = db.DB.Select(&entries, query) - } else if checkAll { - if slotMax > 0 { - query += fmt.Sprintf(" WHERE slot<=%d", slotMax) + } else if opts.CheckAll { + if opts.SlotMax > 0 { + query += fmt.Sprintf(" WHERE slot<=%d", opts.SlotMax) } query += ` ORDER BY slot DESC` - if limit > 0 { - query += fmt.Sprintf(" limit %d", limit) + if opts.Limit > 0 { + query += fmt.Sprintf(" limit %d", opts.Limit) } err = db.DB.Select(&entries, query) - } else if slot != 0 { + } else if opts.Slot != 0 { query += ` WHERE slot=$1` - err = db.DB.Select(&entries, query, slot) + err = db.DB.Select(&entries, query, opts.Slot) } else { - // query += ` WHERE value_check_ok IS NULL AND slot_missed IS NULL ORDER BY slot DESC LIMIT $1` query += ` WHERE value_check_ok IS NULL ORDER BY slot DESC LIMIT $1` - err = db.DB.Select(&entries, query, limit) + err = db.DB.Select(&entries, query, opts.Limit) } if err != nil { - log.WithError(err).Fatalf("couldn't get entries") + return fmt.Errorf("couldn't get entries: %w", err) } log.Infof("query: %s", query) log.Infof("got %d entries", len(entries)) if len(entries) == 0 { - return + return nil } wg := new(sync.WaitGroup) entryC := make(chan database.DataAPIPayloadDeliveredEntry) - if slot != 0 { - numThreads = 1 + threads := opts.NumThreads + if opts.Slot != 0 { + threads = 1 } - for i := 0; i < int(numThreads); i++ { //nolint:gosec,intrange + for i := 0; i < int(threads); i++ { //nolint:gosec,intrange log.Infof("starting worker %d", i+1) wg.Add(1) go startUpdateWorker(wg, db, client, client2, entryC) @@ -127,7 +159,7 @@ func checkPayloadValue(cmd *cobra.Command, args []string) { for _, entry := range entries { // possibly skip - if slotMin != 0 && entry.Slot < slotMin { + if opts.SlotMin != 0 && entry.Slot < opts.SlotMin { continue } @@ -137,7 +169,8 @@ func checkPayloadValue(cmd *cobra.Command, args []string) { wg.Wait() timeNeeded := time.Since(startTime) - log.WithField("timeNeeded", timeNeeded).Info("All done!") + log.WithField("timeNeeded", timeNeeded).Info("Check payload value done!") + return nil } func _getBalanceDiff(ethClient *ethclient.Client, address ethcommon.Address, blockNumber *big.Int) (*big.Int, error) { diff --git a/cmd/core/data-api-backfill.go b/cmd/core/data-api-backfill.go index 431eaa1..9a8c0b6 100644 --- a/cmd/core/data-api-backfill.go +++ b/cmd/core/data-api-backfill.go @@ -35,7 +35,6 @@ var backfillDataAPICmd = &cobra.Command{ Run: func(cmd *cobra.Command, args []string) { var err error var relays []common.RelayEntry - startTime := time.Now().UTC() if cliRelay != "" { var relayEntry common.RelayEntry @@ -59,38 +58,51 @@ var backfillDataAPICmd = &cobra.Command{ } log.Infof("Relayscan %s", vars.Version) - log.Infof("Using %d relays", len(relays)) - for index, relay := range relays { - log.Infof("- relay #%d: %s", index+1, relay.Hostname()) - } // Connect to Postgres db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) - // If needed, get latest slot (i.e. if min-slot is negative) - if minSlot < 0 { - log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot) - latestSlotOnBeaconChain := common.MustGetLatestSlot() - log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain) - minSlot = int64(latestSlotOnBeaconChain) + minSlot + // Run backfill + err = RunBackfill(db, relays, initCursor, minSlot) + if err != nil { + log.WithError(err).Fatal("backfill failed") } + }, +} - if minSlot != 0 { - log.Infof("Using min slot: %d", minSlot) - } +// RunBackfill runs the data API backfill for all given relays +func RunBackfill(db *database.DatabaseService, relays []common.RelayEntry, initCursor uint64, minSlot int64) error { + startTime := time.Now().UTC() - for _, relay := range relays { - log.Infof("Starting backfilling for relay %s ...", relay.Hostname()) - backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot)) - err = backfiller.backfillPayloadsDelivered() - if err != nil { - log.WithError(err).WithField("relay", relay).Error("backfill failed") - } + log.Infof("Using %d relays", len(relays)) + for index, relay := range relays { + log.Infof("- relay #%d: %s", index+1, relay.Hostname()) + } + + // If needed, get latest slot (i.e. if min-slot is negative) + if minSlot < 0 { + log.Infof("Getting latest slot from beaconcha.in for offset %d", minSlot) + latestSlotOnBeaconChain := common.MustGetLatestSlot() + log.Infof("Latest slot from beaconcha.in: %d", latestSlotOnBeaconChain) + minSlot = int64(latestSlotOnBeaconChain) + minSlot + } + + if minSlot != 0 { + log.Infof("Using min slot: %d", minSlot) + } + + for _, relay := range relays { + log.Infof("Starting backfilling for relay %s ...", relay.Hostname()) + backfiller := newBackfiller(db, relay, initCursor, uint64(minSlot)) + err := backfiller.backfillPayloadsDelivered() + if err != nil { + log.WithError(err).WithField("relay", relay).Error("backfill failed") } + } - timeNeeded := time.Since(startTime) - log.WithField("timeNeeded", timeNeeded).Info("All done!") - }, + timeNeeded := time.Since(startTime) + log.WithField("timeNeeded", timeNeeded).Info("Backfill done!") + return nil } type backfiller struct { diff --git a/cmd/service/backfill_runner.go b/cmd/service/backfill_runner.go new file mode 100644 index 0000000..27f2d40 --- /dev/null +++ b/cmd/service/backfill_runner.go @@ -0,0 +1,168 @@ +package service + +import ( + "os" + "os/signal" + "syscall" + "time" + + "github.com/ethereum/go-ethereum/ethclient" + "github.com/flashbots/relayscan/cmd/core" + "github.com/flashbots/relayscan/common" + "github.com/flashbots/relayscan/database" + "github.com/flashbots/relayscan/vars" + "github.com/spf13/cobra" +) + +var ( + runnerInterval time.Duration + runnerEthNodeURI string + runnerEthBackupURI string + runnerLimit uint64 + runnerNumThreads uint64 + runnerRunOnce bool + runnerSkipBackfill bool + runnerSkipCheckValue bool + runnerRelay string + runnerMinSlot int64 +) + +func init() { + backfillRunnerCmd.Flags().DurationVar(&runnerInterval, "interval", time.Duration(vars.DefaultBackfillRunnerInterval)*time.Minute, "interval between runs") + backfillRunnerCmd.Flags().StringVar(&runnerEthNodeURI, "eth-node", vars.DefaultEthNodeURI, "eth node URI") + backfillRunnerCmd.Flags().StringVar(&runnerEthBackupURI, "eth-node-backup", vars.DefaultEthBackupNodeURI, "eth backup node URI") + backfillRunnerCmd.Flags().Uint64Var(&runnerLimit, "limit", 1000, "limit for check-payload-value") + backfillRunnerCmd.Flags().Uint64Var(&runnerNumThreads, "threads", uint64(vars.DefaultBackfillRunnerNumThreads), "number of threads for check-payload-value") + backfillRunnerCmd.Flags().BoolVar(&runnerRunOnce, "once", false, "run once and exit") + backfillRunnerCmd.Flags().BoolVar(&runnerSkipBackfill, "skip-backfill", false, "skip data-api-backfill step") + backfillRunnerCmd.Flags().BoolVar(&runnerSkipCheckValue, "skip-check-value", false, "skip check-payload-value step") + backfillRunnerCmd.Flags().StringVar(&runnerRelay, "relay", "", "specific relay only (e.g. 'fb', 'us', or full URL)") + backfillRunnerCmd.Flags().Int64Var(&runnerMinSlot, "min-slot", 0, "minimum slot (negative for offset from latest)") +} + +var backfillRunnerCmd = &cobra.Command{ + Use: "backfill-runner", + Short: "Continuously run data-api-backfill and check-payload-value", + Run: func(cmd *cobra.Command, args []string) { + var err error + var relays []common.RelayEntry + + log.Infof("Relayscan backfill-runner %s starting...", vars.Version) + log.Infof("Interval: %s", runnerInterval) + + // Get relays + if runnerRelay != "" { + var relayEntry common.RelayEntry + switch runnerRelay { + case "fb": + relayEntry, err = common.NewRelayEntry(vars.RelayURLs[0], false) + case "us": + relayEntry, err = common.NewRelayEntry(vars.RelayURLs[1], false) + default: + relayEntry, err = common.NewRelayEntry(runnerRelay, false) + } + if err != nil { + log.WithField("relay", runnerRelay).WithError(err).Fatal("failed to decode relay") + } + relays = []common.RelayEntry{relayEntry} + } else { + relays, err = common.GetRelays() + if err != nil { + log.WithError(err).Fatal("failed to get relays") + } + } + log.Infof("Using %d relays", len(relays)) + for i, relay := range relays { + log.Infof("- relay #%d: %s", i+1, relay.Hostname()) + } + + if runnerMinSlot != 0 { + log.Infof("Using min-slot: %d", runnerMinSlot) + } + + // Connect to Postgres + db := database.MustConnectPostgres(log, vars.DefaultPostgresDSN) + + // Connect to eth nodes + var ethClient, ethClient2 *ethclient.Client + if !runnerSkipCheckValue { + if runnerEthNodeURI == "" { + log.Fatal("eth-node is required for check-payload-value") + } + ethClient, err = ethclient.Dial(runnerEthNodeURI) + if err != nil { + log.WithError(err).Fatalf("failed to connect to eth node: %s", runnerEthNodeURI) + } + log.Infof("Connected to eth node: %s", runnerEthNodeURI) + + ethClient2 = ethClient + if runnerEthBackupURI != "" { + ethClient2, err = ethclient.Dial(runnerEthBackupURI) + if err != nil { + log.WithError(err).Fatalf("failed to connect to backup eth node: %s", runnerEthBackupURI) + } + log.Infof("Connected to backup eth node: %s", runnerEthBackupURI) + } + } + + // Prepare check-payload-value options + checkOpts := core.CheckPayloadValueOpts{ + Limit: runnerLimit, + NumThreads: runnerNumThreads, + } + + // Run function + runBackfillCycle := func() { + log.Info("Starting backfill cycle...") + + // Step 1: data-api-backfill + if !runnerSkipBackfill { + log.Info("Running data-api-backfill...") + err := core.RunBackfill(db, relays, 0, runnerMinSlot) + if err != nil { + log.WithError(err).Error("data-api-backfill failed") + } + } + + // Step 2: check-payload-value + if !runnerSkipCheckValue { + log.Info("Running check-payload-value...") + err := core.RunCheckPayloadValue(db, ethClient, ethClient2, checkOpts) + if err != nil { + log.WithError(err).Error("check-payload-value failed") + } + } + + log.Info("Backfill cycle complete") + } + + // Run once immediately + runBackfillCycle() + + if runnerRunOnce { + log.Info("Run once mode, exiting") + return + } + + // Set up signal handling + sigC := make(chan os.Signal, 1) + signal.Notify(sigC, syscall.SIGINT, syscall.SIGTERM) + + // Run on interval + ticker := time.NewTicker(runnerInterval) + defer ticker.Stop() + + log.Infof("Waiting for next run in %s...", runnerInterval) + + for { + select { + case <-ticker.C: + runBackfillCycle() + log.Infof("Waiting for next run in %s...", runnerInterval) + case sig := <-sigC: + log.Infof("Received signal %s, shutting down...", sig) + return + } + } + }, +} diff --git a/cmd/service/service.go b/cmd/service/service.go index e1c6328..278da8f 100644 --- a/cmd/service/service.go +++ b/cmd/service/service.go @@ -22,4 +22,5 @@ var ServiceCmd = &cobra.Command{ func init() { ServiceCmd.AddCommand(websiteCmd) ServiceCmd.AddCommand(bidCollectCmd) + ServiceCmd.AddCommand(backfillRunnerCmd) } diff --git a/vars/vars.go b/vars/vars.go index ef36362..7d4c742 100644 --- a/vars/vars.go +++ b/vars/vars.go @@ -19,4 +19,7 @@ var ( DefaultLogLevel = relaycommon.GetEnv("LOG_LEVEL", "info") DefaultEthNodeURI = relaycommon.GetEnv("ETH_NODE_URI", "") DefaultEthBackupNodeURI = relaycommon.GetEnv("ETH_NODE_BACKUP_URI", "") + + DefaultBackfillRunnerInterval = cli.GetEnvInt("BACKFILL_RUNNER_INTERVAL_MIN", 5) + DefaultBackfillRunnerNumThreads = cli.GetEnvInt("BACKFILL_RUNNER_NUM_THREADS", 10) )