Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add dependent requests to check native routes #45

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,3 @@
/target

pks.json
49 changes: 35 additions & 14 deletions src/aws_utils/cloudwatch_utils.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
use std::{future::Future, pin::Pin, sync::Arc};

use aws_sdk_cloudwatch::{config::http::HttpResponse, error::SdkError, operation::put_metric_data::{PutMetricDataError, PutMetricDataOutput}, types::Dimension};
use aws_sdk_cloudwatch::Client as CloudWatchClient;
use aws_sdk_cloudwatch::{
config::http::HttpResponse,
error::SdkError,
operation::put_metric_data::{PutMetricDataError, PutMetricDataOutput},
types::Dimension,
};

/// Constants for dimension names and values
pub const SERVICE_DIMENSION: &str = "Service";
Expand Down Expand Up @@ -90,8 +95,12 @@ impl From<CwMetrics> for String {
CwMetrics::RoutingMs => ROUTING_MS.to_string(),
CwMetrics::Unprofitable => UNPROFITABLE_METRIC.to_string(),
CwMetrics::ExecutionAttempted => EXECUTION_ATTEMPTED_METRIC.to_string(),
CwMetrics::ExecutionSkippedAlreadyFilled => EXECUTION_SKIPPED_ALREADY_FILLED_METRIC.to_string(),
CwMetrics::ExecutionSkippedPastDeadline => EXECUTION_SKIPPED_PAST_DEADLINE_METRIC.to_string(),
CwMetrics::ExecutionSkippedAlreadyFilled => {
EXECUTION_SKIPPED_ALREADY_FILLED_METRIC.to_string()
}
CwMetrics::ExecutionSkippedPastDeadline => {
EXECUTION_SKIPPED_PAST_DEADLINE_METRIC.to_string()
}
CwMetrics::TxSucceeded => TX_SUCCEEDED_METRIC.to_string(),
CwMetrics::TxReverted => TX_REVERTED_METRIC.to_string(),
CwMetrics::TxSubmitted => TX_SUBMITTED_METRIC.to_string(),
Expand All @@ -102,7 +111,6 @@ impl From<CwMetrics> for String {
}
}


pub const ARTEMIS_NAMESPACE: &str = "Artemis";

pub struct MetricBuilder {
Expand Down Expand Up @@ -156,25 +164,38 @@ pub fn receipt_status_to_metric(status: u64) -> CwMetrics {
}
}

pub fn build_metric_future(cloudwatch_client: Option<Arc<CloudWatchClient>>, dimension_value: DimensionValue, metric: CwMetrics, value: f64)
-> Option<Pin<Box<impl Future<Output = Result<PutMetricDataOutput, SdkError<PutMetricDataError, HttpResponse>>> + Send + 'static>>> {
cloudwatch_client.map(|client| {
Box::pin(async move {
client
pub fn build_metric_future(
cloudwatch_client: Option<Arc<CloudWatchClient>>,
dimension_value: DimensionValue,
metric: CwMetrics,
value: f64,
) -> Option<
Pin<
Box<
impl Future<
Output = Result<
PutMetricDataOutput,
SdkError<PutMetricDataError, HttpResponse>,
>,
> + Send
+ 'static,
>,
>,
> {
cloudwatch_client.map(|client| {
Box::pin(async move {
client
.put_metric_data()
.namespace(ARTEMIS_NAMESPACE)
.metric_data(
MetricBuilder::new(metric)
.add_dimension(
DimensionName::Service.as_ref(),
dimension_value.as_ref(),
)
.add_dimension(DimensionName::Service.as_ref(), dimension_value.as_ref())
.with_value(value)
.build(),
)
.send()
.await
})
})
})
}

Expand Down
170 changes: 136 additions & 34 deletions src/collectors/uniswapx_route_collector.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
use std::{collections::HashSet, sync::Arc};
use std::{collections::HashSet, future::Future, sync::Arc};

use alloy_primitives::Uint;
use anyhow::{anyhow, Result};
use aws_sdk_cloudwatch::Client as CloudWatchClient;
use reqwest::header::ORIGIN;
use reqwest::{header::ORIGIN, Error, Response};
use serde::{Deserialize, Serialize};
use tokio::sync::mpsc::{Receiver, Sender};
use tracing::{error, info};
Expand All @@ -16,11 +16,17 @@ use futures::stream::FuturesUnordered;
use futures::StreamExt;
use reqwest::{Client, StatusCode};

use crate::{aws_utils::cloudwatch_utils::{build_metric_future, CwMetrics, DimensionValue}, shared::send_metric_with_order_hash};
use crate::{
aws_utils::cloudwatch_utils::{build_metric_future, CwMetrics, DimensionValue},
shared::send_metric_with_order_hash,
};

const ROUTING_API: &str = "https://api.uniswap.org/v1/quote";
const QUICKROUTES_API: &str = "https://trade-api.gateway.uniswap.org/v1/indicative_quote";
const SLIPPAGE_TOLERANCE: &str = "2.5";
const DEADLINE: u64 = 1000;
const ETH_ADDRESS: &str = "0x0000000000000000000000000000000000000000";
const ONE_ETHER: u64 = 1e18 as u64;

#[derive(Debug, Clone)]
pub struct OrderData {
Expand Down Expand Up @@ -125,12 +131,15 @@ pub struct OrderRoute {
pub method_parameters: MethodParameters,
}

#[derive(Clone, Debug)]
pub struct RouteOrderParams {
pub chain_id: u64,
pub token_in: String,
pub token_out: String,
pub amount: String,
pub recipient: String,
pub slippage_tolerance: String,
pub deadline: u64,
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -174,57 +183,150 @@ impl UniswapXRouteCollector {
}
}

pub async fn route_order(&self, params: RouteOrderParams, order_hash: String) -> Result<OrderRoute> {
// TODO: support exactOutput
/// Fetches the route for a token to native currency
/// uses quickroutes
pub fn fetch_native_route(
&self,
chain_id: u64,
token_in: String,
) -> impl Future<Output = Result<Response, Error>> + Send {
let client = Client::new();
async move {
let response = client
.post(QUICKROUTES_API)
.header("accept", "application/json")
.header("content-type", "application/json")
// public api key
.header("x-api-key", "JoyCGj29tT4pymvhaGciK4r1aIPvqW6W53xT1fwo")
.json(&serde_json::json!({
"type": "EXACT_OUTPUT",
"tokenInChainId": chain_id,
"tokenOutChainId": chain_id,
"tokenIn": resolve_address(token_in),
"tokenOut": resolve_address(ETH_ADDRESS.to_string()),
"amount": ONE_ETHER
}))
.send()
.await;
response
}
}

pub async fn route_order(
&self,
params: RouteOrderParams,
order_hash: String,
) -> Result<OrderRoute> {
let client = Client::new();

// Original routing logic
let query = RoutingApiQuery {
token_in_address: resolve_address(params.token_in),
token_out_address: resolve_address(params.token_out),
token_in_address: resolve_address(params.token_in.clone()),
token_in_chain_id: params.chain_id,
token_out_address: resolve_address(params.token_out.clone()),
token_out_chain_id: params.chain_id,
trade_type: TradeType::ExactIn,
amount: params.amount,
recipient: params.recipient,
slippage_tolerance: SLIPPAGE_TOLERANCE.to_string(),
enable_universal_router: false,
deadline: DEADLINE,
amount: params.amount.clone(),
recipient: params.recipient.clone(),
slippage_tolerance: params.slippage_tolerance.clone(),
deadline: params.deadline,
enable_universal_router: true,
protocols: "v2,v3,mixed".to_string(),
};

let query_string = serde_qs::to_string(&query).unwrap();
let full_query = format!("{}?{}", ROUTING_API, query_string);
info!("{} - full query: {}", order_hash, full_query);
let client = reqwest::Client::new();
let start = std::time::Instant::now();

let response = client
.get(format!("{}?{}", ROUTING_API, query_string))
.header(ORIGIN, "https://app.uniswap.org")
.header("x-request-source", "uniswap-web")
.send()
.await
.map_err(|e| anyhow!("Quote request failed with error: {}", e))?;
// Start all futures in parallel
let (routing_result, native_token_in_result, native_token_out_result) = tokio::join!(
client
.get(format!("{}?{}", ROUTING_API, query_string))
.header(ORIGIN, "https://app.uniswap.org")
.header("accept", "application/json")
.send(),
self.fetch_native_route(params.chain_id, params.token_in.clone()),
self.fetch_native_route(params.chain_id, params.token_out.clone())
);

// Check results from all dependency futures
let dependent_futures = [
("token in to native request", native_token_in_result),
("token out to native request", native_token_out_result),
];

for (request_type, result) in dependent_futures {
match result {
Ok(response) => {
match response.status() {
// Noop
StatusCode::OK => {}
_ => {
error!(
"{} - [Quickroutes] {} failed with status code: {}",
order_hash,
request_type,
response.status()
);
}
}
}
Err(e) => {
error!(
"{} - [Quickroutes] {} failed with error: {}",
order_hash,
request_type,
e
);
}
}
}

if let Err(e) = routing_result {
error!("{} - Routing request failed with error: {}", order_hash, e);
return Err(anyhow!("{} - Routing request failed with error: {}", order_hash, e));
}
let routing_response = routing_result.unwrap();

let elapsed = start.elapsed();
let metric_future = build_metric_future(self.cloudwatch_client.clone(), DimensionValue::Router02, CwMetrics::RoutingMs, elapsed.as_millis() as f64);
let metric_future = build_metric_future(
self.cloudwatch_client.clone(),
DimensionValue::Router02,
CwMetrics::RoutingMs,
elapsed.as_millis() as f64,
);
if let Some(metric_future) = metric_future {
send_metric_with_order_hash!(&Arc::new(""), metric_future);
}

match response.status() {
StatusCode::OK => Ok(response
match routing_response.status() {
StatusCode::OK => Ok(routing_response
.json::<OrderRoute>()
.await
.map_err(|e| anyhow!("{} - Failed to parse response: {}", order_hash, e))?),
StatusCode::BAD_REQUEST => Err(anyhow!("{} - Bad request: {}", order_hash, response.status())),
StatusCode::NOT_FOUND => Err(anyhow!("{} - Not quote found: {}", order_hash, response.status())),
StatusCode::TOO_MANY_REQUESTS => Err(anyhow!("{} - Too many requests: {}", order_hash, response.status())),
StatusCode::INTERNAL_SERVER_ERROR => {
Err(anyhow!("{} - Internal server error: {}", order_hash, response.status()))
}
StatusCode::BAD_REQUEST => Err(anyhow!(
"{} - Bad request: {}",
order_hash,
routing_response.status()
)),
StatusCode::NOT_FOUND => Err(anyhow!(
"{} - Not quote found: {}",
order_hash,
routing_response.status()
)),
StatusCode::TOO_MANY_REQUESTS => Err(anyhow!(
"{} - Too many requests: {}",
order_hash,
routing_response.status()
)),
StatusCode::INTERNAL_SERVER_ERROR => Err(anyhow!(
"{} - Internal server error: {}",
order_hash,
routing_response.status()
)),
_ => Err(anyhow!(
"{} - Unexpected error with status code: {}",
order_hash,
response.status()
routing_response.status()
)),
}
}
Expand Down Expand Up @@ -285,6 +387,8 @@ impl Collector<RoutedOrder> for UniswapXRouteCollector {
token_out: token_out.clone(),
amount: amount_in.to_string(),
recipient: self.executor_address.clone(),
slippage_tolerance: SLIPPAGE_TOLERANCE.to_string(),
deadline: DEADLINE,
}, order_hash).await;
(batch, route_result)
};
Expand All @@ -311,10 +415,8 @@ impl Collector<RoutedOrder> for UniswapXRouteCollector {

Ok(Box::pin(stream))
}

}


// The Uniswap routing API requires that "ETH" be used instead of the zero address
fn resolve_address(token: String) -> String {
if token == "0x0000000000000000000000000000000000000000" {
Expand Down
Loading