Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

wrapper around jemalloc to track allocator usage by thread #4336

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
99 changes: 54 additions & 45 deletions Cargo.lock

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -452,6 +452,7 @@ soketto = "0.7"
solana-account = { path = "sdk/account", version = "=2.2.0" }
solana-account-decoder = { path = "account-decoder", version = "=2.2.0" }
solana-account-decoder-client-types = { path = "account-decoder-client-types", version = "=2.2.0" }
solana-memory-management = { path = "memory-management", version = "=2.2.0" }
solana-account-info = { path = "sdk/account-info", version = "=2.2.0" }
solana-accounts-db = { path = "accounts-db", version = "=2.2.0" }
solana-address-lookup-table-interface = { path = "sdk/address-lookup-table-interface", version = "=2.2.0" }
Expand Down
10 changes: 10 additions & 0 deletions memory-management/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,13 @@ repository = { workspace = true }
homepage = { workspace = true }
license = { workspace = true }
edition = { workspace = true }

[dependencies]
arrayvec = { workspace = true }
libc = { workspace = true }
log = { workspace = true }
memchr = "2.7.4"
solana-metrics = { workspace = true }

[target.'cfg(not(any(target_env = "msvc", target_os = "freebsd")))'.dependencies]
jemallocator = { workspace = true }
217 changes: 217 additions & 0 deletions memory-management/src/jemalloc_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,217 @@
#![cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
use {
jemallocator::Jemalloc,
std::{
alloc::{GlobalAlloc, Layout},
cell::RefCell,
sync::{
atomic::{AtomicUsize, Ordering},
RwLock,
},
},
};

const NAME_LEN: usize = 16;
type ThreadName = arrayvec::ArrayVec<u8, NAME_LEN>;

static SELF: JemWrapStats = JemWrapStats {
named_thread_stats: RwLock::new(None),
unnamed_thread_stats: Counters::new(),
process_stats: Counters::new(),
};

#[derive(Debug)]
pub struct Counters {
allocations_total: AtomicUsize,
deallocations_total: AtomicUsize,
bytes_allocated_total: AtomicUsize,
bytes_deallocated_total: AtomicUsize,
}

pub struct CountersView {
pub allocations_total: usize,
pub deallocations_total: usize,
pub bytes_allocated_total: usize,
pub bytes_deallocated_total: usize,
}

impl Default for Counters {
fn default() -> Self {
Self::new()
}
}

impl Counters {
pub fn view(&self) -> CountersView {
CountersView {
allocations_total: self.allocations_total.load(Ordering::Relaxed),
deallocations_total: self.deallocations_total.load(Ordering::Relaxed),
bytes_allocated_total: self.bytes_allocated_total.load(Ordering::Relaxed),
bytes_deallocated_total: self.bytes_deallocated_total.load(Ordering::Relaxed),
}
}

const fn new() -> Self {
Self {
allocations_total: AtomicUsize::new(0),
deallocations_total: AtomicUsize::new(0),
bytes_allocated_total: AtomicUsize::new(0),
bytes_deallocated_total: AtomicUsize::new(0),
}
}
}

impl Counters {
pub fn alloc(&self, size: usize) {
self.bytes_allocated_total
.fetch_add(size, Ordering::Relaxed);
self.allocations_total.fetch_add(1, Ordering::Relaxed);
}
pub fn dealloc(&self, size: usize) {
self.bytes_deallocated_total
.fetch_add(size, Ordering::Relaxed);
self.deallocations_total.fetch_add(1, Ordering::Relaxed);
}
}

#[repr(C, align(4096))]
pub struct JemWrapAllocator {
jemalloc: Jemalloc,
}

impl JemWrapAllocator {
pub const fn new() -> Self {
Self { jemalloc: Jemalloc }
}
}

impl Default for JemWrapAllocator {
fn default() -> Self {
Self::new()
}
}

struct JemWrapStats {
pub named_thread_stats: RwLock<Option<MemPoolStats>>,
pub unnamed_thread_stats: Counters,
pub process_stats: Counters,
}

pub fn view_allocations(f: impl FnOnce(&MemPoolStats)) {
let lock_guard = &SELF.named_thread_stats.read().unwrap();
if let Some(stats) = lock_guard.as_ref() {
f(stats);
}
}
pub fn view_global_allocations() -> (CountersView, CountersView) {
(SELF.unnamed_thread_stats.view(), SELF.process_stats.view())
}

#[derive(Debug, Default)]
pub struct MemPoolStats {
pub data: Vec<(ThreadName, Counters)>,
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
}

impl MemPoolStats {
pub fn add(&mut self, prefix: &str) {
let key: ThreadName = prefix
.as_bytes()
.try_into()
.unwrap_or_else(|_| panic!("Prefix can not be over {} bytes long", NAME_LEN));

self.data.push((key, Counters::default()));
// keep data sorted with longest prefixes first (this avoids short-circuiting)
// no need for this to be efficient since we do not run time in a tight loop and vec is typically short
self.data.sort_unstable_by(|a, b| b.0.len().cmp(&a.0.len()));
}
}

pub fn init_allocator(mps: MemPoolStats) {
SELF.named_thread_stats.write().unwrap().replace(mps);
}

pub fn deinit_allocator() -> MemPoolStats {
SELF.named_thread_stats.write().unwrap().take().unwrap()
}

unsafe impl Sync for JemWrapAllocator {}

unsafe impl GlobalAlloc for JemWrapAllocator {
unsafe fn alloc(&self, layout: Layout) -> *mut u8 {
let alloc = self.jemalloc.alloc(layout);
if alloc.is_null() {
return alloc;
}
SELF.process_stats.alloc(layout.size());
if let Ok(stats) = SELF.named_thread_stats.try_read() {
if let Some(stats) = stats.as_ref() {
if let Some(stats) = match_thread_name_safely(stats, true) {
stats.alloc(layout.size());
}
}
} else {
SELF.unnamed_thread_stats.alloc(layout.size());
}
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
alloc
}
unsafe fn dealloc(&self, ptr: *mut u8, layout: Layout) {
self.jemalloc.dealloc(ptr, layout);
if ptr.is_null() {
return;
}
SELF.process_stats.dealloc(layout.size());
if let Ok(stats) = SELF.named_thread_stats.try_read() {
if let Some(stats) = stats.as_ref() {
if let Some(stats) = match_thread_name_safely(stats, false) {
stats.dealloc(layout.size());
}
}
} else {
SELF.unnamed_thread_stats.dealloc(layout.size());
}
}
}

thread_local! (
static THREAD_NAME: RefCell<ThreadName> = RefCell::new(ThreadName::new())
);

fn match_thread_name_safely(stats: &MemPoolStats, insert_if_missing: bool) -> Option<&Counters> {
let name: Option<ThreadName> = THREAD_NAME
.try_with(|v| {
let mut name = v.borrow_mut();
if name.is_empty() {
if insert_if_missing {
unsafe {
name.set_len(NAME_LEN);
let res = libc::pthread_getname_np(
libc::pthread_self(),
name.as_mut_ptr() as *mut i8,
name.capacity(),
);
if res == 0 {
let name_len = memchr::memchr(0, &name).unwrap_or(name.len());
name.set_len(name_len);
}
}
} else {
return None;
}
}
Some(name.clone())
})
.ok()
.flatten();
match name {
Some(name) => {
for (prefix, stats) in stats.data.iter() {
if !name.starts_with(prefix) {
continue;
}
return Some(stats);
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
}
None
}
None => None,
}
}
90 changes: 90 additions & 0 deletions memory-management/src/jemalloc_monitor_metrics.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
#![cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
use {
crate::jemalloc_monitor::*, log::Level, solana_metrics::datapoint::DataPoint,
std::time::Duration,
};

fn watcher_thread() {
fn extend_lifetime<'b>(r: &'b str) -> &'static str {
// SAFETY: it is safe to extend lifetimes here since we can never write any metrics beyond the point
// where allocator is deinitialized. The function is private so can not be called from outside
// Metrics can not work with non-static strings due to design limitations.
unsafe { std::mem::transmute::<&'b str, &'static str>(r) }
}
let mut exit = false;
while !exit {
view_allocations(|stats| {
if stats.data.is_empty() {
exit = true;
}
let mut datapoint = DataPoint::new("MemoryBytesAllocatedTotal");
for (name, counters) in stats.data.iter() {
let s = counters.view();
let name = extend_lifetime(std::str::from_utf8(name).unwrap());
datapoint.add_field_i64(name, s.bytes_allocated_total as i64);
}
solana_metrics::submit(datapoint, Level::Info);
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
let mut datapoint = DataPoint::new("MemoryBytesDeallocated");
for (name, counters) in stats.data.iter() {
let s = counters.view();
let name = extend_lifetime(std::str::from_utf8(name).unwrap());
datapoint.add_field_i64(name, s.bytes_deallocated_total as i64);
}
solana_metrics::submit(datapoint, Level::Info);
});
let (cunnamed, _cproc) = view_global_allocations();
let mut datapoint = solana_metrics::datapoint::DataPoint::new("MemoryUnnamedThreads");
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
datapoint.add_field_i64(
"bytes_allocated_total",
cunnamed.bytes_allocated_total as i64,
);
datapoint.add_field_i64(
"bytes_deallocated_total",
cunnamed.bytes_deallocated_total as i64,
);
solana_metrics::submit(datapoint, Level::Info);

std::thread::sleep(Duration::from_millis(1000));
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
}
}

//Agave specific helper to watch for memory usage
pub fn setup_watch_memory_usage() {
let mut mps = MemPoolStats::default();
// this list is brittle but there does not appear to be a better way
// Order of entries matters here, as first matching prefix will be used
// So solGossip will match solGossipConsume as well
for thread in [
alexpyattaev marked this conversation as resolved.
Show resolved Hide resolved
"solPohTickProd",
"solSigVerTpuVot",
"solRcvrGossip",
"solSigVerTpu",
"solClusterSlots",
"solGossipCons",
"solGossipWork",
"solGossip",
"solRepairSvc",
"solRepairListen",
"solReplayTx",
"solReplayFork",
"solRayonGlob",
"solSvrfyShred",
"solSigVerify",
"solRetransmit",
"solRunGossip",
"solWinInsert",
"solAccountsLo",
"solAccounts",
"solAcctHash",
"solQuicClientRt",
"solQuicTVo",
"solQuicTpu",
"solQuicTpuFwd",
"solRepairQuic",
"solTurbineQuic",
] {
mps.add(thread);
}
init_allocator(mps);
std::thread::spawn(watcher_thread);
}
5 changes: 5 additions & 0 deletions memory-management/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,11 @@
#![deny(clippy::arithmetic_side_effects)]
pub mod aligned_memory;

#[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
pub mod jemalloc_monitor;
#[cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
pub mod jemalloc_monitor_metrics;

/// Returns true if `ptr` is aligned to `align`.
pub fn is_memory_aligned(ptr: usize, align: usize) -> bool {
ptr.checked_rem(align)
Expand Down
48 changes: 48 additions & 0 deletions memory-management/tests/jemalloc_wrap.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#![cfg(not(any(target_env = "msvc", target_os = "freebsd")))]
use {solana_memory_management::jemalloc_monitor::*, std::time::Duration};

#[global_allocator]
static GLOBAL_ALLOC_WRAP: JemWrapAllocator = JemWrapAllocator::new();

pub fn print_allocations() {
view_allocations(|stats| {
println!("allocated so far: {:?}", stats);
});
}

// This is not really a test as such, more a canary to check if the logic works and does not deadlock.
// None of the reported data is "exact science"
fn main() {
let mut mps = MemPoolStats::default();
mps.add("Foo");
mps.add("Boo");
init_allocator(mps);

let _s = "allocating a string!".to_owned();
print_allocations();

let jh1 = std::thread::Builder::new()
.name("Foo thread 1".to_string())
.spawn(|| {
let _s2 = "allocating a string!".to_owned();
let _s3 = "allocating a string!".to_owned();
let _s4 = "allocating a string!".to_owned();
let jh2 = std::thread::Builder::new()
.name("Boo thread 1".to_string())
.spawn(|| {
let _s2 = "allocating a string!".to_owned();
let _s3 = "allocating a string!".to_owned();
let _s4 = "allocating a string!".to_owned();
std::thread::sleep(Duration::from_millis(200));
})
.unwrap();
std::thread::sleep(Duration::from_millis(200));
jh2.join().unwrap();
})
.unwrap();
std::thread::sleep(Duration::from_millis(100));
print_allocations();
jh1.join().unwrap();
print_allocations();
deinit_allocator();
}
Loading
Loading