diff --git a/rust/main/agents/relayer/src/msg/op_queue.rs b/rust/main/agents/relayer/src/msg/op_queue.rs index e293ed2794..79cd3b5e79 100644 --- a/rust/main/agents/relayer/src/msg/op_queue.rs +++ b/rust/main/agents/relayer/src/msg/op_queue.rs @@ -4,7 +4,7 @@ use derive_new::new; use hyperlane_core::{PendingOperation, PendingOperationStatus, QueueOperation}; use prometheus::{IntGauge, IntGaugeVec}; use tokio::sync::{broadcast::Receiver, Mutex}; -use tracing::{debug, info, instrument}; +use tracing::{debug, instrument}; use crate::server::{MessageRetryRequest, MessageRetryResponse}; @@ -72,16 +72,21 @@ impl OpQueue { // The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task // that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now. let mut message_retry_requests = vec![]; - while let Ok(retry_request) = self.retry_receiver.lock().await.try_recv() { - let uuid = retry_request.uuid.clone(); - message_retry_requests.push(( - retry_request, - MessageRetryResponse { - uuid, - evaluated: 0, - matched: 0, - }, - )); + + // we only need to lock self.retry_receiver once + { + let mut retry_receiver = self.retry_receiver.lock().await; + while let Ok(retry_request) = retry_receiver.try_recv() { + let uuid = retry_request.uuid.clone(); + message_retry_requests.push(( + retry_request, + MessageRetryResponse { + uuid, + evaluated: 0, + matched: 0, + }, + )); + } } if message_retry_requests.is_empty() { return; @@ -92,21 +97,23 @@ impl OpQueue { let mut reprioritized_queue: BinaryHeap<_> = queue .drain() .map(|Reverse(mut op)| { - let matched_requests: Vec<_> = message_retry_requests + let mut matched = false; + message_retry_requests .iter_mut() - .filter_map(|(retry_req, retry_response)| { - // update retry metrics - if retry_req.pattern.op_matches(&op) { - debug!(uuid = retry_req.uuid, "Matched request"); - retry_response.matched += 1; - Some(retry_req.uuid.clone()) - } else { - None + .for_each(|(retry_req, retry_response)| { + if !retry_req.pattern.op_matches(&op) { + return; } - }) - .collect(); + debug!( + uuid = retry_req.uuid, + ?op, + pattern = ?retry_req.pattern, "Matched retry request"); - if !matched_requests.is_empty() { + // update retry metrics + retry_response.matched += 1; + matched = true; + }); + if matched { op.reset_attempts(); } Reverse(op) @@ -496,4 +503,116 @@ pub mod test { assert_eq!(popped[3], op_ids[0]); assert_eq!(popped[4], op_ids[1]); } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_process_retry_requests_by_id() { + let (metrics, queue_metrics_label) = dummy_metrics_and_label(); + let broadcaster = sync::broadcast::Sender::new(100); + let mut op_queue_1 = OpQueue::new( + metrics.clone(), + queue_metrics_label.clone(), + Arc::new(Mutex::new(broadcaster.subscribe())), + ); + + // Add some operations to the queue with increasing `next_attempt_after` values + let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into(); + let messages_to_send = 5; + let ops: VecDeque<_> = (1..=messages_to_send) + .map(|seconds_to_next_attempt| { + Box::new(MockPendingOperation::new( + seconds_to_next_attempt, + destination_domain.clone(), + )) as QueueOperation + }) + .collect(); + + let op_ids: Vec<_> = ops.iter().map(|op| op.id()).collect(); + + // push to queue 1 + for op in ops { + op_queue_1 + .push(op, Some(PendingOperationStatus::FirstPrepareAttempt)) + .await; + } + + let (transmitter, mut receiver) = + mpsc::channel::(ENDPOINT_MESSAGES_QUEUE_SIZE); + + // Retry by message ids + broadcaster + .send(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_message_id(op_ids[1]), + transmitter: transmitter.clone(), + }) + .unwrap(); + + op_queue_1.process_retry_requests().await; + + let retry_response = receiver.recv().await.unwrap(); + + assert_eq!(retry_response.evaluated, 5); + assert_eq!(retry_response.matched, 1); + } + + #[tracing_test::traced_test] + #[tokio::test] + async fn test_process_retry_requests_by_domain() { + let (metrics, queue_metrics_label) = dummy_metrics_and_label(); + let broadcaster = sync::broadcast::Sender::new(100); + let mut op_queue_1 = OpQueue::new( + metrics.clone(), + queue_metrics_label.clone(), + Arc::new(Mutex::new(broadcaster.subscribe())), + ); + + // Add some operations to the queue with increasing `next_attempt_after` values + let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into(); + let messages_to_send = 5; + let mut ops: VecDeque<_> = (1..=messages_to_send) + .map(|seconds_to_next_attempt| { + Box::new(MockPendingOperation::new( + seconds_to_next_attempt, + destination_domain.clone(), + )) as QueueOperation + }) + .collect(); + ops.push_back(Box::new(MockPendingOperation::new( + 10, + KnownHyperlaneDomain::Arbitrum.into(), + )) as QueueOperation); + ops.push_back(Box::new(MockPendingOperation::new( + 10, + KnownHyperlaneDomain::Arbitrum.into(), + )) as QueueOperation); + + // push to queue 1 + for op in ops { + op_queue_1 + .push(op, Some(PendingOperationStatus::FirstPrepareAttempt)) + .await; + } + + let (transmitter, mut receiver) = + mpsc::channel::(ENDPOINT_MESSAGES_QUEUE_SIZE); + + // Retry by message ids + broadcaster + .send(MessageRetryRequest { + uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(), + pattern: MatchingList::with_destination_domain( + KnownHyperlaneDomain::Arbitrum as u32, + ), + transmitter: transmitter.clone(), + }) + .unwrap(); + + op_queue_1.process_retry_requests().await; + + let retry_response = receiver.recv().await.unwrap(); + + assert_eq!(retry_response.evaluated, 7); + assert_eq!(retry_response.matched, 2); + } } diff --git a/rust/main/agents/relayer/src/server/message_retry.rs b/rust/main/agents/relayer/src/server/message_retry.rs index 62a3cffd55..f81cba8813 100644 --- a/rust/main/agents/relayer/src/server/message_retry.rs +++ b/rust/main/agents/relayer/src/server/message_retry.rs @@ -36,6 +36,7 @@ async fn retry_message( let uuid = uuid::Uuid::new_v4(); let uuid_string = uuid.to_string(); + tracing::debug!(?retry_req_payload); tracing::debug!(uuid = uuid_string, "Sending message retry request"); // This channel is only created to service this single