Skip to content

Commit

Permalink
Merge branch 'main' into test/nightly-schemathesis
Browse files Browse the repository at this point in the history
  • Loading branch information
stevenj authored Jan 16, 2025
2 parents 370a7dc + 21263b1 commit a4b96c0
Show file tree
Hide file tree
Showing 5 changed files with 227 additions and 15 deletions.
2 changes: 2 additions & 0 deletions catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,8 @@ regex = "1.11.1"
minijinja = "2.5.0"
bytes = "1.9.0"
mime = "0.3.17"
stats_alloc = "0.1.10"
memory-stats = "1.0.0"

[dev-dependencies]
proptest = "1.5.0"
Expand Down
166 changes: 166 additions & 0 deletions catalyst-gateway/bin/src/metrics/memory.rs
Original file line number Diff line number Diff line change
@@ -1 +1,167 @@
//! Metrics related to memory analytics.
use std::{
alloc::System,
sync::atomic::{AtomicBool, Ordering},
thread,
};

use memory_stats::{memory_stats, MemoryStats};
use stats_alloc::{Region, StatsAlloc, INSTRUMENTED_SYSTEM};

use crate::settings::Settings;

/// Use the instrumented allocator for gathering allocation statistics.
/// Note: This wraps the global allocator.
/// All structs that use the global allocator can be tracked.
#[global_allocator]
static GLOBAL: &StatsAlloc<System> = &INSTRUMENTED_SYSTEM;

/// This is to prevent the init function from accidentally being called multiple times.
static IS_INITIALIZED: AtomicBool = AtomicBool::new(false);

/// Starts a background thread to periodically update memory metrics.
///
/// This function spawns a thread that updates the global `MemoryMetrics`
/// structure at regular intervals defined by `UPDATE_INTERVAL_MILLI`.
pub(crate) fn init_metrics_updater() {
if IS_INITIALIZED.swap(true, Ordering::SeqCst) {
return;
}

let stats = Region::new(GLOBAL);
let api_host_names = Settings::api_host_names().join(",");
let service_id = Settings::service_id();

thread::spawn(move || {
loop {
{
let allocator_stats = stats.change();
let mem_stats = memory_stats().unwrap_or({
MemoryStats {
physical_mem: 0,
virtual_mem: 0,
}
});

reporter::MEMORY_PHYSICAL_USAGE
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(mem_stats.physical_mem).unwrap_or(-1));
reporter::MEMORY_VIRTUAL_USAGE
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(mem_stats.virtual_mem).unwrap_or(-1));
reporter::MEMORY_ALLOCATION_COUNT
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.allocations).unwrap_or(-1));
reporter::MEMORY_DEALLOCATION_COUNT
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.deallocations).unwrap_or(-1));
reporter::MEMORY_REALLOCATION_COUNT
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.reallocations).unwrap_or(-1));
reporter::MEMORY_BYTES_ALLOCATED
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.bytes_allocated).unwrap_or(-1));
reporter::MEMORY_BYTES_DEALLOCATED
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.bytes_deallocated).unwrap_or(-1));
reporter::MEMORY_BYTES_REALLOCATED
.with_label_values(&[&api_host_names, service_id])
.set(i64::try_from(allocator_stats.bytes_reallocated).unwrap_or(-1));
}

thread::sleep(Settings::metrics_memory_interval());
}
});
}

/// All the related memory reporting metrics to the Prometheus service are inside this
/// module.
mod reporter {
use std::sync::LazyLock;

use prometheus::{register_int_gauge_vec, IntGaugeVec};

/// Labels for the client metrics
const MEMORY_METRIC_LABELS: [&str; 2] = ["api_host_names", "service_id"];

/// The "physical" memory used by this process, in bytes.
pub(super) static MEMORY_PHYSICAL_USAGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_physical_usage",
"Amount of physical memory usage in bytes",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The "virtual" memory used by this process, in bytes.
pub(super) static MEMORY_VIRTUAL_USAGE: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_virtual_usage",
"Amount of physical virtual usage in bytes",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The number of allocation count in the heap.
pub(super) static MEMORY_ALLOCATION_COUNT: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_allocation_count",
"Number of allocation count in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The number of deallocation count in the heap.
pub(super) static MEMORY_DEALLOCATION_COUNT: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_deallocation_count",
"Number of deallocation count in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The number of reallocation count in the heap.
pub(super) static MEMORY_REALLOCATION_COUNT: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_reallocation_count",
"Number of reallocation count in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The amount of accumulative allocated bytes in the heap.
pub(super) static MEMORY_BYTES_ALLOCATED: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_bytes_allocated",
"Amount of accumulative allocated bytes in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The amount of accumulative deallocated bytes in the heap.
pub(super) static MEMORY_BYTES_DEALLOCATED: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_bytes_deallocated",
"Amount of accumulative deallocated bytes in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});

/// The amount of accumulative reallocated bytes in the heap.
pub(super) static MEMORY_BYTES_REALLOCATED: LazyLock<IntGaugeVec> = LazyLock::new(|| {
register_int_gauge_vec!(
"memory_bytes_reallocated",
"Amount of accumulative reallocated bytes in the heap",
&MEMORY_METRIC_LABELS
)
.unwrap()
});
}
2 changes: 2 additions & 0 deletions catalyst-gateway/bin/src/metrics/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,7 @@ pub(crate) mod memory;
/// Returns the default prometheus registry.
#[must_use]
pub(crate) fn init_prometheus() -> Registry {
memory::init_metrics_updater();

default_registry().clone()
}
34 changes: 19 additions & 15 deletions catalyst-gateway/bin/src/settings/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ use anyhow::anyhow;
use cardano_chain_follower::Network;
use clap::Args;
use dotenvy::dotenv;
use duration_string::DurationString;
use str_env_var::StringEnvVar;
use tracing::error;
use url::Url;
Expand Down Expand Up @@ -52,6 +51,9 @@ const API_URL_PREFIX_DEFAULT: &str = "/api";
/// Default `CHECK_CONFIG_TICK` used in development.
const CHECK_CONFIG_TICK_DEFAULT: &str = "5s";

/// Default `METRICS_MEMORY_INTERVAL`.
const METRICS_MEMORY_INTERVAL_DEFAULT: &str = "1s";

/// Default Event DB URL.
const EVENT_DB_URL_DEFAULT: &str =
"postgresql://postgres:postgres@localhost/catalyst_events?sslmode=disable";
Expand Down Expand Up @@ -144,6 +146,9 @@ struct EnvVars {
/// Tick every N seconds until config exists in db
#[allow(unused)]
check_config_tick: Duration,

/// Interval for updating and sending memory metrics.
metrics_memory_interval: Duration,
}

// Lazy initialization of all env vars which are not command line parameters.
Expand All @@ -157,19 +162,6 @@ static ENV_VARS: LazyLock<EnvVars> = LazyLock::new(|| {
// Support env vars in a `.env` file, doesn't need to exist.
dotenv().ok();

let check_interval = StringEnvVar::new("CHECK_CONFIG_TICK", CHECK_CONFIG_TICK_DEFAULT.into());
let check_config_tick = match DurationString::try_from(check_interval.as_string()) {
Ok(duration) => duration.into(),
Err(error) => {
error!(
"Invalid Check Config Tick Duration: {} : {}. Defaulting to 5 seconds.",
check_interval.as_str(),
error
);
Duration::from_secs(5)
},
};

EnvVars {
github_repo_owner: StringEnvVar::new("GITHUB_REPO_OWNER", GITHUB_REPO_OWNER_DEFAULT.into()),
github_repo_name: StringEnvVar::new("GITHUB_REPO_NAME", GITHUB_REPO_NAME_DEFAULT.into()),
Expand All @@ -194,7 +186,14 @@ static ENV_VARS: LazyLock<EnvVars> = LazyLock::new(|| {
),
chain_follower: chain_follower::EnvVars::new(),
internal_api_key: StringEnvVar::new_optional("INTERNAL_API_KEY", true),
check_config_tick,
check_config_tick: StringEnvVar::new_as_duration(
"CHECK_CONFIG_TICK",
CHECK_CONFIG_TICK_DEFAULT,
),
metrics_memory_interval: StringEnvVar::new_as_duration(
"METRICS_MEMORY_INTERVAL",
METRICS_MEMORY_INTERVAL_DEFAULT,
),
}
});

Expand Down Expand Up @@ -288,6 +287,11 @@ impl Settings {
ENV_VARS.service_id.as_str()
}

/// The memory metrics interval
pub(crate) fn metrics_memory_interval() -> Duration {
ENV_VARS.metrics_memory_interval
}

/// Get a list of all host names to serve the API on.
///
/// Used by the `OpenAPI` Documentation to point to the correct backend.
Expand Down
38 changes: 38 additions & 0 deletions catalyst-gateway/bin/src/settings/str_env_var.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
//! Processing for String Environment Variables
// cspell: words smhdwy

use std::{
env::{self, VarError},
fmt::{self, Display},
str::FromStr,
time::Duration,
};

use duration_string::DurationString;
use strum::VariantNames;
use tracing::{error, info};

Expand Down Expand Up @@ -168,6 +173,39 @@ impl StringEnvVar {
value
}

/// Convert an Envvar into the required Duration type.
pub(crate) fn new_as_duration(var_name: &str, default: &str) -> Duration {
let choices = "A value in the format of `[0-9]+(ns|us|ms|[smhdwy])`";

let raw_value = StringEnvVar::new(
var_name,
(default.to_string().as_str(), false, choices).into(),
)
.as_string();

match DurationString::try_from(raw_value.clone()) {
Ok(duration) => duration.into(),
Err(error) => {
error!(
"Invalid Duration: {} : {}. Defaulting to {}.",
raw_value, error, default
);

match DurationString::try_from(default.to_string()) {
Ok(duration) => duration.into(),
// The error from parsing the default value must not happen
Err(error) => {
error!(
"Invalid Default Duration: {} : {}. Defaulting to 1s.",
default, error
);
Duration::from_secs(1)
},
}
},
}
}

/// Convert an Envvar into an integer in the bounded range.
pub(super) fn new_as<T>(var_name: &str, default: T, min: T, max: T) -> T
where
Expand Down

0 comments on commit a4b96c0

Please sign in to comment.