Skip to content

Commit

Permalink
feat: add uTP duration metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
KolbyML committed Jan 20, 2025
1 parent 0b54f24 commit a8b2ac9
Show file tree
Hide file tree
Showing 9 changed files with 86 additions and 28 deletions.
30 changes: 28 additions & 2 deletions crates/metrics/src/overlay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ use ethportal_api::types::portal_wire::{Request, Response};
use prometheus_exporter::{
self,
prometheus::{
opts, register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
histogram_opts, opts, register_histogram_vec_with_registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, HistogramVec,
IntCounterVec, IntGaugeVec, Registry,
},
};

use crate::labels::{MessageDirectionLabel, MessageLabel, UtpDirectionLabel, UtpOutcomeLabel};
use crate::{
labels::{MessageDirectionLabel, MessageLabel, UtpDirectionLabel, UtpOutcomeLabel},
timer::DiscardOnDropHistogramTimer,
};

/// Contains metrics reporters for use in the overlay network
/// (eg. `portalnet/src/overlay.rs` & `portalnet/src/overlay_service.rs`).
Expand All @@ -18,6 +22,7 @@ pub struct OverlayMetrics {
pub message_total: IntCounterVec,
pub utp_outcome_total: IntCounterVec,
pub utp_active_gauge: IntGaugeVec,
pub utp_connection_duration: HistogramVec,
pub validation_total: IntCounterVec,
}

Expand Down Expand Up @@ -47,6 +52,14 @@ impl OverlayMetrics {
&["protocol", "direction"],
registry
)?;
let utp_connection_duration = register_histogram_vec_with_registry!(
histogram_opts!(
"trin_utp_connection_duration",
"the time taken to complete a utp transfer"
),
&["protocol", "direction"],
registry
)?;
let validation_total = register_int_counter_vec_with_registry!(
opts!(
"trin_validation_total",
Expand All @@ -59,6 +72,7 @@ impl OverlayMetrics {
message_total,
utp_outcome_total,
utp_active_gauge,
utp_connection_duration,
validation_total,
})
}
Expand Down Expand Up @@ -157,6 +171,18 @@ impl OverlayMetricsReporter {
.dec();
}

pub fn start_utp_process_timer(
&self,
direction: UtpDirectionLabel,
) -> DiscardOnDropHistogramTimer {
DiscardOnDropHistogramTimer::new(
self.overlay_metrics
.utp_connection_duration
.with_label_values(&[&self.protocol, direction.into()])
.clone(),
)
}

//
// Validations
//
Expand Down
2 changes: 1 addition & 1 deletion crates/portalnet/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,4 @@ pub mod put_content;
pub mod socket;
pub mod types;
pub mod utils;
pub mod utp_controller;
pub mod utp;
2 changes: 1 addition & 1 deletion crates/portalnet/src/overlay/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ use crate::{
kbucket::{Entry, SharedKBucketsTable},
node::Node,
},
utp_controller::UtpController,
utp::controller::UtpController,
};

/// Overlay protocol is a layer on top of discv5 that handles all requests from the overlay networks
Expand Down
9 changes: 4 additions & 5 deletions crates/portalnet/src/overlay/request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ use ethportal_api::types::{
portal_wire::{Request, Response},
};
use futures::channel::oneshot;
use tokio::sync::OwnedSemaphorePermit;

use super::errors::OverlayRequestError;
use crate::find::query_pool::QueryId;
use crate::{find::query_pool::QueryId, utp::timed_semaphore::OwnedTimedSemaphorePermit};

/// An incoming or outgoing request.
#[derive(Debug, PartialEq)]
Expand Down Expand Up @@ -44,7 +43,7 @@ pub struct OverlayRequest {
/// Will be None for requests that are not associated with a query.
pub query_id: Option<QueryId>,
/// An optional permit to allow for transfer caps
pub request_permit: Option<OwnedSemaphorePermit>,
pub request_permit: Option<OwnedTimedSemaphorePermit>,
}

impl OverlayRequest {
Expand All @@ -54,7 +53,7 @@ impl OverlayRequest {
direction: RequestDirection,
responder: Option<OverlayResponder>,
query_id: Option<QueryId>,
request_permit: Option<OwnedSemaphorePermit>,
request_permit: Option<OwnedTimedSemaphorePermit>,
) -> Self {
OverlayRequest {
id: rand::random(),
Expand All @@ -77,7 +76,7 @@ pub struct ActiveOutgoingRequest {
/// An optional QueryID for the query that this request is associated with.
pub query_id: Option<QueryId>,
/// An optional permit to allow for transfer caps
pub request_permit: Option<OwnedSemaphorePermit>,
pub request_permit: Option<OwnedTimedSemaphorePermit>,
}

/// A response for a particular overlay request.
Expand Down
17 changes: 8 additions & 9 deletions crates/portalnet/src/overlay/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ use tokio::{
sync::{
broadcast,
mpsc::{self, UnboundedReceiver, UnboundedSender},
OwnedSemaphorePermit,
},
task::JoinHandle,
};
Expand Down Expand Up @@ -85,7 +84,7 @@ use crate::{
node::Node,
},
utils::portal_wire,
utp_controller::UtpController,
utp::{controller::UtpController, timed_semaphore::OwnedTimedSemaphorePermit},
};

pub const FIND_NODES_MAX_NODES: usize = 32;
Expand Down Expand Up @@ -1031,7 +1030,7 @@ impl<
let utp = Arc::clone(&self.utp_controller);
tokio::spawn(async move {
utp.accept_outbound_stream(cid, &content).await;
drop(permit);
permit.drop();
});

// Connection id is sent as BE because uTP header values are stored also as BE
Expand Down Expand Up @@ -1218,7 +1217,7 @@ impl<
})
.collect();
let _ = join_all(handles).await;
drop(permit);
permit.drop();
return;
}
};
Expand All @@ -1242,7 +1241,7 @@ impl<
})
.collect();
let _ = join_all(handles).await;
drop(permit);
permit.drop();
return;
}
};
Expand Down Expand Up @@ -1302,7 +1301,7 @@ impl<
Some(utp_processing.utp_controller),
);
// explicitly drop semaphore permit in thread so the permit is moved into the thread
drop(permit);
permit.drop();
});

let accept = Accept {
Expand Down Expand Up @@ -1433,7 +1432,7 @@ impl<
source: Enr,
request: Request,
query_id: Option<QueryId>,
request_permit: Option<OwnedSemaphorePermit>,
request_permit: Option<OwnedTimedSemaphorePermit>,
) {
// If the node is present in the routing table, but the node is not connected, then
// use the existing entry's value and direction. Otherwise, build a new entry from
Expand Down Expand Up @@ -1493,7 +1492,7 @@ impl<
response: Accept,
enr: Enr,
offer: Request,
request_permit: Option<OwnedSemaphorePermit>,
request_permit: Option<OwnedTimedSemaphorePermit>,
) -> anyhow::Result<Accept> {
// Check that a valid triggering request was sent
let mut gossip_result_tx = None;
Expand Down Expand Up @@ -1593,7 +1592,7 @@ impl<
}
// explicitly drop permit in the thread so the permit is included in the thread
if let Some(permit) = request_permit {
drop(permit);
permit.drop();
}
});

Expand Down
2 changes: 1 addition & 1 deletion crates/portalnet/src/put_content.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use crate::{
request::{OverlayRequest, RequestDirection},
},
types::kbucket::SharedKBucketsTable,
utp_controller::UtpController,
utp::controller::UtpController,
};

/// Datatype to store the result of a put content request.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ use std::{sync::Arc, time::Duration};
use anyhow::anyhow;
use bytes::Bytes;
use lazy_static::lazy_static;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tokio::sync::Semaphore;
use tracing::debug;
use trin_metrics::{
labels::{UtpDirectionLabel, UtpOutcomeLabel},
overlay::OverlayMetricsReporter,
};
use utp_rs::{cid::ConnectionId, conn::ConnectionConfig, socket::UtpSocket};

use super::timed_semaphore::OwnedTimedSemaphorePermit;
use crate::discovery::UtpEnr;
/// UtpController is meant to be a container which contains all code related to/for managing uTP
/// streams We are implementing this because we want the utils of controlling uTP connection to be
Expand Down Expand Up @@ -65,29 +66,45 @@ impl UtpController {
}

/// Non-blocking method to try and acquire a permit for an outbound uTP transfer.
// `try_acquire_owned()` isn't blocking and will instantly return with
// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
pub fn get_outbound_semaphore(&self) -> Option<OwnedSemaphorePermit> {
/// `try_acquire_owned()` isn't blocking and will instantly return with
/// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
pub fn get_outbound_semaphore(&self) -> Option<OwnedTimedSemaphorePermit> {
match self
.outbound_utp_transfer_semaphore
.clone()
.try_acquire_owned()
{
Ok(permit) => Some(permit),
Ok(permit) => {
let histogram_timer = self
.metrics
.start_utp_process_timer(UtpDirectionLabel::Outbound);
Some(OwnedTimedSemaphorePermit {
permit,
histogram_timer,
})
}
Err(_) => None,
}
}

/// Non-blocking method to try and acquire a permit for an inbound uTP transfer.
// `try_acquire_owned()` isn't blocking and will instantly return with
// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
pub fn get_inbound_semaphore(&self) -> Option<OwnedSemaphorePermit> {
/// `try_acquire_owned()` isn't blocking and will instantly return with
/// `Some(TryAcquireError::NoPermits)` error if there isn't a permit available
pub fn get_inbound_semaphore(&self) -> Option<OwnedTimedSemaphorePermit> {
match self
.inbound_utp_transfer_semaphore
.clone()
.try_acquire_owned()
{
Ok(permit) => Some(permit),
Ok(permit) => {
let histogram_timer = self
.metrics
.start_utp_process_timer(UtpDirectionLabel::Inbound);
Some(OwnedTimedSemaphorePermit {
permit,
histogram_timer,
})
}
Err(_) => None,
}
}
Expand Down
2 changes: 2 additions & 0 deletions crates/portalnet/src/utp/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod controller;
pub mod timed_semaphore;
15 changes: 15 additions & 0 deletions crates/portalnet/src/utp/timed_semaphore.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use tokio::sync::OwnedSemaphorePermit;
use trin_metrics::timer::DiscardOnDropHistogramTimer;

#[derive(Debug)]
pub struct OwnedTimedSemaphorePermit {
pub permit: OwnedSemaphorePermit,
pub histogram_timer: DiscardOnDropHistogramTimer,
}

impl OwnedTimedSemaphorePermit {
pub fn drop(self) {
self.histogram_timer.stop_and_record();
drop(self.permit);
}
}

0 comments on commit a8b2ac9

Please sign in to comment.