Skip to content

Commit

Permalink
stops refreshing legacy gossip NodeInstance and Version
Browse files Browse the repository at this point in the history
Mainnet nodes are running v2.0.15 and v2.1.5 at a minimum, both of which
are using the new ContactInfo to look up version:
https://github.com/anza-xyz/agave/blob/v2.0.15/gossip/src/cluster_info.rs#L1336-L1338
https://github.com/anza-xyz/agave/blob/v2.1.5/gossip/src/cluster_info.rs#L1316-L1317

and to check for duplicate instance:
https://github.com/anza-xyz/agave/blob/v2.0.15/gossip/src/cluster_info.rs#L2504-L2505
https://github.com/anza-xyz/agave/blob/v2.1.5/gossip/src/cluster_info.rs#L2481-L2482

So we no longer need to refresh and push legacy gossip NodeInstance and
Version.
  • Loading branch information
behzadnouri committed Jan 29, 2025
1 parent 0d68275 commit 085ab7f
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 105 deletions.
70 changes: 19 additions & 51 deletions gossip/src/cluster_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,7 @@ use {
},
contact_info::{self, ContactInfo, ContactInfoQuery, Error as ContactInfoError},
crds::{Crds, Cursor, GossipRoute},
crds_data::{
self, CrdsData, EpochSlotsIndex, LowestSlot, NodeInstance, SnapshotHashes, Version,
Vote,
},
crds_data::{self, CrdsData, EpochSlotsIndex, LowestSlot, SnapshotHashes, Vote},
crds_gossip::CrdsGossip,
crds_gossip_error::CrdsGossipError,
crds_gossip_pull::{
Expand All @@ -48,7 +45,7 @@ use {
},
crossbeam_channel::{Receiver, RecvTimeoutError, Sender},
itertools::Itertools,
rand::{seq::SliceRandom, thread_rng, CryptoRng, Rng},
rand::{seq::SliceRandom, CryptoRng, Rng},
rayon::{prelude::*, ThreadPool, ThreadPoolBuilder},
solana_ledger::shred::Shred,
solana_measure::measure::Measure,
Expand Down Expand Up @@ -157,7 +154,6 @@ pub struct ClusterInfo {
local_message_pending_push_queue: Mutex<Vec<CrdsValue>>,
contact_debug_interval: u64, // milliseconds, 0 = disabled
contact_save_interval: u64, // milliseconds, 0 = disabled
instance: RwLock<NodeInstance>,
contact_info_path: PathBuf,
socket_addr_space: SocketAddrSpace,
}
Expand Down Expand Up @@ -211,7 +207,6 @@ impl ClusterInfo {
socket_addr_space: SocketAddrSpace,
) -> Self {
assert_eq!(contact_info.pubkey(), &keypair.pubkey());
let id = *contact_info.pubkey();
let me = Self {
gossip: CrdsGossip::default(),
keypair: RwLock::new(keypair),
Expand All @@ -228,7 +223,6 @@ impl ClusterInfo {
stats: GossipStats::default(),
local_message_pending_push_queue: Mutex::default(),
contact_debug_interval: DEFAULT_CONTACT_DEBUG_INTERVAL_MILLIS,
instance: RwLock::new(NodeInstance::new(&mut thread_rng(), id, timestamp())),
contact_info_path: PathBuf::default(),
contact_save_interval: 0, // disabled
socket_addr_space,
Expand Down Expand Up @@ -431,18 +425,10 @@ impl ClusterInfo {

pub fn set_keypair(&self, new_keypair: Arc<Keypair>) {
let id = new_keypair.pubkey();
{
let mut instance = self.instance.write().unwrap();
*instance = NodeInstance::new(&mut thread_rng(), id, timestamp());
}
*self.keypair.write().unwrap() = new_keypair;
self.my_contact_info.write().unwrap().hot_swap_pubkey(id);

self.refresh_my_gossip_contact_info();
self.push_message(CrdsValue::new(
CrdsData::Version(Version::new(self.id())),
&self.keypair(),
));
}

pub fn set_tpu(&self, tpu_addr: SocketAddr) -> Result<(), ContactInfoError> {
Expand Down Expand Up @@ -1174,24 +1160,17 @@ impl ClusterInfo {

fn refresh_my_gossip_contact_info(&self) {
let keypair: Arc<Keypair> = self.keypair().clone();
let instance = self.instance.read().unwrap().with_wallclock(timestamp());
let node = {
let mut node = self.my_contact_info.write().unwrap();
node.set_wallclock(timestamp());
node.clone()
};
let entries: Vec<_> = [
CrdsData::ContactInfo(node),
CrdsData::NodeInstance(instance),
]
.into_iter()
.map(|entry| CrdsValue::new(entry, &keypair))
.collect();
let mut gossip_crds = self.gossip.crds.write().unwrap();
for entry in entries {
if let Err(err) = gossip_crds.insert(entry, timestamp(), GossipRoute::LocalMessage) {
error!("Insert self failed: {err:?}");
}
let node = CrdsValue::new(CrdsData::ContactInfo(node), &keypair);
if let Err(err) = {
let mut gossip_crds = self.gossip.crds.write().unwrap();
gossip_crds.insert(node, timestamp(), GossipRoute::LocalMessage)
} {
error!("refresh_my_gossip_contact_info failed: {err:?}");
}
}

Expand Down Expand Up @@ -1528,16 +1507,6 @@ impl ClusterInfo {
let mut last_contact_info_save = timestamp();
let mut entrypoints_processed = false;
let recycler = PacketBatchRecycler::default();
let crds_data = vec![
CrdsData::Version(Version::new(self.id())),
CrdsData::NodeInstance(
self.instance.read().unwrap().with_wallclock(timestamp()),
),
];
for value in crds_data {
let value = CrdsValue::new(value, &self.keypair());
self.push_message(value);
}
let mut generate_pull_requests = true;
while !exit.load(Ordering::Relaxed) {
let start = timestamp();
Expand Down Expand Up @@ -2137,19 +2106,14 @@ impl ClusterInfo {
// Check if there is a duplicate instance of
// this node with more recent timestamp.
let check_duplicate_instance = {
let instance = self.instance.read().unwrap();
let my_contact_info = self.my_contact_info();
move |values: &[CrdsValue]| {
if should_check_duplicate_instance
&& values.iter().any(|value| match value.data() {
CrdsData::ContactInfo(other) => my_contact_info.check_duplicate(other),
CrdsData::NodeInstance(other) => instance.check_duplicate(other),
_ => false,
})
{
return Err(GossipError::DuplicateNodeInstance);
let mut nodes = values.iter().filter_map(CrdsValue::contact_info);
if nodes.any(|other| my_contact_info.check_duplicate(other)) {
Err(GossipError::DuplicateNodeInstance)
} else {
Ok(())
}
Ok(())
}
};
let mut pings = Vec::new();
Expand Down Expand Up @@ -2186,14 +2150,18 @@ impl ClusterInfo {
}
}
Protocol::PullResponse(_, mut data) => {
check_duplicate_instance(&data)?;
if should_check_duplicate_instance {
check_duplicate_instance(&data)?;
}
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
pull_responses.append(&mut data);
}
}
Protocol::PushMessage(from, mut data) => {
check_duplicate_instance(&data)?;
if should_check_duplicate_instance {
check_duplicate_instance(&data)?;
}
data.retain(&mut verify_gossip_addr);
if !data.is_empty() {
push_messages.push((from, data));
Expand Down
62 changes: 8 additions & 54 deletions gossip/src/crds_data.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use {
legacy_contact_info::LegacyContactInfo,
restart_crds_values::{RestartHeaviestFork, RestartLastVotedForkSlots},
},
rand::{CryptoRng, Rng},
rand::Rng,
serde::de::{Deserialize, Deserializer},
solana_sanitize::{Sanitize, SanitizeError},
solana_sdk::{
Expand Down Expand Up @@ -116,7 +116,7 @@ pub(crate) fn new_rand_timestamp<R: Rng>(rng: &mut R) -> u64 {
impl CrdsData {
/// New random CrdsData for tests and benchmarks.
pub(crate) fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> CrdsData {
let kind = rng.gen_range(0..9);
let kind = rng.gen_range(0..8);
// TODO: Implement other kinds of CrdsData here.
// TODO: Assign ranges to each arm proportional to their frequency in
// the mainnet crds table.
Expand All @@ -126,12 +126,11 @@ impl CrdsData {
1 => CrdsData::LowestSlot(0, LowestSlot::new_rand(rng, pubkey)),
2 => CrdsData::LegacySnapshotHashes(LegacySnapshotHashes::new_rand(rng, pubkey)),
3 => CrdsData::AccountsHashes(AccountsHashes::new_rand(rng, pubkey)),
4 => CrdsData::Version(Version::new_rand(rng, pubkey)),
5 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
6 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
4 => CrdsData::Vote(rng.gen_range(0..MAX_VOTES), Vote::new_rand(rng, pubkey)),
5 => CrdsData::RestartLastVotedForkSlots(RestartLastVotedForkSlots::new_rand(
rng, pubkey,
)),
7 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
6 => CrdsData::RestartHeaviestFork(RestartHeaviestFork::new_rand(rng, pubkey)),
_ => CrdsData::EpochSlots(
rng.gen_range(0..MAX_EPOCH_SLOTS),
EpochSlots::new_rand(rng, pubkey),
Expand Down Expand Up @@ -437,31 +436,6 @@ impl Sanitize for Version {
}
}

impl Version {
pub(crate) fn new(from: Pubkey) -> Self {
Self {
from,
wallclock: timestamp(),
version: solana_version::LegacyVersion2::default(),
}
}

/// New random Version for tests and benchmarks.
fn new_rand<R: Rng>(rng: &mut R, pubkey: Option<Pubkey>) -> Self {
Self {
from: pubkey.unwrap_or_else(pubkey::new_rand),
wallclock: new_rand_timestamp(rng),
version: solana_version::LegacyVersion2 {
major: rng.gen(),
minor: rng.gen(),
patch: rng.gen(),
commit: Some(rng.gen()),
feature_set: rng.gen(),
},
}
}
}

#[cfg_attr(feature = "frozen-abi", derive(AbiExample))]
#[derive(Clone, Debug, PartialEq, Eq, Deserialize, Serialize)]
pub(crate) struct NodeInstance {
Expand All @@ -472,9 +446,10 @@ pub(crate) struct NodeInstance {
}

impl NodeInstance {
#[cfg(test)]
pub(crate) fn new<R>(rng: &mut R, from: Pubkey, now: u64) -> Self
where
R: Rng + CryptoRng,
R: Rng + rand::CryptoRng,
{
Self {
from,
Expand All @@ -484,21 +459,12 @@ impl NodeInstance {
}
}

#[cfg(test)]
// Clones the value with an updated wallclock.
pub(crate) fn with_wallclock(&self, wallclock: u64) -> Self {
Self { wallclock, ..*self }
}

// Returns true if the crds-value is a duplicate instance of this node,
// with a more recent timestamp.
// The older instance is considered the duplicate instance. If a staked
// node is restarted it will receive its old instance value from gossip.
// Considering the new instance as the duplicate would prevent the node
// from restarting.
pub(crate) fn check_duplicate(&self, other: &NodeInstance) -> bool {
self.token != other.token && self.timestamp <= other.timestamp && self.from == other.from
}

// Returns None if tokens are the same or other is not a node-instance from
// the same owner. Otherwise returns true if self has more recent timestamp
// than other, and so overrides it.
Expand Down Expand Up @@ -686,8 +652,6 @@ mod test {
timestamp: now + 1,
token: node.token,
};
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
// Older timestamp is not a duplicate.
Expand All @@ -697,8 +661,6 @@ mod test {
timestamp: now - 1,
token: rng.gen(),
};
assert!(!node.check_duplicate(&other));
assert!(other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(true));
assert_eq!(other.overrides(&node), Some(false));
// Updated wallclock is not a duplicate.
Expand All @@ -712,8 +674,6 @@ mod test {
token: node.token,
}
);
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
// Duplicate instance; tied timestamp.
Expand All @@ -724,8 +684,6 @@ mod test {
timestamp: now,
token: rng.gen(),
};
assert!(node.check_duplicate(&other));
assert!(other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(other.token < node.token));
assert_eq!(other.overrides(&node), Some(node.token < other.token));
}
Expand All @@ -737,8 +695,6 @@ mod test {
timestamp: now + 1,
token: rng.gen(),
};
assert!(node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(false));
assert_eq!(other.overrides(&node), Some(true));
}
Expand All @@ -749,8 +705,6 @@ mod test {
timestamp: now + 1,
token: rng.gen(),
};
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
Expand Down

0 comments on commit 085ab7f

Please sign in to comment.