diff --git a/catalyst-gateway/bin/Cargo.toml b/catalyst-gateway/bin/Cargo.toml index f55833967ad..565f0ed396c 100644 --- a/catalyst-gateway/bin/Cargo.toml +++ b/catalyst-gateway/bin/Cargo.toml @@ -98,6 +98,8 @@ regex = "1.11.1" minijinja = "2.5.0" bytes = "1.9.0" mime = "0.3.17" +stats_alloc = "0.1.10" +memory-stats = "1.0.0" [dev-dependencies] proptest = "1.5.0" diff --git a/catalyst-gateway/bin/src/metrics/memory.rs b/catalyst-gateway/bin/src/metrics/memory.rs index 7da8bd333e9..13874d98744 100644 --- a/catalyst-gateway/bin/src/metrics/memory.rs +++ b/catalyst-gateway/bin/src/metrics/memory.rs @@ -1 +1,167 @@ //! Metrics related to memory analytics. + +use std::{ + alloc::System, + sync::atomic::{AtomicBool, Ordering}, + thread, +}; + +use memory_stats::{memory_stats, MemoryStats}; +use stats_alloc::{Region, StatsAlloc, INSTRUMENTED_SYSTEM}; + +use crate::settings::Settings; + +/// Use the instrumented allocator for gathering allocation statistics. +/// Note: This wraps the global allocator. +/// All structs that use the global allocator can be tracked. +#[global_allocator] +static GLOBAL: &StatsAlloc = &INSTRUMENTED_SYSTEM; + +/// This is to prevent the init function from accidentally being called multiple times. +static IS_INITIALIZED: AtomicBool = AtomicBool::new(false); + +/// Starts a background thread to periodically update memory metrics. +/// +/// This function spawns a thread that updates the global `MemoryMetrics` +/// structure at regular intervals defined by `UPDATE_INTERVAL_MILLI`. +pub(crate) fn init_metrics_updater() { + if IS_INITIALIZED.swap(true, Ordering::SeqCst) { + return; + } + + let stats = Region::new(GLOBAL); + let api_host_names = Settings::api_host_names().join(","); + let service_id = Settings::service_id(); + + thread::spawn(move || { + loop { + { + let allocator_stats = stats.change(); + let mem_stats = memory_stats().unwrap_or({ + MemoryStats { + physical_mem: 0, + virtual_mem: 0, + } + }); + + reporter::MEMORY_PHYSICAL_USAGE + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(mem_stats.physical_mem).unwrap_or(-1)); + reporter::MEMORY_VIRTUAL_USAGE + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(mem_stats.virtual_mem).unwrap_or(-1)); + reporter::MEMORY_ALLOCATION_COUNT + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(allocator_stats.allocations).unwrap_or(-1)); + reporter::MEMORY_DEALLOCATION_COUNT + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(allocator_stats.deallocations).unwrap_or(-1)); + reporter::MEMORY_REALLOCATION_COUNT + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(allocator_stats.reallocations).unwrap_or(-1)); + reporter::MEMORY_BYTES_ALLOCATED + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(allocator_stats.bytes_allocated).unwrap_or(-1)); + reporter::MEMORY_BYTES_DEALLOCATED + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(allocator_stats.bytes_deallocated).unwrap_or(-1)); + reporter::MEMORY_BYTES_REALLOCATED + .with_label_values(&[&api_host_names, service_id]) + .set(i64::try_from(allocator_stats.bytes_reallocated).unwrap_or(-1)); + } + + thread::sleep(Settings::metrics_memory_interval()); + } + }); +} + +/// All the related memory reporting metrics to the Prometheus service are inside this +/// module. +mod reporter { + use std::sync::LazyLock; + + use prometheus::{register_int_gauge_vec, IntGaugeVec}; + + /// Labels for the client metrics + const MEMORY_METRIC_LABELS: [&str; 2] = ["api_host_names", "service_id"]; + + /// The "physical" memory used by this process, in bytes. + pub(super) static MEMORY_PHYSICAL_USAGE: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_physical_usage", + "Amount of physical memory usage in bytes", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The "virtual" memory used by this process, in bytes. + pub(super) static MEMORY_VIRTUAL_USAGE: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_virtual_usage", + "Amount of physical virtual usage in bytes", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The number of allocation count in the heap. + pub(super) static MEMORY_ALLOCATION_COUNT: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_allocation_count", + "Number of allocation count in the heap", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The number of deallocation count in the heap. + pub(super) static MEMORY_DEALLOCATION_COUNT: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_deallocation_count", + "Number of deallocation count in the heap", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The number of reallocation count in the heap. + pub(super) static MEMORY_REALLOCATION_COUNT: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_reallocation_count", + "Number of reallocation count in the heap", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The amount of accumulative allocated bytes in the heap. + pub(super) static MEMORY_BYTES_ALLOCATED: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_bytes_allocated", + "Amount of accumulative allocated bytes in the heap", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The amount of accumulative deallocated bytes in the heap. + pub(super) static MEMORY_BYTES_DEALLOCATED: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_bytes_deallocated", + "Amount of accumulative deallocated bytes in the heap", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); + + /// The amount of accumulative reallocated bytes in the heap. + pub(super) static MEMORY_BYTES_REALLOCATED: LazyLock = LazyLock::new(|| { + register_int_gauge_vec!( + "memory_bytes_reallocated", + "Amount of accumulative reallocated bytes in the heap", + &MEMORY_METRIC_LABELS + ) + .unwrap() + }); +} diff --git a/catalyst-gateway/bin/src/metrics/mod.rs b/catalyst-gateway/bin/src/metrics/mod.rs index ba8d71503da..74725efee27 100644 --- a/catalyst-gateway/bin/src/metrics/mod.rs +++ b/catalyst-gateway/bin/src/metrics/mod.rs @@ -14,5 +14,7 @@ pub(crate) mod memory; /// Returns the default prometheus registry. #[must_use] pub(crate) fn init_prometheus() -> Registry { + memory::init_metrics_updater(); + default_registry().clone() } diff --git a/catalyst-gateway/bin/src/settings/mod.rs b/catalyst-gateway/bin/src/settings/mod.rs index 6be03945050..d3702d39373 100644 --- a/catalyst-gateway/bin/src/settings/mod.rs +++ b/catalyst-gateway/bin/src/settings/mod.rs @@ -11,7 +11,6 @@ use anyhow::anyhow; use cardano_chain_follower::Network; use clap::Args; use dotenvy::dotenv; -use duration_string::DurationString; use str_env_var::StringEnvVar; use tracing::error; use url::Url; @@ -52,6 +51,9 @@ const API_URL_PREFIX_DEFAULT: &str = "/api"; /// Default `CHECK_CONFIG_TICK` used in development. const CHECK_CONFIG_TICK_DEFAULT: &str = "5s"; +/// Default `METRICS_MEMORY_INTERVAL`. +const METRICS_MEMORY_INTERVAL_DEFAULT: &str = "1s"; + /// Default Event DB URL. const EVENT_DB_URL_DEFAULT: &str = "postgresql://postgres:postgres@localhost/catalyst_events?sslmode=disable"; @@ -144,6 +146,9 @@ struct EnvVars { /// Tick every N seconds until config exists in db #[allow(unused)] check_config_tick: Duration, + + /// Interval for updating and sending memory metrics. + metrics_memory_interval: Duration, } // Lazy initialization of all env vars which are not command line parameters. @@ -157,19 +162,6 @@ static ENV_VARS: LazyLock = LazyLock::new(|| { // Support env vars in a `.env` file, doesn't need to exist. dotenv().ok(); - let check_interval = StringEnvVar::new("CHECK_CONFIG_TICK", CHECK_CONFIG_TICK_DEFAULT.into()); - let check_config_tick = match DurationString::try_from(check_interval.as_string()) { - Ok(duration) => duration.into(), - Err(error) => { - error!( - "Invalid Check Config Tick Duration: {} : {}. Defaulting to 5 seconds.", - check_interval.as_str(), - error - ); - Duration::from_secs(5) - }, - }; - EnvVars { github_repo_owner: StringEnvVar::new("GITHUB_REPO_OWNER", GITHUB_REPO_OWNER_DEFAULT.into()), github_repo_name: StringEnvVar::new("GITHUB_REPO_NAME", GITHUB_REPO_NAME_DEFAULT.into()), @@ -194,7 +186,14 @@ static ENV_VARS: LazyLock = LazyLock::new(|| { ), chain_follower: chain_follower::EnvVars::new(), internal_api_key: StringEnvVar::new_optional("INTERNAL_API_KEY", true), - check_config_tick, + check_config_tick: StringEnvVar::new_as_duration( + "CHECK_CONFIG_TICK", + CHECK_CONFIG_TICK_DEFAULT, + ), + metrics_memory_interval: StringEnvVar::new_as_duration( + "METRICS_MEMORY_INTERVAL", + METRICS_MEMORY_INTERVAL_DEFAULT, + ), } }); @@ -288,6 +287,11 @@ impl Settings { ENV_VARS.service_id.as_str() } + /// The memory metrics interval + pub(crate) fn metrics_memory_interval() -> Duration { + ENV_VARS.metrics_memory_interval + } + /// Get a list of all host names to serve the API on. /// /// Used by the `OpenAPI` Documentation to point to the correct backend. diff --git a/catalyst-gateway/bin/src/settings/str_env_var.rs b/catalyst-gateway/bin/src/settings/str_env_var.rs index 18e8a6414b7..4676e29eb93 100644 --- a/catalyst-gateway/bin/src/settings/str_env_var.rs +++ b/catalyst-gateway/bin/src/settings/str_env_var.rs @@ -1,10 +1,15 @@ //! Processing for String Environment Variables + +// cspell: words smhdwy + use std::{ env::{self, VarError}, fmt::{self, Display}, str::FromStr, + time::Duration, }; +use duration_string::DurationString; use strum::VariantNames; use tracing::{error, info}; @@ -168,6 +173,39 @@ impl StringEnvVar { value } + /// Convert an Envvar into the required Duration type. + pub(crate) fn new_as_duration(var_name: &str, default: &str) -> Duration { + let choices = "A value in the format of `[0-9]+(ns|us|ms|[smhdwy])`"; + + let raw_value = StringEnvVar::new( + var_name, + (default.to_string().as_str(), false, choices).into(), + ) + .as_string(); + + match DurationString::try_from(raw_value.clone()) { + Ok(duration) => duration.into(), + Err(error) => { + error!( + "Invalid Duration: {} : {}. Defaulting to {}.", + raw_value, error, default + ); + + match DurationString::try_from(default.to_string()) { + Ok(duration) => duration.into(), + // The error from parsing the default value must not happen + Err(error) => { + error!( + "Invalid Default Duration: {} : {}. Defaulting to 1s.", + default, error + ); + Duration::from_secs(1) + }, + } + }, + } + } + /// Convert an Envvar into an integer in the bounded range. pub(super) fn new_as(var_name: &str, default: T, min: T, max: T) -> T where