Skip to content

Commit

Permalink
Clean up block producing unified scheduler setup
Browse files Browse the repository at this point in the history
  • Loading branch information
ryoqun committed Jan 14, 2025
1 parent 2548f9d commit 5a4a928
Show file tree
Hide file tree
Showing 6 changed files with 97 additions and 81 deletions.
32 changes: 17 additions & 15 deletions banking-bench/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ use {
rayon::prelude::*,
solana_client::connection_cache::ConnectionCache,
solana_core::{
banking_stage::{update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage},
banking_stage::{
unified_scheduler::ensure_banking_stage_setup,
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage,
},
banking_trace::{BankingTracer, Channels, BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT},
validator::BlockProductionMethod,
},
Expand Down Expand Up @@ -445,7 +448,13 @@ fn main() {
)))
.unwrap();
let prioritization_fee_cache = Arc::new(PrioritizationFeeCache::new(0u64));
let scheduler_pool = if matches!(
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
let banking_tracer_channels = if matches!(
block_production_method,
BlockProductionMethod::UnifiedScheduler
) {
Expand All @@ -458,13 +467,12 @@ fn main() {
prioritization_fee_cache.clone(),
poh_recorder.read().unwrap().new_recorder(),
);
bank_forks
.write()
.unwrap()
.install_scheduler_pool(pool.clone());
Some(pool)
let channels = banking_tracer.create_channels_for_scheduler_pool(&pool);
ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder);
bank_forks.write().unwrap().install_scheduler_pool(pool);
channels
} else {
None
banking_tracer.create_channels(false)
};
let Channels {
non_vote_sender,
Expand All @@ -473,13 +481,7 @@ fn main() {
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = banking_tracer.create_channels_for_scheduler_pool(scheduler_pool.as_ref());
let cluster_info = {
let keypair = Arc::new(Keypair::new());
let node = Node::new_localhost_with_pubkey(&keypair.pubkey());
ClusterInfo::new(node.info, keypair, SocketAddrSpace::Unspecified)
};
let cluster_info = Arc::new(cluster_info);
} = banking_tracer_channels;
let tpu_disable_quic = matches.is_present("tpu_disable_quic");
let connection_cache = if tpu_disable_quic {
ConnectionCache::with_udp(
Expand Down
18 changes: 13 additions & 5 deletions core/src/banking_simulation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
use {
crate::{
banking_stage::{
unified_scheduler::ensure_banking_stage_setup,
update_bank_forks_and_poh_recorder_for_new_tpu_bank, BankingStage, LikeClusterInfo,
},
banking_trace::{
Expand Down Expand Up @@ -770,14 +771,25 @@ impl BankingSimulator {
BANKING_TRACE_DIR_DEFAULT_BYTE_LIMIT,
);

// Create a partially-dummy ClusterInfo for the banking stage.
let cluster_info = Arc::new(DummyClusterInfo {
id: simulated_leader.into(),
});
let banking_tracer_channels = if let Some(pool) = unified_scheduler_pool {
let channels = retracer.create_channels_for_scheduler_pool(&pool);
ensure_banking_stage_setup(&pool, &bank_forks, &channels, &cluster_info, &poh_recorder);
channels
} else {
retracer.create_channels(false)
};
let Channels {
non_vote_sender,
non_vote_receiver,
tpu_vote_sender,
tpu_vote_receiver,
gossip_vote_sender,
gossip_vote_receiver,
} = retracer.create_channels_for_scheduler_pool(unified_scheduler_pool.as_ref());
} = banking_tracer_channels;

let connection_cache = Arc::new(ConnectionCache::new("connection_cache_sim"));
let (replay_vote_sender, _replay_vote_receiver) = unbounded();
Expand Down Expand Up @@ -812,10 +824,6 @@ impl BankingSimulator {
);

info!("Start banking stage!...");
// Create a partially-dummy ClusterInfo for the banking stage.
let cluster_info = Arc::new(DummyClusterInfo {
id: simulated_leader.into(),
});
let prioritization_fee_cache = &Arc::new(PrioritizationFeeCache::new(0u64));
let banking_stage = BankingStage::new_num_threads(
block_production_method.clone(),
Expand Down
6 changes: 6 additions & 0 deletions core/src/banking_stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ mod packet_receiver;
mod read_write_account_set;
mod scheduler_messages;
mod transaction_scheduler;

// proc_macro_hygiene needs to be stabilzied to use qualifier_attr...
// error[E0658]: non-inline modules in proc macro input are unstable
#[cfg(not(feature = "dev-context-only-utils"))]
pub(crate) mod unified_scheduler;
#[cfg(feature = "dev-context-only-utils")]
pub mod unified_scheduler;

// Fixed thread size seems to be fastest on GCP setup
Expand Down
51 changes: 36 additions & 15 deletions core/src/banking_stage/unified_scheduler.rs
Original file line number Diff line number Diff line change
@@ -1,24 +1,19 @@
#[cfg(feature = "dev-context-only-utils")]
use qualifier_attr::qualifiers;
use {
super::decision_maker::{BufferedPacketsDecision, DecisionMaker},
crate::banking_stage::packet_deserializer::PacketDeserializer,
agave_banking_stage_ingress_types::BankingPacketReceiver,
crate::{
banking_stage::{packet_deserializer::PacketDeserializer, BankingStage, LikeClusterInfo},
banking_trace::Channels,
},
solana_poh::poh_recorder::PohRecorder,
solana_runtime::bank_forks::BankForks,
solana_unified_scheduler_pool::{BankingStageAdapter, BatchConverterCreator},
solana_unified_scheduler_pool::{
BankingStageAdapter, BatchConverterCreator, DefaultSchedulerPool,
},
std::sync::{Arc, RwLock},
};

pub(crate) fn unified_receiver(
non_vote_receiver: BankingPacketReceiver,
tpu_vote_receiver: BankingPacketReceiver,
gossip_vote_receiver: BankingPacketReceiver,
) -> BankingPacketReceiver {
assert!(non_vote_receiver.same_channel(&tpu_vote_receiver));
assert!(non_vote_receiver.same_channel(&gossip_vote_receiver));
drop((tpu_vote_receiver, gossip_vote_receiver));

non_vote_receiver
}

pub(crate) fn batch_converter_creator(
decision_maker: DecisionMaker,
bank_forks: Arc<RwLock<BankForks>>,
Expand Down Expand Up @@ -58,3 +53,29 @@ pub(crate) fn batch_converter_creator(
})
})
}

#[cfg_attr(feature = "dev-context-only-utils", qualifiers(pub))]
pub(crate) fn ensure_banking_stage_setup(
pool: &DefaultSchedulerPool,
bank_forks: &Arc<RwLock<BankForks>>,
channels: &Channels,
cluster_info: &impl LikeClusterInfo,
poh_recorder: &Arc<RwLock<PohRecorder>>,
) {
if !pool.block_production_supported() {
return;
}

let unified_receiver = channels.unified_receiver().clone();
let block_producing_scheduler_handler_threads = BankingStage::num_threads() as usize;
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let banking_stage_monitor = Box::new(decision_maker.clone());
let converter = batch_converter_creator(decision_maker, bank_forks.clone());

pool.register_banking_stage(
unified_receiver,
block_producing_scheduler_handler_threads,
banking_stage_monitor,
converter,
);
}
21 changes: 13 additions & 8 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,16 @@ pub struct Channels {
pub gossip_vote_receiver: BankingPacketReceiver,
}

impl Channels {
pub(crate) fn unified_receiver(&self) -> &BankingPacketReceiver {
assert!(self.non_vote_receiver.same_channel(&self.tpu_vote_receiver));
assert!(self
.non_vote_receiver
.same_channel(&self.gossip_vote_receiver));
&self.non_vote_receiver
}
}

impl BankingTracer {
pub fn new(
maybe_config: Option<(&PathBuf, Arc<AtomicBool>, DirByteLimit)>,
Expand Down Expand Up @@ -264,14 +274,9 @@ impl BankingTracer {
}
}

pub fn create_channels_for_scheduler_pool(
&self,
pool: Option<&Arc<DefaultSchedulerPool>>,
) -> Channels {
self.create_channels(
pool.map(|pool| pool.block_production_supported())
.unwrap_or_default(),
)
pub fn create_channels_for_scheduler_pool(&self, pool: &DefaultSchedulerPool) -> Channels {
let should_unify = pool.block_production_supported();
self.create_channels(should_unify)
}

fn create_channel(&self, label: ChannelLabel) -> (BankingPacketSender, BankingPacketReceiver) {
Expand Down
50 changes: 12 additions & 38 deletions core/src/validator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use {
crate::{
accounts_hash_verifier::AccountsHashVerifier,
admin_rpc_post_init::AdminRpcRequestMetadataPostInit,
banking_stage::unified_scheduler::ensure_banking_stage_setup,
banking_trace::{self, BankingTracer, TraceError},
cluster_info_vote_listener::VoteTracker,
completed_data_sets_service::CompletedDataSetsService,
Expand Down Expand Up @@ -943,31 +944,7 @@ impl Validator {
) {
methods @ (BlockVerificationMethod::UnifiedScheduler, _)
| methods @ (_, BlockProductionMethod::UnifiedScheduler) => {
let banking_tracer_channels = banking_tracer.create_channels(true);
use crate::{
banking_stage::{
decision_maker::DecisionMaker, unified_scheduler, BankingStage,
},
banking_trace::Channels,
};

let Channels {
non_vote_sender: _,
non_vote_receiver,
tpu_vote_sender: _,
tpu_vote_receiver,
gossip_vote_sender: _,
gossip_vote_receiver,
} = &banking_tracer_channels;
let unified_receiver = unified_scheduler::unified_receiver(
non_vote_receiver.clone(),
tpu_vote_receiver.clone(),
gossip_vote_receiver.clone(),
);
let block_producing_scheduler_handler_threads =
BankingStage::num_threads() as usize;

let pool = DefaultSchedulerPool::new(
let scheduler_pool = DefaultSchedulerPool::new(
supported_scheduling_mode(methods),
config.unified_scheduler_handler_threads,
config.runtime_config.log_messages_bytes_limit,
Expand All @@ -976,23 +953,20 @@ impl Validator {
prioritization_fee_cache.clone(),
poh_recorder.read().unwrap().new_recorder(),
);
let decision_maker = DecisionMaker::new(cluster_info.id(), poh_recorder.clone());
let banking_stage_monitor = Box::new(decision_maker.clone());
let converter =
unified_scheduler::batch_converter_creator(decision_maker, bank_forks.clone());
pool.register_banking_stage(
unified_receiver,
block_producing_scheduler_handler_threads,
banking_stage_monitor,
converter,
);

let channels = banking_tracer.create_channels_for_scheduler_pool(&scheduler_pool);
ensure_banking_stage_setup(
&scheduler_pool,
&bank_forks,
&channels,
&cluster_info,
&poh_recorder,
);
bank_forks
.write()
.unwrap()
.install_scheduler_pool(pool.clone());
// this actually won't be used and but return this for type safety
banking_tracer_channels
.install_scheduler_pool(scheduler_pool);
channels
}
_ => {
info!("no scheduler pool is installed for block verification/production...");
Expand Down

0 comments on commit 5a4a928

Please sign in to comment.