Skip to content

Commit

Permalink
customizes override logic for gossip ContactInfo (#2579)
Browse files Browse the repository at this point in the history
If there are two running instances of the same node, we want the
ContactInfo with more recent start time to be propagated through
gossip regardless of wallclocks.

The commit adds custom override logic for ContactInfo to first compare
by outset timestamp.
  • Loading branch information
behzadnouri authored Aug 15, 2024
1 parent 92acf94 commit a5bde5f
Show file tree
Hide file tree
Showing 3 changed files with 71 additions and 26 deletions.
43 changes: 43 additions & 0 deletions gossip/src/contact_info.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ use {
solana_streamer::socket::SocketAddrSpace,
static_assertions::const_assert_eq,
std::{
cmp::Ordering,
collections::HashSet,
net::{IpAddr, Ipv4Addr, SocketAddr},
time::{SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -443,6 +444,24 @@ impl ContactInfo {
pub(crate) fn check_duplicate(&self, other: &ContactInfo) -> bool {
self.pubkey == other.pubkey && self.outset < other.outset
}

// Returns None if the contact-infos have different pubkey.
// Otherwise returns true if (self.outset, self.wallclock) tuple is larger
// than (other.outset, other.wallclock).
// If the tuples are equal it returns None.
#[inline]
#[must_use]
pub(crate) fn overrides(&self, other: &ContactInfo) -> Option<bool> {
if self.pubkey != other.pubkey {
return None;
}
let other = (other.outset, other.wallclock);
match (self.outset, self.wallclock).cmp(&other) {
Ordering::Less => Some(false),
Ordering::Greater => Some(true),
Ordering::Equal => None,
}
}
}

impl Default for ContactInfo {
Expand Down Expand Up @@ -1038,6 +1057,8 @@ mod tests {
let other = node.clone();
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
// Updated socket address is not a duplicate instance.
{
Expand All @@ -1046,16 +1067,28 @@ mod tests {
while other.set_serve_repair(new_rand_socket(&mut rng)).is_err() {}
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
other.remove_serve_repair();
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
// Updated wallclock is not a duplicate instance.
{
let other = node.clone();
node.set_wallclock(rng.gen());
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(
node.overrides(&other),
Some(other.wallclock < node.wallclock)
);
assert_eq!(
other.overrides(&node),
Some(node.wallclock < other.wallclock)
);
}
// Different pubkey is not a duplicate instance.
{
Expand All @@ -1066,6 +1099,8 @@ mod tests {
);
assert!(!node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), None);
assert_eq!(other.overrides(&node), None);
}
// Same pubkey, more recent outset timestamp is a duplicate instance.
{
Expand All @@ -1077,6 +1112,14 @@ mod tests {
assert!(node.outset < other.outset);
assert!(node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(false));
assert_eq!(other.overrides(&node), Some(true));
node.set_wallclock(other.wallclock);
assert!(node.outset < other.outset);
assert!(node.check_duplicate(&other));
assert!(!other.check_duplicate(&node));
assert_eq!(node.overrides(&other), Some(false));
assert_eq!(other.overrides(&node), Some(true));
}
}
}
52 changes: 27 additions & 25 deletions gossip/src/crds.rs
Original file line number Diff line number Diff line change
Expand Up @@ -196,14 +196,21 @@ impl Default for Crds {
// Both values should have the same key/label.
fn overrides(value: &CrdsValue, other: &VersionedCrdsValue) -> bool {
assert_eq!(value.label(), other.value.label(), "labels mismatch!");
// Node instances are special cased so that if there are two running
// instances of the same node, the more recent start is propagated through
// gossip regardless of wallclocks.
// Contact-infos and node instances are special cased so that if there are
// two running instances of the same node, the more recent start is
// propagated through gossip regardless of wallclocks.
if let CrdsData::NodeInstance(value) = &value.data {
if let Some(out) = value.overrides(&other.value) {
return out;
}
}
if let CrdsData::ContactInfo(value) = &value.data {
if let CrdsData::ContactInfo(other) = &other.value.data {
if let Some(out) = value.overrides(other) {
return out;
}
}
}
match value.wallclock().cmp(&other.value.wallclock()) {
Ordering::Less => false,
Ordering::Greater => true,
Expand Down Expand Up @@ -1334,15 +1341,17 @@ mod tests {
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let wallclock = node.wallclock();
node.set_shred_version(42);
let node = CrdsData::ContactInfo(node);
let node = CrdsValue::new_unsigned(node);
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
{
let node = CrdsData::ContactInfo(node.clone());
let node = CrdsValue::new_unsigned(node);
assert_eq!(
crds.insert(node, timestamp(), GossipRoute::LocalMessage),
Ok(())
);
}
assert_eq!(crds.get_shred_version(&pubkey), Some(42));
// An outdated value should not update shred-version:
let mut node = ContactInfo::new_rand(&mut rng, Some(pubkey));
let mut node = node.clone();
node.set_wallclock(wallclock - 1); // outdated.
node.set_shred_version(8);
let node = CrdsData::ContactInfo(node);
Expand Down Expand Up @@ -1481,20 +1490,17 @@ mod tests {
#[test]
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_hash_order() {
let mut node = ContactInfo::new_localhost(&Pubkey::default(), 0);
let v1 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
CrdsValue::new_unsigned(CrdsData::ContactInfo(node.clone())),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
let v2 = VersionedCrdsValue::new(
{
let mut contact_info = ContactInfo::new_localhost(&Pubkey::default(), 0);
contact_info.set_rpc((Ipv4Addr::LOCALHOST, 1244)).unwrap();
CrdsValue::new_unsigned(CrdsData::ContactInfo(contact_info))
node.set_rpc((Ipv4Addr::LOCALHOST, 1244)).unwrap();
CrdsValue::new_unsigned(CrdsData::ContactInfo(node))
},
Cursor::default(),
1, // local_timestamp
Expand All @@ -1517,20 +1523,16 @@ mod tests {
#[test]
#[allow(clippy::neg_cmp_op_on_partial_ord)]
fn test_wallclock_order() {
let mut node = ContactInfo::new_localhost(&Pubkey::default(), 1);
let v1 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
1,
))),
CrdsValue::new_unsigned(CrdsData::ContactInfo(node.clone())),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
);
node.set_wallclock(0);
let v2 = VersionedCrdsValue::new(
CrdsValue::new_unsigned(CrdsData::ContactInfo(ContactInfo::new_localhost(
&Pubkey::default(),
0,
))),
CrdsValue::new_unsigned(CrdsData::ContactInfo(node)),
Cursor::default(),
1, // local_timestamp
GossipRoute::LocalMessage,
Expand Down
2 changes: 1 addition & 1 deletion gossip/src/crds_gossip_pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,7 @@ pub(crate) mod tests {

let mut dest_crds = Crds::default();
let new_id = solana_sdk::pubkey::new_rand();
let same_key = ContactInfo::new_localhost(&new_id, 0);
let new = ContactInfo::new_localhost(&new_id, 1);
ping_cache.mock_pong(*new.pubkey(), new.gossip().unwrap(), Instant::now());
let new = CrdsValue::new_unsigned(CrdsData::ContactInfo(new));
Expand All @@ -1157,7 +1158,6 @@ pub(crate) mod tests {
let dest_crds = RwLock::new(dest_crds);

// node contains a key from the dest node, but at an older local timestamp
let same_key = ContactInfo::new_localhost(&new_id, 0);
ping_cache.mock_pong(
*same_key.pubkey(),
same_key.gossip().unwrap(),
Expand Down

0 comments on commit a5bde5f

Please sign in to comment.