diff --git a/crates/subspace-farmer/src/plotter/gpu/gpu_encoders_manager.rs b/crates/subspace-farmer/src/plotter/gpu/gpu_encoders_manager.rs index b131cea021..517a927b2f 100644 --- a/crates/subspace-farmer/src/plotter/gpu/gpu_encoders_manager.rs +++ b/crates/subspace-farmer/src/plotter/gpu/gpu_encoders_manager.rs @@ -96,12 +96,6 @@ where let listener = event.listen(); if let Some(thread_pool_pair) = mutex.lock().pop() { - drop(listener); - // It is possible that we got here because there was the last free pair available - // and in the meantime listener received notification. Just in case that was the - // case, notify one more listener (if there is any) to make sure all available - // thread pools are utilized when there is demand for them. - event.notify(1); break thread_pool_pair; } diff --git a/crates/subspace-farmer/src/plotter/pool.rs b/crates/subspace-farmer/src/plotter/pool.rs index b3ad2fdd7c..d4d6a1b00a 100644 --- a/crates/subspace-farmer/src/plotter/pool.rs +++ b/crates/subspace-farmer/src/plotter/pool.rs @@ -2,8 +2,11 @@ use crate::plotter::{Plotter, SectorPlottingProgress}; use async_trait::async_trait; +use event_listener::Event; use futures::channel::mpsc; +use futures::future; use std::any::type_name_of_val; +use std::pin::pin; use std::time::Duration; use subspace_core_primitives::sectors::SectorIndex; use subspace_core_primitives::PublicKey; @@ -18,6 +21,7 @@ use tracing::{error, trace}; pub struct PoolPlotter { plotters: Vec>, retry_interval: Duration, + notification: Event, } #[async_trait] @@ -66,6 +70,7 @@ impl Plotter for PoolPlotter { ) .await { + self.notification.notify_relaxed(1); return; } } @@ -74,7 +79,11 @@ impl Plotter for PoolPlotter { retry_interval = ?self.retry_interval, "All plotters are busy, will wait and try again later" ); - tokio::time::sleep(self.retry_interval).await; + future::select( + pin!(tokio::time::sleep(self.retry_interval)), + self.notification.listen(), + ) + .await; } } @@ -99,6 +108,7 @@ impl Plotter for PoolPlotter { ) .await { + self.notification.notify_relaxed(1); return true; } } @@ -113,6 +123,7 @@ impl PoolPlotter { Self { plotters, retry_interval, + notification: Event::new(), } } } diff --git a/crates/subspace-farmer/src/thread_pool_manager.rs b/crates/subspace-farmer/src/thread_pool_manager.rs index 2a06866177..0178037d16 100644 --- a/crates/subspace-farmer/src/thread_pool_manager.rs +++ b/crates/subspace-farmer/src/thread_pool_manager.rs @@ -109,12 +109,6 @@ impl PlottingThreadPoolManager { let listener = event.listen(); if let Some(thread_pool_pair) = mutex.lock().thread_pool_pairs.pop() { - drop(listener); - // It is possible that we got here because there was the last free pair available - // and in the meantime listener received notification. Just in case that was the - // case, notify one more listener (if there is any) to make sure all available - // thread pools are utilized when there is demand for them. - event.notify(1); break thread_pool_pair; }