diff --git a/bridgesync/backfill_tx_sender_test.go b/bridgesync/backfill_tx_sender_test.go index e86219eb1..4c55a80ad 100644 --- a/bridgesync/backfill_tx_sender_test.go +++ b/bridgesync/backfill_tx_sender_test.go @@ -79,23 +79,6 @@ func TestBackfillTxnSender(t *testing.T) { require.NoError(t, meddler.Insert(tx, bridgeTableName, newTestBridge(1, 0, "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"))) - // Insert test claim record - _, err = tx.Exec(` - INSERT INTO claim ( - block_num, block_pos, global_index, origin_network, origin_address, - destination_address, amount, proof_local_exit_root, proof_rollup_exit_root, - mainnet_exit_root, rollup_exit_root, global_exit_root, destination_network, - metadata, is_message, block_timestamp, tx_hash - ) VALUES ( - 1, 1, '1', 1, '0x1234567890123456789012345678901234567890', - '0x0987654321098765432109876543210987654321', '1000000000000000000', - '', '', '', '', '', 2, - '', false, 1234567890, - '0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890' - ) - `) - require.NoError(t, err) - err = tx.Commit() require.NoError(t, err) diff --git a/bridgesync/migrations/bridgesync0015.sql b/bridgesync/migrations/bridgesync0015.sql new file mode 100644 index 000000000..d045266a7 --- /dev/null +++ b/bridgesync/migrations/bridgesync0015.sql @@ -0,0 +1,50 @@ +-- +migrate Down +CREATE TABLE IF NOT EXISTS claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + global_index TEXT NOT NULL, + origin_network INTEGER NOT NULL, + origin_address VARCHAR NOT NULL, + destination_address VARCHAR NOT NULL, + amount TEXT NOT NULL, + proof_local_exit_root VARCHAR, + proof_rollup_exit_root VARCHAR, + mainnet_exit_root VARCHAR, + rollup_exit_root VARCHAR, + global_exit_root VARCHAR, + destination_network INTEGER NOT NULL, + metadata BLOB, + is_message BOOLEAN, + tx_hash VARCHAR, + block_timestamp INTEGER, + type TEXT NOT NULL DEFAULT '', + PRIMARY KEY (block_num, block_pos) +); + +CREATE INDEX IF NOT EXISTS idx_claim_block_num_block_pos_desc ON claim (block_num DESC, block_pos DESC); +CREATE INDEX IF NOT EXISTS idx_claim_block_num_block_pos_asc ON claim (block_num ASC, block_pos ASC); +CREATE INDEX IF NOT EXISTS idx_claim_type_block ON claim (type, block_num); + +CREATE TABLE IF NOT EXISTS unset_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + unset_global_index_hash_chain VARCHAR NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) +); + +CREATE TABLE IF NOT EXISTS set_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) +); + +-- +migrate Up +DROP TABLE IF EXISTS set_claim; +DROP TABLE IF EXISTS unset_claim; +DROP TABLE IF EXISTS claim; diff --git a/bridgesync/migrations/migrations_test.go b/bridgesync/migrations/migrations_test.go index e48c56b48..5b7756cda 100644 --- a/bridgesync/migrations/migrations_test.go +++ b/bridgesync/migrations/migrations_test.go @@ -49,24 +49,6 @@ func TestMigration0001(t *testing.T) { metadata, deposit_count ) VALUES (1, 0, 0, 0, '0x0000', 0, '0x0000', 0, NULL, 0); - - INSERT INTO claim ( - block_num, - block_pos, - global_index, - origin_network, - origin_address, - destination_address, - amount, - proof_local_exit_root, - proof_rollup_exit_root, - mainnet_exit_root, - rollup_exit_root, - global_exit_root, - destination_network, - metadata, - is_message - ) VALUES (1, 0, 0, 0, '0x0000', '0x0000', 0, '0x000,0x000', '0x000,0x000', '0x000', '0x000', '0x0', 0, NULL, FALSE); `) require.NoError(t, err) err = tx.Commit() @@ -118,20 +100,6 @@ func TestMigration0002(t *testing.T) { from_address ) VALUES (1, 0, 0, 0, '0x3', 0, '0x0000', 0, NULL, 0, 1739270804, '0xabcd', '0x123'); - INSERT INTO claim ( - block_num, - block_pos, - global_index, - origin_network, - origin_address, - destination_address, - amount, - destination_network, - metadata, - is_message, - block_timestamp, - tx_hash - ) VALUES (1, 0, 0, 0, '0x3', '0x0000', 0, 0, NULL, FALSE, 1739270804, '0xabcd'); `) require.NoError(t, err) err = tx.Commit() @@ -185,27 +153,6 @@ func TestMigration0002(t *testing.T) { require.NoError(t, err) require.NotNil(t, bridge) require.Equal(t, uint64(1739270804), bridge.BlockTimestamp) - - var claim struct { - BlockNum uint64 `meddler:"block_num"` - BlockPos uint64 `meddler:"block_pos"` - GlobalIndex *big.Int `meddler:"global_index,bigint"` - OriginNetwork uint32 `meddler:"origin_network"` - OriginAddress string `meddler:"origin_address"` - DestinationAddress string `meddler:"destination_address"` - Amount *big.Int `meddler:"amount,bigint"` - DestinationNetwork uint32 `meddler:"destination_network"` - Metadata []byte `meddler:"metadata"` - IsMessage bool `meddler:"is_message"` - BlockTimestamp uint64 `meddler:"block_timestamp"` - TxHash string `meddler:"tx_hash"` - } - - err = meddler.QueryRow(db, &claim, - `SELECT * FROM claim`) - require.NoError(t, err) - require.NotNil(t, claim) - require.Equal(t, uint64(1739270804), claim.BlockTimestamp) } func TestMigrations0003(t *testing.T) { @@ -763,6 +710,40 @@ func TestMigration0013(t *testing.T) { err = tx.Commit() require.NoError(t, err) } +func TestMigration0015(t *testing.T) { + dbPath := path.Join(t.TempDir(), "bridgesyncTest0015.sqlite") + + database, err := db.NewSQLiteDB(dbPath) + require.NoError(t, err) + defer database.Close() + + // Run migrations up to 0014 — claim, set_claim and unset_claim still exist. + err = db.RunMigrationsDBExtended(log.GetDefaultLogger(), + database, GetUpTo("bridgesync0014"), nil, migrate.Up, db.NoLimitMigrations) + require.NoError(t, err) + + tableExists := func(name string) bool { + var count int + err := database.QueryRow( + `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?`, name).Scan(&count) + require.NoError(t, err) + return count > 0 + } + + require.True(t, tableExists("claim"), "claim table should exist before migration 0015") + require.True(t, tableExists("set_claim"), "set_claim table should exist before migration 0015") + require.True(t, tableExists("unset_claim"), "unset_claim table should exist before migration 0015") + + // Apply migration 0015. + err = db.RunMigrationsDBExtended(log.GetDefaultLogger(), + database, GetUpTo("bridgesync0015"), nil, migrate.Up, db.NoLimitMigrations) + require.NoError(t, err) + + require.False(t, tableExists("claim"), "claim table should be dropped by migration 0015") + require.False(t, tableExists("set_claim"), "set_claim table should be dropped by migration 0015") + require.False(t, tableExists("unset_claim"), "unset_claim table should be dropped by migration 0015") +} + func TestMigrationsDown(t *testing.T) { dbPath := path.Join(t.TempDir(), "bridgesyncTestDown.sqlite") err := RunMigrations(dbPath) diff --git a/bridgesync/processor_test.go b/bridgesync/processor_test.go index db69964d7..b41c47767 100644 --- a/bridgesync/processor_test.go +++ b/bridgesync/processor_test.go @@ -45,62 +45,6 @@ func newTestProcessor(dbPath string, syncerID string, logger *log.Logger, dbQuer return newProcessor(database, syncerID, logger, dbQueryTimeout) } -func TestBigIntString(t *testing.T) { - globalIndex := GenerateGlobalIndex(true, 0, 1093) - fmt.Println(globalIndex.String()) - - _, ok := new(big.Int).SetString(globalIndex.String(), 10) - require.True(t, ok) - - dbPath := filepath.Join(t.TempDir(), "bridgesyncTestBigIntString.sqlite") - - err := migrations.RunMigrations(dbPath) - require.NoError(t, err) - db, err := db.NewSQLiteDB(dbPath) - require.NoError(t, err) - - ctx := context.Background() - tx, err := db.BeginTx(ctx, nil) - require.NoError(t, err) - - claim := &claimsynctypes.Claim{ - BlockNum: 1, - BlockPos: 0, - GlobalIndex: GenerateGlobalIndex(true, 0, 1093), - OriginNetwork: 11, - Amount: big.NewInt(11), - OriginAddress: common.HexToAddress("0x11"), - DestinationAddress: common.HexToAddress("0x11"), - ProofLocalExitRoot: types.Proof{}, - ProofRollupExitRoot: types.Proof{}, - MainnetExitRoot: common.Hash{}, - RollupExitRoot: common.Hash{}, - GlobalExitRoot: common.Hash{}, - DestinationNetwork: 12, - Type: claimsynctypes.ClaimEvent, - } - - _, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, claim.BlockNum) - require.NoError(t, err) - require.NoError(t, meddler.Insert(tx, "claim", claim)) - - require.NoError(t, tx.Commit()) - - tx, err = db.BeginTx(ctx, nil) - require.NoError(t, err) - - rows, err := tx.Query(` - SELECT * FROM claim - WHERE block_num >= $1 AND block_num <= $2; - `, claim.BlockNum, claim.BlockNum) - require.NoError(t, err) - - claimsFromDB := []*claimsynctypes.Claim{} - require.NoError(t, meddler.ScanAll(rows, &claimsFromDB)) - require.Len(t, claimsFromDB, 1) - require.Equal(t, claim, claimsFromDB[0]) -} - func TestProcessor(t *testing.T) { path := path.Join(t.TempDir(), "bridgeSyncerProcessor.db") logger := log.WithFields("module", "bridge-syncer") diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go new file mode 100644 index 000000000..dbfe49fec --- /dev/null +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -0,0 +1,501 @@ +package storage + +import ( + "context" + "database/sql" + "errors" + "fmt" + "os" + "strings" + + "github.com/agglayer/aggkit/claimsync/storage/migrations" + aggkitcommon "github.com/agglayer/aggkit/common" + "github.com/agglayer/aggkit/db" + "github.com/agglayer/aggkit/log" +) + +// requiredBridgeTables are the bridgesync tables that must all exist for the import to proceed. +var requiredBridgeTables = []string{"block", "claim", "set_claim", "unset_claim"} + +// requiredBridgeMigration is the ID of the last bridgesync migration that modifies the +// schema of any of the tables listed in requiredBridgeTables. +// The bridge DB must have applied at least this migration before we can safely import. +// - bridgesync0012 - ALTER TABLE claim ADD COLUMN type +const requiredBridgeMigration = "bridgesync0012" + +// BridgeSyncerStatus holds the read-only inspection results produced by +// InspectBridgeSyncer. Each field is only meaningful when the fields it depends +// on are true (see field comments). +type BridgeSyncerStatus struct { + // BridgeDBExists reports whether the bridgesync database file exists on disk. + BridgeDBExists bool + // ClaimDBExists reports whether the claimsync database file already exists on disk. + ClaimDBExists bool + // MigrationOK reports whether requiredBridgeMigration has been applied to the + // bridge DB. Only meaningful when BridgeDBExists is true. + MigrationOK bool + // HasClaimData reports whether any of the claim, set_claim or unset_claim tables + // contain at least one row. Only meaningful when BridgeDBExists is true and the + // required tables are present. + HasClaimData bool +} + +// String returns a human-readable summary of the status. +func (s BridgeSyncerStatus) String() string { + return fmt.Sprintf( + "BridgeDBExists=%t ClaimDBExists=%t MigrationOK=%t HasClaimData=%t ShouldMigrate=%t", + s.BridgeDBExists, s.ClaimDBExists, s.MigrationOK, s.HasClaimData, s.ShouldMigrate(), + ) +} + +// Validate returns an error when the status indicates a blocking condition that +// requires user intervention. Specifically, it errors when the bridge DB contains +// claim data but the required bridge migration has not been applied, meaning the +// node must first be upgraded to an intermediate version that runs that migration. +func (s BridgeSyncerStatus) Validate() error { + if s.BridgeDBExists && !s.ClaimDBExists && s.HasClaimData && !s.MigrationOK { + return fmt.Errorf( + "bridge DB contains claim data but required migration %q has not been applied; "+ + "upgrade to the intermediate version first", + requiredBridgeMigration, + ) + } + return nil +} + +// ShouldMigrate reports whether a data migration from the bridgesync DB into the +// claimsync DB should be performed. It returns true only when all of the following +// conditions hold: +// - the bridge DB exists on disk, +// - the claim DB does not yet exist (migration not already done), +// - the required bridge migration has been applied, and +// - the bridge DB contains claim data worth copying. +func (s BridgeSyncerStatus) ShouldMigrate() bool { + return s.BridgeDBExists && !s.ClaimDBExists && s.MigrationOK && s.HasClaimData +} + +// InspectBridgeSyncer performs a read-only inspection of the bridge and claim +// database files and returns a BridgeSyncerStatus summary. It never writes to +// either database. +func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename string) (BridgeSyncerStatus, error) { + var status BridgeSyncerStatus + + if _, err := os.Stat(bridgeDBFilename); err == nil { + status.BridgeDBExists = true + } + if _, err := os.Stat(claimDBFilename); err == nil { + status.ClaimDBExists = true + } + + if !status.BridgeDBExists { + return status, nil + } + + bdb, err := db.NewSQLiteDB(bridgeDBFilename) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to open bridge DB: %w", err) + } + defer bdb.Close() + + conn, err := bdb.Conn(ctx) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to acquire connection: %w", err) + } + defer conn.Close() + + // Check that the required tables exist before querying anything else. + present, err := checkBridgeTablesOnConn(ctx, conn) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to check bridge tables: %w", err) + } + if !present { + return status, nil + } + + // Check whether the required migration has been applied. + // A missing gorp_migrations table is treated as MigrationOK = false, not an error. + // Any other failure (corruption, permissions, …) is surfaced so it is not silently masked. + var migCount int + err = conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration).Scan(&migCount) + if err != nil { + errMsg := strings.ToLower(err.Error()) + if !strings.Contains(errMsg, "no such table") || !strings.Contains(errMsg, "gorp_migrations") { + return status, fmt.Errorf("InspectBridgeSyncer: failed to query gorp_migrations: %w", err) + } + } else { + status.MigrationOK = migCount > 0 + } + + // Always check for claim data, even when the migration is missing, so that + // Validate() can distinguish a harmless empty-DB case from a blocking one. + // Check whether any claim-related tables contain data. + var rowCount int + err = conn.QueryRowContext(ctx, ` + SELECT COUNT(*) FROM ( + SELECT 1 FROM (SELECT 1 FROM claim LIMIT 1) + UNION ALL + SELECT 1 FROM (SELECT 1 FROM set_claim LIMIT 1) + UNION ALL + SELECT 1 FROM (SELECT 1 FROM unset_claim LIMIT 1) + )`).Scan(&rowCount) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to count claim rows: %w", err) + } + status.HasClaimData = rowCount > 0 + + return status, nil +} + +// ImportDataFromBridgesyncer copies block, claim, set_claim and unset_claim data from a +// bridgesync SQLite database (bridgeDBFilename) into the claimsync SQLite database +// (claimDBFilename), creating and migrating it if it does not yet exist. +// +// The caller is responsible for deciding whether a migration is needed (e.g. via +// InspectBridgeSyncer and BridgeSyncerStatus.ShouldMigrate) before calling this +// function. No precondition checks are performed here. +// +// The import is atomic: data is written to a temporary file first and only renamed +// to claimDBFilename on success, so a crash mid-import leaves claimDBFilename absent +// and the migration will be retried on the next startup. +// +// Column-level differences between bridge schema versions are handled automatically: +// - block.hash - present since bridgesync migration 0003; defaults to ". +// - claim.tx_hash - present since bridgesync migration 0002; defaults to ". +// - claim.block_timestamp - present since bridgesync migration 0002; defaults to 0. +// - claim.type - present since bridgesync migration 0012; defaults to ". +func ImportDataFromBridgesyncer(ctx context.Context, + logger aggkitcommon.Logger, + bridgeDBFilename string, + claimDBFilename string) error { + if logger == nil { + logger = log.WithFields("module", "ImportDataFromBridgesyncer") + } + + tmpFilename := claimDBFilename + ".import.tmp" + // Remove any leftover tmp file from a previous failed attempt. + os.Remove(tmpFilename) + + // All DB work happens on tmpFilename. The defers inside importDataToTmpFile + // guarantee the DB/connection/transaction are fully closed before we return, + // so the subsequent Rename is safe even on platforms that lock open files. + blocksImported, claimsImported, setClaimsImported, unsetClaimsImported, err := + importDataToTmpFile(ctx, logger, bridgeDBFilename, tmpFilename) + if err != nil { + os.Remove(tmpFilename) + return err + } + + if err := os.Rename(tmpFilename, claimDBFilename); err != nil { + os.Remove(tmpFilename) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to promote tmp DB: %w", err) + } + + logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d", + blocksImported, claimsImported, setClaimsImported, unsetClaimsImported) + return nil +} + +// importDataToTmpFile performs the actual copy from bridgeDBFilename into destFilename. +// It is a pure helper for ImportDataFromBridgesyncer: all deferred closes run when this +// function returns, ensuring the file is fully closed before the caller renames it. +func importDataToTmpFile(ctx context.Context, + logger aggkitcommon.Logger, + bridgeDBFilename string, + destFilename string) (blocksImported, claimsImported, setClaimsImported, unsetClaimsImported int64, err error) { + claimDB, err := db.NewSQLiteDB(destFilename) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) + } + defer claimDB.Close() + + if err := migrations.RunMigrations(logger, claimDB); err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) + } + + // Use a single connection so that ATTACH and the subsequent transaction share the + // same SQLite connection (ATTACH is per-connection in SQLite). + conn, err := claimDB.Conn(ctx) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) + } + defer conn.Close() + + // ATTACH the bridge DB so we can SELECT from it in the same query. + // The three characters that act as URI delimiters inside a SQLite file URI path + // ('%', '?', '#') are percent-encoded so they cannot be mistaken for query + // separators or fragment identifiers. The full URI is then passed as a bound + // parameter (not interpolated into SQL) to eliminate any injection risk. + escapedPath := strings.NewReplacer("%", "%25", "?", "%3F", "#", "%23").Replace(bridgeDBFilename) + attachURI := "file:" + escapedPath + "?mode=ro" + if _, err := conn.ExecContext(ctx, `ATTACH DATABASE ? AS bridge`, attachURI); err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) + } + defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck + + hasBlockHash, err := bridgeColumnExists(ctx, conn, "block", "hash") + if err != nil { + return 0, 0, 0, 0, err + } + hasClaimTxHash, err := bridgeColumnExists(ctx, conn, "claim", "tx_hash") + if err != nil { + return 0, 0, 0, 0, err + } + hasClaimBlockTimestamp, err := bridgeColumnExists(ctx, conn, "claim", "block_timestamp") + if err != nil { + return 0, 0, 0, 0, err + } + hasClaimType, err := bridgeColumnExists(ctx, conn, "claim", "type") + if err != nil { + return 0, 0, 0, 0, err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) + } + defer tx.Rollback() //nolint:errcheck + + blocksImported, err = importBlocks(tx, hasBlockHash) + if err != nil { + return 0, 0, 0, 0, err + } + claimsImported, err = importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType) + if err != nil { + return 0, 0, 0, 0, err + } + unsetClaimsImported, err = importUnsetClaims(tx) + if err != nil { + return 0, 0, 0, 0, err + } + setClaimsImported, err = importSetClaims(tx) + if err != nil { + return 0, 0, 0, 0, err + } + + if err := tx.Commit(); err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) + } + + return blocksImported, claimsImported, setClaimsImported, unsetClaimsImported, nil +} + +// checkBridgeTablesOnConn returns true only when all requiredBridgeTables exist in the +// main schema of the given connection. +func checkBridgeTablesOnConn(ctx context.Context, conn *sql.Conn) (bool, error) { + placeholders := make([]string, len(requiredBridgeTables)) + args := make([]any, len(requiredBridgeTables)) + for i, name := range requiredBridgeTables { + placeholders[i] = fmt.Sprintf("$%d", i+1) + args[i] = name + } + query := fmt.Sprintf( //nolint:gosec // placeholders contain only "$N" positional markers, no user input + `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN (%s)`, + strings.Join(placeholders, ","), + ) + var count int + if err := conn.QueryRowContext(ctx, query, args...).Scan(&count); err != nil { + return false, fmt.Errorf("checkBridgeTablesOnConn: %w", err) + } + return count == len(requiredBridgeTables), nil +} + +// bridgeColumnExists reports whether the given column exists in the named table of the +// attached 'bridge' schema by inspecting PRAGMA table_info. +func bridgeColumnExists(ctx context.Context, conn *sql.Conn, tableName, columnName string) (bool, error) { + rows, err := conn.QueryContext(ctx, fmt.Sprintf(`PRAGMA bridge.table_info(%s)`, tableName)) + if err != nil { + return false, fmt.Errorf("bridgeColumnExists: PRAGMA table_info(%s): %w", tableName, err) + } + defer rows.Close() + + for rows.Next() { + var cid int + var name, colType string + var notNull int + var dfltValue sql.NullString + var pk int + if err := rows.Scan(&cid, &name, &colType, ¬Null, &dfltValue, &pk); err != nil { + return false, fmt.Errorf("bridgeColumnExists: scan table_info(%s): %w", tableName, err) + } + if name == columnName { + return true, nil + } + } + return false, rows.Err() +} + +func importBlocks(tx *sql.Tx, hasHash bool) (int64, error) { + hashExpr := "''" + if hasHash { + hashExpr = "COALESCE(hash, '')" + } + result, err := tx.Exec(fmt.Sprintf( + `INSERT OR IGNORE INTO main.block (num, hash) SELECT num, %s FROM bridge.block`, + hashExpr, + )) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import blocks: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +func importClaims(tx *sql.Tx, hasTxHash, hasBlockTimestamp, hasType bool) (int64, error) { + txHashExpr := "''" + if hasTxHash { + txHashExpr = "COALESCE(tx_hash, '')" + } + blockTimestampExpr := "0" + if hasBlockTimestamp { + blockTimestampExpr = "COALESCE(block_timestamp, 0)" + } + typeExpr := "''" + if hasType { + typeExpr = "COALESCE(type, '')" + } + result, err := tx.Exec(fmt.Sprintf(` + INSERT OR IGNORE INTO main.claim ( + block_num, block_pos, tx_hash, global_index, + origin_network, origin_address, destination_address, amount, + proof_local_exit_root, proof_rollup_exit_root, + mainnet_exit_root, rollup_exit_root, global_exit_root, + destination_network, metadata, is_message, block_timestamp, type + ) + SELECT + block_num, block_pos, %s, global_index, + origin_network, origin_address, destination_address, amount, + proof_local_exit_root, proof_rollup_exit_root, + mainnet_exit_root, rollup_exit_root, global_exit_root, + destination_network, metadata, is_message, %s, %s + FROM bridge.claim`, txHashExpr, blockTimestampExpr, typeExpr)) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import claims: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +func importUnsetClaims(tx *sql.Tx) (int64, error) { + result, err := tx.Exec(` + INSERT OR IGNORE INTO main.unset_claim + (block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain, created_at) + SELECT block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain, created_at + FROM bridge.unset_claim`) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import unset_claims: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +func importSetClaims(tx *sql.Tx) (int64, error) { + result, err := tx.Exec(` + INSERT OR IGNORE INTO main.set_claim + (block_num, block_pos, tx_hash, global_index, created_at) + SELECT block_num, block_pos, tx_hash, global_index, created_at + FROM bridge.set_claim`) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import set_claims: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +// ImportKeyValueFromBridgesyncer copies the single key_value row from the bridgesync +// SQLite database (bridgeDBFilename) into the claimsync SQLite database (claimDBFilename), +// replacing the original owner value with the provided owner parameter. +// +// The function is a no-op when the key_value table does not exist in the bridge DB or +// contains no rows. In that case the claimDB is not created at all. +// The import is idempotent: an existing row with the same (owner, key) is silently skipped +// (INSERT OR IGNORE). +func ImportKeyValueFromBridgesyncer( + ctx context.Context, bridgeDBFilename string, claimDBFilename string, owner string) error { + logger := log.WithFields("module", "ImportKeyValueFromBridgesyncer") + + // Phase 1 - read the single key_value row from the bridge DB without touching the claim DB. + row, err := readBridgeKeyValueRow(ctx, bridgeDBFilename) + if err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to read bridge key_value: %w", err) + } + if row == nil { + logger.Infof("no key_value data found in bridge DB - skipping import") + return nil + } + + // Phase 2 - open / create the claim DB, run migrations and insert the row. + claimDB, err := db.NewSQLiteDB(claimDBFilename) + if err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to open claim DB: %w", err) + } + defer claimDB.Close() + + if err := migrations.RunMigrations(logger, claimDB); err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to run claim DB migrations: %w", err) + } + + _, err = claimDB.ExecContext(ctx, ` + INSERT OR IGNORE INTO key_value (owner, key, value, updated_at) + VALUES ($1, $2, $3, $4)`, + owner, row.key, row.value, row.updatedAt) + if err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to insert key_value row: %w", err) + } + + logger.Infof("key_value import from bridgesyncer complete (owner=%s key=%s)", owner, row.key) + return nil +} + +// keyValueRow holds the fields of a key_value table row. +type keyValueRow struct { + key string + value string + updatedAt int64 +} + +// readBridgeKeyValueRow opens bridgeDBFilename and returns the single key_value row, or +// nil if the table does not exist or is empty. +func readBridgeKeyValueRow(ctx context.Context, bridgeDBFilename string) (*keyValueRow, error) { + bdb, err := db.NewSQLiteDB(bridgeDBFilename) + if err != nil { + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to open bridge DB: %w", err) + } + defer bdb.Close() + + // Check that the key_value table exists. + var tableCount int + err = bdb.QueryRowContext(ctx, + `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='key_value'`). + Scan(&tableCount) + if err != nil { + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to check key_value table: %w", err) + } + if tableCount == 0 { + return nil, nil + } + + const compatibilityKey = "compatibility_content" + + var count int + err = bdb.QueryRowContext(ctx, + `SELECT COUNT(*) FROM key_value WHERE key = $1`, compatibilityKey).Scan(&count) + if err != nil { + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to count key_value rows: %w", err) + } + if count != 1 { + return nil, fmt.Errorf("readBridgeKeyValueRow: expected exactly 1 row with key=%q, got %d", compatibilityKey, count) + } + + row := &keyValueRow{} + err = bdb.QueryRowContext(ctx, + `SELECT key, value, updated_at FROM key_value WHERE key = $1`, compatibilityKey). + Scan(&row.key, &row.value, &row.updatedAt) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to read row: %w", err) + } + return row, nil +} diff --git a/claimsync/storage/import_data_from_bridgesyncer_test.go b/claimsync/storage/import_data_from_bridgesyncer_test.go new file mode 100644 index 000000000..dafddc4e1 --- /dev/null +++ b/claimsync/storage/import_data_from_bridgesyncer_test.go @@ -0,0 +1,513 @@ +package storage + +import ( + "context" + "database/sql" + "math/big" + "os" + "path/filepath" + "testing" + + "github.com/agglayer/aggkit/db" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +// bridgeDB helpers --------------------------------------------------------------- + +// newBridgeDB creates a minimal bridgesync-like SQLite database at path and returns +// the open *sql.DB. The schema matches bridgesync after all migrations have run +// (block.hash, claim.type, set_claim, unset_claim all present). +func newBridgeDB(t *testing.T, path string) *sql.DB { + t.Helper() + bdb, err := db.NewSQLiteDB(path) + require.NoError(t, err) + t.Cleanup(func() { bdb.Close() }) + + _, err = bdb.Exec(` + CREATE TABLE gorp_migrations ( + id VARCHAR(255) NOT NULL PRIMARY KEY, + applied_at DATETIME + ); + INSERT INTO gorp_migrations (id, applied_at) VALUES ('` + requiredBridgeMigration + `', strftime('%s','now')); + CREATE TABLE block ( + num BIGINT PRIMARY KEY, + hash VARCHAR + ); + CREATE TABLE claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR, + global_index TEXT NOT NULL, + origin_network INTEGER NOT NULL, + origin_address VARCHAR NOT NULL, + destination_address VARCHAR NOT NULL, + amount TEXT NOT NULL, + proof_local_exit_root VARCHAR, + proof_rollup_exit_root VARCHAR, + mainnet_exit_root VARCHAR, + rollup_exit_root VARCHAR, + global_exit_root VARCHAR, + destination_network INTEGER NOT NULL, + metadata BLOB, + is_message BOOLEAN, + block_timestamp INTEGER, + type TEXT NOT NULL DEFAULT '', + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE unset_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + unset_global_index_hash_chain VARCHAR NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE set_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + `) + require.NoError(t, err) + return bdb +} + +// insertBridgeBlock inserts a row into bridge.block. +func insertBridgeBlock(t *testing.T, bdb *sql.DB, num uint64, hash string) { + t.Helper() + _, err := bdb.Exec(`INSERT INTO block (num, hash) VALUES (?, ?)`, num, hash) + require.NoError(t, err) +} + +// insertBridgeClaim inserts a minimal row into bridge.claim. +func insertBridgeClaim(t *testing.T, bdb *sql.DB, blockNum uint64, blockPos uint64, globalIndex string) { + t.Helper() + _, err := bdb.Exec(` + INSERT INTO claim + (block_num, block_pos, global_index, origin_network, origin_address, + destination_address, amount, destination_network) + VALUES (?, ?, ?, 1, '0x1111', '0x2222', '100', 2)`, + blockNum, blockPos, globalIndex) + require.NoError(t, err) +} + +// insertBridgeSetClaim inserts a row into bridge.set_claim. +func insertBridgeSetClaim(t *testing.T, bdb *sql.DB, blockNum uint64, blockPos uint64, globalIndex string) { + t.Helper() + _, err := bdb.Exec(` + INSERT INTO set_claim (block_num, block_pos, tx_hash, global_index) + VALUES (?, ?, '0xabcd', ?)`, blockNum, blockPos, globalIndex) + require.NoError(t, err) +} + +// insertBridgeUnsetClaim inserts a row into bridge.unset_claim. +func insertBridgeUnsetClaim(t *testing.T, bdb *sql.DB, blockNum uint64, blockPos uint64, globalIndex string) { + t.Helper() + _, err := bdb.Exec(` + INSERT INTO unset_claim + (block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain) + VALUES (?, ?, '0xdead', ?, '0xbeef')`, blockNum, blockPos, globalIndex) + require.NoError(t, err) +} + +// count helpers + +func countRows(t *testing.T, claimPath, table string) int { + t.Helper() + d, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer d.Close() + var n int + require.NoError(t, d.QueryRow(`SELECT COUNT(*) FROM `+table).Scan(&n)) + return n +} + +// --------------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------------- + +func TestImportDataFromBridgesyncer_Success(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 10, common.HexToHash("0xaabb").Hex()) + insertBridgeClaim(t, bdb, 10, 0, big.NewInt(1).String()) + insertBridgeClaim(t, bdb, 10, 1, big.NewInt(2).String()) + insertBridgeSetClaim(t, bdb, 10, 2, big.NewInt(3).String()) + insertBridgeUnsetClaim(t, bdb, 10, 3, big.NewInt(4).String()) + bdb.Close() + + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + + require.Equal(t, 1, countRows(t, claimPath, "block")) + require.Equal(t, 2, countRows(t, claimPath, "claim")) + require.Equal(t, 1, countRows(t, claimPath, "set_claim")) + require.Equal(t, 1, countRows(t, claimPath, "unset_claim")) +} + +func TestImportDataFromBridgesyncer_Idempotent(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 5, common.HexToHash("0x1234").Hex()) + insertBridgeClaim(t, bdb, 5, 0, big.NewInt(99).String()) + bdb.Close() + + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + // Second call must succeed and not duplicate rows. + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + + require.Equal(t, 1, countRows(t, claimPath, "block")) + require.Equal(t, 1, countRows(t, claimPath, "claim")) +} + +// ── BridgeSyncerStatus.ShouldMigrate ───────────────────────────────────────── + +func TestBridgeSyncerStatus_ShouldMigrate(t *testing.T) { + tests := []struct { + name string + status BridgeSyncerStatus + want bool + }{ + { + name: "all conditions met", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: true, HasClaimData: true}, + want: true, + }, + { + name: "bridge DB missing", + status: BridgeSyncerStatus{BridgeDBExists: false, ClaimDBExists: false, MigrationOK: true, HasClaimData: true}, + want: false, + }, + { + name: "claim DB already exists", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: true, MigrationOK: true, HasClaimData: true}, + want: false, + }, + { + name: "migration not applied", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: false, HasClaimData: true}, + want: false, + }, + { + name: "no claim data", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: true, HasClaimData: false}, + want: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.want, tc.status.ShouldMigrate()) + }) + } +} + +// ── BridgeSyncerStatus.Validate ─────────────────────────────────────────────── + +func TestBridgeSyncerStatus_Validate(t *testing.T) { + tests := []struct { + name string + status BridgeSyncerStatus + wantErr bool + }{ + { + name: "blocking: bridge has data but migration missing", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: false, HasClaimData: true}, + wantErr: true, + }, + { + name: "ok: bridge has data and migration applied", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: true, HasClaimData: true}, + wantErr: false, + }, + { + name: "ok: bridge DB missing", + status: BridgeSyncerStatus{BridgeDBExists: false, ClaimDBExists: false, MigrationOK: false, HasClaimData: false}, + wantErr: false, + }, + { + name: "ok: claim DB already exists (no migration needed)", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: true, MigrationOK: false, HasClaimData: true}, + wantErr: false, + }, + { + name: "ok: migration missing but no claim data", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: false, HasClaimData: false}, + wantErr: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.status.Validate() + if tc.wantErr { + require.ErrorContains(t, err, requiredBridgeMigration) + } else { + require.NoError(t, err) + } + }) + } +} + +// ── InspectBridgeSyncer ─────────────────────────────────────────────────────── + +func TestInspectBridgeSyncer_BridgeDBNotExist(t *testing.T) { + dir := t.TempDir() + status, err := InspectBridgeSyncer(context.Background(), + filepath.Join(dir, "bridge.db"), filepath.Join(dir, "claim.db")) + require.NoError(t, err) + require.False(t, status.BridgeDBExists) + require.False(t, status.ClaimDBExists) + require.False(t, status.MigrationOK) + require.False(t, status.HasClaimData) +} + +func TestInspectBridgeSyncer_ClaimDBExists(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb := newBridgeDB(t, bridgePath) + bdb.Close() + f, err := os.Create(claimPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + + status, err := InspectBridgeSyncer(context.Background(), bridgePath, claimPath) + require.NoError(t, err) + require.True(t, status.BridgeDBExists) + require.True(t, status.ClaimDBExists) +} + +func TestInspectBridgeSyncer_MigrationMissing(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + + bdb := newBridgeDB(t, bridgePath) + _, err := bdb.Exec(`DELETE FROM gorp_migrations WHERE id = ?`, requiredBridgeMigration) + require.NoError(t, err) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0x01").Hex()) + insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) + bdb.Close() + + status, err := InspectBridgeSyncer(context.Background(), bridgePath, filepath.Join(dir, "claim.db")) + require.NoError(t, err) + require.True(t, status.BridgeDBExists) + require.False(t, status.MigrationOK) + // HasClaimData is populated even when the migration is missing so that + // Validate() can distinguish the blocking case from a harmless empty DB. + require.True(t, status.HasClaimData) + require.ErrorContains(t, status.Validate(), requiredBridgeMigration) +} + +func TestInspectBridgeSyncer_NoClaimData(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0xdeadbeef").Hex()) + bdb.Close() + + status, err := InspectBridgeSyncer(context.Background(), bridgePath, filepath.Join(dir, "claim.db")) + require.NoError(t, err) + require.True(t, status.BridgeDBExists) + require.True(t, status.MigrationOK) + require.False(t, status.HasClaimData) +} + +func TestInspectBridgeSyncer_WithClaimData(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0xaabb").Hex()) + insertBridgeClaim(t, bdb, 1, 0, big.NewInt(42).String()) + bdb.Close() + + status, err := InspectBridgeSyncer(context.Background(), bridgePath, filepath.Join(dir, "claim.db")) + require.NoError(t, err) + require.True(t, status.BridgeDBExists) + require.False(t, status.ClaimDBExists) + require.True(t, status.MigrationOK) + require.True(t, status.HasClaimData) +} + +// ── ImportKeyValueFromBridgesyncer ──────────────────────────────────────────── + +func TestImportKeyValueFromBridgesyncer_NoTable(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB exists but has no key_value table. + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + require.NoError(t, bdb.Ping()) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "my-owner")) + + // Claim DB must not have been created. + _, statErr := os.Stat(claimPath) + require.True(t, os.IsNotExist(statErr)) +} + +func TestImportKeyValueFromBridgesyncer_EmptyTable(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + _, err = bdb.Exec(`CREATE TABLE key_value ( + owner VARCHAR NOT NULL, key VARCHAR NOT NULL, + value VARCHAR, updated_at INTEGER NOT NULL, + PRIMARY KEY (key, owner))`) + require.NoError(t, err) + bdb.Close() + + require.ErrorContains(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "my-owner"), `expected exactly 1 row with key="compatibility_content"`) +} + +func TestImportKeyValueFromBridgesyncer_Success(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + _, err = bdb.Exec(`CREATE TABLE key_value ( + owner VARCHAR NOT NULL, key VARCHAR NOT NULL, + value VARCHAR, updated_at INTEGER NOT NULL, + PRIMARY KEY (key, owner))`) + require.NoError(t, err) + _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compatibility_content', 'data', 1000)`) + require.NoError(t, err) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "new-owner")) + + cdb, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer cdb.Close() + + var owner, key, value string + var updatedAt int64 + err = cdb.QueryRow(`SELECT owner, key, value, updated_at FROM key_value LIMIT 1`). + Scan(&owner, &key, &value, &updatedAt) + require.NoError(t, err) + require.Equal(t, "new-owner", owner) + require.Equal(t, "compatibility_content", key) + require.Equal(t, "data", value) + require.Equal(t, int64(1000), updatedAt) +} + +func TestImportKeyValueFromBridgesyncer_Idempotent(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + _, err = bdb.Exec(`CREATE TABLE key_value ( + owner VARCHAR NOT NULL, key VARCHAR NOT NULL, + value VARCHAR, updated_at INTEGER NOT NULL, + PRIMARY KEY (key, owner))`) + require.NoError(t, err) + _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compatibility_content', 'data', 1000)`) + require.NoError(t, err) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "new-owner")) + // Second call must not fail and must not duplicate the row. + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "new-owner")) + + require.Equal(t, 1, countRows(t, claimPath, "key_value")) +} + +// ── OldSchemaNoHash ─────────────────────────────────────────────────────────── + +func TestImportDataFromBridgesyncer_OldSchemaNoHash(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge_old.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB without block.hash (pre-migration 0003) + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + // Simulates bridgesync schema after migration 0001 only: + // block has no hash, claim has no tx_hash / block_timestamp / type. + // gorp_migrations must still contain the required migration entry. + _, err = bdb.Exec(` + CREATE TABLE gorp_migrations ( + id VARCHAR(255) NOT NULL PRIMARY KEY, + applied_at DATETIME + ); + INSERT INTO gorp_migrations (id, applied_at) VALUES ('` + requiredBridgeMigration + `', strftime('%s','now')); + CREATE TABLE block (num BIGINT PRIMARY KEY); + CREATE TABLE claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + global_index TEXT NOT NULL, + origin_network INTEGER NOT NULL, + origin_address VARCHAR NOT NULL, + destination_address VARCHAR NOT NULL, + amount TEXT NOT NULL, + proof_local_exit_root VARCHAR, + proof_rollup_exit_root VARCHAR, + mainnet_exit_root VARCHAR, + rollup_exit_root VARCHAR, + global_exit_root VARCHAR, + destination_network INTEGER NOT NULL, + metadata BLOB, + is_message BOOLEAN, + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE set_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE unset_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + unset_global_index_hash_chain VARCHAR NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + `) + require.NoError(t, err) + _, err = bdb.Exec(`INSERT INTO block (num) VALUES (1)`) + require.NoError(t, err) + _, err = bdb.Exec(` + INSERT INTO claim + (block_num, block_pos, global_index, origin_network, origin_address, + destination_address, amount, destination_network) + VALUES (1, 0, '42', 1, '0xaaaa', '0xbbbb', '50', 2)`) + require.NoError(t, err) + bdb.Close() + + err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + + // block.hash should default to '' + cdb, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer cdb.Close() + var hash string + require.NoError(t, cdb.QueryRowContext(context.Background(), `SELECT hash FROM block WHERE num = 1`).Scan(&hash)) + require.Equal(t, "", hash) + + require.Equal(t, 1, countRows(t, claimPath, "claim")) +} diff --git a/cmd/run.go b/cmd/run.go index cf5fa1e64..57bbc101e 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -31,6 +31,7 @@ import ( "github.com/agglayer/aggkit/bridgeservice" "github.com/agglayer/aggkit/bridgesync" "github.com/agglayer/aggkit/claimsync" + claimsyncstorage "github.com/agglayer/aggkit/claimsync/storage" claimsynctypes "github.com/agglayer/aggkit/claimsync/types" aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/config" @@ -90,7 +91,6 @@ func start(cliCtx *cli.Context) error { prometheus.Init() } log.Debugf("Components to run: %v", components) - l1Client := runL1ClientIfNeeded(cliCtx.Context, cfg.L1NetworkConfig.RPC) l2Client := runL2ClientIfNeeded(cliCtx.Context, components, cfg.Common.L2RPC) reorgDetectorL1, errChanL1 := runReorgDetectorL1IfNeeded(cliCtx.Context, components, l1Client, &cfg.ReorgDetectorL1) @@ -133,6 +133,10 @@ func start(cliCtx *cli.Context) error { rpcServices = append(rpcServices, l1InfoTreeSync.GetRPCServices()...) } + if isNeeded([]string{aggkitcommon.BRIDGE, aggkitcommon.L1BRIDGESYNC}, components) { + runImportFromBridgeSyncerIfNeeded(ctx, cfg.BridgeL1Sync.DBPath, cfg.ClaimL1Sync.DBPath, claimsynctypes.L1ClaimSyncer) + } + l1ClaimSync := runClaimSyncL1IfNeeded(ctx, components, cfg.ClaimL1Sync, reorgDetectorL1, l1Client, MainnetID) if l1ClaimSync != nil { rpcServices = append(rpcServices, l1ClaimSync.GetRPCServices()...) @@ -145,6 +149,12 @@ func start(cliCtx *cli.Context) error { return fmt.Errorf("failed to get initial local exit root: %w", err) } + if isNeeded([]string{ + aggkitcommon.AGGSENDER, aggkitcommon.AGGSENDERVALIDATOR, aggkitcommon.AGGCHAINPROOFGEN, + aggkitcommon.BRIDGE, aggkitcommon.L2CLAIMSYNC, aggkitcommon.L2BRIDGESYNC}, components) { + runImportFromBridgeSyncerIfNeeded(ctx, cfg.BridgeL2Sync.DBPath, cfg.ClaimL2Sync.DBPath, claimsynctypes.L2ClaimSyncer) + } + l2ClaimSync := runClaimSyncL2IfNeeded( ctx, components, cfg.ClaimL2Sync, reorgDetectorL2, l2Client, rollupDataQuerier.RollupID) if l2ClaimSync != nil { @@ -971,6 +981,41 @@ func runClaimSyncL2IfNeeded( return res } +// runImportFromBridgeSyncerIfNeeded migrates claim data from an existing bridgesync +// database into the claimsync database before the syncer starts. It is a no-op when +// bridgeDBPath is empty or the bridge DB contains no claim data. +func runImportFromBridgeSyncerIfNeeded( + ctx context.Context, + bridgeDBPath string, + claimDBPath string, + syncerID claimsynctypes.ClaimSyncerID, +) { + if bridgeDBPath == "" { + return + } + logger := log.WithFields("module", "ImportFromBridgeSyncer", "syncerID", syncerID.String()) + status, err := claimsyncstorage.InspectBridgeSyncer(ctx, bridgeDBPath, claimDBPath) + if err != nil { + logger.Fatalf("failed to inspect bridge DB: %v", err) + } + if err := status.Validate(); err != nil { + logger.Fatalf("bridge DB migration blocked: %v", err) + } + if !status.ShouldMigrate() { + logger.Infof("no migration needed. %s", status.String()) + return + } + logger.Infof("migration from bridgesyncer to claimsyncer needed, starting migration process. %s", status.String()) + if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { + logger.Fatalf("failed to import claim data from bridge DB: %v", err) + } + if err := claimsyncstorage.ImportKeyValueFromBridgesyncer( + ctx, bridgeDBPath, claimDBPath, syncerID.String()); err != nil { + logger.Fatalf("failed to import key_value from bridge DB: %v", err) + } + logger.Infof("migration from bridgesyncer to claimsyncer completed successfully") +} + func runAggsenderMultisigCommitteeIfNeeded( components []string, rollupAddr common.Address,