diff --git a/cmd/evm-gateway/main.go b/cmd/evm-gateway/main.go index d5cb369..22a6b4c 100644 --- a/cmd/evm-gateway/main.go +++ b/cmd/evm-gateway/main.go @@ -3,14 +3,17 @@ package main import ( "fmt" "os" + "path/filepath" "github.com/spf13/pflag" "github.com/InjectiveLabs/sdk-go/chain/types" sdk "github.com/cosmos/cosmos-sdk/types" + dbm "github.com/cosmos/cosmos-db" "github.com/InjectiveLabs/evm-gateway/internal/app" "github.com/InjectiveLabs/evm-gateway/internal/config" + txindexer "github.com/InjectiveLabs/evm-gateway/internal/indexer" "github.com/InjectiveLabs/evm-gateway/internal/logging" "github.com/InjectiveLabs/evm-gateway/internal/telemetry" "github.com/InjectiveLabs/evm-gateway/version" @@ -42,6 +45,13 @@ type flagOverrides struct { } func main() { + // Dispatch subcommands before the normal flag parse so that "reindex" + // gets its own isolated flag set. + if len(os.Args) > 1 && os.Args[1] == "reindex" { + runReindex(os.Args[2:]) + return + } + flags := parseFlags() if flags.printVersion { fmt.Println(version.Version()) @@ -163,3 +173,77 @@ func fail(err error) { _, _ = os.Stderr.WriteString(err.Error() + "\n") os.Exit(1) } + +// runReindex implements the "reindex" subcommand. It deletes all indexed data +// for the given block range so that the syncer re-processes those blocks on +// the next startup. The gateway must NOT be running while this command executes. +// +// Usage: +// +// evm-gateway reindex --from --to [--data-dir ] [--db-backend ] +func runReindex(args []string) { + fs := pflag.NewFlagSet("reindex", pflag.ContinueOnError) + + var ( + from int64 + to int64 + envFile string + dataDir string + dbBackend string + logFormat string + ) + fs.Int64Var(&from, "from", 0, "First block height to clear (inclusive, required)") + fs.Int64Var(&to, "to", 0, "Last block height to clear (inclusive, required)") + fs.StringVar(&envFile, "env-file", "", "Path to .env file with WEB3INJ_ variables") + fs.StringVar(&dataDir, "data-dir", "", "Data directory for indexer DB (overrides env/config)") + fs.StringVar(&dbBackend, "db-backend", "", "DB backend type (overrides env/config)") + fs.StringVar(&logFormat, "log-format", "", "Log format: json or text") + + if err := fs.Parse(args); err != nil { + _, _ = fmt.Fprintf(os.Stderr, "reindex: %v\n\nUsage:\n", err) + fs.PrintDefaults() + os.Exit(1) + } + + if from <= 0 || to <= 0 { + _, _ = fmt.Fprintln(os.Stderr, "reindex: --from and --to are required and must be > 0") + fs.PrintDefaults() + os.Exit(1) + } + if from > to { + _, _ = fmt.Fprintf(os.Stderr, "reindex: --from (%d) must be <= --to (%d)\n", from, to) + os.Exit(1) + } + + cfg, err := config.Load(envFile) + if err != nil { + fail(err) + } + if dataDir != "" { + cfg.DataDir = dataDir + } + if dbBackend != "" { + cfg.DBBackend = dbBackend + } + if logFormat != "" { + cfg.LogFormat = logFormat + } + cfg.Expand() + + logger := logging.New(logging.Config{ + Format: cfg.LogFormat, + Verbose: cfg.LogVerbose, + Output: os.Stdout, + }) + + dbPath := filepath.Join(cfg.DataDir, "data") + db, err := dbm.NewDB("evmindexer", dbm.BackendType(cfg.DBBackend), dbPath) + if err != nil { + fail(fmt.Errorf("open indexer DB at %s: %w", dbPath, err)) + } + defer db.Close() + + if err := txindexer.ClearBlockRange(db, logger, from, to); err != nil { + fail(err) + } +} diff --git a/internal/evm/rpc/apis.go b/internal/evm/rpc/apis.go index ff657a2..119d39c 100644 --- a/internal/evm/rpc/apis.go +++ b/internal/evm/rpc/apis.go @@ -65,7 +65,10 @@ func init() { indexer txindexer.TxIndexer, status *syncstatus.Tracker, ) []rpc.API { - evmBackend := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + evmBackend, err := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + if err != nil { + panic(err) + } return []rpc.API{ { Namespace: EthNamespace, @@ -114,7 +117,10 @@ func init() { indexer txindexer.TxIndexer, status *syncstatus.Tracker, ) []rpc.API { - evmBackend := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + evmBackend, err := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + if err != nil { + panic(err) + } return []rpc.API{ { Namespace: DebugNamespace, @@ -131,7 +137,10 @@ func init() { indexer txindexer.TxIndexer, status *syncstatus.Tracker, ) []rpc.API { - evmBackend := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + evmBackend, err := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + if err != nil { + panic(err) + } return []rpc.API{ { Namespace: MinerNamespace, @@ -148,7 +157,10 @@ func init() { indexer txindexer.TxIndexer, status *syncstatus.Tracker, ) []rpc.API { - evmBackend := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + evmBackend, err := backend.NewBackend(logger, cfg, clientCtx, allowUnprotectedTxs, indexer, status) + if err != nil { + panic(err) + } return []rpc.API{ { Namespace: InjectiveNamespace, diff --git a/internal/evm/rpc/backend/backend.go b/internal/evm/rpc/backend/backend.go index 8d970fe..17ce891 100644 --- a/internal/evm/rpc/backend/backend.go +++ b/internal/evm/rpc/backend/backend.go @@ -19,6 +19,7 @@ import ( "github.com/ethereum/go-ethereum/params" "github.com/ethereum/go-ethereum/rpc" "github.com/ethereum/go-ethereum/signer/core/apitypes" + "github.com/pkg/errors" appconfig "github.com/InjectiveLabs/evm-gateway/internal/config" rpctypes "github.com/InjectiveLabs/evm-gateway/internal/evm/rpc/types" @@ -146,6 +147,7 @@ type Backend struct { indexer txindexer.TxIndexer syncStatus *syncstatus.Tracker processBlocker ProcessBlocker + cachedChainID *big.Int // immutable once set; avoids live gRPC calls in ChainID() } // NewBackend creates a new Backend instance for cosmos and ethereum namespaces @@ -156,7 +158,7 @@ func NewBackend( allowUnprotectedTxs bool, indexer txindexer.TxIndexer, syncStatus *syncstatus.Tracker, -) *Backend { +) (*Backend, error) { b := &Backend{ ctx: context.Background(), clientCtx: clientCtx, @@ -168,5 +170,22 @@ func NewBackend( syncStatus: syncStatus, } b.processBlocker = b.processBlock - return b + + // Fetch the EVM chain ID once from the node's chain config (eip155ChainId). + // This is the canonical EVM chain ID (e.g. 1776 for Injective mainnet) and + // differs from the Cosmos chain ID string ("injective-1") which ParseChainID + // would incorrectly parse as 1. + // Fetch the EVM chain ID once from the node's chain config (eip155ChainId). + // This is the canonical EVM chain ID (e.g. 1776 for Injective mainnet) and + // differs from the Cosmos chain ID string ("injective-1") which ParseChainID + // would incorrectly parse as 1. A hard failure here is intentional: starting + // with the wrong chain ID silently breaks tx signing and tracing. + evmChainConfig := b.ChainConfig() + if evmChainConfig == nil || evmChainConfig.ChainID == nil { + return nil, errors.New("could not fetch EVM chain ID from node at startup; ensure gRPC is reachable and the node is running") + } + b.cachedChainID = evmChainConfig.ChainID + b.logger.Info("cached EVM chain ID from node", "chainID", b.cachedChainID) + + return b, nil } diff --git a/internal/evm/rpc/backend/chain_info.go b/internal/evm/rpc/backend/chain_info.go index 01cec9e..8c8c557 100644 --- a/internal/evm/rpc/backend/chain_info.go +++ b/internal/evm/rpc/backend/chain_info.go @@ -25,9 +25,13 @@ import ( ) // ChainID is the EIP-155 replay-protection chain id for the current ethereum chain config. +// It returns the value cached at construction time to avoid live gRPC calls on every request. +// A defensive copy is returned so callers cannot mutate the cached value. func (b *Backend) ChainID() *hexutil.Big { - config := b.ChainConfig() - return (*hexutil.Big)(config.ChainID) + if b.cachedChainID == nil { + return nil + } + return (*hexutil.Big)(new(big.Int).Set(b.cachedChainID)) } // ChainConfig returns the latest ethereum chain configuration diff --git a/internal/evm/rpc/types/parsed_tx.go b/internal/evm/rpc/types/parsed_tx.go index b42d40c..2b21294 100644 --- a/internal/evm/rpc/types/parsed_tx.go +++ b/internal/evm/rpc/types/parsed_tx.go @@ -67,8 +67,11 @@ func ParseTxResult(result *abci.ExecTxResult, tx sdk.Tx) (*ParsedTxs, error) { p.Txs[i].Failed = true } + // parseFromLog enriches Hash and GasUsed from the ABCI log for VM-failed + // txs. A log parse failure should not discard the tx — the zero-hash + // fallback in the indexer will recover the hash from ethMsg directly. if err := p.parseFromLog(result.Log); err != nil { - return nil, err + _ = err // hash and gas used will be 0; acceptable vs. dropping the tx entirely } return p, nil @@ -162,6 +165,19 @@ func (p *ParsedTxs) parseFromLog(logText string) error { } txHash := common.HexToHash(vmErr.Hash) + + // If the entry already exists (populated from ethereum_tx events), update + // it in-place rather than appending a duplicate. This is the common path + // for VM-failed txs that emit events but omit the hash/gasUsed attributes. + if msgIndex >= 0 && msgIndex < len(p.Txs) { + p.Txs[msgIndex].Hash = txHash + p.Txs[msgIndex].GasUsed = vmErr.GasUsed + p.Txs[msgIndex].Failed = true + p.TxHashes[txHash] = msgIndex + return nil + } + + // No event-based entry exists — append a new one. parsedTx := ParsedTx{ MsgIndex: msgIndex, Hash: txHash, @@ -169,9 +185,8 @@ func (p *ParsedTxs) parseFromLog(logText string) error { GasUsed: vmErr.GasUsed, Failed: true, } - p.Txs = append(p.Txs, parsedTx) - p.TxHashes[txHash] = msgIndex + p.TxHashes[txHash] = len(p.Txs) - 1 return nil } diff --git a/internal/indexer/kv_indexer.go b/internal/indexer/kv_indexer.go index 99afb52..5ffa826 100644 --- a/internal/indexer/kv_indexer.go +++ b/internal/indexer/kv_indexer.go @@ -65,7 +65,7 @@ func (kv *KVIndexer) IndexBlock(block *cmtypes.Block, txResults []*abci.ExecTxRe } }(&err) - kv.logger.Debug("(KVIndexer) IndexBlock", "height", block.Height, "txns:", len(block.Txs)) + kv.logger.Debug("(KVIndexer) IndexBlock", "height", block.Height, "txns", len(block.Txs)) batch := kv.db.NewBatch() defer batch.Close() @@ -141,15 +141,42 @@ func (kv *KVIndexer) IndexBlock(block *cmtypes.Block, txResults []*abci.ExecTxRe parsedTx := txs.GetTxByMsgIndex(msgIndex) if parsedTx == nil { - kv.logger.Error("msg index not found in results", "msgIndex", msgIndex) - continue + // Positional lookup failed — fall back to a semantic search by + // ParsedTx.MsgIndex for cases where event parsing is sparse or + // non-contiguous. + for i := range txs.Txs { + if txs.Txs[i].MsgIndex == msgIndex { + parsedTx = &txs.Txs[i] + break + } + } } - if parsedTx.EthTxIndex >= 0 && parsedTx.EthTxIndex != ethTxIndex { - kv.logger.Error("eth tx index don't match", "expect", ethTxIndex, "found", parsedTx.EthTxIndex) + if parsedTx == nil { + if result.Code != abci.CodeTypeOK { + // No ethereum_tx events and no parseable ABCI log — derive + // what we can directly from the decoded message so the tx + // is still recorded and findable by hash. + txResult.GasUsed = ethMsg.GetGas() + txResult.Failed = true + txHash = ethMsg.Hash() + } else { + kv.logger.Error("msg index not found in results for successful tx", "msgIndex", msgIndex) + continue + } + } else { + if parsedTx.EthTxIndex >= 0 && parsedTx.EthTxIndex != ethTxIndex { + kv.logger.Error("eth tx index don't match", "expect", ethTxIndex, "found", parsedTx.EthTxIndex) + } + txResult.GasUsed = parsedTx.GasUsed + txResult.Failed = parsedTx.Failed + txHash = parsedTx.Hash + // For VM-failed txs the hash attribute may be absent from the + // ethereum_tx event, leaving parsedTx.Hash as zero. Fall back to + // the hash embedded in the decoded message. + if txHash == (common.Hash{}) { + txHash = ethMsg.Hash() + } } - txResult.GasUsed = parsedTx.GasUsed - txResult.Failed = parsedTx.Failed - txHash = parsedTx.Hash } cumulativeTxEthGasUsed += txResult.GasUsed diff --git a/internal/indexer/reindex.go b/internal/indexer/reindex.go new file mode 100644 index 0000000..19d689b --- /dev/null +++ b/internal/indexer/reindex.go @@ -0,0 +1,178 @@ +package indexer + +import ( + "encoding/json" + "fmt" + "log/slog" + + dbm "github.com/cosmos/cosmos-db" + sdk "github.com/cosmos/cosmos-sdk/types" + "github.com/ethereum/go-ethereum/common" +) + +const clearBatchSize = 500 + +// ClearBlockRange deletes all indexed data for blocks in the inclusive range +// [from, to] from the given DB. After this call the blocks appear as gaps to +// the syncer, which will re-index them on the next startup. +func ClearBlockRange(db dbm.DB, logger *slog.Logger, from, to int64) error { + if from > to { + return fmt.Errorf("from (%d) must be <= to (%d)", from, to) + } + + logger.Info("clearing indexed block range", "from", from, "to", to) + + // Step 1: delete height-keyed entries and collect block hashes for cleanup. + // We iterate in chunks so the batch stays manageable. + blockHashes := make([]common.Hash, 0) + for start := from; start <= to; start += clearBatchSize { + end := start + clearBatchSize - 1 + if end > to { + end = to + } + hashes, err := clearHeightKeys(db, start, end) + if err != nil { + return fmt.Errorf("clearing height keys [%d, %d]: %w", start, end, err) + } + blockHashes = append(blockHashes, hashes...) + logger.Info("cleared height-keyed entries", "start", start, "end", end) + } + + // Step 2: delete BlockHash entries (hash → height mapping). + if err := clearBlockHashKeys(db, blockHashes); err != nil { + return fmt.Errorf("clearing block hash keys: %w", err) + } + + // Step 3: scan TxIndex (block+ethTxIndex → txHash) to find and delete all + // tx-keyed entries: TxHash, TxIndex, RPCtxHash, RPCtxIndex, Receipt. + for start := from; start <= to; start += clearBatchSize { + end := start + clearBatchSize - 1 + if end > to { + end = to + } + if err := clearTxKeys(db, start, end); err != nil { + return fmt.Errorf("clearing tx keys [%d, %d]: %w", start, end, err) + } + logger.Info("cleared tx-keyed entries", "start", start, "end", end) + } + + logger.Info("block range cleared; restart the gateway to re-index", "from", from, "to", to) + return nil +} + +// clearHeightKeys deletes BlockMeta and BlockLogs for [from, to] in a single +// batch. It also returns the block hashes found in the BlockMeta entries so +// BlockHash keys can be cleaned up in a second pass. +func clearHeightKeys(db dbm.DB, from, to int64) ([]common.Hash, error) { + batch := db.NewBatch() + defer batch.Close() + + var blockHashes []common.Hash + + for height := from; height <= to; height++ { + metaKey := BlockMetaKey(height) + bz, err := db.Get(metaKey) + if err != nil { + return nil, fmt.Errorf("read block meta at height %d: %w", height, err) + } + if len(bz) > 0 { + var meta CachedBlockMeta + if jsonErr := json.Unmarshal(bz, &meta); jsonErr != nil { + return nil, fmt.Errorf("decode block meta at height %d: %w", height, jsonErr) + } + if meta.Hash != "" { + blockHashes = append(blockHashes, common.HexToHash(meta.Hash)) + } + if err := batch.Delete(metaKey); err != nil { + return nil, err + } + } + + if err := batch.Delete(BlockLogsKey(height)); err != nil { + return nil, err + } + } + + return blockHashes, batch.WriteSync() +} + +// clearBlockHashKeys deletes BlockHash (hash → height) entries. +func clearBlockHashKeys(db dbm.DB, hashes []common.Hash) error { + if len(hashes) == 0 { + return nil + } + batch := db.NewBatch() + defer batch.Close() + for _, h := range hashes { + if err := batch.Delete(BlockHashKey(h)); err != nil { + return err + } + } + return batch.WriteSync() +} + +// clearTxKeys scans TxIndex entries for [from, to] and deletes the +// corresponding TxHash, TxIndex, RPCtxHash, RPCtxIndex, and Receipt keys. +func clearTxKeys(db dbm.DB, from, to int64) error { + startKey := txIndexRangeStart(from) + endKey := txIndexRangeStart(to + 1) + + it, err := db.Iterator(startKey, endKey) + if err != nil { + return err + } + defer it.Close() + + batch := db.NewBatch() + defer batch.Close() + + for ; it.Valid(); it.Next() { + txHash := common.BytesToHash(it.Value()) + + // Delete TxHash (proto tx result) and the TxIndex entry itself. + if err := batch.Delete(TxHashKey(txHash)); err != nil { + return err + } + if err := batch.Delete(it.Key()); err != nil { + return err + } + } + it.Close() + + // Also scan RPCtxIndex for the same range to clean up cached RPC tx / + // receipt entries. + rpctxStart := rpctxIndexRangeStart(from) + rpctxEnd := rpctxIndexRangeStart(to + 1) + + rpcIt, err := db.Iterator(rpctxStart, rpctxEnd) + if err != nil { + return err + } + defer rpcIt.Close() + + for ; rpcIt.Valid(); rpcIt.Next() { + txHash := common.BytesToHash(rpcIt.Value()) + + if err := batch.Delete(RPCtxHashKey(txHash)); err != nil { + return err + } + if err := batch.Delete(ReceiptKey(txHash)); err != nil { + return err + } + if err := batch.Delete(rpcIt.Key()); err != nil { + return err + } + } + + return batch.WriteSync() +} + +// txIndexRangeStart builds the inclusive start key for TxIndex range scans. +func txIndexRangeStart(height int64) []byte { + return append([]byte{KeyPrefixTxIndex}, sdk.Uint64ToBigEndian(uint64(height))...) +} + +// rpctxIndexRangeStart builds the inclusive start key for RPCtxIndex range scans. +func rpctxIndexRangeStart(height int64) []byte { + return append([]byte{KeyPrefixRPCtxIndex}, sdk.Uint64ToBigEndian(uint64(height))...) +}