diff --git a/rust/main/Cargo.lock b/rust/main/Cargo.lock index 4f728ab69f..47aec03422 100644 --- a/rust/main/Cargo.lock +++ b/rust/main/Cargo.lock @@ -512,6 +512,7 @@ checksum = "3b829e4e32b91e643de6eafe82b1d90675f5874230191a4ffbc1b336dec4d6bf" dependencies = [ "async-trait", "axum-core", + "axum-macros", "bitflags 1.3.2", "bytes", "futures-util", @@ -553,6 +554,18 @@ dependencies = [ "tower-service", ] +[[package]] +name = "axum-macros" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cdca6a10ecad987bda04e95606ef85a5417dcaac1a78455242d72e031e2b6b62" +dependencies = [ + "heck 0.4.1", + "proc-macro2 1.0.86", + "quote 1.0.37", + "syn 2.0.77", +] + [[package]] name = "backtrace" version = "0.3.71" @@ -7015,7 +7028,9 @@ dependencies = [ "tokio-test", "tracing", "tracing-futures", + "tracing-test", "typetag", + "uuid 1.11.0", ] [[package]] @@ -7150,7 +7165,7 @@ dependencies = [ "rkyv_derive", "seahash", "tinyvec", - "uuid 1.10.0", + "uuid 1.11.0", ] [[package]] @@ -7250,6 +7265,7 @@ dependencies = [ "once_cell", "regex", "relayer", + "reqwest", "ripemd", "serde", "serde_json", @@ -7749,7 +7765,7 @@ dependencies = [ "time", "tracing", "url", - "uuid 1.10.0", + "uuid 1.11.0", ] [[package]] @@ -7810,7 +7826,7 @@ dependencies = [ "sea-query-derive", "serde_json", "time", - "uuid 1.10.0", + "uuid 1.11.0", ] [[package]] @@ -7826,7 +7842,7 @@ dependencies = [ "serde_json", "sqlx", "time", - "uuid 1.10.0", + "uuid 1.11.0", ] [[package]] @@ -9126,7 +9142,7 @@ dependencies = [ "time", "tokio-stream", "url", - "uuid 1.10.0", + "uuid 1.11.0", "whoami", ] @@ -10363,10 +10379,11 @@ dependencies = [ [[package]] name = "uuid" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "81dfa00651efa65069b0b6b651f4aaa31ba9e3c3ce0137aaad053604ee7e0314" +checksum = "f8c5f0a0af699448548ad1a2fbf920fb4bee257eae39953ba95cb84891a0446a" dependencies = [ + "getrandom 0.2.15", "serde", ] diff --git a/rust/main/Cargo.toml b/rust/main/Cargo.toml index 8b1a9b58e0..240e301571 100644 --- a/rust/main/Cargo.toml +++ b/rust/main/Cargo.toml @@ -151,6 +151,7 @@ tracing-subscriber = { version = "0.3", default-features = false } tracing-test = "0.2.2" typetag = "0.2" uint = "0.9.5" +uuid = { version = "1.11.0", features = ["v4"] } ureq = { version = "2.4", default-features = false } url = "2.3" walkdir = "2" diff --git a/rust/main/agents/relayer/Cargo.toml b/rust/main/agents/relayer/Cargo.toml index 5a891d912c..b3725a8662 100644 --- a/rust/main/agents/relayer/Cargo.toml +++ b/rust/main/agents/relayer/Cargo.toml @@ -44,6 +44,7 @@ tokio-metrics.workspace = true tracing-futures.workspace = true tracing.workspace = true typetag.workspace = true +uuid.workspace = true hyperlane-core = { path = "../../hyperlane-core", features = [ "agent", @@ -53,9 +54,11 @@ hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } hyperlane-ethereum = { path = "../../chains/hyperlane-ethereum" } [dev-dependencies] +axum = { workspace = true, features = ["macros"] } once_cell.workspace = true mockall.workspace = true tokio-test.workspace = true +tracing-test.workspace = true hyperlane-test = { path = "../../hyperlane-test" } hyperlane-base = { path = "../../hyperlane-base", features = ["test-utils"] } hyperlane-core = { path = "../../hyperlane-core", features = ["agent", "async", "test-utils"] } diff --git a/rust/main/agents/relayer/src/lib.rs b/rust/main/agents/relayer/src/lib.rs index 62b896d628..9a6e1e4147 100644 --- a/rust/main/agents/relayer/src/lib.rs +++ b/rust/main/agents/relayer/src/lib.rs @@ -3,8 +3,9 @@ mod msg; mod processor; mod prover; mod relayer; -mod server; mod settings; +pub mod server; + pub use msg::GAS_EXPENDITURE_LOG_MESSAGE; pub use relayer::*; diff --git a/rust/main/agents/relayer/src/msg/op_queue.rs b/rust/main/agents/relayer/src/msg/op_queue.rs index 99c9dde39f..881a282f56 100644 --- a/rust/main/agents/relayer/src/msg/op_queue.rs +++ b/rust/main/agents/relayer/src/msg/op_queue.rs @@ -4,9 +4,9 @@ use derive_new::new; use hyperlane_core::{PendingOperation, PendingOperationStatus, QueueOperation}; use prometheus::{IntGauge, IntGaugeVec}; use tokio::sync::{broadcast::Receiver, Mutex}; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument}; -use crate::settings::matching_list::MatchingList; +use crate::server::{MessageRetryRequest, MessageRetryResponse}; pub type OperationPriorityQueue = Arc>>>; @@ -16,7 +16,7 @@ pub type OperationPriorityQueue = Arc>> pub struct OpQueue { metrics: IntGaugeVec, queue_metrics_label: String, - retry_rx: Arc>>, + retry_receiver: Arc>>, #[new(default)] pub queue: OperationPriorityQueue, } @@ -72,27 +72,67 @@ impl OpQueue { // The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task // that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now. let mut message_retry_requests = vec![]; - while let Ok(message_id) = self.retry_rx.lock().await.try_recv() { - message_retry_requests.push(message_id); + + // we only need to lock self.retry_receiver once + { + let mut retry_receiver = self.retry_receiver.lock().await; + while let Ok(retry_request) = retry_receiver.try_recv() { + let uuid = retry_request.uuid.clone(); + message_retry_requests.push(( + retry_request, + MessageRetryResponse { + uuid, + evaluated: 0, + matched: 0, + }, + )); + } } if message_retry_requests.is_empty() { return; } let mut queue = self.queue.lock().await; + let queue_length = queue.len(); + let mut reprioritized_queue: BinaryHeap<_> = queue .drain() .map(|Reverse(mut op)| { - if message_retry_requests.iter().any(|r| r.op_matches(&op)) { - info!( - operation = %op, - queue_label = %self.queue_metrics_label, - "Retrying OpQueue operation" - ); - op.reset_attempts() + let mut matched = false; + message_retry_requests + .iter_mut() + .for_each(|(retry_req, retry_response)| { + if !retry_req.pattern.op_matches(&op) { + return; + } + debug!( + uuid = retry_req.uuid, + ?op, + pattern = ?retry_req.pattern, "Matched retry request"); + + // update retry metrics + retry_response.matched += 1; + matched = true; + }); + if matched { + op.reset_attempts(); } Reverse(op) }) .collect(); + + for (retry_req, mut retry_response) in message_retry_requests { + retry_response.evaluated = queue_length; + tracing::debug!( + uuid = retry_response.uuid, + evaluated = retry_response.evaluated, + matched = retry_response.matched, + "Sending relayer retry response back" + ); + if let Err(err) = retry_req.transmitter.send(retry_response).await { + tracing::error!(?err, "Failed to send retry response"); + } + drop(retry_req.transmitter); + } queue.append(&mut reprioritized_queue); } @@ -115,7 +155,10 @@ impl OpQueue { #[cfg(test)] pub mod test { + use crate::{server::ENDPOINT_MESSAGES_QUEUE_SIZE, settings::matching_list::MatchingList}; + use super::*; + use hyperlane_core::{ HyperlaneDomain, HyperlaneDomainProtocol, HyperlaneDomainTechnicalStack, HyperlaneDomainType, HyperlaneMessage, KnownHyperlaneDomain, PendingOperationResult, @@ -127,7 +170,7 @@ pub mod test { str::FromStr, time::{Duration, Instant}, }; - use tokio::sync; + use tokio::sync::{self, mpsc}; #[derive(Debug, Clone, Serialize)] pub struct MockPendingOperation { @@ -364,12 +407,22 @@ pub mod test { .await; } + let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); + // Retry by message ids broadcaster - .send(MatchingList::with_message_id(op_ids[1])) + .send(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_message_id(op_ids[1]), + transmitter: transmitter.clone(), + }) .unwrap(); broadcaster - .send(MatchingList::with_message_id(op_ids[2])) + .send(MessageRetryRequest { + uuid: "59400966-e7fa-4fb9-9372-9a671d4392c3".to_string(), + pattern: MatchingList::with_message_id(op_ids[2]), + transmitter, + }) .unwrap(); // Pop elements from queue 1 @@ -425,11 +478,15 @@ pub mod test { .await; } + let (transmitter, _receiver) = mpsc::channel(ENDPOINT_MESSAGES_QUEUE_SIZE); + // Retry by domain broadcaster - .send(MatchingList::with_destination_domain( - destination_domain_2.id(), - )) + .send(MessageRetryRequest { + uuid: "a5b39473-7cc5-48a1-8bed-565454ba1037".to_string(), + pattern: MatchingList::with_destination_domain(destination_domain_2.id()), + transmitter, + }) .unwrap(); // Pop elements from queue @@ -447,4 +504,116 @@ pub mod test { assert_eq!(popped[3], op_ids[0]); assert_eq!(popped[4], op_ids[1]); } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_process_retry_requests_by_id() { + let (metrics, queue_metrics_label) = dummy_metrics_and_label(); + let broadcaster = sync::broadcast::Sender::new(100); + let mut op_queue_1 = OpQueue::new( + metrics.clone(), + queue_metrics_label.clone(), + Arc::new(Mutex::new(broadcaster.subscribe())), + ); + + // Add some operations to the queue with increasing `next_attempt_after` values + let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into(); + let messages_to_send = 5; + let ops: VecDeque<_> = (1..=messages_to_send) + .map(|seconds_to_next_attempt| { + Box::new(MockPendingOperation::new( + seconds_to_next_attempt, + destination_domain.clone(), + )) as QueueOperation + }) + .collect(); + + let op_ids: Vec<_> = ops.iter().map(|op| op.id()).collect(); + + // push to queue 1 + for op in ops { + op_queue_1 + .push(op, Some(PendingOperationStatus::FirstPrepareAttempt)) + .await; + } + + let (transmitter, mut receiver) = + mpsc::channel::(ENDPOINT_MESSAGES_QUEUE_SIZE); + + // Retry by message ids + broadcaster + .send(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_message_id(op_ids[1]), + transmitter: transmitter.clone(), + }) + .unwrap(); + + op_queue_1.process_retry_requests().await; + + let retry_response = receiver.recv().await.unwrap(); + + assert_eq!(retry_response.evaluated, 5); + assert_eq!(retry_response.matched, 1); + } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_process_retry_requests_by_domain() { + let (metrics, queue_metrics_label) = dummy_metrics_and_label(); + let broadcaster = sync::broadcast::Sender::new(100); + let mut op_queue_1 = OpQueue::new( + metrics.clone(), + queue_metrics_label.clone(), + Arc::new(Mutex::new(broadcaster.subscribe())), + ); + + // Add some operations to the queue with increasing `next_attempt_after` values + let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into(); + let messages_to_send = 5; + let mut ops: VecDeque<_> = (1..=messages_to_send) + .map(|seconds_to_next_attempt| { + Box::new(MockPendingOperation::new( + seconds_to_next_attempt, + destination_domain.clone(), + )) as QueueOperation + }) + .collect(); + ops.push_back(Box::new(MockPendingOperation::new( + 10, + KnownHyperlaneDomain::Arbitrum.into(), + )) as QueueOperation); + ops.push_back(Box::new(MockPendingOperation::new( + 10, + KnownHyperlaneDomain::Arbitrum.into(), + )) as QueueOperation); + + // push to queue 1 + for op in ops { + op_queue_1 + .push(op, Some(PendingOperationStatus::FirstPrepareAttempt)) + .await; + } + + let (transmitter, mut receiver) = + mpsc::channel::(ENDPOINT_MESSAGES_QUEUE_SIZE); + + // Retry by message ids + broadcaster + .send(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_destination_domain( + KnownHyperlaneDomain::Arbitrum as u32, + ), + transmitter: transmitter.clone(), + }) + .unwrap(); + + op_queue_1.process_retry_requests().await; + + let retry_response = receiver.recv().await.unwrap(); + + assert_eq!(retry_response.evaluated, 7); + assert_eq!(retry_response.matched, 2); + } } diff --git a/rust/main/agents/relayer/src/msg/op_submitter.rs b/rust/main/agents/relayer/src/msg/op_submitter.rs index c1e295a24a..8e6043d9e2 100644 --- a/rust/main/agents/relayer/src/msg/op_submitter.rs +++ b/rust/main/agents/relayer/src/msg/op_submitter.rs @@ -32,7 +32,7 @@ use hyperlane_core::{ }; use crate::msg::pending_message::CONFIRM_DELAY; -use crate::settings::matching_list::MatchingList; +use crate::server::MessageRetryRequest; use super::op_queue::OpQueue; use super::op_queue::OperationPriorityQueue; @@ -105,7 +105,7 @@ impl SerialSubmitter { pub fn new( domain: HyperlaneDomain, rx: mpsc::UnboundedReceiver, - retry_op_transmitter: Sender, + retry_op_transmitter: &Sender, metrics: SerialSubmitterMetrics, max_batch_size: u32, task_monitor: TaskMonitor, diff --git a/rust/main/agents/relayer/src/relayer.rs b/rust/main/agents/relayer/src/relayer.rs index b1f013b6ae..0962678df6 100644 --- a/rust/main/agents/relayer/src/relayer.rs +++ b/rust/main/agents/relayer/src/relayer.rs @@ -318,7 +318,7 @@ impl BaseAgent for Relayer { })); tasks.push(console_server.instrument(info_span!("Tokio console server"))); } - let sender = BroadcastSender::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); + let sender = BroadcastSender::new(ENDPOINT_MESSAGES_QUEUE_SIZE); // send channels by destination chain let mut send_channels = HashMap::with_capacity(self.destination_chains.len()); let mut prep_queues = HashMap::with_capacity(self.destination_chains.len()); @@ -328,7 +328,7 @@ impl BaseAgent for Relayer { let serial_submitter = SerialSubmitter::new( dest_domain.clone(), receive_channel, - sender.clone(), + &sender, SerialSubmitterMetrics::new(&self.core.metrics, dest_domain), // Default to submitting one message at a time if there is no batch config self.core.settings.chains[dest_domain.name()] @@ -385,7 +385,7 @@ impl BaseAgent for Relayer { ); } // run server - let custom_routes = relayer_server::Server::new() + let custom_routes = relayer_server::Server::new(self.destination_chains.len()) .with_op_retry(sender.clone()) .with_message_queue(prep_queues) .routes(); diff --git a/rust/main/agents/relayer/src/server/message_retry.rs b/rust/main/agents/relayer/src/server/message_retry.rs index 6d160355a5..bf6ad0c6a7 100644 --- a/rust/main/agents/relayer/src/server/message_retry.rs +++ b/rust/main/agents/relayer/src/server/message_retry.rs @@ -1,32 +1,88 @@ use crate::settings::matching_list::MatchingList; use axum::{extract::State, routing, Json, Router}; use derive_new::new; -use tokio::sync::broadcast::Sender; +use serde::{Deserialize, Serialize}; +use tokio::sync::{broadcast::Sender, mpsc}; const MESSAGE_RETRY_API_BASE: &str = "/message_retry"; -#[derive(new, Clone)] +#[derive(Clone, Debug, new)] pub struct MessageRetryApi { - tx: Sender, + retry_request_transmitter: Sender, + destination_chains: usize, +} + +#[derive(Clone, Debug)] +pub struct MessageRetryRequest { + pub uuid: String, + pub pattern: MatchingList, + pub transmitter: mpsc::Sender, +} + +#[derive(Clone, Debug, Deserialize, Serialize, PartialEq)] +pub struct MessageRetryResponse { + /// ID of the retry request + pub uuid: String, + /// how many pending operations were evaluated + pub evaluated: usize, + /// how many of the pending operations matched the retry request pattern + pub matched: u64, } async fn retry_message( - State(tx): State>, + State(state): State, Json(retry_req_payload): Json, -) -> String { - match tx.send(retry_req_payload) { - Ok(_) => "Moved message(s) to the front of the queue".to_string(), - // Technically it's bad practice to print the error message to the user, but - // this endpoint is for debugging purposes only. - Err(err) => format!("Failed to send retry request to the queue: {}", err), +) -> Result, String> { + let uuid = uuid::Uuid::new_v4(); + let uuid_string = uuid.to_string(); + + tracing::debug!(?retry_req_payload); + tracing::debug!(uuid = uuid_string, "Sending message retry request"); + + // Create a channel that can hold each chain's SerialSubmitter + // message retry responses. + // 3 queues for each chain (prepare, submit, confirm) + let (transmitter, mut receiver) = mpsc::channel(3 * state.destination_chains); + state + .retry_request_transmitter + .send(MessageRetryRequest { + uuid: uuid_string.clone(), + pattern: retry_req_payload, + transmitter, + }) + .map_err(|err| { + // Technically it's bad practice to print the error message to the user, but + // this endpoint is for debugging purposes only. + format!("Failed to send retry request to the queue: {}", err) + })?; + + let mut resp = MessageRetryResponse { + uuid: uuid_string, + evaluated: 0, + matched: 0, + }; + + // Wait for responses from relayer + tracing::debug!(uuid = resp.uuid, "Waiting for response from relayer"); + while let Some(relayer_resp) = receiver.recv().await { + tracing::debug!( + uuid = resp.uuid, + evaluated = resp.evaluated, + matched = resp.matched, + "Submitter response to retry request" + ); + resp.evaluated += relayer_resp.evaluated; + resp.matched += relayer_resp.matched; } + + Ok(Json(resp)) } impl MessageRetryApi { pub fn router(&self) -> Router { Router::new() .route("/", routing::post(retry_message)) - .with_state(self.tx.clone()) + .with_state(self.clone()) } pub fn get_route(&self) -> (&'static str, Router) { @@ -41,13 +97,21 @@ mod tests { use super::*; use axum::http::StatusCode; use hyperlane_core::{HyperlaneMessage, QueueOperation}; + use serde::de::DeserializeOwned; use serde_json::json; use std::net::SocketAddr; use tokio::sync::broadcast::{Receiver, Sender}; - fn setup_test_server() -> (SocketAddr, Receiver) { - let broadcast_tx = Sender::::new(ENDPOINT_MESSAGES_QUEUE_SIZE); - let message_retry_api = MessageRetryApi::new(broadcast_tx.clone()); + #[derive(Debug)] + struct TestServerSetup { + pub socket_address: SocketAddr, + pub retry_req_rx: Receiver, + } + + fn setup_test_server() -> TestServerSetup { + let broadcast_tx = Sender::new(ENDPOINT_MESSAGES_QUEUE_SIZE); + + let message_retry_api = MessageRetryApi::new(broadcast_tx.clone(), 10); let (path, retry_router) = message_retry_api.get_route(); let app = Router::new().nest(path, retry_router); @@ -58,12 +122,51 @@ mod tests { let addr = server.local_addr(); tokio::spawn(server); - (addr, broadcast_tx.subscribe()) + let retry_req_rx = broadcast_tx.subscribe(); + + TestServerSetup { + socket_address: addr, + retry_req_rx, + } } + async fn send_retry_responses_future( + mut retry_request_receiver: Receiver, + pending_operations: Vec, + metrics: Vec<(usize, u64)>, + ) { + if let Ok(req) = retry_request_receiver.recv().await { + for (op, (evaluated, matched)) in pending_operations.iter().zip(metrics) { + // Check that the list received by the server matches the pending operation + assert!(req.pattern.op_matches(&op)); + let resp = MessageRetryResponse { + uuid: req.uuid.clone(), + evaluated, + matched, + }; + req.transmitter.send(resp).await.unwrap(); + } + } + } + + async fn parse_response_to_json(response: reqwest::Response) -> T { + let resp_body = response + .text() + .await + .expect("Failed to parse response body"); + let resp_json: T = + serde_json::from_str(&resp_body).expect("Failed to deserialize response body"); + resp_json + } + + #[tracing_test::traced_test] #[tokio::test] async fn test_message_id_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); // Create a random message with a random message ID @@ -75,25 +178,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + let (_t1, response_res) = tokio::join!(respond_task, response); + + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_destination_domain_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -108,25 +223,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + let (_t1, response_res) = tokio::join!(respond_task, response); + + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_origin_domain_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -141,25 +268,38 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_sender_address_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage::default(); @@ -170,25 +310,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_recipient_address_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage::default(); @@ -199,25 +351,37 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + let (_t1, response_res) = tokio::join!(respond_task, response); + + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } #[tokio::test] async fn test_multiple_retry() { - let (addr, mut rx) = setup_test_server(); + let TestServerSetup { + socket_address: addr, + retry_req_rx, + .. + } = setup_test_server(); let client = reqwest::Client::new(); let message = HyperlaneMessage { @@ -238,19 +402,27 @@ mod tests { } ]); + // spawn a task to respond to message retry request + let respond_task = send_retry_responses_future( + retry_req_rx, + vec![Box::new(pending_operation.clone()) as QueueOperation], + vec![(1, 1)], + ); + // Send a POST request to the server let response = client .post(format!("http://{}{}", addr, MESSAGE_RETRY_API_BASE)) .json(&matching_list_body) // Set the request body - .send() - .await - .unwrap(); + .send(); + + let (_t1, response_res) = tokio::join!(respond_task, response); + let response = response_res.unwrap(); // Check that the response status code is OK assert_eq!(response.status(), StatusCode::OK); - let list = rx.try_recv().unwrap(); - // Check that the list received by the server matches the pending operation - assert!(list.op_matches(&(Box::new(pending_operation) as QueueOperation))); + let resp_json: MessageRetryResponse = parse_response_to_json(response).await; + assert_eq!(resp_json.evaluated, 1); + assert_eq!(resp_json.matched, 1); } } diff --git a/rust/main/agents/relayer/src/server/mod.rs b/rust/main/agents/relayer/src/server/mod.rs index 083f8d94d2..03019d9835 100644 --- a/rust/main/agents/relayer/src/server/mod.rs +++ b/rust/main/agents/relayer/src/server/mod.rs @@ -3,7 +3,7 @@ use derive_new::new; use std::collections::HashMap; use tokio::sync::broadcast::Sender; -use crate::{msg::op_queue::OperationPriorityQueue, settings::matching_list::MatchingList}; +use crate::msg::op_queue::OperationPriorityQueue; pub const ENDPOINT_MESSAGES_QUEUE_SIZE: usize = 100; @@ -15,14 +15,15 @@ mod message_retry; #[derive(new)] pub struct Server { + destination_chains: usize, #[new(default)] - retry_transmitter: Option>, + retry_transmitter: Option>, #[new(default)] op_queues: Option>, } impl Server { - pub fn with_op_retry(mut self, transmitter: Sender) -> Self { + pub fn with_op_retry(mut self, transmitter: Sender) -> Self { self.retry_transmitter = Some(transmitter); self } @@ -36,8 +37,8 @@ impl Server { /// Can be extended with additional routes and feature flags to enable/disable individually. pub fn routes(self) -> Vec<(&'static str, Router)> { let mut routes = vec![]; - if let Some(retry_transmitter) = self.retry_transmitter { - routes.push(MessageRetryApi::new(retry_transmitter).get_route()); + if let Some(tx) = self.retry_transmitter { + routes.push(MessageRetryApi::new(tx, self.destination_chains).get_route()); } if let Some(op_queues) = self.op_queues { routes.push(ListOperationsApi::new(op_queues).get_route()); diff --git a/rust/main/utils/run-locally/Cargo.toml b/rust/main/utils/run-locally/Cargo.toml index 9dedae9cea..a994324687 100644 --- a/rust/main/utils/run-locally/Cargo.toml +++ b/rust/main/utils/run-locally/Cargo.toml @@ -14,6 +14,7 @@ hyperlane-cosmos = { path = "../../chains/hyperlane-cosmos" } toml_edit.workspace = true k256.workspace = true jobserver.workspace = true +reqwest.workspace = true ripemd.workspace = true sha2.workspace = true serde.workspace = true diff --git a/rust/main/utils/run-locally/src/main.rs b/rust/main/utils/run-locally/src/main.rs index 4686c15446..8855a08d86 100644 --- a/rust/main/utils/run-locally/src/main.rs +++ b/rust/main/utils/run-locally/src/main.rs @@ -51,6 +51,7 @@ mod invariants; mod logging; mod metrics; mod program; +mod server; mod solana; mod utils; @@ -483,6 +484,10 @@ fn main() -> ExitCode { // give things a chance to fully start. sleep(Duration::from_secs(10)); + // test retry request + let resp = server::run_retry_request().expect("Failed to process retry request"); + assert!(resp.matched > 0); + if !post_startup_invariants(&checkpoints_dirs) { log!("Failure: Post startup invariants are not met"); return report_test_result(true); diff --git a/rust/main/utils/run-locally/src/server.rs b/rust/main/utils/run-locally/src/server.rs new file mode 100644 index 0000000000..4df7df78f0 --- /dev/null +++ b/rust/main/utils/run-locally/src/server.rs @@ -0,0 +1,55 @@ +use std::io; + +use reqwest::Url; + +use relayer::server::MessageRetryResponse; + +use crate::RELAYER_METRICS_PORT; + +/// create tokio runtime to send a retry request to +/// relayer to retry all existing messages in the queues +pub fn run_retry_request() -> io::Result { + let runtime = tokio::runtime::Builder::new_current_thread() + .enable_all() + .build(); + runtime + .unwrap() + .block_on(async { call_retry_request().await }) +} + +/// sends a request to relayer to retry all existing messages +/// in the queues +async fn call_retry_request() -> io::Result { + let client = reqwest::Client::new(); + + let url = Url::parse(&format!( + "http://0.0.0.0:{RELAYER_METRICS_PORT}/message_retry" + )) + .map_err(|err| { + eprintln!("Failed to parse url: {err}"); + io::Error::new(io::ErrorKind::InvalidInput, err.to_string()) + })?; + + let body = vec![serde_json::json!({ + "message_id": "*" + })]; + let retry_response = client.post(url).json(&body).send().await.map_err(|err| { + eprintln!("Failed to send request: {err}"); + io::Error::new(io::ErrorKind::InvalidData, err.to_string()) + })?; + + let response_text = retry_response.text().await.map_err(|err| { + eprintln!("Failed to parse response body: {err}"); + io::Error::new(io::ErrorKind::InvalidData, err.to_string()) + })?; + + println!("Retry Request Response: {:?}", response_text); + + let response_json: MessageRetryResponse = + serde_json::from_str(&response_text).map_err(|err| { + eprintln!("Failed to parse response body to json: {err}"); + io::Error::new(io::ErrorKind::InvalidData, err.to_string()) + })?; + + Ok(response_json) +}