diff --git a/auction-server/src/opportunity/repository/add_opportunity_analytics.rs b/auction-server/src/opportunity/repository/add_opportunity_analytics.rs index c7917d0c..ade7ff43 100644 --- a/auction-server/src/opportunity/repository/add_opportunity_analytics.rs +++ b/auction-server/src/opportunity/repository/add_opportunity_analytics.rs @@ -1,6 +1,22 @@ use { - super::Repository, - crate::opportunity::entities, + super::{ + OpportunityAnalytics, + OpportunityAnalyticsLimo, + OpportunityAnalyticsSwap, + OpportunityRemovalReason, + Repository, + }, + crate::{ + kernel::pyth_lazer::calculate_notional_value, + opportunity::entities, + state::Price, + }, + base64::{ + engine::general_purpose, + Engine, + }, + solana_sdk::pubkey::Pubkey, + std::collections::HashMap, time::OffsetDateTime, }; @@ -10,9 +26,114 @@ impl Repository { opportunity: entities::OpportunitySvm, removal_time: Option, removal_reason: Option, + prices: HashMap, + decimals: HashMap, ) -> anyhow::Result<()> { + let sell_token = opportunity + .sell_tokens + .first() + .ok_or(anyhow::anyhow!("Opportunity has no sell tokens"))?; + let sell_token_notional_usd_value = calculate_notional_value( + prices.get(&sell_token.token).cloned(), + sell_token.amount, + decimals.get(&sell_token.token).cloned(), + ); + let buy_token = opportunity + .buy_tokens + .first() + .ok_or(anyhow::anyhow!("Opportunity has no buy tokens"))?; + let buy_token_notional_usd_value = calculate_notional_value( + prices.get(&buy_token.token).cloned(), + buy_token.amount, + decimals.get(&buy_token.token).cloned(), + ); + + + let removal_reason: Option = removal_reason.map(|r| r.into()); + // NOTE: It's very easy to forget setting some field in one variant or the other. + // We enforced this by destructing the params and make sure all the fields are used or explicitly discarded. + // This way if we add a field to Limo or Swap variants later on, the code will not compile until we decide what we want to do with that field here. + let opportunity_analytics = match opportunity.program.clone() { + entities::OpportunitySvmProgram::Limo(entities::OpportunitySvmProgramLimo { + order, + order_address, + slot, + }) => OpportunityAnalytics::Limo(OpportunityAnalyticsLimo { + id: opportunity.id, + creation_time: opportunity.creation_time, + permission_key: opportunity.permission_key.to_string(), + chain_id: opportunity.chain_id.clone(), + removal_time, + removal_reason: removal_reason.map(|reason| { + serde_json::to_string(&reason).expect("Failed to serialize removal reason") + }), + sell_token_mint: sell_token.token.to_string(), + sell_token_amount: sell_token.amount, + sell_token_notional_usd_value, + buy_token_mint: buy_token.token.to_string(), + buy_token_amount: buy_token.amount, + buy_token_notional_usd_value, + + order: general_purpose::STANDARD.encode(&order), + order_address: order_address.to_string(), + slot, + + profile_id: opportunity.profile_id, + }), + entities::OpportunitySvmProgram::Swap(entities::OpportunitySvmProgramSwap { + user_wallet_address, + user_mint_user_balance, + fee_token, + referral_fee_bps, + referral_fee_ppm, + platform_fee_bps, + platform_fee_ppm, + token_program_user, + token_program_searcher, + token_account_initialization_configs, + memo, + cancellable, + minimum_lifetime, + minimum_deadline: _, + }) => OpportunityAnalytics::Swap(OpportunityAnalyticsSwap { + id: opportunity.id, + creation_time: opportunity.creation_time, + permission_key: opportunity.permission_key.to_string(), + chain_id: opportunity.chain_id.clone(), + removal_time, + removal_reason: removal_reason.map(|reason| { + serde_json::to_string(&reason).expect("Failed to serialize removal reason") + }), + searcher_token_mint: sell_token.token.to_string(), + searcher_token_amount: sell_token.amount, + searcher_token_notional_usd_value: sell_token_notional_usd_value, + user_token_mint: buy_token.token.to_string(), + user_token_amount: buy_token.amount, + user_token_notional_usd_value: buy_token_notional_usd_value, + + user_wallet_address: user_wallet_address.to_string(), + fee_token: serde_json::to_string(&fee_token) + .expect("Failed to serialize fee token"), + referral_fee_bps, + referral_fee_ppm, + platform_fee_bps, + platform_fee_ppm, + token_program_user: token_program_user.to_string(), + token_program_searcher: token_program_searcher.to_string(), + user_mint_user_balance, + token_account_initialization_configs: serde_json::to_string( + &token_account_initialization_configs, + ) + .expect("Failed to serialize token account initialization configs"), + memo, + cancellable, + minimum_lifetime, + + profile_id: opportunity.profile_id, + }), + }; self.db_analytics - .add_opportunity(&opportunity, removal_time, removal_reason.map(|r| r.into())) + .add_opportunity(opportunity_analytics) .await } } diff --git a/auction-server/src/opportunity/repository/models.rs b/auction-server/src/opportunity/repository/models.rs index b4366e4f..efee7964 100644 --- a/auction-server/src/opportunity/repository/models.rs +++ b/auction-server/src/opportunity/repository/models.rs @@ -18,21 +18,14 @@ use { ChainType, ProfileId, }, - opportunity::{ - entities, - entities::{ - FeeToken, - OpportunitySvm, - TokenAccountInitializationConfigs, - }, + opportunity::entities::{ + FeeToken, + OpportunitySvm, + TokenAccountInitializationConfigs, }, }, ::uuid::Uuid, axum::async_trait, - base64::engine::{ - general_purpose, - Engine, - }, clickhouse::Row, serde::{ de::DeserializeOwned, @@ -175,12 +168,8 @@ pub struct Opportunity { #[cfg_attr(test, automock)] #[async_trait] pub trait AnalyticsDatabase: Send + Sync + 'static { - async fn add_opportunity( - &self, - opportunity: &OpportunitySvm, - removal_time: Option, - removal_reason: Option, - ) -> Result<(), anyhow::Error>; + async fn add_opportunity(&self, opportunity: OpportunityAnalytics) + -> Result<(), anyhow::Error>; } #[cfg_attr(test, automock)] @@ -205,6 +194,11 @@ pub trait Database: Debug + Send + Sync + 'static { ) -> anyhow::Result>; } +pub enum OpportunityAnalytics { + Limo(OpportunityAnalyticsLimo), + Swap(OpportunityAnalyticsSwap), +} + #[derive(Row, Serialize, Deserialize, Debug)] pub struct OpportunityAnalyticsLimo { #[serde(with = "clickhouse::serde::uuid")] @@ -285,123 +279,20 @@ impl AnalyticsDatabase for AnalyticsDatabaseInserter { ), skip_all )] - async fn add_opportunity( - &self, - opportunity: &OpportunitySvm, - removal_time: Option, - removal_reason: Option, - ) -> anyhow::Result<()> { - // TODO Add USD price for tokens - - let sell_token = opportunity - .sell_tokens - .first() - .ok_or(anyhow::anyhow!("Opportunity has no sell tokens"))?; - - let buy_token = opportunity - .buy_tokens - .first() - .ok_or(anyhow::anyhow!("Opportunity has no buy tokens"))?; - - // NOTE: It's very easy to forget setting some field in one variant or the other. - // We enforced this by destructing the params and make sure all the fields are used or explicitly discarded. - // This way if we add a field to Limo or Swap variants later on, the code will not compile until we decide what we want to do with that field here. - match opportunity.program.clone() { - entities::OpportunitySvmProgram::Limo(entities::OpportunitySvmProgramLimo { - order, - order_address, - slot, - }) => { - let opportunity_analytics = OpportunityAnalyticsLimo { - id: opportunity.id, - creation_time: opportunity.creation_time, - permission_key: opportunity.permission_key.to_string(), - chain_id: opportunity.chain_id.clone(), - removal_time, - removal_reason: removal_reason.map(|reason| { - serde_json::to_string(&reason).expect("Failed to serialize removal reason") - }), - sell_token_mint: sell_token.token.to_string(), - sell_token_amount: sell_token.amount, - sell_token_notional_usd_value: None, - buy_token_mint: buy_token.token.to_string(), - buy_token_amount: buy_token.amount, - buy_token_notional_usd_value: None, - - order: general_purpose::STANDARD.encode(&order), - order_address: order_address.to_string(), - slot, - - profile_id: opportunity.profile_id, - }; - self.inserter_opportunity_limo - .sender - .send(opportunity_analytics) - .await - .map_err(|e| { - anyhow::anyhow!("Failed to send limo opportunity analytics {:?}", e) - }) - } - entities::OpportunitySvmProgram::Swap(entities::OpportunitySvmProgramSwap { - user_wallet_address, - user_mint_user_balance, - fee_token, - referral_fee_bps, - referral_fee_ppm, - platform_fee_bps, - platform_fee_ppm, - token_program_user, - token_program_searcher, - token_account_initialization_configs, - memo, - cancellable, - minimum_lifetime, - minimum_deadline: _, - }) => { - let opportunity_analytics = OpportunityAnalyticsSwap { - id: opportunity.id, - creation_time: opportunity.creation_time, - permission_key: opportunity.permission_key.to_string(), - chain_id: opportunity.chain_id.clone(), - removal_time, - removal_reason: removal_reason.map(|reason| { - serde_json::to_string(&reason).expect("Failed to serialize removal reason") - }), - searcher_token_mint: sell_token.token.to_string(), - searcher_token_amount: sell_token.amount, - searcher_token_notional_usd_value: None, - user_token_mint: buy_token.token.to_string(), - user_token_amount: buy_token.amount, - user_token_notional_usd_value: None, - - user_wallet_address: user_wallet_address.to_string(), - fee_token: serde_json::to_string(&fee_token) - .expect("Failed to serialize fee token"), - referral_fee_bps, - referral_fee_ppm, - platform_fee_bps, - platform_fee_ppm, - token_program_user: token_program_user.to_string(), - token_program_searcher: token_program_searcher.to_string(), - user_mint_user_balance, - token_account_initialization_configs: serde_json::to_string( - &token_account_initialization_configs, - ) - .expect("Failed to serialize token account initialization configs"), - memo, - cancellable, - minimum_lifetime, - - profile_id: opportunity.profile_id, - }; - self.inserter_opportunity_swap - .sender - .send(opportunity_analytics) - .await - .map_err(|e| { - anyhow::anyhow!("Failed to send swap opportunity analytics {:?}", e) - }) - } + async fn add_opportunity(&self, opportunity: OpportunityAnalytics) -> anyhow::Result<()> { + match opportunity { + OpportunityAnalytics::Limo(opportunity) => self + .inserter_opportunity_limo + .sender + .send(opportunity) + .await + .map_err(|e| anyhow::anyhow!("Failed to send limo opportunity analytics {:?}", e)), + OpportunityAnalytics::Swap(opportunity) => self + .inserter_opportunity_swap + .sender + .send(opportunity) + .await + .map_err(|e| anyhow::anyhow!("Failed to send swap opportunity analytics {:?}", e)), } } } diff --git a/auction-server/src/opportunity/service/add_opportunity.rs b/auction-server/src/opportunity/service/add_opportunity.rs index ada5c4f1..24b4d695 100644 --- a/auction-server/src/opportunity/service/add_opportunity.rs +++ b/auction-server/src/opportunity/service/add_opportunity.rs @@ -5,10 +5,13 @@ use { ws::UpdateEvent::NewOpportunity, RestError, }, - opportunity::entities::{ - self, - OpportunityCreateSvm, - OpportunitySvm, + opportunity::{ + entities::{ + self, + OpportunityCreateSvm, + OpportunitySvm, + }, + service::add_opportunity_analytics::AddOpportunityAnalyticsInput, }, }, }; @@ -65,17 +68,13 @@ impl Service { self.task_tracker.spawn({ let (service, opportunity) = (self.clone(), opportunity.clone()); async move { - if let Err(err) = service - .repo - .add_opportunity_analytics(opportunity.clone(), None, None) + service + .add_opportunity_analytics(AddOpportunityAnalyticsInput { + opportunity, + removal_time: None, + removal_reason: None, + }) .await - { - tracing::error!( - error = ?err, - opportunity = ?opportunity, - "Failed to add opportunity analytics", - ); - } } }); opportunity diff --git a/auction-server/src/opportunity/service/add_opportunity_analytics.rs b/auction-server/src/opportunity/service/add_opportunity_analytics.rs new file mode 100644 index 00000000..a1d93bf4 --- /dev/null +++ b/auction-server/src/opportunity/service/add_opportunity_analytics.rs @@ -0,0 +1,66 @@ +use { + super::{ + get_token_mint::GetTokenMintInput, + Service, + }, + crate::{ + api::RestError, + opportunity::entities, + }, + std::collections::{ + hash_map::Entry, + HashMap, + }, + time::OffsetDateTime, +}; + +pub struct AddOpportunityAnalyticsInput { + pub opportunity: entities::OpportunitySvm, + pub removal_time: Option, + pub removal_reason: Option, +} + +impl Service { + pub async fn add_opportunity_analytics( + &self, + input: AddOpportunityAnalyticsInput, + ) -> Result<(), RestError> { + let mut decimals = HashMap::new(); + for token in input + .opportunity + .sell_tokens + .iter() + .chain(input.opportunity.buy_tokens.iter()) + { + if let Entry::Vacant(e) = decimals.entry(token.token) { + let mint = self + .get_token_mint(GetTokenMintInput { + chain_id: input.opportunity.chain_id.clone(), + mint: token.token, + }) + .await?; + e.insert(mint.decimals); + } + } + let prices = self.store.prices.read().await.clone(); + if let Err(err) = self + .repo + .add_opportunity_analytics( + input.opportunity.clone(), + input.removal_time, + input.removal_reason, + prices, + decimals, + ) + .await + { + tracing::error!( + error = ?err, + opportunity = ?input.opportunity, + "Failed to add opportunity analytics", + ); + return Err(RestError::TemporarilyUnavailable); + } + Ok(()) + } +} diff --git a/auction-server/src/opportunity/service/mod.rs b/auction-server/src/opportunity/service/mod.rs index 20afd2af..76ad3993 100644 --- a/auction-server/src/opportunity/service/mod.rs +++ b/auction-server/src/opportunity/service/mod.rs @@ -55,6 +55,7 @@ pub mod remove_invalid_or_expired_opportunities; pub mod remove_opportunities; pub mod remove_opportunity; +mod add_opportunity_analytics; mod get_quote_request_account_balances; mod unwrap_referral_fee_info; diff --git a/auction-server/src/opportunity/service/remove_opportunities.rs b/auction-server/src/opportunity/service/remove_opportunities.rs index 3decbbb0..8605768a 100644 --- a/auction-server/src/opportunity/service/remove_opportunities.rs +++ b/auction-server/src/opportunity/service/remove_opportunities.rs @@ -1,5 +1,8 @@ use { - super::Service, + super::{ + add_opportunity_analytics::AddOpportunityAnalyticsInput, + Service, + }, crate::{ api::{ ws::UpdateEvent, @@ -56,20 +59,12 @@ impl Service { (self.clone(), opportunity.clone(), reason.clone()); async move { service - .repo - .add_opportunity_analytics( - opportunity.clone(), - Some(removal_time), - Some(reason.clone()), - ) - .await - .map_err(|err| { - tracing::error!( - error = ?err, - opportunity = ?opportunity, - "Failed to add opportunity analytics", - ); + .add_opportunity_analytics(AddOpportunityAnalyticsInput { + opportunity, + removal_time: Some(removal_time), + removal_reason: Some(reason), }) + .await } }); }); diff --git a/auction-server/src/opportunity/service/remove_opportunity.rs b/auction-server/src/opportunity/service/remove_opportunity.rs index 0f8be2fd..9551e37b 100644 --- a/auction-server/src/opportunity/service/remove_opportunity.rs +++ b/auction-server/src/opportunity/service/remove_opportunity.rs @@ -1,5 +1,8 @@ use { - super::Service, + super::{ + add_opportunity_analytics::AddOpportunityAnalyticsInput, + Service, + }, crate::{ api::RestError, opportunity::entities, @@ -28,23 +31,15 @@ impl Service { })? { self.task_tracker.spawn({ - let (service, opportunity) = (self.clone(), input.opportunity.clone()); + let service = self.clone(); async move { service - .repo - .add_opportunity_analytics( - opportunity.clone(), - Some(removal_time), - Some(input.reason), - ) - .await - .map_err(|err| { - tracing::error!( - error = ?err, - opportunity = ?opportunity, - "Failed to add opportunity analytics", - ); + .add_opportunity_analytics(AddOpportunityAnalyticsInput { + opportunity: input.opportunity, + removal_time: Some(removal_time), + removal_reason: Some(input.reason), }) + .await } }); }