Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
127 changes: 124 additions & 3 deletions auction-server/src/opportunity/repository/add_opportunity_analytics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,22 @@
use {
super::Repository,
crate::opportunity::entities,
super::{
OpportunityAnalytics,
OpportunityAnalyticsLimo,
OpportunityAnalyticsSwap,
OpportunityRemovalReason,
Repository,
},
crate::{
kernel::pyth_lazer::calculate_notional_value,
opportunity::entities,
state::Price,
},
base64::{
engine::general_purpose,
Engine,
},
solana_sdk::pubkey::Pubkey,
std::collections::HashMap,
time::OffsetDateTime,
};

Expand All @@ -10,9 +26,114 @@ impl Repository {
opportunity: entities::OpportunitySvm,
removal_time: Option<OffsetDateTime>,
removal_reason: Option<entities::OpportunityRemovalReason>,
prices: HashMap<Pubkey, Price>,
decimals: HashMap<Pubkey, u8>,
) -> anyhow::Result<()> {
let sell_token = opportunity
.sell_tokens
.first()
.ok_or(anyhow::anyhow!("Opportunity has no sell tokens"))?;
let sell_token_notional_usd_value = calculate_notional_value(
prices.get(&sell_token.token).cloned(),
sell_token.amount,
decimals.get(&sell_token.token).cloned(),
);
let buy_token = opportunity
.buy_tokens
.first()
.ok_or(anyhow::anyhow!("Opportunity has no buy tokens"))?;
let buy_token_notional_usd_value = calculate_notional_value(
prices.get(&buy_token.token).cloned(),
buy_token.amount,
decimals.get(&buy_token.token).cloned(),
);


let removal_reason: Option<OpportunityRemovalReason> = removal_reason.map(|r| r.into());
// NOTE: It's very easy to forget setting some field in one variant or the other.
// We enforced this by destructing the params and make sure all the fields are used or explicitly discarded.
// This way if we add a field to Limo or Swap variants later on, the code will not compile until we decide what we want to do with that field here.
let opportunity_analytics = match opportunity.program.clone() {
entities::OpportunitySvmProgram::Limo(entities::OpportunitySvmProgramLimo {
order,
order_address,
slot,
}) => OpportunityAnalytics::Limo(OpportunityAnalyticsLimo {
id: opportunity.id,
creation_time: opportunity.creation_time,
permission_key: opportunity.permission_key.to_string(),
chain_id: opportunity.chain_id.clone(),
removal_time,
removal_reason: removal_reason.map(|reason| {
serde_json::to_string(&reason).expect("Failed to serialize removal reason")
}),
sell_token_mint: sell_token.token.to_string(),
sell_token_amount: sell_token.amount,
sell_token_notional_usd_value,
buy_token_mint: buy_token.token.to_string(),
buy_token_amount: buy_token.amount,
buy_token_notional_usd_value,

order: general_purpose::STANDARD.encode(&order),
order_address: order_address.to_string(),
slot,

profile_id: opportunity.profile_id,
}),
entities::OpportunitySvmProgram::Swap(entities::OpportunitySvmProgramSwap {
user_wallet_address,
user_mint_user_balance,
fee_token,
referral_fee_bps,
referral_fee_ppm,
platform_fee_bps,
platform_fee_ppm,
token_program_user,
token_program_searcher,
token_account_initialization_configs,
memo,
cancellable,
minimum_lifetime,
minimum_deadline: _,
}) => OpportunityAnalytics::Swap(OpportunityAnalyticsSwap {
id: opportunity.id,
creation_time: opportunity.creation_time,
permission_key: opportunity.permission_key.to_string(),
chain_id: opportunity.chain_id.clone(),
removal_time,
removal_reason: removal_reason.map(|reason| {
serde_json::to_string(&reason).expect("Failed to serialize removal reason")
}),
searcher_token_mint: sell_token.token.to_string(),
searcher_token_amount: sell_token.amount,
searcher_token_notional_usd_value: sell_token_notional_usd_value,
user_token_mint: buy_token.token.to_string(),
user_token_amount: buy_token.amount,
user_token_notional_usd_value: buy_token_notional_usd_value,

user_wallet_address: user_wallet_address.to_string(),
fee_token: serde_json::to_string(&fee_token)
.expect("Failed to serialize fee token"),
referral_fee_bps,
referral_fee_ppm,
platform_fee_bps,
platform_fee_ppm,
token_program_user: token_program_user.to_string(),
token_program_searcher: token_program_searcher.to_string(),
user_mint_user_balance,
token_account_initialization_configs: serde_json::to_string(
&token_account_initialization_configs,
)
.expect("Failed to serialize token account initialization configs"),
memo,
cancellable,
minimum_lifetime,

profile_id: opportunity.profile_id,
}),
};
self.db_analytics
.add_opportunity(&opportunity, removal_time, removal_reason.map(|r| r.into()))
.add_opportunity(opportunity_analytics)
.await
}
}
159 changes: 25 additions & 134 deletions auction-server/src/opportunity/repository/models.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,14 @@ use {
ChainType,
ProfileId,
},
opportunity::{
entities,
entities::{
FeeToken,
OpportunitySvm,
TokenAccountInitializationConfigs,
},
opportunity::entities::{
FeeToken,
OpportunitySvm,
TokenAccountInitializationConfigs,
},
},
::uuid::Uuid,
axum::async_trait,
base64::engine::{
general_purpose,
Engine,
},
clickhouse::Row,
serde::{
de::DeserializeOwned,
Expand Down Expand Up @@ -175,12 +168,8 @@ pub struct Opportunity<T: OpportunityMetadata> {
#[cfg_attr(test, automock)]
#[async_trait]
pub trait AnalyticsDatabase: Send + Sync + 'static {
async fn add_opportunity(
&self,
opportunity: &OpportunitySvm,
removal_time: Option<OffsetDateTime>,
removal_reason: Option<OpportunityRemovalReason>,
) -> Result<(), anyhow::Error>;
async fn add_opportunity(&self, opportunity: OpportunityAnalytics)
-> Result<(), anyhow::Error>;
}

#[cfg_attr(test, automock)]
Expand All @@ -205,6 +194,11 @@ pub trait Database: Debug + Send + Sync + 'static {
) -> anyhow::Result<Option<OffsetDateTime>>;
}

pub enum OpportunityAnalytics {
Limo(OpportunityAnalyticsLimo),
Swap(OpportunityAnalyticsSwap),
}

#[derive(Row, Serialize, Deserialize, Debug)]
pub struct OpportunityAnalyticsLimo {
#[serde(with = "clickhouse::serde::uuid")]
Expand Down Expand Up @@ -285,123 +279,20 @@ impl AnalyticsDatabase for AnalyticsDatabaseInserter {
),
skip_all
)]
async fn add_opportunity(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any particular reason to move this logic out of this function?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This trait should be isolated to work only with the analytics type. It shouldn’t be aware of the Opportunity type. The repository layer should handle any conversions between different data sources, such as analytics and Postgres. Traits in models should only accept and operate on the types they are explicitly designed to handle.

&self,
opportunity: &OpportunitySvm,
removal_time: Option<OffsetDateTime>,
removal_reason: Option<OpportunityRemovalReason>,
) -> anyhow::Result<()> {
// TODO Add USD price for tokens

let sell_token = opportunity
.sell_tokens
.first()
.ok_or(anyhow::anyhow!("Opportunity has no sell tokens"))?;

let buy_token = opportunity
.buy_tokens
.first()
.ok_or(anyhow::anyhow!("Opportunity has no buy tokens"))?;

// NOTE: It's very easy to forget setting some field in one variant or the other.
// We enforced this by destructing the params and make sure all the fields are used or explicitly discarded.
// This way if we add a field to Limo or Swap variants later on, the code will not compile until we decide what we want to do with that field here.
match opportunity.program.clone() {
entities::OpportunitySvmProgram::Limo(entities::OpportunitySvmProgramLimo {
order,
order_address,
slot,
}) => {
let opportunity_analytics = OpportunityAnalyticsLimo {
id: opportunity.id,
creation_time: opportunity.creation_time,
permission_key: opportunity.permission_key.to_string(),
chain_id: opportunity.chain_id.clone(),
removal_time,
removal_reason: removal_reason.map(|reason| {
serde_json::to_string(&reason).expect("Failed to serialize removal reason")
}),
sell_token_mint: sell_token.token.to_string(),
sell_token_amount: sell_token.amount,
sell_token_notional_usd_value: None,
buy_token_mint: buy_token.token.to_string(),
buy_token_amount: buy_token.amount,
buy_token_notional_usd_value: None,

order: general_purpose::STANDARD.encode(&order),
order_address: order_address.to_string(),
slot,

profile_id: opportunity.profile_id,
};
self.inserter_opportunity_limo
.sender
.send(opportunity_analytics)
.await
.map_err(|e| {
anyhow::anyhow!("Failed to send limo opportunity analytics {:?}", e)
})
}
entities::OpportunitySvmProgram::Swap(entities::OpportunitySvmProgramSwap {
user_wallet_address,
user_mint_user_balance,
fee_token,
referral_fee_bps,
referral_fee_ppm,
platform_fee_bps,
platform_fee_ppm,
token_program_user,
token_program_searcher,
token_account_initialization_configs,
memo,
cancellable,
minimum_lifetime,
minimum_deadline: _,
}) => {
let opportunity_analytics = OpportunityAnalyticsSwap {
id: opportunity.id,
creation_time: opportunity.creation_time,
permission_key: opportunity.permission_key.to_string(),
chain_id: opportunity.chain_id.clone(),
removal_time,
removal_reason: removal_reason.map(|reason| {
serde_json::to_string(&reason).expect("Failed to serialize removal reason")
}),
searcher_token_mint: sell_token.token.to_string(),
searcher_token_amount: sell_token.amount,
searcher_token_notional_usd_value: None,
user_token_mint: buy_token.token.to_string(),
user_token_amount: buy_token.amount,
user_token_notional_usd_value: None,

user_wallet_address: user_wallet_address.to_string(),
fee_token: serde_json::to_string(&fee_token)
.expect("Failed to serialize fee token"),
referral_fee_bps,
referral_fee_ppm,
platform_fee_bps,
platform_fee_ppm,
token_program_user: token_program_user.to_string(),
token_program_searcher: token_program_searcher.to_string(),
user_mint_user_balance,
token_account_initialization_configs: serde_json::to_string(
&token_account_initialization_configs,
)
.expect("Failed to serialize token account initialization configs"),
memo,
cancellable,
minimum_lifetime,

profile_id: opportunity.profile_id,
};
self.inserter_opportunity_swap
.sender
.send(opportunity_analytics)
.await
.map_err(|e| {
anyhow::anyhow!("Failed to send swap opportunity analytics {:?}", e)
})
}
async fn add_opportunity(&self, opportunity: OpportunityAnalytics) -> anyhow::Result<()> {
match opportunity {
OpportunityAnalytics::Limo(opportunity) => self
.inserter_opportunity_limo
.sender
.send(opportunity)
.await
.map_err(|e| anyhow::anyhow!("Failed to send limo opportunity analytics {:?}", e)),
OpportunityAnalytics::Swap(opportunity) => self
.inserter_opportunity_swap
.sender
.send(opportunity)
.await
.map_err(|e| anyhow::anyhow!("Failed to send swap opportunity analytics {:?}", e)),
}
}
}
Expand Down
27 changes: 13 additions & 14 deletions auction-server/src/opportunity/service/add_opportunity.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ use {
ws::UpdateEvent::NewOpportunity,
RestError,
},
opportunity::entities::{
self,
OpportunityCreateSvm,
OpportunitySvm,
opportunity::{
entities::{
self,
OpportunityCreateSvm,
OpportunitySvm,
},
service::add_opportunity_analytics::AddOpportunityAnalyticsInput,
},
},
};
Expand Down Expand Up @@ -65,17 +68,13 @@ impl Service {
self.task_tracker.spawn({
let (service, opportunity) = (self.clone(), opportunity.clone());
async move {
if let Err(err) = service
.repo
.add_opportunity_analytics(opportunity.clone(), None, None)
service
.add_opportunity_analytics(AddOpportunityAnalyticsInput {
opportunity,
removal_time: None,
removal_reason: None,
})
.await
{
tracing::error!(
error = ?err,
opportunity = ?opportunity,
"Failed to add opportunity analytics",
);
}
}
});
opportunity
Expand Down
Loading
Loading