Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(cat-gateway): Add memory metrics to improve observability #1499

Merged
merged 20 commits into from
Jan 16, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ regex = "1.11.1"
minijinja = "2.5.0"
bytes = "1.9.0"
mime = "0.3.17"
stats_alloc = "0.1.10"
memory-stats = "1.0.0"

[dev-dependencies]
proptest = "1.5.0"
Expand Down
166 changes: 166 additions & 0 deletions catalyst-gateway/bin/src/metrics/memory.rs
Original file line number Diff line number Diff line change
@@ -1 +1,167 @@
//! Metrics related to memory analytics.
use std::{
alloc::System,
sync::atomic::{AtomicBool, Ordering},
thread,
};

use memory_stats::{memory_stats, MemoryStats};
use stats_alloc::{Region, StatsAlloc, INSTRUMENTED_SYSTEM};

use crate::settings::Settings;

/// Use the instrumented allocator for gathering allocation statistics.
/// Note: This wraps the global allocator.
/// All structs that use the global allocator can be tracked.
#[global_allocator]
static GLOBAL: &StatsAlloc<System> = &INSTRUMENTED_SYSTEM;

/// This is to prevent the init function from accidentally being called multiple times.
static IS_INITIALIZED: AtomicBool = AtomicBool::new(false);

/// Starts a background thread to periodically update memory metrics.
///
/// This function spawns a thread that updates the global `MemoryMetrics`
/// structure at regular intervals defined by `UPDATE_INTERVAL_MILLI`.
pub(crate) fn init_metrics_updater() {
if IS_INITIALIZED.swap(true, Ordering::SeqCst) {
return;
}

let stats = Region::new(GLOBAL);
let api_host_names = Settings::api_host_names().join(",");
let service_id = Settings::service_id();

thread::spawn(move || {
loop {
{
let allocator_stats = stats.change();
let mem_stats = memory_stats().unwrap_or({
MemoryStats {
physical_mem: 0,
virtual_mem: 0,
}
});

reporter::MEMORY_PHYSICAL_USAGE
.with_label_values(&[&api_host_names, service_id])
stevenj marked this conversation as resolved.
Show resolved Hide resolved
.set(i64::try_from(mem_stats.physical_mem).unwrap_or(-1));
reporter::MEMORY_VIRTUAL_USAGE
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(mem_stats.virtual_mem).unwrap_or(-1));
reporter::MEMORY_ALLOCATION_COUNT
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.allocations).unwrap_or(-1));
reporter::MEMORY_DEALLOCATION_COUNT
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.deallocations).unwrap_or(-1));
reporter::MEMORY_REALLOCATION_COUNT
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.reallocations).unwrap_or(-1));
reporter::MEMORY_BYTES_ALLOCATED
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.bytes_allocated).unwrap_or(-1));
reporter::MEMORY_BYTES_DEALLOCATED
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.bytes_deallocated).unwrap_or(-1));
reporter::MEMORY_BYTES_REALLOCATED
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.bytes_reallocated).unwrap_or(-1));
}

thread::sleep(Settings::metrics_memory_interval());
}
});
}

/// All the related memory reporting metrics to the Prometheus service are inside this
/// module.
mod reporter {
use std::sync::LazyLock;

use prometheus::{register_int_gauge_vec, IntGaugeVec};

/// Labels for the client metrics
const MEMORY_METRIC_LABELS: [&str; 2] = ["api_host_names", "service_id"];

/// The "physical" memory used by this process, in bytes.
pub(super) static MEMORY_PHYSICAL_USAGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_physical_usage",
"Amount of physical memory usage in bytes",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The "virtual" memory used by this process, in bytes.
pub(super) static MEMORY_VIRTUAL_USAGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_virtual_usage",
"Amount of physical virtual usage in bytes",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The number of allocation count in the heap.
pub(super) static MEMORY_ALLOCATION_COUNT: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_allocation_count",
"Number of allocation count in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The number of deallocation count in the heap.
pub(super) static MEMORY_DEALLOCATION_COUNT: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_deallocation_count",
"Number of deallocation count in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The number of reallocation count in the heap.
pub(super) static MEMORY_REALLOCATION_COUNT: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_reallocation_count",
"Number of reallocation count in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The amount of accumulative allocated bytes in the heap.
pub(super) static MEMORY_BYTES_ALLOCATED: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_bytes_allocated",
"Amount of accumulative allocated bytes in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The amount of accumulative deallocated bytes in the heap.
pub(super) static MEMORY_BYTES_DEALLOCATED: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_bytes_deallocated",
"Amount of accumulative deallocated bytes in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The amount of accumulative reallocated bytes in the heap.
pub(super) static MEMORY_BYTES_REALLOCATED: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_bytes_reallocated",
"Amount of accumulative reallocated bytes in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});
}
2 changes: 2 additions & 0 deletions catalyst-gateway/bin/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ pub(crate) mod memory;
/// Returns the default prometheus registry.
#[must_use]
pub(crate) fn init_prometheus() -> Registry {
memory::init_metrics_updater();

default_registry().clone()
}
34 changes: 19 additions & 15 deletions catalyst-gateway/bin/src/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use anyhow::anyhow;
use cardano_chain_follower::Network;
use clap::Args;
use dotenvy::dotenv;
use duration_string::DurationString;
use str_env_var::StringEnvVar;
use tracing::error;
use url::Url;
Expand Down Expand Up @@ -52,6 +51,9 @@ const API_URL_PREFIX_DEFAULT: &str = "/api";
/// Default `CHECK_CONFIG_TICK` used in development.
const CHECK_CONFIG_TICK_DEFAULT: &str = "5s";

/// Default `METRICS_MEMORY_INTERVAL`.
const METRICS_MEMORY_INTERVAL_DEFAULT: &str = "1s";

/// Default Event DB URL.
const EVENT_DB_URL_DEFAULT: &str =
"postgresql://postgres:postgres@localhost/catalyst_events?sslmode=disable";
Expand Down Expand Up @@ -144,6 +146,9 @@ struct EnvVars {
/// Tick every N seconds until config exists in db
#[allow(unused)]
check_config_tick: Duration,

/// Interval for updating and sending memory metrics.
metrics_memory_interval: Duration,
}

// Lazy initialization of all env vars which are not command line parameters.
Expand All @@ -157,19 +162,6 @@ static ENV_VARS: LazyLock<EnvVars> = LazyLock::new(|| {
// Support env vars in a `.env` file, doesn't need to exist.
dotenv().ok();

let check_interval = StringEnvVar::new("CHECK_CONFIG_TICK", CHECK_CONFIG_TICK_DEFAULT.into());
let check_config_tick = match DurationString::try_from(check_interval.as_string()) {
Ok(duration) => duration.into(),
Err(error) => {
error!(
"Invalid Check Config Tick Duration: {} : {}. Defaulting to 5 seconds.",
check_interval.as_str(),
error
);
Duration::from_secs(5)
},
};

EnvVars {
github_repo_owner: StringEnvVar::new("GITHUB_REPO_OWNER", GITHUB_REPO_OWNER_DEFAULT.into()),
github_repo_name: StringEnvVar::new("GITHUB_REPO_NAME", GITHUB_REPO_NAME_DEFAULT.into()),
Expand All @@ -194,7 +186,14 @@ static ENV_VARS: LazyLock<EnvVars> = LazyLock::new(|| {
),
chain_follower: chain_follower::EnvVars::new(),
internal_api_key: StringEnvVar::new_optional("INTERNAL_API_KEY", true),
check_config_tick,
check_config_tick: StringEnvVar::new_as_duration(
"CHECK_CONFIG_TICK",
CHECK_CONFIG_TICK_DEFAULT,
),
metrics_memory_interval: StringEnvVar::new_as_duration(
"METRICS_MEMORY_INTERVAL",
METRICS_MEMORY_INTERVAL_DEFAULT,
),
}
});

Expand Down Expand Up @@ -288,6 +287,11 @@ impl Settings {
ENV_VARS.service_id.as_str()
}

/// The memory metrics interval
pub(crate) fn metrics_memory_interval() -> Duration {
ENV_VARS.metrics_memory_interval
}

/// Get a list of all host names to serve the API on.
///
/// Used by the `OpenAPI` Documentation to point to the correct backend.
Expand Down
38 changes: 38 additions & 0 deletions catalyst-gateway/bin/src/settings/str_env_var.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
//! Processing for String Environment Variables
// cspell: words smhdwy

use std::{
env::{self, VarError},
fmt::{self, Display},
str::FromStr,
time::Duration,
};

use duration_string::DurationString;
use strum::VariantNames;
use tracing::{error, info};

Expand Down Expand Up @@ -168,6 +173,39 @@ impl StringEnvVar {
value
}

/// Convert an Envvar into the required Duration type.
pub(crate) fn new_as_duration(var_name: &str, default: &str) -> Duration {
let choices = "A value in the format of `[0-9]+(ns|us|ms|[smhdwy])`";

let raw_value = StringEnvVar::new(
var_name,
(default.to_string().as_str(), false, choices).into(),
)
.as_string();

match DurationString::try_from(raw_value.clone()) {
Ok(duration) => duration.into(),
Err(error) => {
error!(
"Invalid Duration: {} : {}. Defaulting to {}.",
raw_value, error, default
);

match DurationString::try_from(default.to_string()) {
Ok(duration) => duration.into(),
// The error from parsing the default value must not happen
Err(error) => {
error!(
"Invalid Default Duration: {} : {}. Defaulting to 1s.",
default, error
);
Duration::from_secs(1)
},
}
},
}
}

/// Convert an Envvar into an integer in the bounded range.
pub(super) fn new_as<T>(var_name: &str, default: T, min: T, max: T) -> T
where
Expand Down
Loading