Skip to content

Commit

Permalink
add unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
kamiyaa committed Jan 9, 2025
1 parent 2654b18 commit 831a111
Show file tree
Hide file tree
Showing 2 changed files with 143 additions and 23 deletions.
165 changes: 142 additions & 23 deletions rust/main/agents/relayer/src/msg/op_queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use derive_new::new;
use hyperlane_core::{PendingOperation, PendingOperationStatus, QueueOperation};
use prometheus::{IntGauge, IntGaugeVec};
use tokio::sync::{broadcast::Receiver, Mutex};
use tracing::{debug, info, instrument};
use tracing::{debug, instrument};

use crate::server::{MessageRetryRequest, MessageRetryResponse};

Expand Down Expand Up @@ -72,16 +72,21 @@ impl OpQueue {
// The other consideration is whether to put the channel receiver in the OpQueue or in a dedicated task
// that also holds an Arc to the Mutex. For simplicity, we'll put it in the OpQueue for now.
let mut message_retry_requests = vec![];
while let Ok(retry_request) = self.retry_receiver.lock().await.try_recv() {
let uuid = retry_request.uuid.clone();
message_retry_requests.push((
retry_request,
MessageRetryResponse {
uuid,
evaluated: 0,
matched: 0,
},
));

// we only need to lock self.retry_receiver once
{
let mut retry_receiver = self.retry_receiver.lock().await;
while let Ok(retry_request) = retry_receiver.try_recv() {
let uuid = retry_request.uuid.clone();
message_retry_requests.push((
retry_request,
MessageRetryResponse {
uuid,
evaluated: 0,
matched: 0,
},
));
}
}
if message_retry_requests.is_empty() {
return;
Expand All @@ -92,21 +97,23 @@ impl OpQueue {
let mut reprioritized_queue: BinaryHeap<_> = queue
.drain()
.map(|Reverse(mut op)| {
let matched_requests: Vec<_> = message_retry_requests
let mut matched = false;
message_retry_requests
.iter_mut()
.filter_map(|(retry_req, retry_response)| {
// update retry metrics
if retry_req.pattern.op_matches(&op) {
debug!(uuid = retry_req.uuid, "Matched request");
retry_response.matched += 1;
Some(retry_req.uuid.clone())
} else {
None
.for_each(|(retry_req, retry_response)| {
if !retry_req.pattern.op_matches(&op) {
return;
}
})
.collect();
debug!(
uuid = retry_req.uuid,
?op,
pattern = ?retry_req.pattern, "Matched retry request");

if !matched_requests.is_empty() {
// update retry metrics
retry_response.matched += 1;
matched = true;
});
if matched {
op.reset_attempts();
}
Reverse(op)
Expand Down Expand Up @@ -496,4 +503,116 @@ pub mod test {
assert_eq!(popped[3], op_ids[0]);
assert_eq!(popped[4], op_ids[1]);
}

#[tracing_test::traced_test]
#[tokio::test]
async fn test_process_retry_requests_by_id() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);
let mut op_queue_1 = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Arc::new(Mutex::new(broadcaster.subscribe())),
);

// Add some operations to the queue with increasing `next_attempt_after` values
let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into();
let messages_to_send = 5;
let ops: VecDeque<_> = (1..=messages_to_send)
.map(|seconds_to_next_attempt| {
Box::new(MockPendingOperation::new(
seconds_to_next_attempt,
destination_domain.clone(),
)) as QueueOperation
})
.collect();

let op_ids: Vec<_> = ops.iter().map(|op| op.id()).collect();

// push to queue 1
for op in ops {
op_queue_1
.push(op, Some(PendingOperationStatus::FirstPrepareAttempt))
.await;
}

let (transmitter, mut receiver) =
mpsc::channel::<MessageRetryResponse>(ENDPOINT_MESSAGES_QUEUE_SIZE);

// Retry by message ids
broadcaster
.send(MessageRetryRequest {
uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(),
pattern: MatchingList::with_message_id(op_ids[1]),
transmitter: transmitter.clone(),
})
.unwrap();

op_queue_1.process_retry_requests().await;

let retry_response = receiver.recv().await.unwrap();

assert_eq!(retry_response.evaluated, 5);
assert_eq!(retry_response.matched, 1);
}

#[tracing_test::traced_test]
#[tokio::test]
async fn test_process_retry_requests_by_domain() {
let (metrics, queue_metrics_label) = dummy_metrics_and_label();
let broadcaster = sync::broadcast::Sender::new(100);
let mut op_queue_1 = OpQueue::new(
metrics.clone(),
queue_metrics_label.clone(),
Arc::new(Mutex::new(broadcaster.subscribe())),
);

// Add some operations to the queue with increasing `next_attempt_after` values
let destination_domain: HyperlaneDomain = KnownHyperlaneDomain::Injective.into();
let messages_to_send = 5;
let mut ops: VecDeque<_> = (1..=messages_to_send)
.map(|seconds_to_next_attempt| {
Box::new(MockPendingOperation::new(
seconds_to_next_attempt,
destination_domain.clone(),
)) as QueueOperation
})
.collect();
ops.push_back(Box::new(MockPendingOperation::new(
10,
KnownHyperlaneDomain::Arbitrum.into(),
)) as QueueOperation);
ops.push_back(Box::new(MockPendingOperation::new(
10,
KnownHyperlaneDomain::Arbitrum.into(),
)) as QueueOperation);

// push to queue 1
for op in ops {
op_queue_1
.push(op, Some(PendingOperationStatus::FirstPrepareAttempt))
.await;
}

let (transmitter, mut receiver) =
mpsc::channel::<MessageRetryResponse>(ENDPOINT_MESSAGES_QUEUE_SIZE);

// Retry by message ids
broadcaster
.send(MessageRetryRequest {
uuid: "0e92ace7-ba5d-4a1f-8501-51b6d9d500cf".to_string(),
pattern: MatchingList::with_destination_domain(
KnownHyperlaneDomain::Arbitrum as u32,
),
transmitter: transmitter.clone(),
})
.unwrap();

op_queue_1.process_retry_requests().await;

let retry_response = receiver.recv().await.unwrap();

assert_eq!(retry_response.evaluated, 7);
assert_eq!(retry_response.matched, 2);
}
}
1 change: 1 addition & 0 deletions rust/main/agents/relayer/src/server/message_retry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ async fn retry_message(
let uuid = uuid::Uuid::new_v4();
let uuid_string = uuid.to_string();

tracing::debug!(?retry_req_payload);
tracing::debug!(uuid = uuid_string, "Sending message retry request");

// This channel is only created to service this single
Expand Down

0 comments on commit 831a111

Please sign in to comment.