Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

uses SipHasher24 for gossip ping tokens #3974

Merged
merged 3 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 0 additions & 5 deletions core/src/banking_trace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,11 +62,6 @@ pub struct BankingTracer {
active_tracer: Option<ActiveTracer>,
}

#[cfg_attr(
feature = "frozen-abi",
derive(AbiExample),
frozen_abi(digest = "6PCDw6YSEivfbwhbPmE4NAsXb88ZX6hkFnruP8B38nma")
)]
#[derive(Serialize, Deserialize, Debug)]
pub struct TimedTracedEvent(pub std::time::SystemTime, pub TracedEvent);

Expand Down
9 changes: 3 additions & 6 deletions core/src/repair/ancestor_hashes_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ use {
replay_stage::DUPLICATE_THRESHOLD,
shred_fetch_stage::receive_quic_datagrams,
},
bincode::serialize,
bytes::Bytes,
crossbeam_channel::{unbounded, Receiver, RecvTimeoutError, Sender},
dashmap::{mapref::entry::Entry::Occupied, DashMap},
Expand Down Expand Up @@ -454,11 +453,9 @@ impl AncestorHashesService {
return None;
}
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let _ignore = ancestor_socket.send_to(&pong_bytes[..], from_addr);
}
let pong = RepairProtocol::Pong(Pong::new(&ping, keypair));
if let Ok(pong) = bincode::serialize(&pong) {
let _ = ancestor_socket.send_to(&pong, from_addr);
}
None
}
Expand Down
45 changes: 21 additions & 24 deletions core/src/repair/serve_repair.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use {
solana_gossip::{
cluster_info::{ClusterInfo, ClusterInfoError},
contact_info::{ContactInfo, Protocol},
ping_pong::{self, PingCache, Pong},
ping_pong::{self, Pong},
weighted_shuffle::WeightedShuffle,
},
solana_ledger::{
Expand Down Expand Up @@ -81,7 +81,7 @@ pub const MAX_ANCESTOR_BYTES_IN_PACKET: usize =
pub const MAX_ANCESTOR_RESPONSES: usize =
MAX_ANCESTOR_BYTES_IN_PACKET / std::mem::size_of::<(Slot, Hash)>();
/// Number of bytes in the randomly generated token sent with ping messages.
pub(crate) const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES;
const REPAIR_PING_TOKEN_SIZE: usize = HASH_BYTES;
pub const REPAIR_PING_CACHE_CAPACITY: usize = 65536;
pub const REPAIR_PING_CACHE_TTL: Duration = Duration::from_secs(1280);
const REPAIR_PING_CACHE_RATE_LIMIT_DELAY: Duration = Duration::from_secs(2);
Expand Down Expand Up @@ -141,11 +141,6 @@ impl AncestorHashesRepairType {
}
}

#[cfg_attr(
feature = "frozen-abi",
derive(AbiEnumVisitor, AbiExample),
frozen_abi(digest = "GPS6e6pgUdbXLwXN6XHTqrUVMwAL2YKLPDawgMi5hHzi")
)]
#[derive(Debug, Deserialize, Serialize)]
pub enum AncestorHashesResponse {
Hashes(Vec<(Slot, Hash)>),
Expand Down Expand Up @@ -219,7 +214,8 @@ impl RepairRequestHeader {
}
}

pub(crate) type Ping = ping_pong::Ping<[u8; REPAIR_PING_TOKEN_SIZE]>;
type Ping = ping_pong::Ping<REPAIR_PING_TOKEN_SIZE>;
type PingCache = ping_pong::PingCache<REPAIR_PING_TOKEN_SIZE>;
Comment on lines +217 to +218

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is there any reason why REPAIR_PING_TOKEN_SIZE may differ from GOSSIP_PING_TOKEN_SIZE? And would it make sense to just turn it into one variable called PING_TOKEN_SIZE and make it public in gossip/src/protocol.rs?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a legacy thing. I have commented in the ping_pong.rs file that the new code should only use [u8; 8]. Old code was unnecessarily generic to allow different protocols specify different ping tokens.
I guess that could have been useful to distinguish types but with the new siphasher it no longer is applicable.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

debug_assert!(N >= std::mem::size_of::<u64>());

is this the only safeguard if someone accidentally defines N < 8? is it worth making sure someone can't define N < 8 for non debug builds?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I have commented this out where struct Ping is defined.

is it worth making sure someone can't define N < 8 for non debug builds?

how?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could we just change that line to:

const_assert!(N >= std::mem::size_of::<u64>());

This would fail at compile time and wouldn't add runtime overhead.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried that before, but const_assert wouldn't work because N is const generic parameter, and apparently static_assertions cannot handle it.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ahh ok not ideal but ya not sure exactly how this could be enforced at compile time. and runtime check probably doesn't make sense. happy to stick with how you've documented the requirements for N


/// Window protocol messages
#[cfg_attr(
Expand Down Expand Up @@ -270,11 +266,6 @@ fn discard_malformed_repair_requests(
requests.len()
}

#[cfg_attr(
feature = "frozen-abi",
derive(AbiEnumVisitor, AbiExample),
frozen_abi(digest = "9A6ae44qpdT7PaxiDZbybMM2mewnSnPs3C4CxhpbbYuV")
)]
#[derive(Debug, Deserialize, Serialize)]
pub(crate) enum RepairResponse {
Ping(Ping),
Expand Down Expand Up @@ -824,6 +815,8 @@ impl ServeRepair {
assert!(REPAIR_PING_CACHE_RATE_LIMIT_DELAY > Duration::from_millis(REPAIR_MS));

let mut ping_cache = PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
REPAIR_PING_CACHE_TTL,
REPAIR_PING_CACHE_RATE_LIMIT_DELAY,
REPAIR_PING_CACHE_CAPACITY,
Expand Down Expand Up @@ -924,10 +917,16 @@ impl ServeRepair {
identity_keypair: &Keypair,
) -> (bool, Option<Packet>) {
let mut rng = rand::thread_rng();
let mut pingf = move || Ping::new_rand(&mut rng, identity_keypair).ok();
let (check, ping) = request
.sender()
.map(|&sender| ping_cache.check(Instant::now(), (sender, *from_addr), &mut pingf))
.map(|&sender| {
ping_cache.check(
&mut rng,
identity_keypair,
Instant::now(),
(sender, *from_addr),
)
})
.unwrap_or_default();
let ping_pkt = if let Some(ping) = ping {
match request {
Expand Down Expand Up @@ -1232,12 +1231,10 @@ impl ServeRepair {
}
packet.meta_mut().set_discard(true);
stats.ping_count += 1;
if let Ok(pong) = Pong::new(&ping, keypair) {
let pong = RepairProtocol::Pong(pong);
if let Ok(pong_bytes) = serialize(&pong) {
let from_addr = packet.meta().socket_addr();
pending_pongs.push((pong_bytes, from_addr));
}
let pong = RepairProtocol::Pong(Pong::new(&ping, keypair));
if let Ok(pong) = bincode::serialize(&pong) {
let from_addr = packet.meta().socket_addr();
pending_pongs.push((pong, from_addr));
}
}
}
Expand Down Expand Up @@ -1462,7 +1459,7 @@ mod tests {
fn test_serialized_ping_size() {
let mut rng = rand::thread_rng();
let keypair = Keypair::new();
let ping = Ping::new_rand(&mut rng, &keypair).unwrap();
let ping = Ping::new(rng.gen(), &keypair);
let ping = RepairResponse::Ping(ping);
let pkt = Packet::from_data(None, ping).unwrap();
assert_eq!(pkt.meta().size, REPAIR_RESPONSE_SERIALIZED_PING_BYTES);
Expand Down Expand Up @@ -1516,8 +1513,8 @@ mod tests {
fn test_check_well_formed_repair_request() {
let mut rng = rand::thread_rng();
let keypair = Keypair::new();
let ping = ping_pong::Ping::<[u8; 32]>::new_rand(&mut rng, &keypair).unwrap();
let pong = Pong::new(&ping, &keypair).unwrap();
let ping = Ping::new(rng.gen(), &keypair);
let pong = Pong::new(&ping, &keypair);
let request = RepairProtocol::Pong(pong);
let mut pkt = Packet::from_data(None, request).unwrap();
let mut batch = vec![make_remote_request(&pkt)];
Expand Down
2 changes: 2 additions & 0 deletions gossip/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@ rand = { workspace = true }
rand_chacha = { workspace = true }
rayon = { workspace = true }
serde = { workspace = true }
serde-big-array = { workspace = true }
serde_bytes = { workspace = true }
serde_derive = { workspace = true }
siphasher = { workspace = true }
solana-bloom = { workspace = true }
solana-clap-utils = { workspace = true }
solana-client = { workspace = true }
Expand Down
42 changes: 18 additions & 24 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ use {
epoch_slots::EpochSlots,
gossip_error::GossipError,
legacy_contact_info::LegacyContactInfo,
ping_pong::{PingCache, Pong},
ping_pong::Pong,
protocol::{
split_gossip_messages, Ping, Protocol, PruneData, DUPLICATE_SHRED_MAX_PAYLOAD_SIZE,
MAX_INCREMENTAL_SNAPSHOT_HASHES, MAX_PRUNE_DATA_NODES,
PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
split_gossip_messages, Ping, PingCache, Protocol, PruneData,
DUPLICATE_SHRED_MAX_PAYLOAD_SIZE, MAX_INCREMENTAL_SNAPSHOT_HASHES,
MAX_PRUNE_DATA_NODES, PULL_RESPONSE_MIN_SERIALIZED_SIZE, PUSH_MESSAGE_MAX_PAYLOAD_SIZE,
},
restart_crds_values::{
RestartHeaviestFork, RestartLastVotedForkSlots, RestartLastVotedForkSlotsError,
Expand Down Expand Up @@ -217,6 +217,8 @@ impl ClusterInfo {
outbound_budget: DataBudget::default(),
my_contact_info: RwLock::new(contact_info),
ping_cache: Mutex::new(PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
GOSSIP_PING_CACHE_TTL,
GOSSIP_PING_CACHE_RATE_LIMIT_DELAY,
GOSSIP_PING_CACHE_CAPACITY,
Expand Down Expand Up @@ -1729,24 +1731,19 @@ impl ClusterInfo {
// Returns a predicate checking if the pull request is from a valid
// address, and if the address have responded to a ping request. Also
// appends ping packets for the addresses which need to be (re)verified.
//
// allow lint false positive trait bound requirement (`CryptoRng` only
// implemented on `&'a mut T`
#[allow(clippy::needless_pass_by_ref_mut)]
fn check_pull_request<'a, R>(
&'a self,
now: Instant,
mut rng: &'a mut R,
rng: &'a mut R,
packet_batch: &'a mut PacketBatch,
) -> impl FnMut(&PullData) -> bool + 'a
where
R: Rng + CryptoRng,
{
let mut cache = HashMap::<(Pubkey, SocketAddr), bool>::new();
let mut pingf = move || Ping::new_rand(&mut rng, &self.keypair()).ok();
let mut ping_cache = self.ping_cache.lock().unwrap();
let mut hard_check = move |node| {
let (check, ping) = ping_cache.check(now, node, &mut pingf);
let (check, ping) = ping_cache.check(rng, &self.keypair(), now, node);
if let Some(ping) = ping {
let ping = Protocol::PingMessage(ping);
match Packet::from_data(Some(&node.1), ping) {
Expand Down Expand Up @@ -1964,10 +1961,9 @@ impl ClusterInfo {
let keypair = self.keypair();
let pongs_and_dests: Vec<_> = pings
.into_iter()
.filter_map(|(addr, ping)| {
let pong = Pong::new(&ping, &keypair).ok()?;
let pong = Protocol::PongMessage(pong);
Some((addr, pong))
.map(|(addr, ping)| {
let pong = Pong::new(&ping, &keypair);
(addr, Protocol::PongMessage(pong))
})
.collect();
if pongs_and_dests.is_empty() {
Expand Down Expand Up @@ -3110,9 +3106,8 @@ fn verify_gossip_addr<R: Rng + CryptoRng>(
};
let (out, ping) = {
let node = (*pubkey, addr);
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let mut ping_cache = ping_cache.lock().unwrap();
ping_cache.check(Instant::now(), node, &mut pingf)
ping_cache.check(rng, keypair, Instant::now(), node)
};
if let Some(ping) = ping {
pings.push((addr, Protocol::PingMessage(ping)));
Expand Down Expand Up @@ -3209,12 +3204,11 @@ mod tests {
.collect();
let pings: Vec<_> = {
let mut ping_cache = cluster_info.ping_cache.lock().unwrap();
let mut pingf = || Ping::new_rand(&mut rng, &this_node).ok();
remote_nodes
.iter()
.map(|(keypair, socket)| {
let node = (keypair.pubkey(), *socket);
let (check, ping) = ping_cache.check(now, node, &mut pingf);
let (check, ping) = ping_cache.check(&mut rng, &this_node, now, node);
// Assert that initially remote nodes will not pass the
// ping/pong check.
assert!(!check);
Expand All @@ -3225,7 +3219,7 @@ mod tests {
let pongs: Vec<(SocketAddr, Pong)> = pings
.iter()
.zip(&remote_nodes)
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair).unwrap()))
.map(|(ping, (keypair, socket))| (*socket, Pong::new(ping, keypair)))
.collect();
let now = now + Duration::from_millis(1);
cluster_info.handle_batch_pong_messages(pongs, now);
Expand All @@ -3234,7 +3228,7 @@ mod tests {
let mut ping_cache = cluster_info.ping_cache.lock().unwrap();
for (keypair, socket) in &remote_nodes {
let node = (keypair.pubkey(), *socket);
let (check, _) = ping_cache.check(now, node, || -> Option<Ping> { None });
let (check, _) = ping_cache.check(&mut rng, &this_node, now, node);
assert!(check);
}
}
Expand All @@ -3243,7 +3237,7 @@ mod tests {
let mut ping_cache = cluster_info.ping_cache.lock().unwrap();
let (keypair, socket) = new_rand_remote_node(&mut rng);
let node = (keypair.pubkey(), socket);
let (check, _) = ping_cache.check(now, node, || -> Option<Ping> { None });
let (check, _) = ping_cache.check(&mut rng, &this_node, now, node);
assert!(!check);
}
}
Expand All @@ -3263,11 +3257,11 @@ mod tests {
.collect();
let pings: Vec<_> = remote_nodes
.iter()
.map(|(keypair, _)| Ping::new_rand(&mut rng, keypair).unwrap())
.map(|(keypair, _)| Ping::new(rng.gen(), keypair))
.collect();
let pongs: Vec<_> = pings
.iter()
.map(|ping| Pong::new(ping, &this_node).unwrap())
.map(|ping| Pong::new(ping, &this_node))
.collect();
let recycler = PacketBatchRecycler::default();
let packets = cluster_info
Expand Down
8 changes: 4 additions & 4 deletions gossip/src/crds_gossip.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,7 @@ use {
crds_gossip_push::CrdsGossipPush,
crds_value::CrdsValue,
duplicate_shred::{self, DuplicateShredIndex, MAX_DUPLICATE_SHREDS},
ping_pong::PingCache,
protocol::Ping,
protocol::{Ping, PingCache},
},
itertools::Itertools,
rand::{CryptoRng, Rng},
Expand Down Expand Up @@ -386,7 +385,6 @@ pub(crate) fn maybe_ping_gossip_addresses<R: Rng + CryptoRng>(
pings: &mut Vec<(SocketAddr, Ping)>,
) -> Vec<ContactInfo> {
let mut ping_cache = ping_cache.lock().unwrap();
let mut pingf = move || Ping::new_rand(rng, keypair).ok();
let now = Instant::now();
nodes
.into_iter()
Expand All @@ -396,7 +394,7 @@ pub(crate) fn maybe_ping_gossip_addresses<R: Rng + CryptoRng>(
};
let (check, ping) = {
let node = (*node.pubkey(), node_gossip);
ping_cache.check(now, node, &mut pingf)
ping_cache.check(rng, keypair, now, node)
};
if let Some(ping) = ping {
pings.push((node_gossip, ping));
Expand Down Expand Up @@ -431,6 +429,8 @@ mod test {
)
.unwrap();
let ping_cache = PingCache::new(
&mut rand::thread_rng(),
Instant::now(),
Duration::from_secs(20 * 60), // ttl
Duration::from_secs(20 * 60) / 64, // rate_limit_delay
128, // capacity
Expand Down
Loading
Loading