Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Revert "quic: increase timeout and keep alive (#4585)" (#4637)" #4684

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions accounts-db/src/append_vec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,8 @@ impl Drop for AppendVec {
if self.remove_file_on_drop.load(Ordering::Acquire) {
// If we're reopening in readonly mode, we don't delete the file. See
// AppendVec::reopen_as_readonly.
let bt = std::backtrace::Backtrace::capture();
info!("Removing appendvec file {:?} {:?}", self.path, bt);
if let Err(_err) = remove_file(&self.path) {
// promote this to panic soon.
// disabled due to many false positive warnings while running tests.
Expand Down
97 changes: 71 additions & 26 deletions local-cluster/src/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,13 @@ impl Default for ClusterConfig {
}
}

struct QuicConnectionCacheConfig {
stake: u64,
total_stake: u64,
client_keypair: Keypair,
staked_nodes: Arc<RwLock<StakedNodes>>,
}

pub struct LocalCluster {
/// Keypair with funding to participate in the network
pub funding_keypair: Keypair,
Expand All @@ -152,6 +159,8 @@ pub struct LocalCluster {
pub validators: HashMap<Pubkey, ClusterValidatorInfo>,
pub genesis_config: GenesisConfig,
pub connection_cache: Arc<ConnectionCache>,
quic_connection_cache_config: Option<QuicConnectionCacheConfig>,
tpu_connection_pool_size: usize,
}

impl LocalCluster {
Expand Down Expand Up @@ -191,21 +200,9 @@ impl LocalCluster {
pub fn new(config: &mut ClusterConfig, socket_addr_space: SocketAddrSpace) -> Self {
assert_eq!(config.validator_configs.len(), config.node_stakes.len());

let connection_cache = if config.tpu_use_quic {
let client_keypair = Keypair::new();
let quic_connection_cache_config = if config.tpu_use_quic {
let client_keypair: Keypair = Keypair::new();
let stake = DEFAULT_NODE_STAKE;

for validator_config in config.validator_configs.iter_mut() {
let mut overrides = HashMap::new();
overrides.insert(client_keypair.pubkey(), stake);
validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides));
}

assert!(
config.tpu_use_quic,
"no support for staked override forwarding without quic"
);

let total_stake = config.node_stakes.iter().sum::<u64>();
let stakes = HashMap::from([
(client_keypair.pubkey(), stake),
Expand All @@ -216,20 +213,26 @@ impl LocalCluster {
HashMap::<Pubkey, u64>::default(), // overrides
)));

Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_local_cluster_quic_staked",
config.tpu_connection_pool_size,
None,
Some((&client_keypair, IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)))),
Some((&staked_nodes, &client_keypair.pubkey())),
))
for validator_config in config.validator_configs.iter_mut() {
let mut overrides = HashMap::new();
overrides.insert(client_keypair.pubkey(), stake);
validator_config.staked_nodes_overrides = Arc::new(RwLock::new(overrides));
}
Some(QuicConnectionCacheConfig {
stake,
total_stake,
client_keypair,
staked_nodes,
})
} else {
Arc::new(ConnectionCache::with_udp(
"connection_cache_local_cluster_udp",
config.tpu_connection_pool_size,
))
None
};

let connection_cache = create_connection_cache(
&quic_connection_cache_config,
config.tpu_connection_pool_size,
);

let mut validator_keys = {
if let Some(ref keys) = config.validator_keys {
assert_eq!(config.validator_configs.len(), keys.len());
Expand Down Expand Up @@ -379,6 +382,8 @@ impl LocalCluster {
validators,
genesis_config,
connection_cache,
quic_connection_cache_config,
tpu_connection_pool_size: config.tpu_connection_pool_size,
};

let node_pubkey_to_vote_key: HashMap<Pubkey, Arc<Keypair>> = keys_in_genesis
Expand Down Expand Up @@ -709,6 +714,10 @@ impl LocalCluster {

while now.elapsed().as_secs() < wait_time as u64 {
if num_confirmed == 0 {
info!(
"zzzzz sending transaction {:?}, {} over {attempts}",
transaction, attempt
);
client.send_transaction_to_upcoming_leaders(transaction)?;
}

Expand All @@ -717,6 +726,10 @@ impl LocalCluster {
pending_confirmations,
) {
num_confirmed = confirmed_blocks;
info!(
"zzzzz confirmed blocks: {} pending: {} for {transaction:?}",
confirmed_blocks, pending_confirmations
);
if confirmed_blocks >= pending_confirmations {
return Ok(transaction.signatures[0]);
}
Expand All @@ -726,9 +739,11 @@ impl LocalCluster {
wait_time = wait_time.max(
MAX_PROCESSING_AGE * pending_confirmations.saturating_sub(num_confirmed),
);
} else {
info!("zzzzz failed to poll_for_signature_confirmation num_confirmed: {} pending: {} for {transaction:?}", num_confirmed, pending_confirmations);
}
}
info!("{attempt} tries failed transfer");
info!("{attempt}/{attempts} tries failed transfer");
let blockhash = client.rpc_client().get_latest_blockhash()?;
transaction.sign(keypairs, blockhash);
}
Expand Down Expand Up @@ -973,6 +988,30 @@ impl LocalCluster {
}
}

fn create_connection_cache(
quic_connection_cache_config: &Option<QuicConnectionCacheConfig>,
tpu_connection_pool_size: usize,
) -> Arc<ConnectionCache> {
let connection_cache = if let Some(config) = quic_connection_cache_config {
Arc::new(ConnectionCache::new_with_client_options(
"connection_cache_local_cluster_quic_staked",
tpu_connection_pool_size,
None,
Some((
&config.client_keypair,
IpAddr::V4(Ipv4Addr::new(0, 0, 0, 0)),
)),
Some((&config.staked_nodes, &config.client_keypair.pubkey())),
))
} else {
Arc::new(ConnectionCache::with_udp(
"connection_cache_local_cluster_udp",
tpu_connection_pool_size,
))
};
connection_cache
}

impl Cluster for LocalCluster {
fn get_node_pubkeys(&self) -> Vec<Pubkey> {
self.validators.keys().cloned().collect()
Expand Down Expand Up @@ -1059,6 +1098,12 @@ impl Cluster for LocalCluster {
socket_addr_space,
);
self.add_node(pubkey, cluster_validator_info);

// reset the connection cache as we are connecting to the new nodes
self.connection_cache = create_connection_cache(
&self.quic_connection_cache_config,
self.tpu_connection_pool_size,
);
}

fn add_node(&mut self, pubkey: &Pubkey, cluster_validator_info: ClusterValidatorInfo) {
Expand Down
6 changes: 6 additions & 0 deletions local-cluster/src/local_cluster_snapshot_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ impl LocalCluster {
if let Some(full_snapshot_archive_info) =
snapshot_utils::get_highest_full_snapshot_archive_info(&full_snapshot_archives_dir)
{
trace!("Got snapshot info: {:?}", full_snapshot_archive_info);
match next_snapshot_type {
NextSnapshotType::FullSnapshot => {
if full_snapshot_archive_info.slot() >= last_slot {
Expand All @@ -115,6 +116,11 @@ impl LocalCluster {
}
}
}
} else {
trace!(
"Could not get snapshot info from {:?}",
full_snapshot_archives_dir.as_ref()
);
}
if let Some(max_wait_duration) = max_wait_duration {
assert!(
Expand Down
4 changes: 4 additions & 0 deletions local-cluster/tests/local_cluster.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1448,13 +1448,17 @@ fn test_snapshots_restart_validity() {
10,
);

trace!("Waiting for for full snapshot");

expected_balances.extend(new_balances);

cluster.wait_for_next_full_snapshot(
full_snapshot_archives_dir,
Some(Duration::from_secs(5 * 60)),
);

trace!("generate_account_paths...");

// Create new account paths since validator exit is not guaranteed to cleanup RPC threads,
// which may delete the old accounts on exit at any point
let (new_account_storage_dirs, new_account_storage_paths) =
Expand Down
8 changes: 6 additions & 2 deletions sdk/quic-definitions/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,12 @@ pub const QUIC_TOTAL_STAKED_CONCURRENT_STREAMS: usize = 100_000;
// forwarded packets from staked nodes.
pub const QUIC_MAX_STAKED_CONCURRENT_STREAMS: usize = 512;

pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(2);
pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(1);
// Connection idle timeout, and keep alive.
// Quic will close the connection after QUIC_MAX_TIMEOUT,
// and send a ping every QUIC_KEEP_ALIVE.
// These shouldn't be too low to avoid unnecessary ping traffic.
pub const QUIC_MAX_TIMEOUT: Duration = Duration::from_secs(60);
pub const QUIC_KEEP_ALIVE: Duration = Duration::from_secs(45);

// Disable Quic send fairness.
// When set to false, streams are still scheduled based on priority,
Expand Down
1 change: 1 addition & 0 deletions streamer/src/nonblocking/quic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,7 @@ async fn run_server(
debug!("accept(): Timed out waiting for connection");
}
}
info!("zzzzz quic server {:?} {}", endpoints[0].local_addr(), name);
}

fn prune_unstaked_connection_table(
Expand Down
1 change: 1 addition & 0 deletions transaction-metrics-tracker/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ edition = { workspace = true }
base64 = { workspace = true }
bincode = { workspace = true }
# Update this borsh dependency to the workspace version once
bs58 = { workspace = true }
lazy_static = { workspace = true }
log = { workspace = true }
rand = { workspace = true }
Expand Down
6 changes: 5 additions & 1 deletion transaction-metrics-tracker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,11 @@ lazy_static! {
pub fn should_track_transaction(signature: &[u8; SIGNATURE_BYTES]) -> bool {
// We do not use the highest signature byte as it is not really random
let match_portion: u16 = u16::from_le_bytes([signature[61], signature[62]]) >> 4;
trace!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK);
info!(
"zzzzz Txn signature: {}",
bs58::encode(signature).into_string()
);
info!("Matching txn: {match_portion:016b} {:016b}", *TXN_MASK);
*TXN_MASK == match_portion
}

Expand Down
Loading