diff --git a/artifacts/checksums.txt b/artifacts/checksums.txt index 2bc072d..e94e95e 100644 --- a/artifacts/checksums.txt +++ b/artifacts/checksums.txt @@ -1,2 +1,2 @@ -6751147df3820468674b76e49dd069642513f36b59404847de1ca5e9ce4eca7c dex_aggregator.wasm -46dcce8409830a30b424dd342887f1cc203ef9c203f4b754b172c2203a7c0ad7 mock_swap.wasm +d565fcb50e8379e218e8896f90a9f1e02aa737bd0eb7c1b77756f918c0d38ec0 dex_aggregator.wasm +f33d353816a46e3219f95c046682efeef68408252c9a31ce15599b2de9ba7aaa mock_swap.wasm diff --git a/artifacts/dex_aggregator.wasm b/artifacts/dex_aggregator.wasm index 30a8ec1..d6189d2 100644 Binary files a/artifacts/dex_aggregator.wasm and b/artifacts/dex_aggregator.wasm differ diff --git a/artifacts/mock_swap.wasm b/artifacts/mock_swap.wasm index ee648fc..e340b53 100644 Binary files a/artifacts/mock_swap.wasm and b/artifacts/mock_swap.wasm differ diff --git a/contracts/dex_aggregator/src/msg.rs b/contracts/dex_aggregator/src/msg.rs index 6de5d9e..ac2a8bb 100644 --- a/contracts/dex_aggregator/src/msg.rs +++ b/contracts/dex_aggregator/src/msg.rs @@ -155,6 +155,8 @@ pub struct Stage { pub struct PlannedSwap { pub operation: Operation, pub amount: Uint128, + pub split_index: usize, + pub op_index: usize, } pub struct StagePlan { diff --git a/contracts/dex_aggregator/src/reply.rs b/contracts/dex_aggregator/src/reply.rs index 4b61b3c..6ff76dc 100644 --- a/contracts/dex_aggregator/src/reply.rs +++ b/contracts/dex_aggregator/src/reply.rs @@ -2,8 +2,8 @@ use crate::error::ContractError; use crate::execute::create_swap_cosmos_msg; use crate::msg::{amm, cw20_adapter, Operation, PlannedSwap, Stage, StagePlan}; use crate::state::{ - Awaiting, Config, ExecutionState, PendingPathOp, RoutePlan, CONFIG, EXECUTION_STATES, FEE_MAP, - ROUTE_PLANS, + Awaiting, Config, ExecutionState, PendingPathOp, RoutePlan, SubmsgReplyState, CONFIG, + EXECUTION_STATES, FEE_MAP, REPLY_ID_COUNTER, ROUTE_PLANS, SUBMSG_REPLY_STATES, }; use cosmwasm_std::{ to_json_binary, Addr, Coin, CosmosMsg, DepsMut, Env, Reply, Response, StdError, SubMsg, @@ -17,18 +17,32 @@ pub fn handle_reply( env: Env, msg: Reply, ) -> Result, ContractError> { - let reply_id = msg.id; - let mut exec_state = EXECUTION_STATES.load(deps.storage, reply_id)?; - let plan = ROUTE_PLANS.load(deps.storage, reply_id)?; - - match exec_state.awaiting { - Awaiting::Swaps => handle_swap_reply(deps, env, msg, &mut exec_state, &plan), - Awaiting::Conversions => handle_conversion_reply(deps, env, msg, &mut exec_state, &plan), - Awaiting::FinalConversions => { - handle_final_conversion_reply(deps, env, msg, &mut exec_state, &plan) - } - Awaiting::PathConversion => { - handle_path_conversion_reply(deps, env, msg, &mut exec_state, &plan) + if let Ok(submsg_state) = SUBMSG_REPLY_STATES.load(deps.storage, msg.id) { + let master_reply_id = submsg_state.master_reply_id; + let mut exec_state = EXECUTION_STATES.load(deps.storage, master_reply_id)?; + let plan = ROUTE_PLANS.load(deps.storage, master_reply_id)?; + SUBMSG_REPLY_STATES.remove(deps.storage, msg.id); + + handle_swap_reply(deps, env, msg, &mut exec_state, &plan, submsg_state) + } else { + let master_reply_id = msg.id; + let mut exec_state = EXECUTION_STATES.load(deps.storage, master_reply_id)?; + let plan = ROUTE_PLANS.load(deps.storage, master_reply_id)?; + + match exec_state.awaiting { + Awaiting::Conversions => { + handle_conversion_reply(deps, env, msg, &mut exec_state, &plan) + } + Awaiting::FinalConversions => { + handle_final_conversion_reply(deps, env, msg, &mut exec_state, &plan) + } + Awaiting::PathConversion => { + handle_path_conversion_reply(deps, env, msg, &mut exec_state, &plan) + } + Awaiting::Swaps => Err(ContractError::Std(StdError::generic_err(format!( + "Unregistered swap reply ID received: {}", + msg.id + )))), } } } @@ -87,8 +101,9 @@ fn handle_swap_reply( msg: Reply, exec_state: &mut ExecutionState, plan: &RoutePlan, + submsg_state: SubmsgReplyState, ) -> Result, ContractError> { - let master_reply_id = msg.id; + let master_reply_id = submsg_state.master_reply_id; let events = &msg .result @@ -105,124 +120,129 @@ fn handle_swap_reply( if swap_event_opt.is_none() { exec_state.replies_expected -= 1; - - let response = if exec_state.replies_expected > 0 { + if exec_state.replies_expected == 0 { + exec_state.current_stage_index += 1; + return proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id); + } else { EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; - Response::new() + return Ok(Response::new() .add_attribute("action", "accumulating_path_outputs") - .add_attribute("info", "zero_value_path_completed_or_irrelevant_reply") - } else { - exec_state.current_stage_index += 1; - proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)? - }; - return Ok(response); + .add_attribute("info", "zero_value_path_completed")); + } } - let swap_event = swap_event_opt.unwrap(); - - let replying_pool_addr_str = swap_event - .attributes - .iter() - .find(|a| a.key == "_contract_address") - .map(|a| a.value.clone()) - .ok_or_else(|| StdError::generic_err("Swap result event is missing '_contract_address'"))?; - - let replying_pool_addr = deps.api.addr_validate(&replying_pool_addr_str)?; + let split_index = submsg_state.split_index; + let op_index = submsg_state.op_index; let current_stage = plan .stages .get(exec_state.current_stage_index as usize) .ok_or(ContractError::EmptyRoute {})?; - let mut replied_path_info = None; - 'outer: for (split_idx, split) in current_stage.splits.iter().enumerate() { - for (op_idx, op) in split.path.iter().enumerate() { - if get_operation_address(op) == replying_pool_addr.as_str() { - replied_path_info = Some(((split_idx, op_idx), op)); - break 'outer; - } - } - } + let replied_op = ¤t_stage.splits[split_index].path[op_index]; + + let received_amount = parse_amount_from_swap_reply(&msg)?; + let received_asset_info = get_operation_output(replied_op)?; - if let Some(((split_index, op_index), replied_op)) = replied_path_info { - let received_amount = parse_amount_from_swap_reply(&msg)?; - let received_asset_info = get_operation_output(replied_op)?; + let replied_path = ¤t_stage.splits[split_index].path; - let replied_path = ¤t_stage.splits[split_index].path; + if let Some(next_op) = replied_path.get(op_index + 1) { + // This is a multi-hop path, proceed to the next operation. + let offer_asset_for_next_op = amm::Asset { + info: received_asset_info, + amount: received_amount, + }; - if let Some(next_op) = replied_path.get(op_index + 1) { - let required_input_info = get_operation_input(next_op)?; - let offer_asset_for_next_op = amm::Asset { - info: received_asset_info, + // Before dispatching the next message, check for asset mismatch. + let required_input_info = get_operation_input(next_op)?; + if offer_asset_for_next_op.info != required_input_info { + // A mid-path conversion is needed. + exec_state.awaiting = Awaiting::PathConversion; + exec_state.pending_path_op = Some(PendingPathOp { + operation: next_op.clone(), amount: received_amount, - }; - if offer_asset_for_next_op.info != required_input_info { - exec_state.awaiting = Awaiting::PathConversion; - exec_state.pending_path_op = Some(PendingPathOp { - operation: next_op.clone(), - amount: received_amount, - }); - let config = CONFIG.load(deps.storage)?; - let conversion_msg = - create_conversion_msg(&offer_asset_for_next_op, &config, &env)?; - let sub_msg = SubMsg::reply_on_success(conversion_msg, master_reply_id); - EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; - return Ok(Response::new() - .add_submessage(sub_msg) - .add_attribute("action", "performing_path_conversion")); - } - let next_msg = create_swap_cosmos_msg( - &mut deps, - next_op, - &offer_asset_for_next_op.info, - offer_asset_for_next_op.amount, - &env, - )?; - let sub_msg = SubMsg::reply_on_success(next_msg, master_reply_id); + }); + + let config = CONFIG.load(deps.storage)?; + let conversion_msg = create_conversion_msg(&offer_asset_for_next_op, &config, &env)?; + + // The reply for this conversion will use the master_reply_id + let sub_msg = SubMsg::reply_on_success(conversion_msg, master_reply_id); EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; - Ok(Response::new() + + return Ok(Response::new() .add_submessage(sub_msg) - .add_attribute("action", "proceeding_to_next_op_in_path") - .add_attribute("split_index", split_index.to_string()) - .add_attribute("op_index", (op_index + 1).to_string())) - } else { - let fee = match FEE_MAP.may_load(deps.storage, &replying_pool_addr)? { - Some(fee_percent) => { - let numerator = fee_percent.atomics(); - let denominator = Uint128::new(1_000_000_000_000_000_000u128); - received_amount.multiply_ratio(numerator, denominator) - } - None => Uint128::zero(), - }; - let amount_after_fee = received_amount.checked_sub(fee).map_err(StdError::from)?; - exec_state.accumulated_assets.push(amm::Asset { - info: received_asset_info.clone(), - amount: amount_after_fee, - }); - exec_state.replies_expected -= 1; - let mut response; - if exec_state.replies_expected > 0 { - EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; - response = Response::new().add_attribute("action", "accumulating_path_outputs"); - } else { - exec_state.current_stage_index += 1; - response = proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?; - } - if !fee.is_zero() { - let config = CONFIG.load(deps.storage)?; - let fee_send_msg = - create_send_msg(&config.fee_collector, &received_asset_info, fee)?; - response = response - .add_message(fee_send_msg) - .add_attribute("fee_collected", fee.to_string()) - .add_attribute("fee_pool", replying_pool_addr.to_string()); - } - Ok(response) + .add_attribute("action", "performing_path_conversion")); } - } else { + + // Create the message for the next step. + let next_msg = create_swap_cosmos_msg( + &mut deps, + next_op, + &offer_asset_for_next_op.info, + offer_asset_for_next_op.amount, + &env, + )?; + + let mut reply_id_counter = REPLY_ID_COUNTER.load(deps.storage)?; + reply_id_counter += 1; + REPLY_ID_COUNTER.save(deps.storage, &reply_id_counter)?; + let next_submsg_id = reply_id_counter; + + SUBMSG_REPLY_STATES.save( + deps.storage, + next_submsg_id, + &SubmsgReplyState { + master_reply_id, + split_index, + op_index: op_index + 1, + }, + )?; + + let sub_msg = SubMsg::reply_on_success(next_msg, next_submsg_id); + + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; + Ok(Response::new() - .add_attribute("action", "ignored_unmatched_reply") - .add_attribute("source", replying_pool_addr)) + .add_submessage(sub_msg) + .add_attribute("action", "proceeding_to_next_op_in_path") + .add_attribute("split_index", split_index.to_string()) + .add_attribute("op_index", (op_index + 1).to_string())) + } else { + let replying_pool_addr = deps.api.addr_validate(get_operation_address(replied_op))?; + + let fee = match FEE_MAP.may_load(deps.storage, &replying_pool_addr)? { + Some(fee_percent) => { + received_amount.multiply_ratio(fee_percent.atomics(), 1_000_000_000_000_000_000u128) + } + None => Uint128::zero(), + }; + + let amount_after_fee = received_amount.checked_sub(fee).map_err(StdError::from)?; + exec_state.accumulated_assets.push(amm::Asset { + info: received_asset_info.clone(), + amount: amount_after_fee, + }); + exec_state.replies_expected -= 1; + + let mut response; + if exec_state.replies_expected > 0 { + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; + response = Response::new().add_attribute("action", "accumulating_path_outputs"); + } else { + exec_state.current_stage_index += 1; + response = proceed_to_next_step(&mut deps, env, exec_state, plan, master_reply_id)?; + } + + if !fee.is_zero() { + let config = CONFIG.load(deps.storage)?; + let fee_send_msg = create_send_msg(&config.fee_collector, &received_asset_info, fee)?; + response = response + .add_message(fee_send_msg) + .add_attribute("fee_collected", fee.to_string()) + .add_attribute("fee_pool", replying_pool_addr.to_string()); + } + Ok(response) } } @@ -655,6 +675,8 @@ fn plan_next_stage( swaps_to_execute.push(PlannedSwap { operation: first_op.clone(), amount: amount_for_split, + split_index: i, + op_index: 0, }); } @@ -675,28 +697,48 @@ fn execute_planned_swaps( deps: &mut DepsMut, env: Env, exec_state: &mut ExecutionState, - plan: &RoutePlan, - reply_id: u64, + _plan: &RoutePlan, + master_reply_id: u64, swaps: Vec, ) -> Result, ContractError> { let mut submessages = vec![]; + let filtered_swaps: Vec = + swaps.into_iter().filter(|s| !s.amount.is_zero()).collect(); + + let mut reply_id_counter = REPLY_ID_COUNTER.load(deps.storage)?; + + for swap in &filtered_swaps { + reply_id_counter += 1; + let submsg_id = reply_id_counter; + + SUBMSG_REPLY_STATES.save( + deps.storage, + submsg_id, + &SubmsgReplyState { + master_reply_id, + split_index: swap.split_index, + op_index: swap.op_index, + }, + )?; - for swap in swaps.into_iter().filter(|s| !s.amount.is_zero()) { let offer_asset_info = get_operation_input(&swap.operation)?; let msg = create_swap_cosmos_msg(deps, &swap.operation, &offer_asset_info, swap.amount, &env)?; - submessages.push(SubMsg::reply_on_success(msg, reply_id)); + + submessages.push(SubMsg::reply_on_success(msg, submsg_id)); } + REPLY_ID_COUNTER.save(deps.storage, &reply_id_counter)?; + if submessages.is_empty() { exec_state.current_stage_index += 1; - return proceed_to_next_step(deps, env, exec_state, plan, reply_id); + return proceed_to_next_step(deps, env, exec_state, _plan, master_reply_id); } exec_state.awaiting = Awaiting::Swaps; exec_state.replies_expected = submessages.len() as u64; - EXECUTION_STATES.save(deps.storage, reply_id, exec_state)?; + EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; Ok(Response::new() .add_submessages(submessages) @@ -716,18 +758,34 @@ fn handle_path_conversion_reply( env: Env, msg: Reply, exec_state: &mut ExecutionState, - _plan: &RoutePlan, + plan: &RoutePlan, ) -> Result, ContractError> { let master_reply_id = msg.id; - let converted_amount = parse_amount_from_conversion_reply(&msg, &env)?; let pending_op_details = exec_state.pending_path_op.take().ok_or_else(|| { StdError::generic_err("Path conversion state is invalid: no pending operation found") })?; - let converted_asset_info = get_operation_input(&pending_op_details.operation)?; + let current_stage = plan + .stages + .get(exec_state.current_stage_index as usize) + .unwrap(); + let (split_index, op_index) = current_stage + .splits + .iter() + .enumerate() + .find_map(|(si, split)| { + split + .path + .iter() + .enumerate() + .find(|(_, op)| **op == pending_op_details.operation) + .map(|(oi, _)| (si, oi)) + }) + .ok_or_else(|| StdError::generic_err("Could not find pending op in route plan"))?; + let converted_asset_info = get_operation_input(&pending_op_details.operation)?; let swap_msg = create_swap_cosmos_msg( &mut deps, &pending_op_details.operation, @@ -735,10 +793,25 @@ fn handle_path_conversion_reply( converted_amount, &env, )?; - let sub_msg = SubMsg::reply_on_success(swap_msg, master_reply_id); - exec_state.awaiting = Awaiting::Swaps; + let mut reply_id_counter = REPLY_ID_COUNTER.load(deps.storage)?; + reply_id_counter += 1; + REPLY_ID_COUNTER.save(deps.storage, &reply_id_counter)?; + let submsg_id = reply_id_counter; + SUBMSG_REPLY_STATES.save( + deps.storage, + submsg_id, + &SubmsgReplyState { + master_reply_id, + split_index, + op_index, + }, + )?; + + let sub_msg = SubMsg::reply_on_success(swap_msg, submsg_id); + + exec_state.awaiting = Awaiting::Swaps; EXECUTION_STATES.save(deps.storage, master_reply_id, exec_state)?; Ok(Response::new() diff --git a/contracts/dex_aggregator/src/state.rs b/contracts/dex_aggregator/src/state.rs index e81556e..b5cb91a 100644 --- a/contracts/dex_aggregator/src/state.rs +++ b/contracts/dex_aggregator/src/state.rs @@ -46,7 +46,14 @@ pub struct ExecutionState { pub pending_path_op: Option, } +#[cw_serde] +pub struct SubmsgReplyState { + pub master_reply_id: u64, + pub split_index: usize, + pub op_index: usize, +} + pub const ROUTE_PLANS: Map = Map::new("route_plans"); pub const EXECUTION_STATES: Map = Map::new("execution_states"); - +pub const SUBMSG_REPLY_STATES: Map = Map::new("submsg_reply_states"); pub const REPLY_ID_COUNTER: Item = Item::new("reply_id_counter");