@@ -92,11 +92,13 @@ import {
9292 connectPostgres ,
9393 isProdEnv ,
9494 isTestEnv ,
95+ parseBoolean ,
9596 runMigrations ,
9697} from '@hirosystems/api-toolkit' ;
9798import { PgServer , getConnectionArgs , getConnectionConfig } from './connection' ;
9899import { BigNumber } from 'bignumber.js' ;
99100import { RedisNotifier } from './redis-notifier' ;
101+ import { log } from 'console' ;
100102
101103const MIGRATIONS_TABLE = 'pgmigrations' ;
102104const INSERT_BATCH_SIZE = 500 ;
@@ -183,6 +185,11 @@ export class PgWriteStore extends PgStore {
183185 connectionConfig : getConnectionConfig ( PgServer . primary ) ,
184186 } ) ;
185187 if ( ! skipMigrations ) {
188+ // Before running new migrations, check if we need to migrate default schema
189+ // PG_OLD_SCHEMA must be set to the current schema name (usually 'public')
190+ if ( parseBoolean ( process . env [ 'PG_MIGRATE_SCHEMA_ON_STARTUP' ] ) ) {
191+ await this . migrateSchema ( sql ) ;
192+ }
186193 await runMigrations ( MIGRATIONS_DIR , 'up' , getConnectionArgs ( PgServer . primary ) , {
187194 logger : {
188195 debug : _ => { } ,
@@ -203,6 +210,22 @@ export class PgWriteStore extends PgStore {
203210 return store ;
204211 }
205212
213+ static async migrateSchema ( sql : PgSqlClient ) : Promise < void > {
214+ const oldSchema = process . env [ 'PG_OLD_SCHEMA' ] || 'public' ;
215+ const newSchema = process . env [ 'PG_SCHEMA' ] || 'stacks_blockchain_api' ;
216+
217+ // Fetch schemas to see if new schema already exists; skip migration if it exists
218+ const schemas = await sql < { schema_name : string } [ ] > `
219+ SELECT schema_name FROM information_schema.schemata
220+ ` ;
221+ if ( schemas . find ( s => s . schema_name === newSchema ) ) {
222+ logger . warn ( `Schema ${ newSchema } already exists, skipping migration.` ) ;
223+ logger . warn ( `Set PG_MIGRATE_SCHEMA_ON_STARTUP=false to disable this check on startup.` ) ;
224+ return ;
225+ }
226+ await sql `ALTER SCHEMA ${ sql ( oldSchema ) } RENAME TO ${ sql ( newSchema ) } ;` ;
227+ }
228+
206229 async storeRawEventRequest ( eventPath : string , payload : any ) : Promise < void > {
207230 if ( eventPath === '/new_block' && typeof payload === 'object' ) {
208231 for ( const tx of payload . transactions ) {
@@ -3209,15 +3232,15 @@ export class PgWriteStore extends PgStore {
32093232 RETURNING tx_id, sender_address, nonce, sponsor_address, fee_rate, sponsored, canonical
32103233 ),
32113234 affected_addresses AS (
3212- SELECT
3235+ SELECT
32133236 sender_address AS address,
32143237 fee_rate AS fee_change,
32153238 canonical,
32163239 sponsored
32173240 FROM updated_txs
32183241 WHERE sponsored = false
32193242 UNION ALL
3220- SELECT
3243+ SELECT
32213244 sponsor_address AS address,
32223245 fee_rate AS fee_change,
32233246 canonical,
@@ -3277,12 +3300,12 @@ export class PgWriteStore extends PgStore {
32773300 RETURNING recipient, coinbase_amount, tx_fees_anchored, tx_fees_streamed_confirmed, tx_fees_streamed_produced, canonical
32783301 ),
32793302 reward_changes AS (
3280- SELECT
3303+ SELECT
32813304 recipient AS address,
3282- SUM(CASE WHEN canonical THEN
3283- (coinbase_amount + tx_fees_anchored + tx_fees_streamed_confirmed + tx_fees_streamed_produced)
3284- ELSE
3285- -(coinbase_amount + tx_fees_anchored + tx_fees_streamed_confirmed + tx_fees_streamed_produced)
3305+ SUM(CASE WHEN canonical THEN
3306+ (coinbase_amount + tx_fees_anchored + tx_fees_streamed_confirmed + tx_fees_streamed_produced)
3307+ ELSE
3308+ -(coinbase_amount + tx_fees_anchored + tx_fees_streamed_confirmed + tx_fees_streamed_produced)
32863309 END) AS balance_change
32873310 FROM updated_rewards
32883311 GROUP BY recipient
@@ -3296,7 +3319,7 @@ export class PgWriteStore extends PgStore {
32963319 SET balance = ft_balances.balance + EXCLUDED.balance
32973320 RETURNING ft_balances.address
32983321 )
3299- SELECT
3322+ SELECT
33003323 (SELECT COUNT(*)::int FROM updated_rewards) AS updated_rewards_count
33013324 ` ;
33023325 const updateCount = minerRewardResults [ 0 ] ?. updated_rewards_count ?? 0 ;
@@ -3327,18 +3350,18 @@ export class PgWriteStore extends PgStore {
33273350 RETURNING sender, recipient, amount, asset_event_type_id, canonical
33283351 ),
33293352 event_changes AS (
3330- SELECT
3353+ SELECT
33313354 address,
33323355 SUM(balance_change) AS balance_change
33333356 FROM (
3334- SELECT
3357+ SELECT
33353358 sender AS address,
33363359 SUM(CASE WHEN canonical THEN -amount ELSE amount END) AS balance_change
33373360 FROM updated_events
33383361 WHERE asset_event_type_id IN (1, 3) -- Transfers and Burns affect the sender's balance
33393362 GROUP BY sender
33403363 UNION ALL
3341- SELECT
3364+ SELECT
33423365 recipient AS address,
33433366 SUM(CASE WHEN canonical THEN amount ELSE -amount END) AS balance_change
33443367 FROM updated_events
@@ -3356,7 +3379,7 @@ export class PgWriteStore extends PgStore {
33563379 SET balance = ft_balances.balance + EXCLUDED.balance
33573380 RETURNING ft_balances.address
33583381 )
3359- SELECT
3382+ SELECT
33603383 (SELECT COUNT(*)::int FROM updated_events) AS updated_events_count
33613384 ` ;
33623385 const updateCount = stxResults [ 0 ] ?. updated_events_count ?? 0 ;
@@ -3400,7 +3423,7 @@ export class PgWriteStore extends PgStore {
34003423 SET balance = ft_balances.balance + EXCLUDED.balance
34013424 RETURNING ft_balances.address
34023425 )
3403- SELECT
3426+ SELECT
34043427 (SELECT COUNT(*)::int FROM updated_events) AS updated_events_count
34053428 ` ;
34063429 const updateCount = ftResult [ 0 ] ?. updated_events_count ?? 0 ;
0 commit comments