Skip to content

Commit

Permalink
fix obscure validator shutdown issues when running in async context
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Pyattaev committed Jan 24, 2025
1 parent 16858f8 commit 28e1ca9
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
5 changes: 3 additions & 2 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,7 @@ pub struct Validator {
repair_quic_endpoints: Option<[Endpoint; 3]>,
repair_quic_endpoints_runtime: Option<TokioRuntime>,
repair_quic_endpoints_join_handle: Option<repair::quic_endpoint::AsyncTryJoinHandle>,
_thread_manager: ThreadManager,
thread_manager: ThreadManager,
}

impl Validator {
Expand Down Expand Up @@ -1664,7 +1664,7 @@ impl Validator {
repair_quic_endpoints,
repair_quic_endpoints_runtime,
repair_quic_endpoints_join_handle,
_thread_manager: thread_manager,
thread_manager,
})
}

Expand Down Expand Up @@ -1822,6 +1822,7 @@ impl Validator {
self.poh_timing_report_service
.join()
.expect("poh_timing_report_service");
self.thread_manager.destroy();
}
}

Expand Down
35 changes: 32 additions & 3 deletions thread-manager/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
use {
anyhow::Ok,
std::{collections::HashMap, ops::Deref, sync::Arc},
log::{debug, error, warn},
std::{
collections::HashMap,
ops::Deref,
sync::{atomic::Ordering, Arc},
},
};

pub mod config;
Expand Down Expand Up @@ -87,7 +91,7 @@ impl ThreadManager {
Some(n) => runtimes.get(n),
None => match mapping.get("default") {
Some(n) => {
log::warn!("Falling back to default runtime for {name}");
warn!("Falling back to default runtime for {name}");
runtimes.get(n)
}
None => None,
Expand Down Expand Up @@ -164,6 +168,31 @@ impl ThreadManager {
inner: Arc::new(manager),
})
}

pub fn destroy(self) {
let Ok(mut inner) = Arc::try_unwrap(self.inner) else {
error!(
"References to Thread Manager are still active, clean shutdown may not be possible!"
);
return;
};

for (name, runtime) in inner.tokio_runtimes.drain() {
let active_cnt = runtime.counters.active_threads_cnt.load(Ordering::SeqCst);
match active_cnt {
0 => debug!("Shutting down Tokio runtime {name}"),
_ => warn!("Tokio runtime {name} has active workers during shutdown!"),
}
runtime.tokio.shutdown_background();
}
for (name, runtime) in inner.native_thread_runtimes.drain() {
let active_cnt = runtime.running_count.load(Ordering::SeqCst);
match active_cnt {
0 => debug!("Shutting down Native thread pool {name}"),
_ => warn!("Native pool {name} has active threads during shutdown!"),
}
}
}
}

#[cfg(test)]
Expand Down

0 comments on commit 28e1ca9

Please sign in to comment.