From 97ca6508d0489c5817f9acc034380d36ba08acf6 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Mon, 23 Mar 2026 17:42:32 +0100 Subject: [PATCH 01/12] feat: migrate bridgesync claim data into claimsync DB on startup Introduce `runImportFromBridgeSyncerIfNeeded` which copies block, claim, set_claim, unset_claim and key_value rows from an existing bridgesync SQLite database into the claimsync database before the syncers start. The migration is triggered at the call site in `start()`: - for L1 when BRIDGE or L1BRIDGESYNC are active - for L2 when any of BRIDGE, L2BRIDGESYNC, L2CLAIMSYNC, AGGSENDER, AGGSENDERVALIDATOR or AGGCHAINPROOFGEN are active It delegates to `claimsyncstorage.ImportDataFromBridgesyncer` and `claimsyncstorage.ImportKeyValueFromBridgesyncer`, both of which are idempotent and no-ops when the source DB has no relevant data. Co-Authored-By: Claude Sonnet 4.6 --- .../storage/import_data_from_bridgesyncer.go | 395 ++++++++++++++++++ .../import_data_from_bridgesyncer_test.go | 395 ++++++++++++++++++ cmd/run.go | 32 ++ 3 files changed, 822 insertions(+) create mode 100644 claimsync/storage/import_data_from_bridgesyncer.go create mode 100644 claimsync/storage/import_data_from_bridgesyncer_test.go diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go new file mode 100644 index 000000000..6b1aa308b --- /dev/null +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -0,0 +1,395 @@ +package storage + +import ( + "context" + "database/sql" + "errors" + "fmt" + "strings" + + "github.com/agglayer/aggkit/claimsync/storage/migrations" + aggkitcommon "github.com/agglayer/aggkit/common" + "github.com/agglayer/aggkit/db" + "github.com/agglayer/aggkit/log" +) + +// requiredBridgeTables are the bridgesync tables that must all exist for the import to proceed. +var requiredBridgeTables = []string{"block", "claim", "set_claim", "unset_claim"} + +// requiredBridgeMigration is the ID of the last bridgesync migration that modifies the +// schema of any of the tables listed in requiredBridgeTables. +// The bridge DB must have applied at least this migration before we can safely import. +// - bridgesync0012 – ALTER TABLE claim ADD COLUMN type +const requiredBridgeMigration = "bridgesync0012" + +// ImportDataFromBridgesyncer copies block, claim, set_claim and unset_claim data from a +// bridgesync SQLite database (bridgeDBFilename) into the claimsync SQLite database +// (claimDBFilename), creating and migrating it if it does not yet exist. +// +// The function is a no-op when the required source tables are absent in the bridge DB or +// when none of claim/set_claim/unset_claim contain any rows. In that case the claimDB is +// not created at all. +// The import is idempotent: rows that already exist in the destination are silently +// skipped (INSERT OR IGNORE). +// +// Column-level differences between schema versions are handled automatically: +// - block.hash – present since bridgesync migration 0003; defaults to ''. +// - claim.tx_hash – present since bridgesync migration 0002; defaults to ''. +// - claim.block_timestamp – present since bridgesync migration 0002; defaults to 0. +// - claim.type – present since bridgesync migration 0012; defaults to ''. +func ImportDataFromBridgesyncer(ctx context.Context, logger aggkitcommon.Logger, bridgeDBFilename string, claimDBFilename string) error { + if logger == nil { + logger = log.WithFields("module", "ImportDataFromBridgesyncer") + } + + // Phase 1 – inspect the bridge DB without touching the claim DB. + hasData, err := bridgeHasClaimData(ctx, bridgeDBFilename) + if err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to inspect bridge DB: %w", err) + } + if !hasData { + logger.Infof("no claim data found in bridge DB – skipping import") + return nil + } + + // Phase 2 – open / create the claim DB and run migrations. + claimDB, err := db.NewSQLiteDB(claimDBFilename) + if err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) + } + defer claimDB.Close() + + if err := migrations.RunMigrations(logger, claimDB); err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) + } + + // Use a single connection so that ATTACH and the subsequent transaction share the + // same SQLite connection (ATTACH is per-connection in SQLite). + conn, err := claimDB.Conn(ctx) + if err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) + } + defer conn.Close() + + // ATTACH the bridge DB so we can SELECT from it in the same query. + attachSQL := fmt.Sprintf(`ATTACH DATABASE 'file:%s' AS bridge`, bridgeDBFilename) + if _, err := conn.ExecContext(ctx, attachSQL); err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) + } + defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck + + hasBlockHash, err := bridgeColumnExists(ctx, conn, "block", "hash") + if err != nil { + return err + } + hasClaimTxHash, err := bridgeColumnExists(ctx, conn, "claim", "tx_hash") + if err != nil { + return err + } + hasClaimBlockTimestamp, err := bridgeColumnExists(ctx, conn, "claim", "block_timestamp") + if err != nil { + return err + } + hasClaimType, err := bridgeColumnExists(ctx, conn, "claim", "type") + if err != nil { + return err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) + } + defer tx.Rollback() //nolint:errcheck + + blocksImported, err := importBlocks(tx, hasBlockHash) + if err != nil { + return err + } + claimsImported, err := importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType) + if err != nil { + return err + } + unsetClaimsImported, err := importUnsetClaims(tx) + if err != nil { + return err + } + setClaimsImported, err := importSetClaims(tx) + if err != nil { + return err + } + + if err := tx.Commit(); err != nil { + return fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) + } + + logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d", + blocksImported, claimsImported, setClaimsImported, unsetClaimsImported) + return nil +} + +// bridgeHasClaimData opens bridgeDBFilename directly, checks that all required tables +// exist, and returns true if any of claim/set_claim/unset_claim contain at least one row. +func bridgeHasClaimData(ctx context.Context, bridgeDBFilename string) (bool, error) { + bdb, err := db.NewSQLiteDB(bridgeDBFilename) + if err != nil { + return false, fmt.Errorf("bridgeHasClaimData: failed to open bridge DB: %w", err) + } + defer bdb.Close() + + conn, err := bdb.Conn(ctx) + if err != nil { + return false, fmt.Errorf("bridgeHasClaimData: failed to acquire connection: %w", err) + } + defer conn.Close() + + // Re-use the existing helper but against the main schema of the bridge DB. + present, err := checkBridgeTablesOnConn(ctx, conn) + if err != nil { + return false, err + } + if !present { + return false, nil + } + + if err := checkBridgeMigration(ctx, conn); err != nil { + return false, err + } + + var count int + err = conn.QueryRowContext(ctx, ` + SELECT COUNT(*) FROM ( + SELECT 1 FROM (SELECT 1 FROM claim LIMIT 1) + UNION ALL + SELECT 1 FROM (SELECT 1 FROM set_claim LIMIT 1) + UNION ALL + SELECT 1 FROM (SELECT 1 FROM unset_claim LIMIT 1) + )`).Scan(&count) + if err != nil { + return false, fmt.Errorf("bridgeHasClaimData: failed to count claim rows: %w", err) + } + return count > 0, nil +} + +// checkBridgeMigration returns an error if requiredBridgeMigration has not been applied +// to the bridge DB, which means its schema may be incomplete for a safe import. +func checkBridgeMigration(ctx context.Context, conn *sql.Conn) error { + var count int + err := conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration). + Scan(&count) + if err != nil { + return fmt.Errorf("checkBridgeMigration: failed to query gorp_migrations: %w", err) + } + if count == 0 { + return fmt.Errorf("checkBridgeMigration: bridge DB has not applied required migration %q", requiredBridgeMigration) + } + return nil +} + +// checkBridgeTablesOnConn returns true only when all requiredBridgeTables exist in the +// main schema of the given connection. +func checkBridgeTablesOnConn(ctx context.Context, conn *sql.Conn) (bool, error) { + placeholders := make([]string, len(requiredBridgeTables)) + args := make([]any, len(requiredBridgeTables)) + for i, name := range requiredBridgeTables { + placeholders[i] = fmt.Sprintf("$%d", i+1) + args[i] = name + } + query := fmt.Sprintf( + `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN (%s)`, + strings.Join(placeholders, ","), + ) + var count int + if err := conn.QueryRowContext(ctx, query, args...).Scan(&count); err != nil { + return false, fmt.Errorf("checkBridgeTablesOnConn: %w", err) + } + return count == len(requiredBridgeTables), nil +} + +// bridgeColumnExists reports whether the given column exists in the named table of the +// attached 'bridge' schema by inspecting PRAGMA table_info. +func bridgeColumnExists(ctx context.Context, conn *sql.Conn, tableName, columnName string) (bool, error) { + rows, err := conn.QueryContext(ctx, fmt.Sprintf(`PRAGMA bridge.table_info(%s)`, tableName)) + if err != nil { + return false, fmt.Errorf("bridgeColumnExists: PRAGMA table_info(%s): %w", tableName, err) + } + defer rows.Close() + + for rows.Next() { + var cid int + var name, colType string + var notNull int + var dfltValue sql.NullString + var pk int + if err := rows.Scan(&cid, &name, &colType, ¬Null, &dfltValue, &pk); err != nil { + return false, fmt.Errorf("bridgeColumnExists: scan table_info(%s): %w", tableName, err) + } + if name == columnName { + return true, nil + } + } + return false, rows.Err() +} + +func importBlocks(tx *sql.Tx, hasHash bool) (int64, error) { + hashExpr := "''" + if hasHash { + hashExpr = "COALESCE(hash, '')" + } + result, err := tx.Exec(fmt.Sprintf( + `INSERT OR IGNORE INTO main.block (num, hash) SELECT num, %s FROM bridge.block`, + hashExpr, + )) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import blocks: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +func importClaims(tx *sql.Tx, hasTxHash, hasBlockTimestamp, hasType bool) (int64, error) { + txHashExpr := "''" + if hasTxHash { + txHashExpr = "COALESCE(tx_hash, '')" + } + blockTimestampExpr := "0" + if hasBlockTimestamp { + blockTimestampExpr = "COALESCE(block_timestamp, 0)" + } + typeExpr := "''" + if hasType { + typeExpr = "COALESCE(type, '')" + } + result, err := tx.Exec(fmt.Sprintf(` + INSERT OR IGNORE INTO main.claim ( + block_num, block_pos, tx_hash, global_index, + origin_network, origin_address, destination_address, amount, + proof_local_exit_root, proof_rollup_exit_root, + mainnet_exit_root, rollup_exit_root, global_exit_root, + destination_network, metadata, is_message, block_timestamp, type + ) + SELECT + block_num, block_pos, %s, global_index, + origin_network, origin_address, destination_address, amount, + proof_local_exit_root, proof_rollup_exit_root, + mainnet_exit_root, rollup_exit_root, global_exit_root, + destination_network, metadata, is_message, %s, %s + FROM bridge.claim`, txHashExpr, blockTimestampExpr, typeExpr)) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import claims: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +func importUnsetClaims(tx *sql.Tx) (int64, error) { + result, err := tx.Exec(` + INSERT OR IGNORE INTO main.unset_claim + (block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain, created_at) + SELECT block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain, created_at + FROM bridge.unset_claim`) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import unset_claims: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +func importSetClaims(tx *sql.Tx) (int64, error) { + result, err := tx.Exec(` + INSERT OR IGNORE INTO main.set_claim + (block_num, block_pos, tx_hash, global_index, created_at) + SELECT block_num, block_pos, tx_hash, global_index, created_at + FROM bridge.set_claim`) + if err != nil { + return 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to import set_claims: %w", err) + } + n, _ := result.RowsAffected() + return n, nil +} + +// ImportKeyValueFromBridgesyncer copies the single key_value row from the bridgesync +// SQLite database (bridgeDBFilename) into the claimsync SQLite database (claimDBFilename), +// replacing the original owner value with the provided owner parameter. +// +// The function is a no-op when the key_value table does not exist in the bridge DB or +// contains no rows. In that case the claimDB is not created at all. +// The import is idempotent: an existing row with the same (owner, key) is silently skipped +// (INSERT OR IGNORE). +func ImportKeyValueFromBridgesyncer(bridgeDBFilename string, claimDBFilename string, owner string) error { + logger := log.WithFields("module", "ImportKeyValueFromBridgesyncer") + ctx := context.Background() + + // Phase 1 – read the single key_value row from the bridge DB without touching the claim DB. + row, err := readBridgeKeyValueRow(ctx, bridgeDBFilename) + if err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to read bridge key_value: %w", err) + } + if row == nil { + logger.Infof("no key_value data found in bridge DB – skipping import") + return nil + } + + // Phase 2 – open / create the claim DB, run migrations and insert the row. + claimDB, err := db.NewSQLiteDB(claimDBFilename) + if err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to open claim DB: %w", err) + } + defer claimDB.Close() + + if err := migrations.RunMigrations(logger, claimDB); err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to run claim DB migrations: %w", err) + } + + _, err = claimDB.ExecContext(ctx, ` + INSERT OR IGNORE INTO key_value (owner, key, value, updated_at) + VALUES ($1, $2, $3, $4)`, + owner, row.key, row.value, row.updatedAt) + if err != nil { + return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to insert key_value row: %w", err) + } + + logger.Infof("key_value import from bridgesyncer complete (owner=%s key=%s)", owner, row.key) + return nil +} + +// keyValueRow holds the fields of a key_value table row. +type keyValueRow struct { + key string + value string + updatedAt int64 +} + +// readBridgeKeyValueRow opens bridgeDBFilename and returns the single key_value row, or +// nil if the table does not exist or is empty. +func readBridgeKeyValueRow(ctx context.Context, bridgeDBFilename string) (*keyValueRow, error) { + bdb, err := db.NewSQLiteDB(bridgeDBFilename) + if err != nil { + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to open bridge DB: %w", err) + } + defer bdb.Close() + + // Check that the key_value table exists. + var tableCount int + err = bdb.QueryRowContext(ctx, + `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='key_value'`). + Scan(&tableCount) + if err != nil { + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to check key_value table: %w", err) + } + if tableCount == 0 { + return nil, nil + } + + row := &keyValueRow{} + err = bdb.QueryRowContext(ctx, `SELECT key, value, updated_at FROM key_value LIMIT 1`). + Scan(&row.key, &row.value, &row.updatedAt) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + return nil, nil + } + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to read row: %w", err) + } + return row, nil +} + diff --git a/claimsync/storage/import_data_from_bridgesyncer_test.go b/claimsync/storage/import_data_from_bridgesyncer_test.go new file mode 100644 index 000000000..325046c47 --- /dev/null +++ b/claimsync/storage/import_data_from_bridgesyncer_test.go @@ -0,0 +1,395 @@ +package storage + +import ( + "context" + "database/sql" + "math/big" + "os" + "path/filepath" + "testing" + + "github.com/agglayer/aggkit/db" + "github.com/ethereum/go-ethereum/common" + "github.com/stretchr/testify/require" +) + +// bridgeDB helpers --------------------------------------------------------------- + +// newBridgeDB creates a minimal bridgesync-like SQLite database at path and returns +// the open *sql.DB. The schema matches bridgesync after all migrations have run +// (block.hash, claim.type, set_claim, unset_claim all present). +func newBridgeDB(t *testing.T, path string) *sql.DB { + t.Helper() + bdb, err := db.NewSQLiteDB(path) + require.NoError(t, err) + t.Cleanup(func() { bdb.Close() }) + + _, err = bdb.Exec(` + CREATE TABLE gorp_migrations ( + id VARCHAR(255) NOT NULL PRIMARY KEY, + applied_at DATETIME + ); + INSERT INTO gorp_migrations (id, applied_at) VALUES ('` + requiredBridgeMigration + `', strftime('%s','now')); + CREATE TABLE block ( + num BIGINT PRIMARY KEY, + hash VARCHAR + ); + CREATE TABLE claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR, + global_index TEXT NOT NULL, + origin_network INTEGER NOT NULL, + origin_address VARCHAR NOT NULL, + destination_address VARCHAR NOT NULL, + amount TEXT NOT NULL, + proof_local_exit_root VARCHAR, + proof_rollup_exit_root VARCHAR, + mainnet_exit_root VARCHAR, + rollup_exit_root VARCHAR, + global_exit_root VARCHAR, + destination_network INTEGER NOT NULL, + metadata BLOB, + is_message BOOLEAN, + block_timestamp INTEGER, + type TEXT NOT NULL DEFAULT '', + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE unset_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + unset_global_index_hash_chain VARCHAR NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE set_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + `) + require.NoError(t, err) + return bdb +} + +// insertBridgeBlock inserts a row into bridge.block. +func insertBridgeBlock(t *testing.T, bdb *sql.DB, num uint64, hash string) { + t.Helper() + _, err := bdb.Exec(`INSERT INTO block (num, hash) VALUES (?, ?)`, num, hash) + require.NoError(t, err) +} + +// insertBridgeClaim inserts a minimal row into bridge.claim. +func insertBridgeClaim(t *testing.T, bdb *sql.DB, blockNum uint64, blockPos uint64, globalIndex string) { + t.Helper() + _, err := bdb.Exec(` + INSERT INTO claim + (block_num, block_pos, global_index, origin_network, origin_address, + destination_address, amount, destination_network) + VALUES (?, ?, ?, 1, '0x1111', '0x2222', '100', 2)`, + blockNum, blockPos, globalIndex) + require.NoError(t, err) +} + +// insertBridgeSetClaim inserts a row into bridge.set_claim. +func insertBridgeSetClaim(t *testing.T, bdb *sql.DB, blockNum uint64, blockPos uint64, globalIndex string) { + t.Helper() + _, err := bdb.Exec(` + INSERT INTO set_claim (block_num, block_pos, tx_hash, global_index) + VALUES (?, ?, '0xabcd', ?)`, blockNum, blockPos, globalIndex) + require.NoError(t, err) +} + +// insertBridgeUnsetClaim inserts a row into bridge.unset_claim. +func insertBridgeUnsetClaim(t *testing.T, bdb *sql.DB, blockNum uint64, blockPos uint64, globalIndex string) { + t.Helper() + _, err := bdb.Exec(` + INSERT INTO unset_claim + (block_num, block_pos, tx_hash, global_index, unset_global_index_hash_chain) + VALUES (?, ?, '0xdead', ?, '0xbeef')`, blockNum, blockPos, globalIndex) + require.NoError(t, err) +} + +// count helpers + +func countRows(t *testing.T, claimPath, table string) int { + t.Helper() + d, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer d.Close() + var n int + require.NoError(t, d.QueryRow(`SELECT COUNT(*) FROM `+table).Scan(&n)) + return n +} + +// --------------------------------------------------------------------------------- +// Tests +// --------------------------------------------------------------------------------- + +func TestImportDataFromBridgesyncer_NoTables(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB exists but has NO required tables + emptyDB, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + emptyDB.Close() + + err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) +} + +func TestImportDataFromBridgesyncer_EmptyTables(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB has all required tables but no rows – claimDB must not be created. + bdb := newBridgeDB(t, bridgePath) + bdb.Close() + + err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + + // The claim DB must not have been created. + _, statErr := os.Stat(claimPath) + require.True(t, os.IsNotExist(statErr), "claim DB should not be created when bridge has no claim data") +} + +func TestImportDataFromBridgesyncer_Success(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 10, common.HexToHash("0xaabb").Hex()) + insertBridgeClaim(t, bdb, 10, 0, big.NewInt(1).String()) + insertBridgeClaim(t, bdb, 10, 1, big.NewInt(2).String()) + insertBridgeSetClaim(t, bdb, 10, 2, big.NewInt(3).String()) + insertBridgeUnsetClaim(t, bdb, 10, 3, big.NewInt(4).String()) + bdb.Close() + + err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + + require.Equal(t, 1, countRows(t, claimPath, "block")) + require.Equal(t, 2, countRows(t, claimPath, "claim")) + require.Equal(t, 1, countRows(t, claimPath, "set_claim")) + require.Equal(t, 1, countRows(t, claimPath, "unset_claim")) +} + +func TestImportDataFromBridgesyncer_Idempotent(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 5, common.HexToHash("0x1234").Hex()) + insertBridgeClaim(t, bdb, 5, 0, big.NewInt(99).String()) + bdb.Close() + + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + // Second call must succeed and not duplicate rows + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + + require.Equal(t, 1, countRows(t, claimPath, "block")) + require.Equal(t, 1, countRows(t, claimPath, "claim")) +} + +// ── ImportKeyValueFromBridgesyncer ──────────────────────────────────────────── + +func TestImportKeyValueFromBridgesyncer_NoTable(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB exists but has no key_value table. + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + require.NoError(t, bdb.Ping()) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "my-owner")) + + // Claim DB must not have been created. + _, statErr := os.Stat(claimPath) + require.True(t, os.IsNotExist(statErr)) +} + +func TestImportKeyValueFromBridgesyncer_EmptyTable(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + _, err = bdb.Exec(`CREATE TABLE key_value ( + owner VARCHAR NOT NULL, key VARCHAR NOT NULL, + value VARCHAR, updated_at INTEGER NOT NULL, + PRIMARY KEY (key, owner))`) + require.NoError(t, err) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "my-owner")) + + _, statErr := os.Stat(claimPath) + require.True(t, os.IsNotExist(statErr)) +} + +func TestImportKeyValueFromBridgesyncer_Success(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + _, err = bdb.Exec(`CREATE TABLE key_value ( + owner VARCHAR NOT NULL, key VARCHAR NOT NULL, + value VARCHAR, updated_at INTEGER NOT NULL, + PRIMARY KEY (key, owner))`) + require.NoError(t, err) + _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compat', 'data', 1000)`) + require.NoError(t, err) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "new-owner")) + + cdb, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer cdb.Close() + + var owner, key, value string + var updatedAt int64 + err = cdb.QueryRow(`SELECT owner, key, value, updated_at FROM key_value LIMIT 1`). + Scan(&owner, &key, &value, &updatedAt) + require.NoError(t, err) + require.Equal(t, "new-owner", owner) + require.Equal(t, "compat", key) + require.Equal(t, "data", value) + require.Equal(t, int64(1000), updatedAt) +} + +func TestImportKeyValueFromBridgesyncer_Idempotent(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + _, err = bdb.Exec(`CREATE TABLE key_value ( + owner VARCHAR NOT NULL, key VARCHAR NOT NULL, + value VARCHAR, updated_at INTEGER NOT NULL, + PRIMARY KEY (key, owner))`) + require.NoError(t, err) + _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compat', 'data', 1000)`) + require.NoError(t, err) + bdb.Close() + + require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "new-owner")) + // Second call must not fail and must not duplicate the row. + require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "new-owner")) + + require.Equal(t, 1, countRows(t, claimPath, "key_value")) +} + +func TestImportDataFromBridgesyncer_MissingRequiredMigration(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB has all tables and data but the required migration is absent. + bdb := newBridgeDB(t, bridgePath) + _, err := bdb.Exec(`DELETE FROM gorp_migrations WHERE id = ?`, requiredBridgeMigration) + require.NoError(t, err) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0x01").Hex()) + insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) + bdb.Close() + + err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.ErrorContains(t, err, requiredBridgeMigration) +} + +// ── OldSchemaNoHash ─────────────────────────────────────────────────────────── + +func TestImportDataFromBridgesyncer_OldSchemaNoHash(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge_old.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB without block.hash (pre-migration 0003) + bdb, err := db.NewSQLiteDB(bridgePath) + require.NoError(t, err) + // Simulates bridgesync schema after migration 0001 only: + // block has no hash, claim has no tx_hash / block_timestamp / type. + // gorp_migrations must still contain the required migration entry. + _, err = bdb.Exec(` + CREATE TABLE gorp_migrations ( + id VARCHAR(255) NOT NULL PRIMARY KEY, + applied_at DATETIME + ); + INSERT INTO gorp_migrations (id, applied_at) VALUES ('` + requiredBridgeMigration + `', strftime('%s','now')); + CREATE TABLE block (num BIGINT PRIMARY KEY); + CREATE TABLE claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + global_index TEXT NOT NULL, + origin_network INTEGER NOT NULL, + origin_address VARCHAR NOT NULL, + destination_address VARCHAR NOT NULL, + amount TEXT NOT NULL, + proof_local_exit_root VARCHAR, + proof_rollup_exit_root VARCHAR, + mainnet_exit_root VARCHAR, + rollup_exit_root VARCHAR, + global_exit_root VARCHAR, + destination_network INTEGER NOT NULL, + metadata BLOB, + is_message BOOLEAN, + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE set_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + CREATE TABLE unset_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + unset_global_index_hash_chain VARCHAR NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) + ); + `) + require.NoError(t, err) + _, err = bdb.Exec(`INSERT INTO block (num) VALUES (1)`) + require.NoError(t, err) + _, err = bdb.Exec(` + INSERT INTO claim + (block_num, block_pos, global_index, origin_network, origin_address, + destination_address, amount, destination_network) + VALUES (1, 0, '42', 1, '0xaaaa', '0xbbbb', '50', 2)`) + require.NoError(t, err) + bdb.Close() + + err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + + // block.hash should default to '' + cdb, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer cdb.Close() + var hash string + require.NoError(t, cdb.QueryRowContext(context.Background(), `SELECT hash FROM block WHERE num = 1`).Scan(&hash)) + require.Equal(t, "", hash) + + require.Equal(t, 1, countRows(t, claimPath, "claim")) +} diff --git a/cmd/run.go b/cmd/run.go index cf5fa1e64..d208bdff7 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -31,6 +31,7 @@ import ( "github.com/agglayer/aggkit/bridgeservice" "github.com/agglayer/aggkit/bridgesync" "github.com/agglayer/aggkit/claimsync" + claimsyncstorage "github.com/agglayer/aggkit/claimsync/storage" claimsynctypes "github.com/agglayer/aggkit/claimsync/types" aggkitcommon "github.com/agglayer/aggkit/common" "github.com/agglayer/aggkit/config" @@ -133,6 +134,10 @@ func start(cliCtx *cli.Context) error { rpcServices = append(rpcServices, l1InfoTreeSync.GetRPCServices()...) } + if isNeeded([]string{aggkitcommon.BRIDGE, aggkitcommon.L1BRIDGESYNC}, components) { + runImportFromBridgeSyncerIfNeeded(ctx, cfg.BridgeL1Sync.DBPath, cfg.ClaimL1Sync.DBPath, claimsynctypes.L1ClaimSyncer) + } + l1ClaimSync := runClaimSyncL1IfNeeded(ctx, components, cfg.ClaimL1Sync, reorgDetectorL1, l1Client, MainnetID) if l1ClaimSync != nil { rpcServices = append(rpcServices, l1ClaimSync.GetRPCServices()...) @@ -145,6 +150,12 @@ func start(cliCtx *cli.Context) error { return fmt.Errorf("failed to get initial local exit root: %w", err) } + if isNeeded([]string{ + aggkitcommon.AGGSENDER, aggkitcommon.AGGSENDERVALIDATOR, aggkitcommon.AGGCHAINPROOFGEN, + aggkitcommon.BRIDGE, aggkitcommon.L2CLAIMSYNC, aggkitcommon.L2BRIDGESYNC}, components) { + runImportFromBridgeSyncerIfNeeded(ctx, cfg.BridgeL2Sync.DBPath, cfg.ClaimL2Sync.DBPath, claimsynctypes.L2ClaimSyncer) + } + l2ClaimSync := runClaimSyncL2IfNeeded( ctx, components, cfg.ClaimL2Sync, reorgDetectorL2, l2Client, rollupDataQuerier.RollupID) if l2ClaimSync != nil { @@ -971,6 +982,27 @@ func runClaimSyncL2IfNeeded( return res } +// runImportFromBridgeSyncerIfNeeded migrates claim data from an existing bridgesync +// database into the claimsync database before the syncer starts. It is a no-op when +// bridgeDBPath is empty or the bridge DB contains no claim data. +func runImportFromBridgeSyncerIfNeeded( + ctx context.Context, + bridgeDBPath string, + claimDBPath string, + syncerID claimsynctypes.ClaimSyncerID, +) { + if bridgeDBPath == "" { + return + } + logger := log.WithFields("module", "ImportFromBridgeSyncer", "syncerID", syncerID.String()) + if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { + log.Fatalf("failed to import claim data from bridge DB: %v", err) + } + if err := claimsyncstorage.ImportKeyValueFromBridgesyncer(bridgeDBPath, claimDBPath, syncerID.String()); err != nil { + log.Fatalf("failed to import key_value from bridge DB: %v", err) + } +} + func runAggsenderMultisigCommitteeIfNeeded( components []string, rollupAddr common.Address, From ddbaf450900fb136f5446133b550400c65be6973 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Mon, 23 Mar 2026 17:52:43 +0100 Subject: [PATCH 02/12] fix: linter --- claimsync/storage/import_data_from_bridgesyncer.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index 6b1aa308b..6c31e499d 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -33,11 +33,14 @@ const requiredBridgeMigration = "bridgesync0012" // skipped (INSERT OR IGNORE). // // Column-level differences between schema versions are handled automatically: -// - block.hash – present since bridgesync migration 0003; defaults to ''. -// - claim.tx_hash – present since bridgesync migration 0002; defaults to ''. +// - block.hash – present since bridgesync migration 0003; defaults to ”. +// - claim.tx_hash – present since bridgesync migration 0002; defaults to ”. // - claim.block_timestamp – present since bridgesync migration 0002; defaults to 0. -// - claim.type – present since bridgesync migration 0012; defaults to ''. -func ImportDataFromBridgesyncer(ctx context.Context, logger aggkitcommon.Logger, bridgeDBFilename string, claimDBFilename string) error { +// - claim.type – present since bridgesync migration 0012; defaults to ”. +func ImportDataFromBridgesyncer(ctx context.Context, + logger aggkitcommon.Logger, + bridgeDBFilename string, + claimDBFilename string) error { if logger == nil { logger = log.WithFields("module", "ImportDataFromBridgesyncer") } @@ -195,7 +198,7 @@ func checkBridgeTablesOnConn(ctx context.Context, conn *sql.Conn) (bool, error) placeholders[i] = fmt.Sprintf("$%d", i+1) args[i] = name } - query := fmt.Sprintf( + query := fmt.Sprintf( //nolint:gosec // placeholders contain only "$N" positional markers, no user input `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name IN (%s)`, strings.Join(placeholders, ","), ) @@ -392,4 +395,3 @@ func readBridgeKeyValueRow(ctx context.Context, bridgeDBFilename string) (*keyVa } return row, nil } - From bde3b4acab018477e3eb45fbc4d92c86c2fc804e Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:16:07 +0100 Subject: [PATCH 03/12] feat: add more cases --- .../storage/import_data_from_bridgesyncer.go | 84 +++++++++++-------- .../import_data_from_bridgesyncer_test.go | 79 +++++++++++++++-- cmd/run.go | 6 +- 3 files changed, 126 insertions(+), 43 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index 6c31e499d..aaeaca03a 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -5,6 +5,7 @@ import ( "database/sql" "errors" "fmt" + "os" "strings" "github.com/agglayer/aggkit/claimsync/storage/migrations" @@ -19,115 +20,128 @@ var requiredBridgeTables = []string{"block", "claim", "set_claim", "unset_claim" // requiredBridgeMigration is the ID of the last bridgesync migration that modifies the // schema of any of the tables listed in requiredBridgeTables. // The bridge DB must have applied at least this migration before we can safely import. -// - bridgesync0012 – ALTER TABLE claim ADD COLUMN type +// - bridgesync0012 - ALTER TABLE claim ADD COLUMN type const requiredBridgeMigration = "bridgesync0012" // ImportDataFromBridgesyncer copies block, claim, set_claim and unset_claim data from a // bridgesync SQLite database (bridgeDBFilename) into the claimsync SQLite database // (claimDBFilename), creating and migrating it if it does not yet exist. // -// The function is a no-op when the required source tables are absent in the bridge DB or -// when none of claim/set_claim/unset_claim contain any rows. In that case the claimDB is -// not created at all. -// The import is idempotent: rows that already exist in the destination are silently -// skipped (INSERT OR IGNORE). +// Return values: +// - (false, nil) - nothing to migrate: bridge DB not found, claimDB already exists, +// or bridge DB has no claim data. The claimDB is not created. +// - (true, nil) - migration completed successfully. +// - (true, error) - migration was needed but failed (e.g. missing required bridge +// migration, DB I/O error). The claimDB may be left in a partial state. // -// Column-level differences between schema versions are handled automatically: -// - block.hash – present since bridgesync migration 0003; defaults to ”. -// - claim.tx_hash – present since bridgesync migration 0002; defaults to ”. -// - claim.block_timestamp – present since bridgesync migration 0002; defaults to 0. -// - claim.type – present since bridgesync migration 0012; defaults to ”. +// Column-level differences between bridge schema versions are handled automatically: +// - block.hash - present since bridgesync migration 0003; defaults to ”. +// - claim.tx_hash - present since bridgesync migration 0002; defaults to ”. +// - claim.block_timestamp - present since bridgesync migration 0002; defaults to 0. +// - claim.type - present since bridgesync migration 0012; defaults to ”. func ImportDataFromBridgesyncer(ctx context.Context, logger aggkitcommon.Logger, bridgeDBFilename string, - claimDBFilename string) error { + claimDBFilename string) (bool, error) { if logger == nil { logger = log.WithFields("module", "ImportDataFromBridgesyncer") } - // Phase 1 – inspect the bridge DB without touching the claim DB. + // Skip import if the bridge DB does not exist yet. + if _, err := os.Stat(bridgeDBFilename); os.IsNotExist(err) { + logger.Infof("bridge DB not found - skipping import") + return false, nil + } + + // Skip import if the claim DB already exists (import was already performed). + if _, err := os.Stat(claimDBFilename); err == nil { + logger.Infof("claim DB already exists - skipping import") + return false, nil + } + + // Phase 1 - inspect the bridge DB without touching the claim DB. hasData, err := bridgeHasClaimData(ctx, bridgeDBFilename) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to inspect bridge DB: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to inspect bridge DB: %w", err) } if !hasData { - logger.Infof("no claim data found in bridge DB – skipping import") - return nil + logger.Infof("no claim data found in bridge DB - skipping import") + return false, nil } - // Phase 2 – open / create the claim DB and run migrations. + // Phase 2 - open / create the claim DB and run migrations. claimDB, err := db.NewSQLiteDB(claimDBFilename) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) } defer claimDB.Close() if err := migrations.RunMigrations(logger, claimDB); err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) } // Use a single connection so that ATTACH and the subsequent transaction share the // same SQLite connection (ATTACH is per-connection in SQLite). conn, err := claimDB.Conn(ctx) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) } defer conn.Close() // ATTACH the bridge DB so we can SELECT from it in the same query. attachSQL := fmt.Sprintf(`ATTACH DATABASE 'file:%s' AS bridge`, bridgeDBFilename) if _, err := conn.ExecContext(ctx, attachSQL); err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) } defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck hasBlockHash, err := bridgeColumnExists(ctx, conn, "block", "hash") if err != nil { - return err + return true, err } hasClaimTxHash, err := bridgeColumnExists(ctx, conn, "claim", "tx_hash") if err != nil { - return err + return true, err } hasClaimBlockTimestamp, err := bridgeColumnExists(ctx, conn, "claim", "block_timestamp") if err != nil { - return err + return true, err } hasClaimType, err := bridgeColumnExists(ctx, conn, "claim", "type") if err != nil { - return err + return true, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) } defer tx.Rollback() //nolint:errcheck blocksImported, err := importBlocks(tx, hasBlockHash) if err != nil { - return err + return true, err } claimsImported, err := importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType) if err != nil { - return err + return true, err } unsetClaimsImported, err := importUnsetClaims(tx) if err != nil { - return err + return true, err } setClaimsImported, err := importSetClaims(tx) if err != nil { - return err + return true, err } if err := tx.Commit(); err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) + return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) } logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d", blocksImported, claimsImported, setClaimsImported, unsetClaimsImported) - return nil + return true, nil } // bridgeHasClaimData opens bridgeDBFilename directly, checks that all required tables @@ -323,17 +337,17 @@ func ImportKeyValueFromBridgesyncer(bridgeDBFilename string, claimDBFilename str logger := log.WithFields("module", "ImportKeyValueFromBridgesyncer") ctx := context.Background() - // Phase 1 – read the single key_value row from the bridge DB without touching the claim DB. + // Phase 1 - read the single key_value row from the bridge DB without touching the claim DB. row, err := readBridgeKeyValueRow(ctx, bridgeDBFilename) if err != nil { return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to read bridge key_value: %w", err) } if row == nil { - logger.Infof("no key_value data found in bridge DB – skipping import") + logger.Infof("no key_value data found in bridge DB - skipping import") return nil } - // Phase 2 – open / create the claim DB, run migrations and insert the row. + // Phase 2 - open / create the claim DB, run migrations and insert the row. claimDB, err := db.NewSQLiteDB(claimDBFilename) if err != nil { return fmt.Errorf("ImportKeyValueFromBridgesyncer: failed to open claim DB: %w", err) diff --git a/claimsync/storage/import_data_from_bridgesyncer_test.go b/claimsync/storage/import_data_from_bridgesyncer_test.go index 325046c47..f4b2aae43 100644 --- a/claimsync/storage/import_data_from_bridgesyncer_test.go +++ b/claimsync/storage/import_data_from_bridgesyncer_test.go @@ -131,6 +131,69 @@ func countRows(t *testing.T, claimPath, table string) int { // Tests // --------------------------------------------------------------------------------- +func TestImportDataFromBridgesyncer_BridgeDBNotExist(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") // file does not exist + claimPath := filepath.Join(dir, "claim.db") + + migrated, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + require.False(t, migrated, "nothing to migrate when bridge DB does not exist") + + // claim DB must not be created. + _, statErr := os.Stat(claimPath) + require.True(t, os.IsNotExist(statErr), "claim DB should not be created when bridge DB does not exist") +} + +func TestImportDataFromBridgesyncer_ClaimDBAlreadyExists(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB with data that would normally be migrated. + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0x01").Hex()) + insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) + bdb.Close() + + // Create claim DB beforehand (simulates a node restart after a previous migration). + // Use os.Create to guarantee the file exists on disk before calling the import. + f, err := os.Create(claimPath) + require.NoError(t, err) + require.NoError(t, f.Close()) + + migrated, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + require.False(t, migrated, "nothing to migrate when claim DB already exists") + + // Import must have been skipped: claim DB has no tables (migrations never ran). + cdb2, err := db.NewSQLiteDB(claimPath) + require.NoError(t, err) + defer cdb2.Close() + var tableCount int + require.NoError(t, cdb2.QueryRow(`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='claim'`).Scan(&tableCount)) + require.Equal(t, 0, tableCount, "claim table must not exist when import was skipped") +} + +func TestImportDataFromBridgesyncer_NoDataToMigrate(t *testing.T) { + dir := t.TempDir() + bridgePath := filepath.Join(dir, "bridge.db") + claimPath := filepath.Join(dir, "claim.db") + + // Bridge DB has blocks but no claim/set_claim/unset_claim rows. + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0xdeadbeef").Hex()) + bdb.Close() + + migrated, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, err) + require.False(t, migrated, "nothing to migrate when bridge has no claim rows") + + // claim DB must not be created. + _, statErr := os.Stat(claimPath) + require.True(t, os.IsNotExist(statErr), "claim DB should not be created when bridge has no claim data") +} + func TestImportDataFromBridgesyncer_NoTables(t *testing.T) { dir := t.TempDir() bridgePath := filepath.Join(dir, "bridge.db") @@ -141,7 +204,7 @@ func TestImportDataFromBridgesyncer_NoTables(t *testing.T) { require.NoError(t, err) emptyDB.Close() - err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + _, err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) require.NoError(t, err) } @@ -154,7 +217,7 @@ func TestImportDataFromBridgesyncer_EmptyTables(t *testing.T) { bdb := newBridgeDB(t, bridgePath) bdb.Close() - err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + _, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) require.NoError(t, err) // The claim DB must not have been created. @@ -175,7 +238,7 @@ func TestImportDataFromBridgesyncer_Success(t *testing.T) { insertBridgeUnsetClaim(t, bdb, 10, 3, big.NewInt(4).String()) bdb.Close() - err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + _, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) require.NoError(t, err) require.Equal(t, 1, countRows(t, claimPath, "block")) @@ -194,9 +257,11 @@ func TestImportDataFromBridgesyncer_Idempotent(t *testing.T) { insertBridgeClaim(t, bdb, 5, 0, big.NewInt(99).String()) bdb.Close() - require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + _, errImport := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, errImport) // Second call must succeed and not duplicate rows - require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + _, errImport = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + require.NoError(t, errImport) require.Equal(t, 1, countRows(t, claimPath, "block")) require.Equal(t, 1, countRows(t, claimPath, "claim")) @@ -311,7 +376,7 @@ func TestImportDataFromBridgesyncer_MissingRequiredMigration(t *testing.T) { insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) bdb.Close() - err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + _, err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) require.ErrorContains(t, err, requiredBridgeMigration) } @@ -380,7 +445,7 @@ func TestImportDataFromBridgesyncer_OldSchemaNoHash(t *testing.T) { require.NoError(t, err) bdb.Close() - err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + _, err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) require.NoError(t, err) // block.hash should default to '' diff --git a/cmd/run.go b/cmd/run.go index d208bdff7..8eeecbacb 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -995,9 +995,13 @@ func runImportFromBridgeSyncerIfNeeded( return } logger := log.WithFields("module", "ImportFromBridgeSyncer", "syncerID", syncerID.String()) - if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { + migrated, err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath) + if err != nil { log.Fatalf("failed to import claim data from bridge DB: %v", err) } + if !migrated { + return + } if err := claimsyncstorage.ImportKeyValueFromBridgesyncer(bridgeDBPath, claimDBPath, syncerID.String()); err != nil { log.Fatalf("failed to import key_value from bridge DB: %v", err) } From 8209bd8e2b26fb82d78987e556569ae7f2772794 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 24 Mar 2026 10:52:12 +0100 Subject: [PATCH 04/12] feat: split function --- .../storage/import_data_from_bridgesyncer.go | 225 +++++++++--------- .../import_data_from_bridgesyncer_test.go | 203 ++++++++-------- cmd/run.go | 14 +- 3 files changed, 233 insertions(+), 209 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index aaeaca03a..b35e6e665 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -23,183 +23,194 @@ var requiredBridgeTables = []string{"block", "claim", "set_claim", "unset_claim" // - bridgesync0012 - ALTER TABLE claim ADD COLUMN type const requiredBridgeMigration = "bridgesync0012" +// BridgeSyncerStatus holds the read-only inspection results produced by +// InspectBridgeSyncer. Each field is only meaningful when the fields it depends +// on are true (see field comments). +type BridgeSyncerStatus struct { + // BridgeDBExists reports whether the bridgesync database file exists on disk. + BridgeDBExists bool + // ClaimDBExists reports whether the claimsync database file already exists on disk. + ClaimDBExists bool + // MigrationOK reports whether requiredBridgeMigration has been applied to the + // bridge DB. Only meaningful when BridgeDBExists is true. + MigrationOK bool + // HasClaimData reports whether any of the claim, set_claim or unset_claim tables + // contain at least one row. Only meaningful when MigrationOK is true. + HasClaimData bool +} + +// ShouldMigrate reports whether a data migration from the bridgesync DB into the +// claimsync DB should be performed. It returns true only when all of the following +// conditions hold: +// - the bridge DB exists on disk, +// - the claim DB does not yet exist (migration not already done), +// - the required bridge migration has been applied, and +// - the bridge DB contains claim data worth copying. +func (s BridgeSyncerStatus) ShouldMigrate() bool { + return s.BridgeDBExists && !s.ClaimDBExists && s.MigrationOK && s.HasClaimData +} + +// InspectBridgeSyncer performs a read-only inspection of the bridge and claim +// database files and returns a BridgeSyncerStatus summary. It never writes to +// either database. +func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename string) (BridgeSyncerStatus, error) { + var status BridgeSyncerStatus + + if _, err := os.Stat(bridgeDBFilename); err == nil { + status.BridgeDBExists = true + } + if _, err := os.Stat(claimDBFilename); err == nil { + status.ClaimDBExists = true + } + + if !status.BridgeDBExists { + return status, nil + } + + bdb, err := db.NewSQLiteDB(bridgeDBFilename) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to open bridge DB: %w", err) + } + defer bdb.Close() + + conn, err := bdb.Conn(ctx) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to acquire connection: %w", err) + } + defer conn.Close() + + // Check that the required tables exist before querying anything else. + present, err := checkBridgeTablesOnConn(ctx, conn) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to check bridge tables: %w", err) + } + if !present { + return status, nil + } + + // Check whether the required migration has been applied. + // A missing gorp_migrations table is treated as MigrationOK = false, not an error. + var migCount int + err = conn.QueryRowContext(ctx, + `SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration).Scan(&migCount) + if err == nil { + status.MigrationOK = migCount > 0 + } + + if !status.MigrationOK { + return status, nil + } + + // Check whether any claim-related tables contain data. + var rowCount int + err = conn.QueryRowContext(ctx, ` + SELECT COUNT(*) FROM ( + SELECT 1 FROM (SELECT 1 FROM claim LIMIT 1) + UNION ALL + SELECT 1 FROM (SELECT 1 FROM set_claim LIMIT 1) + UNION ALL + SELECT 1 FROM (SELECT 1 FROM unset_claim LIMIT 1) + )`).Scan(&rowCount) + if err != nil { + return status, fmt.Errorf("InspectBridgeSyncer: failed to count claim rows: %w", err) + } + status.HasClaimData = rowCount > 0 + + return status, nil +} + // ImportDataFromBridgesyncer copies block, claim, set_claim and unset_claim data from a // bridgesync SQLite database (bridgeDBFilename) into the claimsync SQLite database // (claimDBFilename), creating and migrating it if it does not yet exist. // -// Return values: -// - (false, nil) - nothing to migrate: bridge DB not found, claimDB already exists, -// or bridge DB has no claim data. The claimDB is not created. -// - (true, nil) - migration completed successfully. -// - (true, error) - migration was needed but failed (e.g. missing required bridge -// migration, DB I/O error). The claimDB may be left in a partial state. +// The caller is responsible for deciding whether a migration is needed (e.g. via +// InspectBridgeSyncer and BridgeSyncerStatus.ShouldMigrate) before calling this +// function. No precondition checks are performed here. // // Column-level differences between bridge schema versions are handled automatically: -// - block.hash - present since bridgesync migration 0003; defaults to ”. -// - claim.tx_hash - present since bridgesync migration 0002; defaults to ”. -// - claim.block_timestamp - present since bridgesync migration 0002; defaults to 0. -// - claim.type - present since bridgesync migration 0012; defaults to ”. +// - block.hash - present since bridgesync migration 0003; defaults to ”. +// - claim.tx_hash - present since bridgesync migration 0002; defaults to ”. +// - claim.block_timestamp - present since bridgesync migration 0002; defaults to 0. +// - claim.type - present since bridgesync migration 0012; defaults to ”. func ImportDataFromBridgesyncer(ctx context.Context, logger aggkitcommon.Logger, bridgeDBFilename string, - claimDBFilename string) (bool, error) { + claimDBFilename string) error { if logger == nil { logger = log.WithFields("module", "ImportDataFromBridgesyncer") } - // Skip import if the bridge DB does not exist yet. - if _, err := os.Stat(bridgeDBFilename); os.IsNotExist(err) { - logger.Infof("bridge DB not found - skipping import") - return false, nil - } - - // Skip import if the claim DB already exists (import was already performed). - if _, err := os.Stat(claimDBFilename); err == nil { - logger.Infof("claim DB already exists - skipping import") - return false, nil - } - - // Phase 1 - inspect the bridge DB without touching the claim DB. - hasData, err := bridgeHasClaimData(ctx, bridgeDBFilename) - if err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to inspect bridge DB: %w", err) - } - if !hasData { - logger.Infof("no claim data found in bridge DB - skipping import") - return false, nil - } - - // Phase 2 - open / create the claim DB and run migrations. claimDB, err := db.NewSQLiteDB(claimDBFilename) if err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) } defer claimDB.Close() if err := migrations.RunMigrations(logger, claimDB); err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) } // Use a single connection so that ATTACH and the subsequent transaction share the // same SQLite connection (ATTACH is per-connection in SQLite). conn, err := claimDB.Conn(ctx) if err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) } defer conn.Close() // ATTACH the bridge DB so we can SELECT from it in the same query. attachSQL := fmt.Sprintf(`ATTACH DATABASE 'file:%s' AS bridge`, bridgeDBFilename) if _, err := conn.ExecContext(ctx, attachSQL); err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) } defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck hasBlockHash, err := bridgeColumnExists(ctx, conn, "block", "hash") if err != nil { - return true, err + return err } hasClaimTxHash, err := bridgeColumnExists(ctx, conn, "claim", "tx_hash") if err != nil { - return true, err + return err } hasClaimBlockTimestamp, err := bridgeColumnExists(ctx, conn, "claim", "block_timestamp") if err != nil { - return true, err + return err } hasClaimType, err := bridgeColumnExists(ctx, conn, "claim", "type") if err != nil { - return true, err + return err } tx, err := conn.BeginTx(ctx, nil) if err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) } defer tx.Rollback() //nolint:errcheck blocksImported, err := importBlocks(tx, hasBlockHash) if err != nil { - return true, err + return err } claimsImported, err := importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType) if err != nil { - return true, err + return err } unsetClaimsImported, err := importUnsetClaims(tx) if err != nil { - return true, err + return err } setClaimsImported, err := importSetClaims(tx) if err != nil { - return true, err + return err } if err := tx.Commit(); err != nil { - return true, fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) + return fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) } logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d", blocksImported, claimsImported, setClaimsImported, unsetClaimsImported) - return true, nil -} - -// bridgeHasClaimData opens bridgeDBFilename directly, checks that all required tables -// exist, and returns true if any of claim/set_claim/unset_claim contain at least one row. -func bridgeHasClaimData(ctx context.Context, bridgeDBFilename string) (bool, error) { - bdb, err := db.NewSQLiteDB(bridgeDBFilename) - if err != nil { - return false, fmt.Errorf("bridgeHasClaimData: failed to open bridge DB: %w", err) - } - defer bdb.Close() - - conn, err := bdb.Conn(ctx) - if err != nil { - return false, fmt.Errorf("bridgeHasClaimData: failed to acquire connection: %w", err) - } - defer conn.Close() - - // Re-use the existing helper but against the main schema of the bridge DB. - present, err := checkBridgeTablesOnConn(ctx, conn) - if err != nil { - return false, err - } - if !present { - return false, nil - } - - if err := checkBridgeMigration(ctx, conn); err != nil { - return false, err - } - - var count int - err = conn.QueryRowContext(ctx, ` - SELECT COUNT(*) FROM ( - SELECT 1 FROM (SELECT 1 FROM claim LIMIT 1) - UNION ALL - SELECT 1 FROM (SELECT 1 FROM set_claim LIMIT 1) - UNION ALL - SELECT 1 FROM (SELECT 1 FROM unset_claim LIMIT 1) - )`).Scan(&count) - if err != nil { - return false, fmt.Errorf("bridgeHasClaimData: failed to count claim rows: %w", err) - } - return count > 0, nil -} - -// checkBridgeMigration returns an error if requiredBridgeMigration has not been applied -// to the bridge DB, which means its schema may be incomplete for a safe import. -func checkBridgeMigration(ctx context.Context, conn *sql.Conn) error { - var count int - err := conn.QueryRowContext(ctx, - `SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration). - Scan(&count) - if err != nil { - return fmt.Errorf("checkBridgeMigration: failed to query gorp_migrations: %w", err) - } - if count == 0 { - return fmt.Errorf("checkBridgeMigration: bridge DB has not applied required migration %q", requiredBridgeMigration) - } return nil } diff --git a/claimsync/storage/import_data_from_bridgesyncer_test.go b/claimsync/storage/import_data_from_bridgesyncer_test.go index f4b2aae43..e65f79849 100644 --- a/claimsync/storage/import_data_from_bridgesyncer_test.go +++ b/claimsync/storage/import_data_from_bridgesyncer_test.go @@ -131,140 +131,164 @@ func countRows(t *testing.T, claimPath, table string) int { // Tests // --------------------------------------------------------------------------------- -func TestImportDataFromBridgesyncer_BridgeDBNotExist(t *testing.T) { +func TestImportDataFromBridgesyncer_Success(t *testing.T) { dir := t.TempDir() - bridgePath := filepath.Join(dir, "bridge.db") // file does not exist + bridgePath := filepath.Join(dir, "bridge.db") claimPath := filepath.Join(dir, "claim.db") - migrated, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) - require.NoError(t, err) - require.False(t, migrated, "nothing to migrate when bridge DB does not exist") + bdb := newBridgeDB(t, bridgePath) + insertBridgeBlock(t, bdb, 10, common.HexToHash("0xaabb").Hex()) + insertBridgeClaim(t, bdb, 10, 0, big.NewInt(1).String()) + insertBridgeClaim(t, bdb, 10, 1, big.NewInt(2).String()) + insertBridgeSetClaim(t, bdb, 10, 2, big.NewInt(3).String()) + insertBridgeUnsetClaim(t, bdb, 10, 3, big.NewInt(4).String()) + bdb.Close() - // claim DB must not be created. - _, statErr := os.Stat(claimPath) - require.True(t, os.IsNotExist(statErr), "claim DB should not be created when bridge DB does not exist") + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + + require.Equal(t, 1, countRows(t, claimPath, "block")) + require.Equal(t, 2, countRows(t, claimPath, "claim")) + require.Equal(t, 1, countRows(t, claimPath, "set_claim")) + require.Equal(t, 1, countRows(t, claimPath, "unset_claim")) } -func TestImportDataFromBridgesyncer_ClaimDBAlreadyExists(t *testing.T) { +func TestImportDataFromBridgesyncer_Idempotent(t *testing.T) { dir := t.TempDir() bridgePath := filepath.Join(dir, "bridge.db") claimPath := filepath.Join(dir, "claim.db") - // Bridge DB with data that would normally be migrated. bdb := newBridgeDB(t, bridgePath) - insertBridgeBlock(t, bdb, 1, common.HexToHash("0x01").Hex()) - insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) + insertBridgeBlock(t, bdb, 5, common.HexToHash("0x1234").Hex()) + insertBridgeClaim(t, bdb, 5, 0, big.NewInt(99).String()) bdb.Close() - // Create claim DB beforehand (simulates a node restart after a previous migration). - // Use os.Create to guarantee the file exists on disk before calling the import. - f, err := os.Create(claimPath) - require.NoError(t, err) - require.NoError(t, f.Close()) + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) + // Second call must succeed and not duplicate rows. + require.NoError(t, ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath)) - migrated, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) - require.NoError(t, err) - require.False(t, migrated, "nothing to migrate when claim DB already exists") - - // Import must have been skipped: claim DB has no tables (migrations never ran). - cdb2, err := db.NewSQLiteDB(claimPath) - require.NoError(t, err) - defer cdb2.Close() - var tableCount int - require.NoError(t, cdb2.QueryRow(`SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name='claim'`).Scan(&tableCount)) - require.Equal(t, 0, tableCount, "claim table must not exist when import was skipped") + require.Equal(t, 1, countRows(t, claimPath, "block")) + require.Equal(t, 1, countRows(t, claimPath, "claim")) } -func TestImportDataFromBridgesyncer_NoDataToMigrate(t *testing.T) { - dir := t.TempDir() - bridgePath := filepath.Join(dir, "bridge.db") - claimPath := filepath.Join(dir, "claim.db") +// ── BridgeSyncerStatus.ShouldMigrate ───────────────────────────────────────── + +func TestBridgeSyncerStatus_ShouldMigrate(t *testing.T) { + tests := []struct { + name string + status BridgeSyncerStatus + want bool + }{ + { + name: "all conditions met", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: true, HasClaimData: true}, + want: true, + }, + { + name: "bridge DB missing", + status: BridgeSyncerStatus{BridgeDBExists: false, ClaimDBExists: false, MigrationOK: true, HasClaimData: true}, + want: false, + }, + { + name: "claim DB already exists", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: true, MigrationOK: true, HasClaimData: true}, + want: false, + }, + { + name: "migration not applied", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: false, HasClaimData: true}, + want: false, + }, + { + name: "no claim data", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: true, HasClaimData: false}, + want: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal(t, tc.want, tc.status.ShouldMigrate()) + }) + } +} - // Bridge DB has blocks but no claim/set_claim/unset_claim rows. - bdb := newBridgeDB(t, bridgePath) - insertBridgeBlock(t, bdb, 1, common.HexToHash("0xdeadbeef").Hex()) - bdb.Close() +// ── InspectBridgeSyncer ─────────────────────────────────────────────────────── - migrated, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) +func TestInspectBridgeSyncer_BridgeDBNotExist(t *testing.T) { + dir := t.TempDir() + status, err := InspectBridgeSyncer(context.Background(), + filepath.Join(dir, "bridge.db"), filepath.Join(dir, "claim.db")) require.NoError(t, err) - require.False(t, migrated, "nothing to migrate when bridge has no claim rows") - - // claim DB must not be created. - _, statErr := os.Stat(claimPath) - require.True(t, os.IsNotExist(statErr), "claim DB should not be created when bridge has no claim data") + require.False(t, status.BridgeDBExists) + require.False(t, status.ClaimDBExists) + require.False(t, status.MigrationOK) + require.False(t, status.HasClaimData) } -func TestImportDataFromBridgesyncer_NoTables(t *testing.T) { +func TestInspectBridgeSyncer_ClaimDBExists(t *testing.T) { dir := t.TempDir() bridgePath := filepath.Join(dir, "bridge.db") claimPath := filepath.Join(dir, "claim.db") - // Bridge DB exists but has NO required tables - emptyDB, err := db.NewSQLiteDB(bridgePath) + bdb := newBridgeDB(t, bridgePath) + bdb.Close() + f, err := os.Create(claimPath) require.NoError(t, err) - emptyDB.Close() + require.NoError(t, f.Close()) - _, err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + status, err := InspectBridgeSyncer(context.Background(), bridgePath, claimPath) require.NoError(t, err) + require.True(t, status.BridgeDBExists) + require.True(t, status.ClaimDBExists) } -func TestImportDataFromBridgesyncer_EmptyTables(t *testing.T) { +func TestInspectBridgeSyncer_MigrationMissing(t *testing.T) { dir := t.TempDir() bridgePath := filepath.Join(dir, "bridge.db") - claimPath := filepath.Join(dir, "claim.db") - // Bridge DB has all required tables but no rows – claimDB must not be created. bdb := newBridgeDB(t, bridgePath) + _, err := bdb.Exec(`DELETE FROM gorp_migrations WHERE id = ?`, requiredBridgeMigration) + require.NoError(t, err) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0x01").Hex()) + insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) bdb.Close() - _, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + status, err := InspectBridgeSyncer(context.Background(), bridgePath, filepath.Join(dir, "claim.db")) require.NoError(t, err) - - // The claim DB must not have been created. - _, statErr := os.Stat(claimPath) - require.True(t, os.IsNotExist(statErr), "claim DB should not be created when bridge has no claim data") + require.True(t, status.BridgeDBExists) + require.False(t, status.MigrationOK) + require.False(t, status.HasClaimData) } -func TestImportDataFromBridgesyncer_Success(t *testing.T) { +func TestInspectBridgeSyncer_NoClaimData(t *testing.T) { dir := t.TempDir() bridgePath := filepath.Join(dir, "bridge.db") - claimPath := filepath.Join(dir, "claim.db") bdb := newBridgeDB(t, bridgePath) - insertBridgeBlock(t, bdb, 10, common.HexToHash("0xaabb").Hex()) - insertBridgeClaim(t, bdb, 10, 0, big.NewInt(1).String()) - insertBridgeClaim(t, bdb, 10, 1, big.NewInt(2).String()) - insertBridgeSetClaim(t, bdb, 10, 2, big.NewInt(3).String()) - insertBridgeUnsetClaim(t, bdb, 10, 3, big.NewInt(4).String()) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0xdeadbeef").Hex()) bdb.Close() - _, err := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + status, err := InspectBridgeSyncer(context.Background(), bridgePath, filepath.Join(dir, "claim.db")) require.NoError(t, err) - - require.Equal(t, 1, countRows(t, claimPath, "block")) - require.Equal(t, 2, countRows(t, claimPath, "claim")) - require.Equal(t, 1, countRows(t, claimPath, "set_claim")) - require.Equal(t, 1, countRows(t, claimPath, "unset_claim")) + require.True(t, status.BridgeDBExists) + require.True(t, status.MigrationOK) + require.False(t, status.HasClaimData) } -func TestImportDataFromBridgesyncer_Idempotent(t *testing.T) { +func TestInspectBridgeSyncer_WithClaimData(t *testing.T) { dir := t.TempDir() bridgePath := filepath.Join(dir, "bridge.db") - claimPath := filepath.Join(dir, "claim.db") bdb := newBridgeDB(t, bridgePath) - insertBridgeBlock(t, bdb, 5, common.HexToHash("0x1234").Hex()) - insertBridgeClaim(t, bdb, 5, 0, big.NewInt(99).String()) + insertBridgeBlock(t, bdb, 1, common.HexToHash("0xaabb").Hex()) + insertBridgeClaim(t, bdb, 1, 0, big.NewInt(42).String()) bdb.Close() - _, errImport := ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) - require.NoError(t, errImport) - // Second call must succeed and not duplicate rows - _, errImport = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) - require.NoError(t, errImport) - - require.Equal(t, 1, countRows(t, claimPath, "block")) - require.Equal(t, 1, countRows(t, claimPath, "claim")) + status, err := InspectBridgeSyncer(context.Background(), bridgePath, filepath.Join(dir, "claim.db")) + require.NoError(t, err) + require.True(t, status.BridgeDBExists) + require.False(t, status.ClaimDBExists) + require.True(t, status.MigrationOK) + require.True(t, status.HasClaimData) } // ── ImportKeyValueFromBridgesyncer ──────────────────────────────────────────── @@ -363,23 +387,6 @@ func TestImportKeyValueFromBridgesyncer_Idempotent(t *testing.T) { require.Equal(t, 1, countRows(t, claimPath, "key_value")) } -func TestImportDataFromBridgesyncer_MissingRequiredMigration(t *testing.T) { - dir := t.TempDir() - bridgePath := filepath.Join(dir, "bridge.db") - claimPath := filepath.Join(dir, "claim.db") - - // Bridge DB has all tables and data but the required migration is absent. - bdb := newBridgeDB(t, bridgePath) - _, err := bdb.Exec(`DELETE FROM gorp_migrations WHERE id = ?`, requiredBridgeMigration) - require.NoError(t, err) - insertBridgeBlock(t, bdb, 1, common.HexToHash("0x01").Hex()) - insertBridgeClaim(t, bdb, 1, 0, big.NewInt(1).String()) - bdb.Close() - - _, err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) - require.ErrorContains(t, err, requiredBridgeMigration) -} - // ── OldSchemaNoHash ─────────────────────────────────────────────────────────── func TestImportDataFromBridgesyncer_OldSchemaNoHash(t *testing.T) { @@ -445,7 +452,7 @@ func TestImportDataFromBridgesyncer_OldSchemaNoHash(t *testing.T) { require.NoError(t, err) bdb.Close() - _, err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) + err = ImportDataFromBridgesyncer(context.Background(), nil, bridgePath, claimPath) require.NoError(t, err) // block.hash should default to '' diff --git a/cmd/run.go b/cmd/run.go index 8eeecbacb..82bcd338d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -995,16 +995,22 @@ func runImportFromBridgeSyncerIfNeeded( return } logger := log.WithFields("module", "ImportFromBridgeSyncer", "syncerID", syncerID.String()) - migrated, err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath) + status, err := claimsyncstorage.InspectBridgeSyncer(ctx, bridgeDBPath, claimDBPath) if err != nil { - log.Fatalf("failed to import claim data from bridge DB: %v", err) + logger.Fatalf("failed to inspect bridge DB: %v", err) } - if !migrated { + if !status.ShouldMigrate() { + logger.Infof("no migration needed") return } + logger.Infof("migration from bridgesyncer to claimsyncer needed, starting migration process") + if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { + logger.Fatalf("failed to import claim data from bridge DB: %v", err) + } if err := claimsyncstorage.ImportKeyValueFromBridgesyncer(bridgeDBPath, claimDBPath, syncerID.String()); err != nil { - log.Fatalf("failed to import key_value from bridge DB: %v", err) + logger.Fatalf("failed to import key_value from bridge DB: %v", err) } + logger.Infof("migration from bridgesyncer to claimsyncer completed successfully") } func runAggsenderMultisigCommitteeIfNeeded( From a0f601c4076f72118826f1643d23614c2067d7a2 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 24 Mar 2026 11:56:47 +0100 Subject: [PATCH 05/12] feat: add more logs --- claimsync/storage/import_data_from_bridgesyncer.go | 14 +++++++++++--- cmd/run.go | 5 ++--- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index b35e6e665..c78cce7aa 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -39,6 +39,14 @@ type BridgeSyncerStatus struct { HasClaimData bool } +// String returns a human-readable summary of the status. +func (s BridgeSyncerStatus) String() string { + return fmt.Sprintf( + "BridgeDBExists=%t ClaimDBExists=%t MigrationOK=%t HasClaimData=%t ShouldMigrate=%t", + s.BridgeDBExists, s.ClaimDBExists, s.MigrationOK, s.HasClaimData, s.ShouldMigrate(), + ) +} + // ShouldMigrate reports whether a data migration from the bridgesync DB into the // claimsync DB should be performed. It returns true only when all of the following // conditions hold: @@ -128,10 +136,10 @@ func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename // function. No precondition checks are performed here. // // Column-level differences between bridge schema versions are handled automatically: -// - block.hash - present since bridgesync migration 0003; defaults to ”. -// - claim.tx_hash - present since bridgesync migration 0002; defaults to ”. +// - block.hash - present since bridgesync migration 0003; defaults to ". +// - claim.tx_hash - present since bridgesync migration 0002; defaults to ". // - claim.block_timestamp - present since bridgesync migration 0002; defaults to 0. -// - claim.type - present since bridgesync migration 0012; defaults to ”. +// - claim.type - present since bridgesync migration 0012; defaults to ". func ImportDataFromBridgesyncer(ctx context.Context, logger aggkitcommon.Logger, bridgeDBFilename string, diff --git a/cmd/run.go b/cmd/run.go index 82bcd338d..30fbc3e8d 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -91,7 +91,6 @@ func start(cliCtx *cli.Context) error { prometheus.Init() } log.Debugf("Components to run: %v", components) - l1Client := runL1ClientIfNeeded(cliCtx.Context, cfg.L1NetworkConfig.RPC) l2Client := runL2ClientIfNeeded(cliCtx.Context, components, cfg.Common.L2RPC) reorgDetectorL1, errChanL1 := runReorgDetectorL1IfNeeded(cliCtx.Context, components, l1Client, &cfg.ReorgDetectorL1) @@ -1000,10 +999,10 @@ func runImportFromBridgeSyncerIfNeeded( logger.Fatalf("failed to inspect bridge DB: %v", err) } if !status.ShouldMigrate() { - logger.Infof("no migration needed") + logger.Infof("no migration needed. %s", status.String()) return } - logger.Infof("migration from bridgesyncer to claimsyncer needed, starting migration process") + logger.Infof("migration from bridgesyncer to claimsyncer needed, starting migration process. %s", status.String()) if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { logger.Fatalf("failed to import claim data from bridge DB: %v", err) } From 36c472588f97501dd7f6debc9d849a49010a1ea0 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:43:31 +0100 Subject: [PATCH 06/12] feat: case that data is too old to be migrated, we abort and suggest update aggkit --- .../storage/import_data_from_bridgesyncer.go | 24 +++++++-- .../import_data_from_bridgesyncer_test.go | 51 ++++++++++++++++++- cmd/run.go | 3 ++ 3 files changed, 72 insertions(+), 6 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index c78cce7aa..3f8ef9ea7 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -35,7 +35,8 @@ type BridgeSyncerStatus struct { // bridge DB. Only meaningful when BridgeDBExists is true. MigrationOK bool // HasClaimData reports whether any of the claim, set_claim or unset_claim tables - // contain at least one row. Only meaningful when MigrationOK is true. + // contain at least one row. Only meaningful when BridgeDBExists is true and the + // required tables are present. HasClaimData bool } @@ -47,6 +48,21 @@ func (s BridgeSyncerStatus) String() string { ) } +// Validate returns an error when the status indicates a blocking condition that +// requires user intervention. Specifically, it errors when the bridge DB contains +// claim data but the required bridge migration has not been applied, meaning the +// node must first be upgraded to an intermediate version that runs that migration. +func (s BridgeSyncerStatus) Validate() error { + if s.BridgeDBExists && !s.ClaimDBExists && s.HasClaimData && !s.MigrationOK { + return fmt.Errorf( + "bridge DB contains claim data but required migration %q has not been applied; "+ + "upgrade to the intermediate version first", + requiredBridgeMigration, + ) + } + return nil +} + // ShouldMigrate reports whether a data migration from the bridgesync DB into the // claimsync DB should be performed. It returns true only when all of the following // conditions hold: @@ -105,10 +121,8 @@ func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename status.MigrationOK = migCount > 0 } - if !status.MigrationOK { - return status, nil - } - + // Always check for claim data, even when the migration is missing, so that + // Validate() can distinguish a harmless empty-DB case from a blocking one. // Check whether any claim-related tables contain data. var rowCount int err = conn.QueryRowContext(ctx, ` diff --git a/claimsync/storage/import_data_from_bridgesyncer_test.go b/claimsync/storage/import_data_from_bridgesyncer_test.go index e65f79849..fc58200ab 100644 --- a/claimsync/storage/import_data_from_bridgesyncer_test.go +++ b/claimsync/storage/import_data_from_bridgesyncer_test.go @@ -211,6 +211,52 @@ func TestBridgeSyncerStatus_ShouldMigrate(t *testing.T) { } } +// ── BridgeSyncerStatus.Validate ─────────────────────────────────────────────── + +func TestBridgeSyncerStatus_Validate(t *testing.T) { + tests := []struct { + name string + status BridgeSyncerStatus + wantErr bool + }{ + { + name: "blocking: bridge has data but migration missing", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: false, HasClaimData: true}, + wantErr: true, + }, + { + name: "ok: bridge has data and migration applied", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: true, HasClaimData: true}, + wantErr: false, + }, + { + name: "ok: bridge DB missing", + status: BridgeSyncerStatus{BridgeDBExists: false, ClaimDBExists: false, MigrationOK: false, HasClaimData: false}, + wantErr: false, + }, + { + name: "ok: claim DB already exists (no migration needed)", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: true, MigrationOK: false, HasClaimData: true}, + wantErr: false, + }, + { + name: "ok: migration missing but no claim data", + status: BridgeSyncerStatus{BridgeDBExists: true, ClaimDBExists: false, MigrationOK: false, HasClaimData: false}, + wantErr: false, + }, + } + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + err := tc.status.Validate() + if tc.wantErr { + require.ErrorContains(t, err, requiredBridgeMigration) + } else { + require.NoError(t, err) + } + }) + } +} + // ── InspectBridgeSyncer ─────────────────────────────────────────────────────── func TestInspectBridgeSyncer_BridgeDBNotExist(t *testing.T) { @@ -256,7 +302,10 @@ func TestInspectBridgeSyncer_MigrationMissing(t *testing.T) { require.NoError(t, err) require.True(t, status.BridgeDBExists) require.False(t, status.MigrationOK) - require.False(t, status.HasClaimData) + // HasClaimData is populated even when the migration is missing so that + // Validate() can distinguish the blocking case from a harmless empty DB. + require.True(t, status.HasClaimData) + require.ErrorContains(t, status.Validate(), requiredBridgeMigration) } func TestInspectBridgeSyncer_NoClaimData(t *testing.T) { diff --git a/cmd/run.go b/cmd/run.go index 30fbc3e8d..8aafc0318 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -998,6 +998,9 @@ func runImportFromBridgeSyncerIfNeeded( if err != nil { logger.Fatalf("failed to inspect bridge DB: %v", err) } + if err := status.Validate(); err != nil { + logger.Fatalf("bridge DB migration blocked: %v", err) + } if !status.ShouldMigrate() { logger.Infof("no migration needed. %s", status.String()) return From 1692f743f63bf41288cede2d3b4e5fd61085e80c Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Tue, 24 Mar 2026 16:57:48 +0100 Subject: [PATCH 07/12] feat: remove unused tables from bridgesync --- bridgesync/migrations/bridgesync0015.sql | 50 ++++++++++++++ bridgesync/migrations/migrations_test.go | 87 +++++++++--------------- 2 files changed, 84 insertions(+), 53 deletions(-) create mode 100644 bridgesync/migrations/bridgesync0015.sql diff --git a/bridgesync/migrations/bridgesync0015.sql b/bridgesync/migrations/bridgesync0015.sql new file mode 100644 index 000000000..d045266a7 --- /dev/null +++ b/bridgesync/migrations/bridgesync0015.sql @@ -0,0 +1,50 @@ +-- +migrate Down +CREATE TABLE IF NOT EXISTS claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + global_index TEXT NOT NULL, + origin_network INTEGER NOT NULL, + origin_address VARCHAR NOT NULL, + destination_address VARCHAR NOT NULL, + amount TEXT NOT NULL, + proof_local_exit_root VARCHAR, + proof_rollup_exit_root VARCHAR, + mainnet_exit_root VARCHAR, + rollup_exit_root VARCHAR, + global_exit_root VARCHAR, + destination_network INTEGER NOT NULL, + metadata BLOB, + is_message BOOLEAN, + tx_hash VARCHAR, + block_timestamp INTEGER, + type TEXT NOT NULL DEFAULT '', + PRIMARY KEY (block_num, block_pos) +); + +CREATE INDEX IF NOT EXISTS idx_claim_block_num_block_pos_desc ON claim (block_num DESC, block_pos DESC); +CREATE INDEX IF NOT EXISTS idx_claim_block_num_block_pos_asc ON claim (block_num ASC, block_pos ASC); +CREATE INDEX IF NOT EXISTS idx_claim_type_block ON claim (type, block_num); + +CREATE TABLE IF NOT EXISTS unset_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + unset_global_index_hash_chain VARCHAR NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) +); + +CREATE TABLE IF NOT EXISTS set_claim ( + block_num INTEGER NOT NULL REFERENCES block(num) ON DELETE CASCADE, + block_pos INTEGER NOT NULL, + tx_hash VARCHAR NOT NULL, + global_index TEXT NOT NULL, + created_at INTEGER NOT NULL DEFAULT (strftime('%s', 'now')), + PRIMARY KEY (block_num, block_pos) +); + +-- +migrate Up +DROP TABLE IF EXISTS set_claim; +DROP TABLE IF EXISTS unset_claim; +DROP TABLE IF EXISTS claim; diff --git a/bridgesync/migrations/migrations_test.go b/bridgesync/migrations/migrations_test.go index e48c56b48..5b7756cda 100644 --- a/bridgesync/migrations/migrations_test.go +++ b/bridgesync/migrations/migrations_test.go @@ -49,24 +49,6 @@ func TestMigration0001(t *testing.T) { metadata, deposit_count ) VALUES (1, 0, 0, 0, '0x0000', 0, '0x0000', 0, NULL, 0); - - INSERT INTO claim ( - block_num, - block_pos, - global_index, - origin_network, - origin_address, - destination_address, - amount, - proof_local_exit_root, - proof_rollup_exit_root, - mainnet_exit_root, - rollup_exit_root, - global_exit_root, - destination_network, - metadata, - is_message - ) VALUES (1, 0, 0, 0, '0x0000', '0x0000', 0, '0x000,0x000', '0x000,0x000', '0x000', '0x000', '0x0', 0, NULL, FALSE); `) require.NoError(t, err) err = tx.Commit() @@ -118,20 +100,6 @@ func TestMigration0002(t *testing.T) { from_address ) VALUES (1, 0, 0, 0, '0x3', 0, '0x0000', 0, NULL, 0, 1739270804, '0xabcd', '0x123'); - INSERT INTO claim ( - block_num, - block_pos, - global_index, - origin_network, - origin_address, - destination_address, - amount, - destination_network, - metadata, - is_message, - block_timestamp, - tx_hash - ) VALUES (1, 0, 0, 0, '0x3', '0x0000', 0, 0, NULL, FALSE, 1739270804, '0xabcd'); `) require.NoError(t, err) err = tx.Commit() @@ -185,27 +153,6 @@ func TestMigration0002(t *testing.T) { require.NoError(t, err) require.NotNil(t, bridge) require.Equal(t, uint64(1739270804), bridge.BlockTimestamp) - - var claim struct { - BlockNum uint64 `meddler:"block_num"` - BlockPos uint64 `meddler:"block_pos"` - GlobalIndex *big.Int `meddler:"global_index,bigint"` - OriginNetwork uint32 `meddler:"origin_network"` - OriginAddress string `meddler:"origin_address"` - DestinationAddress string `meddler:"destination_address"` - Amount *big.Int `meddler:"amount,bigint"` - DestinationNetwork uint32 `meddler:"destination_network"` - Metadata []byte `meddler:"metadata"` - IsMessage bool `meddler:"is_message"` - BlockTimestamp uint64 `meddler:"block_timestamp"` - TxHash string `meddler:"tx_hash"` - } - - err = meddler.QueryRow(db, &claim, - `SELECT * FROM claim`) - require.NoError(t, err) - require.NotNil(t, claim) - require.Equal(t, uint64(1739270804), claim.BlockTimestamp) } func TestMigrations0003(t *testing.T) { @@ -763,6 +710,40 @@ func TestMigration0013(t *testing.T) { err = tx.Commit() require.NoError(t, err) } +func TestMigration0015(t *testing.T) { + dbPath := path.Join(t.TempDir(), "bridgesyncTest0015.sqlite") + + database, err := db.NewSQLiteDB(dbPath) + require.NoError(t, err) + defer database.Close() + + // Run migrations up to 0014 — claim, set_claim and unset_claim still exist. + err = db.RunMigrationsDBExtended(log.GetDefaultLogger(), + database, GetUpTo("bridgesync0014"), nil, migrate.Up, db.NoLimitMigrations) + require.NoError(t, err) + + tableExists := func(name string) bool { + var count int + err := database.QueryRow( + `SELECT COUNT(*) FROM sqlite_master WHERE type='table' AND name=?`, name).Scan(&count) + require.NoError(t, err) + return count > 0 + } + + require.True(t, tableExists("claim"), "claim table should exist before migration 0015") + require.True(t, tableExists("set_claim"), "set_claim table should exist before migration 0015") + require.True(t, tableExists("unset_claim"), "unset_claim table should exist before migration 0015") + + // Apply migration 0015. + err = db.RunMigrationsDBExtended(log.GetDefaultLogger(), + database, GetUpTo("bridgesync0015"), nil, migrate.Up, db.NoLimitMigrations) + require.NoError(t, err) + + require.False(t, tableExists("claim"), "claim table should be dropped by migration 0015") + require.False(t, tableExists("set_claim"), "set_claim table should be dropped by migration 0015") + require.False(t, tableExists("unset_claim"), "unset_claim table should be dropped by migration 0015") +} + func TestMigrationsDown(t *testing.T) { dbPath := path.Join(t.TempDir(), "bridgesyncTestDown.sqlite") err := RunMigrations(dbPath) From ff70c1e7f90d82b16de1770624f7105dd7299242 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 25 Mar 2026 11:02:22 +0100 Subject: [PATCH 08/12] fix: unittets removing dead code --- bridgesync/backfill_tx_sender_test.go | 17 ----------------- 1 file changed, 17 deletions(-) diff --git a/bridgesync/backfill_tx_sender_test.go b/bridgesync/backfill_tx_sender_test.go index e86219eb1..4c55a80ad 100644 --- a/bridgesync/backfill_tx_sender_test.go +++ b/bridgesync/backfill_tx_sender_test.go @@ -79,23 +79,6 @@ func TestBackfillTxnSender(t *testing.T) { require.NoError(t, meddler.Insert(tx, bridgeTableName, newTestBridge(1, 0, "0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890"))) - // Insert test claim record - _, err = tx.Exec(` - INSERT INTO claim ( - block_num, block_pos, global_index, origin_network, origin_address, - destination_address, amount, proof_local_exit_root, proof_rollup_exit_root, - mainnet_exit_root, rollup_exit_root, global_exit_root, destination_network, - metadata, is_message, block_timestamp, tx_hash - ) VALUES ( - 1, 1, '1', 1, '0x1234567890123456789012345678901234567890', - '0x0987654321098765432109876543210987654321', '1000000000000000000', - '', '', '', '', '', 2, - '', false, 1234567890, - '0xabcdef1234567890abcdef1234567890abcdef1234567890abcdef1234567890' - ) - `) - require.NoError(t, err) - err = tx.Commit() require.NoError(t, err) From 56fdcdf74f9508c712943972518a015b93f05832 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 25 Mar 2026 11:39:10 +0100 Subject: [PATCH 09/12] fix: unittets removing dead code --- bridgesync/processor_test.go | 55 ------------------------------------ 1 file changed, 55 deletions(-) diff --git a/bridgesync/processor_test.go b/bridgesync/processor_test.go index db69964d7..042d15496 100644 --- a/bridgesync/processor_test.go +++ b/bridgesync/processor_test.go @@ -45,61 +45,6 @@ func newTestProcessor(dbPath string, syncerID string, logger *log.Logger, dbQuer return newProcessor(database, syncerID, logger, dbQueryTimeout) } -func TestBigIntString(t *testing.T) { - globalIndex := GenerateGlobalIndex(true, 0, 1093) - fmt.Println(globalIndex.String()) - - _, ok := new(big.Int).SetString(globalIndex.String(), 10) - require.True(t, ok) - - dbPath := filepath.Join(t.TempDir(), "bridgesyncTestBigIntString.sqlite") - - err := migrations.RunMigrations(dbPath) - require.NoError(t, err) - db, err := db.NewSQLiteDB(dbPath) - require.NoError(t, err) - - ctx := context.Background() - tx, err := db.BeginTx(ctx, nil) - require.NoError(t, err) - - claim := &claimsynctypes.Claim{ - BlockNum: 1, - BlockPos: 0, - GlobalIndex: GenerateGlobalIndex(true, 0, 1093), - OriginNetwork: 11, - Amount: big.NewInt(11), - OriginAddress: common.HexToAddress("0x11"), - DestinationAddress: common.HexToAddress("0x11"), - ProofLocalExitRoot: types.Proof{}, - ProofRollupExitRoot: types.Proof{}, - MainnetExitRoot: common.Hash{}, - RollupExitRoot: common.Hash{}, - GlobalExitRoot: common.Hash{}, - DestinationNetwork: 12, - Type: claimsynctypes.ClaimEvent, - } - - _, err = tx.Exec(`INSERT INTO block (num) VALUES ($1)`, claim.BlockNum) - require.NoError(t, err) - require.NoError(t, meddler.Insert(tx, "claim", claim)) - - require.NoError(t, tx.Commit()) - - tx, err = db.BeginTx(ctx, nil) - require.NoError(t, err) - - rows, err := tx.Query(` - SELECT * FROM claim - WHERE block_num >= $1 AND block_num <= $2; - `, claim.BlockNum, claim.BlockNum) - require.NoError(t, err) - - claimsFromDB := []*claimsynctypes.Claim{} - require.NoError(t, meddler.ScanAll(rows, &claimsFromDB)) - require.Len(t, claimsFromDB, 1) - require.Equal(t, claim, claimsFromDB[0]) -} func TestProcessor(t *testing.T) { path := path.Join(t.TempDir(), "bridgeSyncerProcessor.db") From 5dd8a2502f422f8c35000045afb9f43e3e7a4c94 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 25 Mar 2026 11:51:42 +0100 Subject: [PATCH 10/12] feat: the import data uses a temporary database file to maintain the atomicity --- .../storage/import_data_from_bridgesyncer.go | 77 +++++++++++++------ 1 file changed, 55 insertions(+), 22 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index 3f8ef9ea7..a9577d4fc 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -149,6 +149,10 @@ func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename // InspectBridgeSyncer and BridgeSyncerStatus.ShouldMigrate) before calling this // function. No precondition checks are performed here. // +// The import is atomic: data is written to a temporary file first and only renamed +// to claimDBFilename on success, so a crash mid-import leaves claimDBFilename absent +// and the migration will be retried on the next startup. +// // Column-level differences between bridge schema versions are handled automatically: // - block.hash - present since bridgesync migration 0003; defaults to ". // - claim.tx_hash - present since bridgesync migration 0002; defaults to ". @@ -162,78 +166,107 @@ func ImportDataFromBridgesyncer(ctx context.Context, logger = log.WithFields("module", "ImportDataFromBridgesyncer") } - claimDB, err := db.NewSQLiteDB(claimDBFilename) + tmpFilename := claimDBFilename + ".import.tmp" + // Remove any leftover tmp file from a previous failed attempt. + os.Remove(tmpFilename) //nolint:errcheck + + // All DB work happens on tmpFilename. The defers inside importDataToTmpFile + // guarantee the DB/connection/transaction are fully closed before we return, + // so the subsequent Rename is safe even on platforms that lock open files. + blocksImported, claimsImported, setClaimsImported, unsetClaimsImported, err := + importDataToTmpFile(ctx, logger, bridgeDBFilename, tmpFilename) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) + os.Remove(tmpFilename) //nolint:errcheck + return err + } + + if err := os.Rename(tmpFilename, claimDBFilename); err != nil { + os.Remove(tmpFilename) //nolint:errcheck + return fmt.Errorf("ImportDataFromBridgesyncer: failed to promote tmp DB: %w", err) + } + + logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d", + blocksImported, claimsImported, setClaimsImported, unsetClaimsImported) + return nil +} + +// importDataToTmpFile performs the actual copy from bridgeDBFilename into destFilename. +// It is a pure helper for ImportDataFromBridgesyncer: all deferred closes run when this +// function returns, ensuring the file is fully closed before the caller renames it. +func importDataToTmpFile(ctx context.Context, + logger aggkitcommon.Logger, + bridgeDBFilename string, + destFilename string) (blocksImported, claimsImported, setClaimsImported, unsetClaimsImported int64, err error) { + claimDB, err := db.NewSQLiteDB(destFilename) + if err != nil { + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to open claim DB: %w", err) } defer claimDB.Close() if err := migrations.RunMigrations(logger, claimDB); err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to run claim DB migrations: %w", err) } // Use a single connection so that ATTACH and the subsequent transaction share the // same SQLite connection (ATTACH is per-connection in SQLite). conn, err := claimDB.Conn(ctx) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to acquire DB connection: %w", err) } defer conn.Close() // ATTACH the bridge DB so we can SELECT from it in the same query. attachSQL := fmt.Sprintf(`ATTACH DATABASE 'file:%s' AS bridge`, bridgeDBFilename) if _, err := conn.ExecContext(ctx, attachSQL); err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) } defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck hasBlockHash, err := bridgeColumnExists(ctx, conn, "block", "hash") if err != nil { - return err + return 0, 0, 0, 0, err } hasClaimTxHash, err := bridgeColumnExists(ctx, conn, "claim", "tx_hash") if err != nil { - return err + return 0, 0, 0, 0, err } hasClaimBlockTimestamp, err := bridgeColumnExists(ctx, conn, "claim", "block_timestamp") if err != nil { - return err + return 0, 0, 0, 0, err } hasClaimType, err := bridgeColumnExists(ctx, conn, "claim", "type") if err != nil { - return err + return 0, 0, 0, 0, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to begin transaction: %w", err) } defer tx.Rollback() //nolint:errcheck - blocksImported, err := importBlocks(tx, hasBlockHash) + blocksImported, err = importBlocks(tx, hasBlockHash) if err != nil { - return err + return 0, 0, 0, 0, err } - claimsImported, err := importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType) + claimsImported, err = importClaims(tx, hasClaimTxHash, hasClaimBlockTimestamp, hasClaimType) if err != nil { - return err + return 0, 0, 0, 0, err } - unsetClaimsImported, err := importUnsetClaims(tx) + unsetClaimsImported, err = importUnsetClaims(tx) if err != nil { - return err + return 0, 0, 0, 0, err } - setClaimsImported, err := importSetClaims(tx) + setClaimsImported, err = importSetClaims(tx) if err != nil { - return err + return 0, 0, 0, 0, err } if err := tx.Commit(); err != nil { - return fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) + return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to commit transaction: %w", err) } - logger.Infof("import from bridgesyncer complete: blocks=%d claims=%d set_claims=%d unset_claims=%d", - blocksImported, claimsImported, setClaimsImported, unsetClaimsImported) - return nil + return blocksImported, claimsImported, setClaimsImported, unsetClaimsImported, nil } // checkBridgeTablesOnConn returns true only when all requiredBridgeTables exist in the From f3238e27d3b09ccb06adb99c69cccb20ce5b7b51 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 25 Mar 2026 12:15:17 +0100 Subject: [PATCH 11/12] fix: PR comments --- .../storage/import_data_from_bridgesyncer.go | 35 +++++++++++++++---- .../import_data_from_bridgesyncer_test.go | 19 +++++----- cmd/run.go | 2 +- 3 files changed, 38 insertions(+), 18 deletions(-) diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index a9577d4fc..38de010b4 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -114,10 +114,16 @@ func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename // Check whether the required migration has been applied. // A missing gorp_migrations table is treated as MigrationOK = false, not an error. + // Any other failure (corruption, permissions, …) is surfaced so it is not silently masked. var migCount int err = conn.QueryRowContext(ctx, `SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration).Scan(&migCount) - if err == nil { + if err != nil { + errMsg := strings.ToLower(err.Error()) + if !(strings.Contains(errMsg, "no such table") && strings.Contains(errMsg, "gorp_migrations")) { + return status, fmt.Errorf("InspectBridgeSyncer: failed to query gorp_migrations: %w", err) + } + } else { status.MigrationOK = migCount > 0 } @@ -216,8 +222,13 @@ func importDataToTmpFile(ctx context.Context, defer conn.Close() // ATTACH the bridge DB so we can SELECT from it in the same query. - attachSQL := fmt.Sprintf(`ATTACH DATABASE 'file:%s' AS bridge`, bridgeDBFilename) - if _, err := conn.ExecContext(ctx, attachSQL); err != nil { + // The three characters that act as URI delimiters inside a SQLite file URI path + // ('%', '?', '#') are percent-encoded so they cannot be mistaken for query + // separators or fragment identifiers. The full URI is then passed as a bound + // parameter (not interpolated into SQL) to eliminate any injection risk. + escapedPath := strings.NewReplacer("%", "%25", "?", "%3F", "#", "%23").Replace(bridgeDBFilename) + attachURI := "file:" + escapedPath + "?mode=ro" + if _, err := conn.ExecContext(ctx, `ATTACH DATABASE ? AS bridge`, attachURI); err != nil { return 0, 0, 0, 0, fmt.Errorf("ImportDataFromBridgesyncer: failed to attach bridge DB: %w", err) } defer conn.ExecContext(ctx, `DETACH DATABASE bridge`) //nolint:errcheck @@ -399,9 +410,8 @@ func importSetClaims(tx *sql.Tx) (int64, error) { // contains no rows. In that case the claimDB is not created at all. // The import is idempotent: an existing row with the same (owner, key) is silently skipped // (INSERT OR IGNORE). -func ImportKeyValueFromBridgesyncer(bridgeDBFilename string, claimDBFilename string, owner string) error { +func ImportKeyValueFromBridgesyncer(ctx context.Context, bridgeDBFilename string, claimDBFilename string, owner string) error { logger := log.WithFields("module", "ImportKeyValueFromBridgesyncer") - ctx := context.Background() // Phase 1 - read the single key_value row from the bridge DB without touching the claim DB. row, err := readBridgeKeyValueRow(ctx, bridgeDBFilename) @@ -464,8 +474,21 @@ func readBridgeKeyValueRow(ctx context.Context, bridgeDBFilename string) (*keyVa return nil, nil } + const compatibilityKey = "compatibility_content" + + var count int + err = bdb.QueryRowContext(ctx, + `SELECT COUNT(*) FROM key_value WHERE key = $1`, compatibilityKey).Scan(&count) + if err != nil { + return nil, fmt.Errorf("readBridgeKeyValueRow: failed to count key_value rows: %w", err) + } + if count != 1 { + return nil, fmt.Errorf("readBridgeKeyValueRow: expected exactly 1 row with key=%q, got %d", compatibilityKey, count) + } + row := &keyValueRow{} - err = bdb.QueryRowContext(ctx, `SELECT key, value, updated_at FROM key_value LIMIT 1`). + err = bdb.QueryRowContext(ctx, + `SELECT key, value, updated_at FROM key_value WHERE key = $1`, compatibilityKey). Scan(&row.key, &row.value, &row.updatedAt) if err != nil { if errors.Is(err, sql.ErrNoRows) { diff --git a/claimsync/storage/import_data_from_bridgesyncer_test.go b/claimsync/storage/import_data_from_bridgesyncer_test.go index fc58200ab..dafddc4e1 100644 --- a/claimsync/storage/import_data_from_bridgesyncer_test.go +++ b/claimsync/storage/import_data_from_bridgesyncer_test.go @@ -353,7 +353,7 @@ func TestImportKeyValueFromBridgesyncer_NoTable(t *testing.T) { require.NoError(t, bdb.Ping()) bdb.Close() - require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "my-owner")) + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "my-owner")) // Claim DB must not have been created. _, statErr := os.Stat(claimPath) @@ -374,10 +374,7 @@ func TestImportKeyValueFromBridgesyncer_EmptyTable(t *testing.T) { require.NoError(t, err) bdb.Close() - require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "my-owner")) - - _, statErr := os.Stat(claimPath) - require.True(t, os.IsNotExist(statErr)) + require.ErrorContains(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "my-owner"), `expected exactly 1 row with key="compatibility_content"`) } func TestImportKeyValueFromBridgesyncer_Success(t *testing.T) { @@ -392,11 +389,11 @@ func TestImportKeyValueFromBridgesyncer_Success(t *testing.T) { value VARCHAR, updated_at INTEGER NOT NULL, PRIMARY KEY (key, owner))`) require.NoError(t, err) - _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compat', 'data', 1000)`) + _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compatibility_content', 'data', 1000)`) require.NoError(t, err) bdb.Close() - require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "new-owner")) + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "new-owner")) cdb, err := db.NewSQLiteDB(claimPath) require.NoError(t, err) @@ -408,7 +405,7 @@ func TestImportKeyValueFromBridgesyncer_Success(t *testing.T) { Scan(&owner, &key, &value, &updatedAt) require.NoError(t, err) require.Equal(t, "new-owner", owner) - require.Equal(t, "compat", key) + require.Equal(t, "compatibility_content", key) require.Equal(t, "data", value) require.Equal(t, int64(1000), updatedAt) } @@ -425,13 +422,13 @@ func TestImportKeyValueFromBridgesyncer_Idempotent(t *testing.T) { value VARCHAR, updated_at INTEGER NOT NULL, PRIMARY KEY (key, owner))`) require.NoError(t, err) - _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compat', 'data', 1000)`) + _, err = bdb.Exec(`INSERT INTO key_value (owner, key, value, updated_at) VALUES ('old-owner', 'compatibility_content', 'data', 1000)`) require.NoError(t, err) bdb.Close() - require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "new-owner")) + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "new-owner")) // Second call must not fail and must not duplicate the row. - require.NoError(t, ImportKeyValueFromBridgesyncer(bridgePath, claimPath, "new-owner")) + require.NoError(t, ImportKeyValueFromBridgesyncer(context.Background(), bridgePath, claimPath, "new-owner")) require.Equal(t, 1, countRows(t, claimPath, "key_value")) } diff --git a/cmd/run.go b/cmd/run.go index 8aafc0318..f85034231 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -1009,7 +1009,7 @@ func runImportFromBridgeSyncerIfNeeded( if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { logger.Fatalf("failed to import claim data from bridge DB: %v", err) } - if err := claimsyncstorage.ImportKeyValueFromBridgesyncer(bridgeDBPath, claimDBPath, syncerID.String()); err != nil { + if err := claimsyncstorage.ImportKeyValueFromBridgesyncer(ctx, bridgeDBPath, claimDBPath, syncerID.String()); err != nil { logger.Fatalf("failed to import key_value from bridge DB: %v", err) } logger.Infof("migration from bridgesyncer to claimsyncer completed successfully") From bedb1e2af0e0f5bf1b6a2b4b9c164950b1ed7b28 Mon Sep 17 00:00:00 2001 From: joanestebanr <129153821+joanestebanr@users.noreply.github.com> Date: Wed, 25 Mar 2026 13:21:46 +0100 Subject: [PATCH 12/12] fix: linter --- bridgesync/processor_test.go | 1 - claimsync/storage/import_data_from_bridgesyncer.go | 11 ++++++----- cmd/run.go | 3 ++- 3 files changed, 8 insertions(+), 7 deletions(-) diff --git a/bridgesync/processor_test.go b/bridgesync/processor_test.go index 042d15496..b41c47767 100644 --- a/bridgesync/processor_test.go +++ b/bridgesync/processor_test.go @@ -45,7 +45,6 @@ func newTestProcessor(dbPath string, syncerID string, logger *log.Logger, dbQuer return newProcessor(database, syncerID, logger, dbQueryTimeout) } - func TestProcessor(t *testing.T) { path := path.Join(t.TempDir(), "bridgeSyncerProcessor.db") logger := log.WithFields("module", "bridge-syncer") diff --git a/claimsync/storage/import_data_from_bridgesyncer.go b/claimsync/storage/import_data_from_bridgesyncer.go index 38de010b4..dbfe49fec 100644 --- a/claimsync/storage/import_data_from_bridgesyncer.go +++ b/claimsync/storage/import_data_from_bridgesyncer.go @@ -120,7 +120,7 @@ func InspectBridgeSyncer(ctx context.Context, bridgeDBFilename, claimDBFilename `SELECT COUNT(*) FROM gorp_migrations WHERE id = $1`, requiredBridgeMigration).Scan(&migCount) if err != nil { errMsg := strings.ToLower(err.Error()) - if !(strings.Contains(errMsg, "no such table") && strings.Contains(errMsg, "gorp_migrations")) { + if !strings.Contains(errMsg, "no such table") || !strings.Contains(errMsg, "gorp_migrations") { return status, fmt.Errorf("InspectBridgeSyncer: failed to query gorp_migrations: %w", err) } } else { @@ -174,7 +174,7 @@ func ImportDataFromBridgesyncer(ctx context.Context, tmpFilename := claimDBFilename + ".import.tmp" // Remove any leftover tmp file from a previous failed attempt. - os.Remove(tmpFilename) //nolint:errcheck + os.Remove(tmpFilename) // All DB work happens on tmpFilename. The defers inside importDataToTmpFile // guarantee the DB/connection/transaction are fully closed before we return, @@ -182,12 +182,12 @@ func ImportDataFromBridgesyncer(ctx context.Context, blocksImported, claimsImported, setClaimsImported, unsetClaimsImported, err := importDataToTmpFile(ctx, logger, bridgeDBFilename, tmpFilename) if err != nil { - os.Remove(tmpFilename) //nolint:errcheck + os.Remove(tmpFilename) return err } if err := os.Rename(tmpFilename, claimDBFilename); err != nil { - os.Remove(tmpFilename) //nolint:errcheck + os.Remove(tmpFilename) return fmt.Errorf("ImportDataFromBridgesyncer: failed to promote tmp DB: %w", err) } @@ -410,7 +410,8 @@ func importSetClaims(tx *sql.Tx) (int64, error) { // contains no rows. In that case the claimDB is not created at all. // The import is idempotent: an existing row with the same (owner, key) is silently skipped // (INSERT OR IGNORE). -func ImportKeyValueFromBridgesyncer(ctx context.Context, bridgeDBFilename string, claimDBFilename string, owner string) error { +func ImportKeyValueFromBridgesyncer( + ctx context.Context, bridgeDBFilename string, claimDBFilename string, owner string) error { logger := log.WithFields("module", "ImportKeyValueFromBridgesyncer") // Phase 1 - read the single key_value row from the bridge DB without touching the claim DB. diff --git a/cmd/run.go b/cmd/run.go index f85034231..57bbc101e 100644 --- a/cmd/run.go +++ b/cmd/run.go @@ -1009,7 +1009,8 @@ func runImportFromBridgeSyncerIfNeeded( if err := claimsyncstorage.ImportDataFromBridgesyncer(ctx, logger, bridgeDBPath, claimDBPath); err != nil { logger.Fatalf("failed to import claim data from bridge DB: %v", err) } - if err := claimsyncstorage.ImportKeyValueFromBridgesyncer(ctx, bridgeDBPath, claimDBPath, syncerID.String()); err != nil { + if err := claimsyncstorage.ImportKeyValueFromBridgesyncer( + ctx, bridgeDBPath, claimDBPath, syncerID.String()); err != nil { logger.Fatalf("failed to import key_value from bridge DB: %v", err) } logger.Infof("migration from bridgesyncer to claimsyncer completed successfully")