From d53dfb619dbb84cc490a46a711a3b2db60fec384 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 14:32:09 +0100 Subject: [PATCH 01/13] add db delete loop --- auction-server/src/config.rs | 18 ++++++++++ auction-server/src/kernel/workers.rs | 52 +++++++++++++++++++++++++--- auction-server/src/server.rs | 19 +++++++++- integration.py | 2 ++ 4 files changed, 86 insertions(+), 5 deletions(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index b9acdeca..9b5037b7 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -75,6 +75,24 @@ pub struct RunOptions { #[arg(long = "secret-key")] #[arg(env = "SECRET_KEY")] pub secret_key: String, + + #[command(flatten)] + pub delete_pg_rows: DeletePgRowsOptions, +} + +#[derive(Args, Clone, Debug)] +#[command(next_help_heading = "Delete PG Rows Options")] +#[group(id = "DeletePgRows")] +pub struct DeletePgRowsOptions { + /// How often to delete rows from the database. + #[arg(long = "delete-interval-seconds")] + #[arg(env = "DELETE_INTERVAL_SECONDS")] + pub delete_interval_secs: u64, + + /// The threshold staleness for whether a row should be deleted. + #[arg(long = "delete-threshold-seconds")] + #[arg(env = "DELETE_THRESHOLD_SECONDS")] + pub delete_threshold_secs: u64, } #[derive(Args, Clone, Debug)] diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index 83f2fe3f..6af2e13e 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -1,5 +1,6 @@ use { crate::{ + config::DeletePgRowsOptions, kernel::pyth_lazer::{ PriceFeed, PythLazer, @@ -14,9 +15,17 @@ use { }, }, reqwest::Url, - std::sync::{ - atomic::Ordering, - Arc, + sqlx::PgPool, + std::{ + sync::{ + atomic::Ordering, + Arc, + }, + time::Duration, + }, + time::{ + OffsetDateTime, + PrimitiveDateTime, }, }; @@ -69,6 +78,41 @@ pub async fn run_price_subscription( _ = exit_check_interval.tick() => {} } } - tracing::info!("Shutting down transaction submitter..."); + tracing::info!("Shutting down price subscription..."); + Ok(()) +} + + +pub async fn run_delete_pg_db_history( + db: &PgPool, + delete_pg_rows_options: DeletePgRowsOptions, +) -> anyhow::Result<()> { + tracing::info!("Starting delete PG DB history worker..."); + let mut delete_history_interval = tokio::time::interval(Duration::from_secs( + delete_pg_rows_options.delete_interval_secs, + )); + + while !SHOULD_EXIT.load(Ordering::Acquire) { + tokio::select! { + _ = delete_history_interval.tick() => { + let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_pg_rows_options.delete_threshold_secs); + + sqlx::query!( + "DELETE FROM opportunity WHERE creation_time < $1", + PrimitiveDateTime::new(threshold.date(), threshold.time()) + ) + .execute(db) + .await?; + + sqlx::query!( + "DELETE FROM bid WHERE creation_time < $1", + PrimitiveDateTime::new(threshold.date(), threshold.time()) + ) + .execute(db) + .await?; + } + } + } + tracing::info!("Shutting down delete PG DB history worker..."); Ok(()) } diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs index ab410179..4227aed2 100644 --- a/auction-server/src/server.rs +++ b/auction-server/src/server.rs @@ -23,7 +23,10 @@ use { }, kernel::{ traced_sender_svm::TracedSenderSvm, - workers::run_price_subscription, + workers::{ + run_delete_pg_db_history, + run_price_subscription, + }, }, models, opportunity::{ @@ -651,6 +654,20 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { run_options.clone(), server_state.clone(), )), + fault_tolerant_handler("pg deletion loop".to_string(), { + let pool = pool.clone(); + let delete_pg_rows = run_options.delete_pg_rows.clone(); + move || { + tracing::info!( + "Running deletion of pg rows that are {} seconds stale, every {} seconds", + delete_pg_rows.delete_threshold_secs, + delete_pg_rows.delete_interval_secs + ); + let pool = pool.clone(); + let delete_pg_rows = delete_pg_rows.clone(); + async move { run_delete_pg_db_history(&pool, delete_pg_rows).await } + } + }), ); // To make sure all the spawned tasks will finish their job before shut down diff --git a/integration.py b/integration.py index c77a01c4..0be83ea9 100644 --- a/integration.py +++ b/integration.py @@ -13,6 +13,8 @@ def main(): with open('tilt-resources.env', 'w') as f: f.write('export SECRET_KEY=admin\n') f.write(f'export PRIVATE_KEY_SVM={str(relayer_key_svm)}\n') + f.write(f'export DELETE_INTERVAL_SECONDS={60}\n') + f.write(f'export DELETE_THRESHOLD_SECONDS={60*60*24*2}\n') mint_buy = Keypair.from_json((open('keypairs/mint_buy.json').read())).pubkey() mint_sell = Keypair.from_json((open('keypairs/mint_sell.json').read())).pubkey() From c579273e5024d6b8bc7b4c2edd8cbf47cebc9659 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 15:02:33 +0100 Subject: [PATCH 02/13] add optionality to the deletion args --- auction-server/src/config.rs | 4 +-- auction-server/src/kernel/workers.rs | 52 +++++++++++++++++----------- auction-server/src/server.rs | 5 --- 3 files changed, 33 insertions(+), 28 deletions(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index 9b5037b7..aebb46ad 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -87,12 +87,12 @@ pub struct DeletePgRowsOptions { /// How often to delete rows from the database. #[arg(long = "delete-interval-seconds")] #[arg(env = "DELETE_INTERVAL_SECONDS")] - pub delete_interval_secs: u64, + pub delete_interval_secs: Option, /// The threshold staleness for whether a row should be deleted. #[arg(long = "delete-threshold-seconds")] #[arg(env = "DELETE_THRESHOLD_SECONDS")] - pub delete_threshold_secs: u64, + pub delete_threshold_secs: Option, } #[derive(Args, Clone, Debug)] diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index 6af2e13e..b11ef24b 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -87,32 +87,42 @@ pub async fn run_delete_pg_db_history( db: &PgPool, delete_pg_rows_options: DeletePgRowsOptions, ) -> anyhow::Result<()> { - tracing::info!("Starting delete PG DB history worker..."); - let mut delete_history_interval = tokio::time::interval(Duration::from_secs( + match ( delete_pg_rows_options.delete_interval_secs, - )); + delete_pg_rows_options.delete_threshold_secs, + ) { + (Some(delete_interval_secs), Some(delete_threshold_secs)) => { + tracing::info!("Starting delete PG DB history worker, deleting every {} seconds rows that are {} seconds stale...", delete_interval_secs, delete_threshold_secs); + let mut delete_history_interval = + tokio::time::interval(Duration::from_secs(delete_interval_secs)); - while !SHOULD_EXIT.load(Ordering::Acquire) { - tokio::select! { - _ = delete_history_interval.tick() => { - let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_pg_rows_options.delete_threshold_secs); + while !SHOULD_EXIT.load(Ordering::Acquire) { + tokio::select! { + _ = delete_history_interval.tick() => { + let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); - sqlx::query!( - "DELETE FROM opportunity WHERE creation_time < $1", - PrimitiveDateTime::new(threshold.date(), threshold.time()) - ) - .execute(db) - .await?; + sqlx::query!( + "DELETE FROM opportunity WHERE creation_time < $1", + PrimitiveDateTime::new(threshold.date(), threshold.time()) + ) + .execute(db) + .await?; - sqlx::query!( - "DELETE FROM bid WHERE creation_time < $1", - PrimitiveDateTime::new(threshold.date(), threshold.time()) - ) - .execute(db) - .await?; + sqlx::query!( + "DELETE FROM bid WHERE creation_time < $1", + PrimitiveDateTime::new(threshold.date(), threshold.time()) + ) + .execute(db) + .await?; + } + } } + tracing::info!("Shutting down delete PG DB history worker..."); + Ok(()) + } + _ => { + tracing::info!("Skipping PG DB history deletion loop..."); + Ok(()) } } - tracing::info!("Shutting down delete PG DB history worker..."); - Ok(()) } diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs index 4227aed2..298f753e 100644 --- a/auction-server/src/server.rs +++ b/auction-server/src/server.rs @@ -658,11 +658,6 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { let pool = pool.clone(); let delete_pg_rows = run_options.delete_pg_rows.clone(); move || { - tracing::info!( - "Running deletion of pg rows that are {} seconds stale, every {} seconds", - delete_pg_rows.delete_threshold_secs, - delete_pg_rows.delete_interval_secs - ); let pool = pool.clone(); let delete_pg_rows = delete_pg_rows.clone(); async move { run_delete_pg_db_history(&pool, delete_pg_rows).await } From a9776b8cc1fb5fbd3e28c8b59992eb92614cd1b8 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 15:17:46 +0100 Subject: [PATCH 03/13] improve option parsing --- auction-server/src/config.rs | 22 ++++++++++++++++++++++ auction-server/src/kernel/workers.rs | 14 +++++++------- auction-server/src/server.rs | 2 +- 3 files changed, 30 insertions(+), 8 deletions(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index aebb46ad..9c2d0d9c 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -95,6 +95,28 @@ pub struct DeletePgRowsOptions { pub delete_threshold_secs: Option, } +#[derive(Clone, Debug)] +pub struct DeletePgRowsFlags { + pub delete_interval_secs: u64, + pub delete_threshold_secs: u64, +} + +impl DeletePgRowsOptions { + pub fn into_option(self) -> Option { + match (self.delete_interval_secs, self.delete_threshold_secs) { + (Some(interval), Some(threshold)) => Some(DeletePgRowsFlags { + delete_interval_secs: interval, + delete_threshold_secs: threshold, + }), + (None, None) => None, + _ => { + tracing::error!("Both --delete-interval-seconds and --delete-threshold-seconds must be set together."); + None + } + } + } +} + #[derive(Args, Clone, Debug)] #[command(next_help_heading = "Config Options")] #[group(id = "Config")] diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index b11ef24b..f8848318 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -1,6 +1,6 @@ use { crate::{ - config::DeletePgRowsOptions, + config::DeletePgRowsFlags, kernel::pyth_lazer::{ PriceFeed, PythLazer, @@ -85,13 +85,13 @@ pub async fn run_price_subscription( pub async fn run_delete_pg_db_history( db: &PgPool, - delete_pg_rows_options: DeletePgRowsOptions, + delete_pg_rows_flags: Option, ) -> anyhow::Result<()> { - match ( - delete_pg_rows_options.delete_interval_secs, - delete_pg_rows_options.delete_threshold_secs, - ) { - (Some(delete_interval_secs), Some(delete_threshold_secs)) => { + match delete_pg_rows_flags { + Some(DeletePgRowsFlags { + delete_interval_secs, + delete_threshold_secs, + }) => { tracing::info!("Starting delete PG DB history worker, deleting every {} seconds rows that are {} seconds stale...", delete_interval_secs, delete_threshold_secs); let mut delete_history_interval = tokio::time::interval(Duration::from_secs(delete_interval_secs)); diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs index 298f753e..2fcb7d90 100644 --- a/auction-server/src/server.rs +++ b/auction-server/src/server.rs @@ -660,7 +660,7 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { move || { let pool = pool.clone(); let delete_pg_rows = delete_pg_rows.clone(); - async move { run_delete_pg_db_history(&pool, delete_pg_rows).await } + async move { run_delete_pg_db_history(&pool, delete_pg_rows.into_option()).await } } }), ); From 673269a19b3b89ae3e2ad19713f7ef177e11e0e6 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 15:19:19 +0100 Subject: [PATCH 04/13] warn rather than error --- auction-server/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index 9c2d0d9c..d3a2b7a1 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -110,7 +110,7 @@ impl DeletePgRowsOptions { }), (None, None) => None, _ => { - tracing::error!("Both --delete-interval-seconds and --delete-threshold-seconds must be set together."); + tracing::warn!("Both --delete-interval-seconds and --delete-threshold-seconds must be set together."); None } } From 316183ebb26d8efd233189114738f1284813e94c Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 16:32:50 +0100 Subject: [PATCH 05/13] batch deletes --- auction-server/src/kernel/workers.rs | 44 +++++++++++++++++++--------- 1 file changed, 30 insertions(+), 14 deletions(-) diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index f8848318..3e53c2dc 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -83,6 +83,8 @@ pub async fn run_price_subscription( } +const DELETE_BATCH_SIZE: u64 = 1000; + pub async fn run_delete_pg_db_history( db: &PgPool, delete_pg_rows_flags: Option, @@ -99,21 +101,35 @@ pub async fn run_delete_pg_db_history( while !SHOULD_EXIT.load(Ordering::Acquire) { tokio::select! { _ = delete_history_interval.tick() => { - let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); - - sqlx::query!( - "DELETE FROM opportunity WHERE creation_time < $1", - PrimitiveDateTime::new(threshold.date(), threshold.time()) - ) - .execute(db) - .await?; + let mut n_bids_deleted: Option = None; + let threshold_bid = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); + while n_bids_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { + n_bids_deleted = Some(sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM bid WHERE creation_time < $1 LIMIT $2 + ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", + PrimitiveDateTime::new(threshold_bid.date(), threshold_bid.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(db) + .await? + .rows_affected()); + } - sqlx::query!( - "DELETE FROM bid WHERE creation_time < $1", - PrimitiveDateTime::new(threshold.date(), threshold.time()) - ) - .execute(db) - .await?; + let mut n_opportunities_deleted: Option = None; + let threshold_opportunity = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); + while n_opportunities_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { + n_opportunities_deleted = Some(sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM opportunity WHERE creation_time < $1 LIMIT $2 + ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", + PrimitiveDateTime::new(threshold_opportunity.date(), threshold_opportunity.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(db) + .await? + .rows_affected()); + } } } } From 33062e1edf255b87761758ddba3ca98fdd6d121b Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 16:34:41 +0100 Subject: [PATCH 06/13] updated sqlx queries --- ...7edbd374eae20bc8692800a97618de2d63eeb735f.json | 15 +++++++++++++++ ...5b34cd13ec49afd5d32c949d54797f9bffece4a72.json | 15 +++++++++++++++ 2 files changed, 30 insertions(+) create mode 100644 auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json create mode 100644 auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json diff --git a/auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json b/auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json new file mode 100644 index 00000000..fafb2b9b --- /dev/null +++ b/auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH rows_to_delete AS (\n SELECT id FROM bid WHERE creation_time < $1 LIMIT $2\n ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamp", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f" +} diff --git a/auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json b/auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json new file mode 100644 index 00000000..6339c0c7 --- /dev/null +++ b/auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH rows_to_delete AS (\n SELECT id FROM opportunity WHERE creation_time < $1 LIMIT $2\n ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamp", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72" +} From 53252e2e80b3e875e64a9a997770657e5c420445 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 17:20:08 +0100 Subject: [PATCH 07/13] add enabled flag, get rid of option --- auction-server/src/config.rs | 29 ++------- auction-server/src/kernel/workers.rs | 91 ++++++++++++++-------------- auction-server/src/server.rs | 2 +- integration.py | 1 + 4 files changed, 52 insertions(+), 71 deletions(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index d3a2b7a1..b8fa5bda 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -84,39 +84,22 @@ pub struct RunOptions { #[command(next_help_heading = "Delete PG Rows Options")] #[group(id = "DeletePgRows")] pub struct DeletePgRowsOptions { + /// Whether to enable the deletion of rows from the database. + #[arg(long = "delete-enabled")] + #[arg(env = "DELETE_ENABLED")] + pub delete_enabled: bool, + /// How often to delete rows from the database. #[arg(long = "delete-interval-seconds")] #[arg(env = "DELETE_INTERVAL_SECONDS")] - pub delete_interval_secs: Option, + pub delete_interval_secs: u64, /// The threshold staleness for whether a row should be deleted. #[arg(long = "delete-threshold-seconds")] #[arg(env = "DELETE_THRESHOLD_SECONDS")] - pub delete_threshold_secs: Option, -} - -#[derive(Clone, Debug)] -pub struct DeletePgRowsFlags { - pub delete_interval_secs: u64, pub delete_threshold_secs: u64, } -impl DeletePgRowsOptions { - pub fn into_option(self) -> Option { - match (self.delete_interval_secs, self.delete_threshold_secs) { - (Some(interval), Some(threshold)) => Some(DeletePgRowsFlags { - delete_interval_secs: interval, - delete_threshold_secs: threshold, - }), - (None, None) => None, - _ => { - tracing::warn!("Both --delete-interval-seconds and --delete-threshold-seconds must be set together."); - None - } - } - } -} - #[derive(Args, Clone, Debug)] #[command(next_help_heading = "Config Options")] #[group(id = "Config")] diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index 3e53c2dc..bf27e3eb 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -1,6 +1,6 @@ use { crate::{ - config::DeletePgRowsFlags, + config::DeletePgRowsOptions, kernel::pyth_lazer::{ PriceFeed, PythLazer, @@ -87,58 +87,55 @@ const DELETE_BATCH_SIZE: u64 = 1000; pub async fn run_delete_pg_db_history( db: &PgPool, - delete_pg_rows_flags: Option, + delete_pg_rows_options: DeletePgRowsOptions, ) -> anyhow::Result<()> { - match delete_pg_rows_flags { - Some(DeletePgRowsFlags { - delete_interval_secs, - delete_threshold_secs, - }) => { - tracing::info!("Starting delete PG DB history worker, deleting every {} seconds rows that are {} seconds stale...", delete_interval_secs, delete_threshold_secs); - let mut delete_history_interval = - tokio::time::interval(Duration::from_secs(delete_interval_secs)); + if delete_pg_rows_options.delete_enabled { + let delete_interval_secs = delete_pg_rows_options.delete_interval_secs; + let delete_threshold_secs = delete_pg_rows_options.delete_threshold_secs; - while !SHOULD_EXIT.load(Ordering::Acquire) { - tokio::select! { - _ = delete_history_interval.tick() => { - let mut n_bids_deleted: Option = None; - let threshold_bid = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); - while n_bids_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { - n_bids_deleted = Some(sqlx::query!( - "WITH rows_to_delete AS ( - SELECT id FROM bid WHERE creation_time < $1 LIMIT $2 - ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", - PrimitiveDateTime::new(threshold_bid.date(), threshold_bid.time()), - DELETE_BATCH_SIZE as i64, - ) - .execute(db) - .await? - .rows_affected()); - } + tracing::info!("Starting delete PG DB history worker, deleting every {} seconds rows that are {} seconds stale...", delete_interval_secs, delete_threshold_secs); + let mut delete_history_interval = + tokio::time::interval(Duration::from_secs(delete_interval_secs)); - let mut n_opportunities_deleted: Option = None; - let threshold_opportunity = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); - while n_opportunities_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { - n_opportunities_deleted = Some(sqlx::query!( - "WITH rows_to_delete AS ( - SELECT id FROM opportunity WHERE creation_time < $1 LIMIT $2 - ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", - PrimitiveDateTime::new(threshold_opportunity.date(), threshold_opportunity.time()), - DELETE_BATCH_SIZE as i64, - ) - .execute(db) - .await? - .rows_affected()); - } + while !SHOULD_EXIT.load(Ordering::Acquire) { + tokio::select! { + _ = delete_history_interval.tick() => { + let mut n_bids_deleted: Option = None; + let threshold_bid = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); + while n_bids_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { + n_bids_deleted = Some(sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM bid WHERE creation_time < $1 LIMIT $2 + ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", + PrimitiveDateTime::new(threshold_bid.date(), threshold_bid.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(db) + .await? + .rows_affected()); + } + + let mut n_opportunities_deleted: Option = None; + let threshold_opportunity = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); + while n_opportunities_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { + n_opportunities_deleted = Some(sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM opportunity WHERE creation_time < $1 LIMIT $2 + ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", + PrimitiveDateTime::new(threshold_opportunity.date(), threshold_opportunity.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(db) + .await? + .rows_affected()); } } } - tracing::info!("Shutting down delete PG DB history worker..."); - Ok(()) - } - _ => { - tracing::info!("Skipping PG DB history deletion loop..."); - Ok(()) } + tracing::info!("Shutting down delete PG DB history worker..."); + Ok(()) + } else { + tracing::info!("Skipping PG DB history deletion loop..."); + Ok(()) } } diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs index 2fcb7d90..298f753e 100644 --- a/auction-server/src/server.rs +++ b/auction-server/src/server.rs @@ -660,7 +660,7 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { move || { let pool = pool.clone(); let delete_pg_rows = delete_pg_rows.clone(); - async move { run_delete_pg_db_history(&pool, delete_pg_rows.into_option()).await } + async move { run_delete_pg_db_history(&pool, delete_pg_rows).await } } }), ); diff --git a/integration.py b/integration.py index 0be83ea9..3618c995 100644 --- a/integration.py +++ b/integration.py @@ -13,6 +13,7 @@ def main(): with open('tilt-resources.env', 'w') as f: f.write('export SECRET_KEY=admin\n') f.write(f'export PRIVATE_KEY_SVM={str(relayer_key_svm)}\n') + f.write(f'export DELETE_ENABLED=true\n') f.write(f'export DELETE_INTERVAL_SECONDS={60}\n') f.write(f'export DELETE_THRESHOLD_SECONDS={60*60*24*2}\n') From faf55ac7134a10d1a75ab1f8775adb4a4757e947 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 17:23:31 +0100 Subject: [PATCH 08/13] add default vals --- auction-server/src/config.rs | 3 +++ 1 file changed, 3 insertions(+) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index b8fa5bda..8169f818 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -87,16 +87,19 @@ pub struct DeletePgRowsOptions { /// Whether to enable the deletion of rows from the database. #[arg(long = "delete-enabled")] #[arg(env = "DELETE_ENABLED")] + #[arg(default_value = "true")] pub delete_enabled: bool, /// How often to delete rows from the database. #[arg(long = "delete-interval-seconds")] #[arg(env = "DELETE_INTERVAL_SECONDS")] + #[arg(default_value = "60")] pub delete_interval_secs: u64, /// The threshold staleness for whether a row should be deleted. #[arg(long = "delete-threshold-seconds")] #[arg(env = "DELETE_THRESHOLD_SECONDS")] + #[arg(default_value = "86400")] pub delete_threshold_secs: u64, } From 25de37f2e33312dedeaad3c3c1a31778c189c281 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 17:24:14 +0100 Subject: [PATCH 09/13] add corrected default vals --- auction-server/src/config.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index 8169f818..c37e4201 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -99,7 +99,7 @@ pub struct DeletePgRowsOptions { /// The threshold staleness for whether a row should be deleted. #[arg(long = "delete-threshold-seconds")] #[arg(env = "DELETE_THRESHOLD_SECONDS")] - #[arg(default_value = "86400")] + #[arg(default_value = "172800")] // 2 days in seconds pub delete_threshold_secs: u64, } From 0a50ab0bbb838d1d037756208777a65134de7126 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Tue, 3 Jun 2025 17:44:11 +0100 Subject: [PATCH 10/13] added chain_id index --- auction-server/src/kernel/workers.rs | 35 +++++++++++++++++----------- auction-server/src/server.rs | 4 +++- 2 files changed, 25 insertions(+), 14 deletions(-) diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index bf27e3eb..ed4cde57 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -87,6 +87,7 @@ const DELETE_BATCH_SIZE: u64 = 1000; pub async fn run_delete_pg_db_history( db: &PgPool, + chain_ids: Vec, delete_pg_rows_options: DeletePgRowsOptions, ) -> anyhow::Result<()> { if delete_pg_rows_options.delete_enabled { @@ -115,20 +116,28 @@ pub async fn run_delete_pg_db_history( .rows_affected()); } - let mut n_opportunities_deleted: Option = None; let threshold_opportunity = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); - while n_opportunities_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { - n_opportunities_deleted = Some(sqlx::query!( - "WITH rows_to_delete AS ( - SELECT id FROM opportunity WHERE creation_time < $1 LIMIT $2 - ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", - PrimitiveDateTime::new(threshold_opportunity.date(), threshold_opportunity.time()), - DELETE_BATCH_SIZE as i64, - ) - .execute(db) - .await? - .rows_affected()); - } + let futures = chain_ids.iter().map(|chain_id| { + let db = db.clone(); + async move { + let mut n_opportunities_deleted: Option = None; + while n_opportunities_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { + n_opportunities_deleted = Some(sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM opportunity WHERE chain_id = $1 AND creation_time < $2 LIMIT $3 + ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", + chain_id, + PrimitiveDateTime::new(threshold_opportunity.date(), threshold_opportunity.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(&db) + .await? + .rows_affected()); + } + Ok::<(), anyhow::Error>(()) + } + }); + futures::future::try_join_all(futures).await?; } } } diff --git a/auction-server/src/server.rs b/auction-server/src/server.rs index 298f753e..11411c12 100644 --- a/auction-server/src/server.rs +++ b/auction-server/src/server.rs @@ -657,10 +657,12 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> { fault_tolerant_handler("pg deletion loop".to_string(), { let pool = pool.clone(); let delete_pg_rows = run_options.delete_pg_rows.clone(); + let chain_ids = auction_services.keys().cloned().collect::>(); move || { let pool = pool.clone(); let delete_pg_rows = delete_pg_rows.clone(); - async move { run_delete_pg_db_history(&pool, delete_pg_rows).await } + let chain_ids = chain_ids.clone(); + async move { run_delete_pg_db_history(&pool, chain_ids, delete_pg_rows).await } } }), ); From 30267c45f62acd985182d379da1b87a9d100fcdb Mon Sep 17 00:00:00 2001 From: --systemdf Date: Wed, 4 Jun 2025 00:45:20 +0100 Subject: [PATCH 11/13] add instrumentation --- auction-server/src/kernel/workers.rs | 112 ++++++++++++++++++++------- 1 file changed, 83 insertions(+), 29 deletions(-) diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index ed4cde57..8e8095f3 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -14,6 +14,7 @@ use { Store, }, }, + axum_prometheus::metrics, reqwest::Url, sqlx::PgPool, std::{ @@ -27,6 +28,7 @@ use { OffsetDateTime, PrimitiveDateTime, }, + tracing::instrument, }; @@ -101,39 +103,21 @@ pub async fn run_delete_pg_db_history( while !SHOULD_EXIT.load(Ordering::Acquire) { tokio::select! { _ = delete_history_interval.tick() => { - let mut n_bids_deleted: Option = None; - let threshold_bid = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); - while n_bids_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { - n_bids_deleted = Some(sqlx::query!( - "WITH rows_to_delete AS ( - SELECT id FROM bid WHERE creation_time < $1 LIMIT $2 - ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", - PrimitiveDateTime::new(threshold_bid.date(), threshold_bid.time()), - DELETE_BATCH_SIZE as i64, - ) - .execute(db) - .await? - .rows_affected()); - } + delete_pg_db_bid_history( + db, + delete_threshold_secs, + ) + .await?; - let threshold_opportunity = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); let futures = chain_ids.iter().map(|chain_id| { let db = db.clone(); async move { - let mut n_opportunities_deleted: Option = None; - while n_opportunities_deleted.unwrap_or(DELETE_BATCH_SIZE) >= DELETE_BATCH_SIZE { - n_opportunities_deleted = Some(sqlx::query!( - "WITH rows_to_delete AS ( - SELECT id FROM opportunity WHERE chain_id = $1 AND creation_time < $2 LIMIT $3 - ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", - chain_id, - PrimitiveDateTime::new(threshold_opportunity.date(), threshold_opportunity.time()), - DELETE_BATCH_SIZE as i64, - ) - .execute(&db) - .await? - .rows_affected()); - } + delete_pg_db_opportunity_history( + &db, + chain_id, + delete_threshold_secs, + ) + .await?; Ok::<(), anyhow::Error>(()) } }); @@ -148,3 +132,73 @@ pub async fn run_delete_pg_db_history( Ok(()) } } + +#[instrument( + target = "metrics", + name = "db_delete_pg_bid_history" + fields(category = "db_queries", result = "success", name = "delete_pg_bid_history", tracing_enabled), + skip_all +)] +pub async fn delete_pg_db_bid_history( + db: &PgPool, + delete_threshold_secs: u64, +) -> anyhow::Result<()> { + let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); + let n_bids_deleted = sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM bid WHERE creation_time < $1 LIMIT $2 + ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", + PrimitiveDateTime::new(threshold.date(), threshold.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(db) + .await + .map_err(|e| { + tracing::Span::current().record("result", "error"); + tracing::error!("Failed to delete PG DB bid history: {}", e); + e + })? + .rows_affected(); + + metrics::histogram!("db_delete_pg_bid_count").record(n_bids_deleted as f64); + + Ok(()) +} + +#[instrument( + target = "metrics", + name = "db_delete_pg_opportunity_history" + fields(category = "db_queries", result = "success", name = "delete_pg_opportunity_history", tracing_enabled), + skip_all +)] +pub async fn delete_pg_db_opportunity_history( + db: &PgPool, + chain_id: &str, + delete_threshold_secs: u64, +) -> anyhow::Result<()> { + let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs); + let n_opportunities_deleted = sqlx::query!( + "WITH rows_to_delete AS ( + SELECT id FROM opportunity WHERE chain_id = $1 AND creation_time < $2 LIMIT $3 + ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", + chain_id, + PrimitiveDateTime::new(threshold.date(), threshold.time()), + DELETE_BATCH_SIZE as i64, + ) + .execute(db) + .await + .map_err(|e| { + tracing::Span::current().record("result", "error"); + tracing::error!("Failed to delete PG DB opportunity history: {}", e); + e + })? + .rows_affected(); + + metrics::histogram!( + "db_delete_pg_opportunity_count", + &[("chain_id", chain_id.to_string())] + ) + .record(n_opportunities_deleted as f64); + + Ok(()) +} From 6deafea0576f38827bae6337be16b95d6c26c14b Mon Sep 17 00:00:00 2001 From: --systemdf Date: Wed, 4 Jun 2025 00:55:11 +0100 Subject: [PATCH 12/13] update sqlx queries --- ...776169796e24781c48d7c99a334f6293362609a5.json | 15 +++++++++++++++ ...edbd374eae20bc8692800a97618de2d63eeb735f.json | 15 --------------- ...1dccd42901a70a3d1ff736d5cafc6aaa6edf3669.json | 16 ++++++++++++++++ ...b34cd13ec49afd5d32c949d54797f9bffece4a72.json | 15 --------------- 4 files changed, 31 insertions(+), 30 deletions(-) create mode 100644 auction-server/.sqlx/query-6a3cc6bf52fde898997d788c776169796e24781c48d7c99a334f6293362609a5.json delete mode 100644 auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json create mode 100644 auction-server/.sqlx/query-da61b53df51ffad6f7ed8baa1dccd42901a70a3d1ff736d5cafc6aaa6edf3669.json delete mode 100644 auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json diff --git a/auction-server/.sqlx/query-6a3cc6bf52fde898997d788c776169796e24781c48d7c99a334f6293362609a5.json b/auction-server/.sqlx/query-6a3cc6bf52fde898997d788c776169796e24781c48d7c99a334f6293362609a5.json new file mode 100644 index 00000000..db5e0623 --- /dev/null +++ b/auction-server/.sqlx/query-6a3cc6bf52fde898997d788c776169796e24781c48d7c99a334f6293362609a5.json @@ -0,0 +1,15 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH rows_to_delete AS (\n SELECT id FROM bid WHERE creation_time < $1 LIMIT $2\n ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Timestamp", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "6a3cc6bf52fde898997d788c776169796e24781c48d7c99a334f6293362609a5" +} diff --git a/auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json b/auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json deleted file mode 100644 index fafb2b9b..00000000 --- a/auction-server/.sqlx/query-8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH rows_to_delete AS (\n SELECT id FROM bid WHERE creation_time < $1 LIMIT $2\n ) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamp", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "8903c96cea10b683b67203b7edbd374eae20bc8692800a97618de2d63eeb735f" -} diff --git a/auction-server/.sqlx/query-da61b53df51ffad6f7ed8baa1dccd42901a70a3d1ff736d5cafc6aaa6edf3669.json b/auction-server/.sqlx/query-da61b53df51ffad6f7ed8baa1dccd42901a70a3d1ff736d5cafc6aaa6edf3669.json new file mode 100644 index 00000000..5cffde84 --- /dev/null +++ b/auction-server/.sqlx/query-da61b53df51ffad6f7ed8baa1dccd42901a70a3d1ff736d5cafc6aaa6edf3669.json @@ -0,0 +1,16 @@ +{ + "db_name": "PostgreSQL", + "query": "WITH rows_to_delete AS (\n SELECT id FROM opportunity WHERE chain_id = $1 AND creation_time < $2 LIMIT $3\n ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", + "describe": { + "columns": [], + "parameters": { + "Left": [ + "Text", + "Timestamp", + "Int8" + ] + }, + "nullable": [] + }, + "hash": "da61b53df51ffad6f7ed8baa1dccd42901a70a3d1ff736d5cafc6aaa6edf3669" +} diff --git a/auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json b/auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json deleted file mode 100644 index 6339c0c7..00000000 --- a/auction-server/.sqlx/query-e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72.json +++ /dev/null @@ -1,15 +0,0 @@ -{ - "db_name": "PostgreSQL", - "query": "WITH rows_to_delete AS (\n SELECT id FROM opportunity WHERE creation_time < $1 LIMIT $2\n ) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)", - "describe": { - "columns": [], - "parameters": { - "Left": [ - "Timestamp", - "Int8" - ] - }, - "nullable": [] - }, - "hash": "e1926635bb48243338f17ff5b34cd13ec49afd5d32c949d54797f9bffece4a72" -} From 7871fa091eefcd2d6a54f0eac9342e92181c9287 Mon Sep 17 00:00:00 2001 From: --systemdf Date: Wed, 4 Jun 2025 10:29:54 +0100 Subject: [PATCH 13/13] set defaults --- auction-server/src/config.rs | 2 +- auction-server/src/kernel/workers.rs | 2 +- integration.py | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/auction-server/src/config.rs b/auction-server/src/config.rs index c37e4201..d6684218 100644 --- a/auction-server/src/config.rs +++ b/auction-server/src/config.rs @@ -93,7 +93,7 @@ pub struct DeletePgRowsOptions { /// How often to delete rows from the database. #[arg(long = "delete-interval-seconds")] #[arg(env = "DELETE_INTERVAL_SECONDS")] - #[arg(default_value = "60")] + #[arg(default_value = "1")] pub delete_interval_secs: u64, /// The threshold staleness for whether a row should be deleted. diff --git a/auction-server/src/kernel/workers.rs b/auction-server/src/kernel/workers.rs index 8e8095f3..e06b7a32 100644 --- a/auction-server/src/kernel/workers.rs +++ b/auction-server/src/kernel/workers.rs @@ -85,7 +85,7 @@ pub async fn run_price_subscription( } -const DELETE_BATCH_SIZE: u64 = 1000; +const DELETE_BATCH_SIZE: u64 = 5000; pub async fn run_delete_pg_db_history( db: &PgPool, diff --git a/integration.py b/integration.py index 3618c995..da017a83 100644 --- a/integration.py +++ b/integration.py @@ -14,7 +14,7 @@ def main(): f.write('export SECRET_KEY=admin\n') f.write(f'export PRIVATE_KEY_SVM={str(relayer_key_svm)}\n') f.write(f'export DELETE_ENABLED=true\n') - f.write(f'export DELETE_INTERVAL_SECONDS={60}\n') + f.write(f'export DELETE_INTERVAL_SECONDS={1}\n') f.write(f'export DELETE_THRESHOLD_SECONDS={60*60*24*2}\n') mint_buy = Keypair.from_json((open('keypairs/mint_buy.json').read())).pubkey()