Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 26 additions & 0 deletions auction-server/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,32 @@ pub struct RunOptions {
#[arg(long = "secret-key")]
#[arg(env = "SECRET_KEY")]
pub secret_key: String,

#[command(flatten)]
pub delete_pg_rows: DeletePgRowsOptions,
}

#[derive(Args, Clone, Debug)]
#[command(next_help_heading = "Delete PG Rows Options")]
#[group(id = "DeletePgRows")]
pub struct DeletePgRowsOptions {
/// Whether to enable the deletion of rows from the database.
#[arg(long = "delete-enabled")]
#[arg(env = "DELETE_ENABLED")]
#[arg(default_value = "true")]
pub delete_enabled: bool,

/// How often to delete rows from the database.
#[arg(long = "delete-interval-seconds")]
#[arg(env = "DELETE_INTERVAL_SECONDS")]
#[arg(default_value = "1")]
pub delete_interval_secs: u64,

/// The threshold staleness for whether a row should be deleted.
#[arg(long = "delete-threshold-seconds")]
#[arg(env = "DELETE_THRESHOLD_SECONDS")]
#[arg(default_value = "172800")] // 2 days in seconds
pub delete_threshold_secs: u64,
}

#[derive(Args, Clone, Debug)]
Expand Down
138 changes: 134 additions & 4 deletions auction-server/src/kernel/workers.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
use {
crate::{
config::DeletePgRowsOptions,
kernel::pyth_lazer::{
PriceFeed,
PythLazer,
Expand All @@ -13,11 +14,21 @@ use {
Store,
},
},
axum_prometheus::metrics,
reqwest::Url,
std::sync::{
atomic::Ordering,
Arc,
sqlx::PgPool,
std::{
sync::{
atomic::Ordering,
Arc,
},
time::Duration,
},
time::{
OffsetDateTime,
PrimitiveDateTime,
},
tracing::instrument,
};


Expand Down Expand Up @@ -69,6 +80,125 @@ pub async fn run_price_subscription(
_ = exit_check_interval.tick() => {}
}
}
tracing::info!("Shutting down transaction submitter...");
tracing::info!("Shutting down price subscription...");
Ok(())
}


const DELETE_BATCH_SIZE: u64 = 5000;

pub async fn run_delete_pg_db_history(
db: &PgPool,
chain_ids: Vec<String>,
delete_pg_rows_options: DeletePgRowsOptions,
) -> anyhow::Result<()> {
if delete_pg_rows_options.delete_enabled {
let delete_interval_secs = delete_pg_rows_options.delete_interval_secs;
let delete_threshold_secs = delete_pg_rows_options.delete_threshold_secs;

tracing::info!("Starting delete PG DB history worker, deleting every {} seconds rows that are {} seconds stale...", delete_interval_secs, delete_threshold_secs);
let mut delete_history_interval =
tokio::time::interval(Duration::from_secs(delete_interval_secs));

while !SHOULD_EXIT.load(Ordering::Acquire) {
tokio::select! {
_ = delete_history_interval.tick() => {
delete_pg_db_bid_history(
db,
delete_threshold_secs,
)
.await?;

let futures = chain_ids.iter().map(|chain_id| {
let db = db.clone();
async move {
delete_pg_db_opportunity_history(
&db,
chain_id,
delete_threshold_secs,
)
.await?;
Ok::<(), anyhow::Error>(())
}
});
futures::future::try_join_all(futures).await?;
}
}
}
tracing::info!("Shutting down delete PG DB history worker...");
Ok(())
} else {
tracing::info!("Skipping PG DB history deletion loop...");
Ok(())
}
}

#[instrument(
target = "metrics",
name = "db_delete_pg_bid_history"
fields(category = "db_queries", result = "success", name = "delete_pg_bid_history", tracing_enabled),
skip_all
)]
pub async fn delete_pg_db_bid_history(
db: &PgPool,
delete_threshold_secs: u64,
) -> anyhow::Result<()> {
let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs);
let n_bids_deleted = sqlx::query!(
"WITH rows_to_delete AS (
SELECT id FROM bid WHERE creation_time < $1 LIMIT $2
) DELETE FROM bid WHERE id IN (SELECT id FROM rows_to_delete)",
PrimitiveDateTime::new(threshold.date(), threshold.time()),
DELETE_BATCH_SIZE as i64,
)
.execute(db)
.await
.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!("Failed to delete PG DB bid history: {}", e);
e
})?
.rows_affected();

metrics::histogram!("db_delete_pg_bid_count").record(n_bids_deleted as f64);

Ok(())
}

#[instrument(
target = "metrics",
name = "db_delete_pg_opportunity_history"
fields(category = "db_queries", result = "success", name = "delete_pg_opportunity_history", tracing_enabled),
skip_all
)]
pub async fn delete_pg_db_opportunity_history(
db: &PgPool,
chain_id: &str,
delete_threshold_secs: u64,
) -> anyhow::Result<()> {
let threshold = OffsetDateTime::now_utc() - Duration::from_secs(delete_threshold_secs);
let n_opportunities_deleted = sqlx::query!(
"WITH rows_to_delete AS (
SELECT id FROM opportunity WHERE chain_id = $1 AND creation_time < $2 LIMIT $3
) DELETE FROM opportunity WHERE id IN (SELECT id FROM rows_to_delete)",
chain_id,
PrimitiveDateTime::new(threshold.date(), threshold.time()),
DELETE_BATCH_SIZE as i64,
)
.execute(db)
.await
.map_err(|e| {
tracing::Span::current().record("result", "error");
tracing::error!("Failed to delete PG DB opportunity history: {}", e);
e
})?
.rows_affected();

metrics::histogram!(
"db_delete_pg_opportunity_count",
&[("chain_id", chain_id.to_string())]
)
.record(n_opportunities_deleted as f64);

Ok(())
}
16 changes: 15 additions & 1 deletion auction-server/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,10 @@ use {
},
kernel::{
traced_sender_svm::TracedSenderSvm,
workers::run_price_subscription,
workers::{
run_delete_pg_db_history,
run_price_subscription,
},
},
models,
opportunity::{
Expand Down Expand Up @@ -651,6 +654,17 @@ pub async fn start_server(run_options: RunOptions) -> Result<()> {
run_options.clone(),
server_state.clone(),
)),
fault_tolerant_handler("pg deletion loop".to_string(), {
let pool = pool.clone();
let delete_pg_rows = run_options.delete_pg_rows.clone();
let chain_ids = auction_services.keys().cloned().collect::<Vec<_>>();
move || {
let pool = pool.clone();
let delete_pg_rows = delete_pg_rows.clone();
let chain_ids = chain_ids.clone();
async move { run_delete_pg_db_history(&pool, chain_ids, delete_pg_rows).await }
}
}),
);

// To make sure all the spawned tasks will finish their job before shut down
Expand Down
3 changes: 3 additions & 0 deletions integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,9 @@ def main():
with open('tilt-resources.env', 'w') as f:
f.write('export SECRET_KEY=admin\n')
f.write(f'export PRIVATE_KEY_SVM={str(relayer_key_svm)}\n')
f.write(f'export DELETE_ENABLED=true\n')
f.write(f'export DELETE_INTERVAL_SECONDS={1}\n')
f.write(f'export DELETE_THRESHOLD_SECONDS={60*60*24*2}\n')

mint_buy = Keypair.from_json((open('keypairs/mint_buy.json').read())).pubkey()
mint_sell = Keypair.from_json((open('keypairs/mint_sell.json').read())).pubkey()
Expand Down
Loading