From 87db74521989202f4edde006aaba402a763ec055 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 19 Feb 2024 12:25:32 +0900 Subject: [PATCH] Use the heartbeat in digest for cluster membership. Right now we send the heartbeat of all of the known nodes in the cluster during digests. We can use that information for cluster membership. This will allow nodes to detect new member, join a cluster, and detect failure considerably faster. This PR also removes heartbeat from delta message. Instead it joins the last_gc_version, to fix a bug introduced in #122. Bugfix: Avoid updating the time of death of a node when it is redetected as faulty. We remove outselves from the list of seeds. Before nodes were regularly gossipping with themselves. Code improvement: In the failure detector when we have few samples. We now use additive smoothing. --- chitchat/src/delta.rs | 81 +++++----- chitchat/src/failure_detector.rs | 246 +++++++++++++++++++------------ chitchat/src/lib.rs | 70 +++++---- chitchat/src/message.rs | 4 +- chitchat/src/server.rs | 22 ++- chitchat/src/state.rs | 113 ++++++++------ chitchat/src/types.rs | 2 +- chitchat/tests/cluster_test.rs | 91 +++++++++++- 8 files changed, 394 insertions(+), 235 deletions(-) diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 90b2d6a..79f9ccd 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -3,7 +3,7 @@ use std::collections::HashSet; use tokio::time::Instant; use crate::serialize::*; -use crate::{ChitchatId, Heartbeat, VersionedValue}; +use crate::{ChitchatId, Version, VersionedValue}; /// A delta is the message we send to another node to update it. /// @@ -32,7 +32,7 @@ impl Delta { let node_deltas = self.node_deltas.iter().flat_map(|node_delta| { std::iter::once(DeltaOpRef::Node { chitchat_id: &node_delta.chitchat_id, - heartbeat: node_delta.heartbeat, + last_gc_version: node_delta.last_gc_version, }) .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { DeltaOpRef::KeyValue { @@ -49,7 +49,7 @@ enum DeltaOp { NodeToReset(ChitchatId), Node { chitchat_id: ChitchatId, - heartbeat: Heartbeat, + last_gc_version: Version, }, KeyValue { key: String, @@ -61,7 +61,7 @@ enum DeltaOpRef<'a> { NodeToReset(&'a ChitchatId), Node { chitchat_id: &'a ChitchatId, - heartbeat: Heartbeat, + last_gc_version: Version, }, KeyValue { key: &'a str, @@ -108,10 +108,10 @@ impl Deserializable for DeltaOp { } DeltaOpTag::Node => { let chitchat_id = ChitchatId::deserialize(buf)?; - let heartbeat = Heartbeat::deserialize(buf)?; + let last_gc_version = Version::deserialize(buf)?; Ok(DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, }) } DeltaOpTag::KeyValue => { @@ -139,10 +139,10 @@ impl DeltaOp { match self { DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, } => DeltaOpRef::Node { chitchat_id, - heartbeat: *heartbeat, + last_gc_version: *last_gc_version, }, DeltaOp::KeyValue { key, @@ -171,11 +171,11 @@ impl<'a> Serializable for DeltaOpRef<'a> { match self { Self::Node { chitchat_id, - heartbeat, + last_gc_version, } => { buf.push(DeltaOpTag::Node.into()); chitchat_id.serialize(buf); - heartbeat.serialize(buf); + last_gc_version.serialize(buf); } Self::KeyValue { key, @@ -198,8 +198,8 @@ impl<'a> Serializable for DeltaOpRef<'a> { 1 + match self { Self::Node { chitchat_id, - heartbeat, - } => chitchat_id.serialized_len() + heartbeat.serialized_len(), + last_gc_version, + } => chitchat_id.serialized_len() + last_gc_version.serialized_len(), Self::KeyValue { key, versioned_value, @@ -252,14 +252,14 @@ impl Delta { .sum() } - pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) { + pub(crate) fn add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) { assert!(!self .node_deltas .iter() .any(|node_delta| { node_delta.chitchat_id == chitchat_id })); self.node_deltas.push(NodeDelta { chitchat_id, - heartbeat, + last_gc_version, key_values: Vec::new(), }); } @@ -306,7 +306,7 @@ impl Delta { #[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { pub chitchat_id: ChitchatId, - pub heartbeat: Heartbeat, + pub last_gc_version: Version, pub key_values: Vec<(String, VersionedValue)>, } @@ -335,14 +335,14 @@ impl DeltaBuilder { match op { DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, } => { self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); self.existing_nodes.insert(chitchat_id.clone()); self.current_node_delta = Some(NodeDelta { chitchat_id, - heartbeat, + last_gc_version, key_values: Vec::new(), }); } @@ -441,10 +441,10 @@ impl DeltaSerializer { } /// Returns false if the node could not be added because the payload would exceed the mtu. - pub fn try_add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) -> bool { + pub fn try_add_node(&mut self, chitchat_id: ChitchatId, last_gc_version: Version) -> bool { let new_node_op = DeltaOp::Node { chitchat_id, - heartbeat, + last_gc_version, }; self.try_add_op(new_node_op) } @@ -471,9 +471,8 @@ mod tests { // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes: 2 bytes (key length) + 5 bytes (key) + 7 bytes (values) + 8 bytes (version) + // 1 bytes (empty tombstone). @@ -497,9 +496,8 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); // +37 bytes - assert!(delta_writer.try_add_node(node2, heartbeat)); + assert!(delta_writer.try_add_node(node2, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -529,9 +527,8 @@ mod tests { // ChitchatId takes 27 bytes = 15 bytes + 2 bytes for node length + "node-10001".len(). let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); - // +37 bytes = 8 bytes (heartbeat) + 27 bytes (node) + 2bytes (block length) - assert!(delta_writer.try_add_node(node1, heartbeat)); + // +37 bytes = 8 bytes (last gc version) + 27 bytes (node) + 2bytes (block length) + assert!(delta_writer.try_add_node(node1, 0)); // +24 bytes (kv + op tag) assert!(delta_writer.try_add_kv( @@ -554,9 +551,8 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); - // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (node). - assert!(delta_writer.try_add_node(node2, heartbeat)); + // +37 bytes = 8 bytes (last gc version) + 2 bytes (empty node delta) + 27 bytes (node). + assert!(delta_writer.try_add_node(node2, 0)); test_aux_delta_writer(delta_writer, 80); } @@ -576,11 +572,10 @@ mod tests { assert!(delta_writer.try_add_node_to_reset(ChitchatId::for_local_test(10_000))); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); - // +8 bytes (heartbeat) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic new - // block) = 71 - assert!(delta_writer.try_add_node(node1, heartbeat)); + // +8 bytes (last gc version) + 27 bytes (ChitchatId) + (1 op tag) + 3 bytes (pessimistic + // new block) = 71 + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes (kv) + 1 (op tag) // = 95 @@ -604,10 +599,9 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); - // +8 bytes (heartbeat) + 27 bytes (ChitchatId) + 1 byte (op tag) + // +8 bytes (last gc version) + 27 bytes (ChitchatId) + 1 byte (op tag) // = 155 - assert!(delta_writer.try_add_node(node2, heartbeat)); + assert!(delta_writer.try_add_node(node2, 0u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 85); } @@ -618,9 +612,8 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(100); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -642,9 +635,8 @@ mod tests { )); let node2 = ChitchatId::for_local_test(10_002); - let heartbeat = Heartbeat(0); // +37 bytes = 8 bytes (heartbeat) + 2 bytes (empty node delta) + 27 bytes (ChitchatId). - assert!(!delta_writer.try_add_node(node2, heartbeat)); + assert!(!delta_writer.try_add_node(node2, 0u64)); // The block got compressed. test_aux_delta_writer(delta_writer, 72); @@ -656,9 +648,8 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(100); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // +37 bytes. - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes. assert!(delta_writer.try_add_kv( @@ -692,11 +683,10 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(100); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); // + 3 bytes (block tag) + 35 bytes (node) + 1 byte (op tag) // = 40 - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); // +23 bytes (kv) + 1 (op tag) + 3 bytes (pessimistic block tag) // = 67 @@ -728,8 +718,7 @@ mod tests { let mut delta_writer = DeltaSerializer::with_mtu(62); let node1 = ChitchatId::for_local_test(10_001); - let heartbeat = Heartbeat(0); - assert!(delta_writer.try_add_node(node1, heartbeat)); + assert!(delta_writer.try_add_node(node1, 0u64)); assert!(delta_writer.try_add_kv( "key11", diff --git a/chitchat/src/failure_detector.rs b/chitchat/src/failure_detector.rs index d8b8038..2415642 100644 --- a/chitchat/src/failure_detector.rs +++ b/chitchat/src/failure_detector.rs @@ -1,4 +1,5 @@ use std::collections::{HashMap, HashSet}; +use std::num::NonZeroUsize; use std::time::Duration; use serde::{Deserialize, Serialize}; @@ -32,21 +33,6 @@ impl FailureDetector { /// Reports node heartbeat. pub fn report_heartbeat(&mut self, chitchat_id: &ChitchatId) { debug!(node_id=%chitchat_id.node_id, "reporting node heartbeat."); - let heartbeat_window = self - .node_samples - .entry(chitchat_id.clone()) - .or_insert_with(|| { - SamplingWindow::new( - self.config.sampling_window_size, - self.config.max_interval, - self.config.initial_interval, - ) - }); - heartbeat_window.report_heartbeat(); - } - - pub fn report_unknown(&mut self, chitchat_id: &ChitchatId) { - debug!(node_id=%chitchat_id.node_id, "reporting unknown node heartbeat."); self.node_samples .entry(chitchat_id.clone()) .or_insert_with(|| { @@ -55,23 +41,30 @@ impl FailureDetector { self.config.max_interval, self.config.initial_interval, ) - }); + }) + .report_heartbeat(); } /// Marks the node as dead or alive based on the current phi value. pub fn update_node_liveness(&mut self, chitchat_id: &ChitchatId) { - if let Some(phi) = self.phi(chitchat_id) { - debug!(node_id=%chitchat_id.node_id, phi=phi, "updating node liveness"); - if phi > self.config.phi_threshold { - self.live_nodes.remove(chitchat_id); + let phi_opt = self.phi(chitchat_id); + let is_alive = self + .phi(chitchat_id) + .map(|phi| phi <= self.config.phi_threshold) + .unwrap_or(false); + debug!(node_id=%chitchat_id.node_id, phi=?phi_opt, is_alive=is_alive, "computing node liveness"); + if is_alive { + self.live_nodes.insert(chitchat_id.clone()); + self.dead_nodes.remove(chitchat_id); + } else { + self.live_nodes.remove(chitchat_id); + if !self.dead_nodes.contains_key(chitchat_id) { self.dead_nodes.insert(chitchat_id.clone(), Instant::now()); - // Remove current sampling window so that when the node - // comes back online, we start with a fresh sampling window. - self.node_samples.remove(chitchat_id); - } else { - self.live_nodes.insert(chitchat_id.clone()); - self.dead_nodes.remove(chitchat_id); } + // Remove current sampling window so that when the node + // comes back online, we start with a fresh sampling window. + // TODO is this the right idea? + // self.node_samples.remove(chitchat_id); } } @@ -118,10 +111,10 @@ impl FailureDetector { } /// Returns the current phi value of a node. + /// + /// If we have received less than 2 heartbeat, `phi()` returns `None`. fn phi(&mut self, chitchat_id: &ChitchatId) -> Option { - self.node_samples - .get(chitchat_id) - .map(|sampling_window| sampling_window.phi()) + self.node_samples.get(chitchat_id)?.phi() } } @@ -170,6 +163,18 @@ impl Default for FailureDetectorConfig { } } +#[derive(Debug)] +struct AdditiveSmoothing { + prior_mean: f64, + prior_weight: f64, +} + +impl AdditiveSmoothing { + fn compute_mean(&self, len: NonZeroUsize, sum: f64) -> f64 { + (sum + self.prior_weight * self.prior_mean) / (len.get() as f64 + self.prior_weight) + } +} + /// A fixed-sized window that keeps track of the most recent heartbeat arrival intervals. #[derive(Debug)] struct SamplingWindow { @@ -179,44 +184,55 @@ struct SamplingWindow { last_heartbeat: Option, /// Heartbeat intervals greater than this value are ignored. max_interval: Duration, - /// The initial interval on startup. - initial_interval: Duration, + /// We may not have many intervals in the beginning. + /// For this reason we use additive smoothing to make sure we are + /// lenient on the first few intervals, and we don't have nodes flapping from + /// life to death. + additive_smoothing: AdditiveSmoothing, } impl SamplingWindow { // Construct a new instance. - pub fn new(window_size: usize, max_interval: Duration, initial_interval: Duration) -> Self { - Self { - intervals: BoundedArrayStats::new(window_size), + pub fn new(window_size: usize, max_interval: Duration, prior_interval: Duration) -> Self { + let additive_smoothing = AdditiveSmoothing { + prior_mean: prior_interval.as_secs_f64(), + prior_weight: 5.0f64, + }; + SamplingWindow { + intervals: BoundedArrayStats::with_capacity(window_size), last_heartbeat: None, max_interval, - initial_interval, + additive_smoothing, } } /// Reports a heartbeat. pub fn report_heartbeat(&mut self) { - if let Some(last_value) = &self.last_heartbeat { - let interval = last_value.elapsed(); + let now = Instant::now(); + if let Some(last_value) = self.last_heartbeat { + let interval = now.duration_since(last_value); if interval <= self.max_interval { self.intervals.append(interval.as_secs_f64()); } } else { - self.intervals.append(self.initial_interval.as_secs_f64()); - }; - self.last_heartbeat = Some(Instant::now()); + // This is our first heartbeat. + // No way to compute an interval. + // This is fine. + } + self.last_heartbeat = Some(now); } /// Computes the sampling window's phi value. - pub fn phi(&self) -> f64 { - if let Some(last_heartbeat) = self.last_heartbeat { - assert!(self.intervals.mean() > 0.0); - let elapsed_time = last_heartbeat.elapsed().as_secs_f64(); - elapsed_time / self.intervals.mean() - } else { - // if we phi is called before we have a sample, we assume the node isn't really alive. - f64::INFINITY - } + /// Returns `None` if have not received two heartbeat yet. + pub fn phi(&self) -> Option { + // We avoid computing phi if we have only received one heartbeat. + // It could be data from an old dead node after all. + let len_non_zero = NonZeroUsize::new(self.intervals.len())?; + let sum = self.intervals.sum(); + let last_heartbeat = self.last_heartbeat?; + let interval_mean = self.additive_smoothing.compute_mean(len_non_zero, sum); + let elapsed_time = last_heartbeat.elapsed().as_secs_f64(); + Some(elapsed_time / interval_mean) } } @@ -224,57 +240,48 @@ impl SamplingWindow { #[derive(Debug)] struct BoundedArrayStats { /// The values. - data: Vec, - /// Number of accumulated values. - size: usize, + values: Box<[f64]>, /// Is the values array filled? is_filled: bool, - /// Position of the index within the values array. + /// Position of the next value to be written in the values array. index: usize, /// The accumulated sum of values. sum: f64, - /// The accumulated mean of values. - mean: f64, } impl BoundedArrayStats { - pub fn new(size: usize) -> Self { + pub fn with_capacity(capacity: usize) -> Self { Self { - data: vec![0.0; size], - size, + values: vec![0.0; capacity].into_boxed_slice(), is_filled: false, index: 0, sum: 0.0, - mean: 0.0, } } /// Returns the mean. - pub fn mean(&self) -> f64 { - self.mean + pub fn sum(&self) -> f64 { + self.sum } /// Appends a new value and updates the statistics. pub fn append(&mut self, interval: f64) { - if self.index == self.size { - self.is_filled = true; - self.index = 0; - } - if self.is_filled { - self.sum -= self.data[self.index]; + self.sum -= self.values[self.index]; } + self.values[self.index] = interval; self.sum += interval; - - self.data[self.index] = interval; - self.index += 1; - - self.mean = self.sum / self.len() as f64; + if self.index == self.values.len() - 1 { + self.is_filled = true; + self.index = 0; + } else { + self.index += 1; + } } fn len(&self) -> usize { if self.is_filled { - return self.size; + return self.values.len(); } self.index } @@ -282,6 +289,7 @@ impl BoundedArrayStats { #[cfg(test)] mod tests { + use std::collections::VecDeque; use std::time::Duration; use rand::prelude::*; @@ -290,6 +298,17 @@ mod tests { use crate::failure_detector::{FailureDetector, FailureDetectorConfig}; use crate::ChitchatId; + #[test] + fn test_failure_detector_does_not_see_a_node_as_alive_with_a_single_heartbeat() { + let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + let chitchat_id = ChitchatId::for_local_test(10_001); + failure_detector.report_heartbeat(&chitchat_id); + failure_detector.update_node_liveness(&chitchat_id); + let dead_nodes: Vec<&ChitchatId> = failure_detector.dead_nodes().collect(); + assert_eq!(dead_nodes.len(), 1); + assert!(failure_detector.live_nodes().next().is_none()); + } + #[tokio::test] async fn test_failure_detector() { tokio::time::pause(); @@ -413,14 +432,22 @@ mod tests { } #[tokio::test] - async fn test_failure_detector_node_state_after_initial_interval() { + async fn test_failure_detector_node_state_additive_smoothing_predominant_in_the_beginning() { tokio::time::pause(); let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + // We add a few very short samples. let chitchat_id = ChitchatId::for_local_test(10_001); failure_detector.report_heartbeat(&chitchat_id); tokio::time::advance(Duration::from_secs(1)).await; + let chitchat_id = ChitchatId::for_local_test(10_001); + for _ in 0..5 { + tokio::time::advance(Duration::from_millis(200)).await; + failure_detector.report_heartbeat(&chitchat_id); + } + + tokio::time::advance(Duration::from_secs(6)).await; failure_detector.update_node_liveness(&chitchat_id); let live_nodes = failure_detector @@ -428,6 +455,7 @@ mod tests { .map(|chitchat_id| chitchat_id.node_id.as_str()) .collect::>(); assert_eq!(live_nodes, vec!["node-10001"]); + tokio::time::advance(Duration::from_secs(40)).await; failure_detector.update_node_liveness(&chitchat_id); @@ -438,6 +466,28 @@ mod tests { assert_eq!(live_nodes, Vec::<&str>::new()); } + #[tokio::test] + async fn test_failure_detector_node_state_additive_smoothing_effect_fades_off() { + tokio::time::pause(); + let mut failure_detector = FailureDetector::new(FailureDetectorConfig::default()); + + // We add a few very short samples. + let chitchat_id = ChitchatId::for_local_test(10_001); + failure_detector.report_heartbeat(&chitchat_id); + + let chitchat_id = ChitchatId::for_local_test(10_001); + for _ in 0..1000 { + tokio::time::advance(Duration::from_millis(200)).await; + failure_detector.report_heartbeat(&chitchat_id); + } + + tokio::time::advance(Duration::from_secs(6)).await; + + failure_detector.update_node_liveness(&chitchat_id); + + assert!(failure_detector.live_nodes().next().is_none()); + } + #[tokio::test] async fn test_sampling_window() { tokio::time::pause(); @@ -448,43 +498,57 @@ mod tests { tokio::time::advance(Duration::from_secs(3)).await; sampling_window.report_heartbeat(); - // Now intervals window is: [2.0, 3.0]. - let mean = (2.0 + 3.0) / 2.0; + // Now intervals window is: [3.0]. + let mean = (3.0 + 2.0 * 5.0) / (1.0f64 + 5.0f64); // 0s elapsed since last reported heartbeat. - assert!((sampling_window.phi() - (0.0 / mean)).abs() < f64::EPSILON); + assert_nearly_equal(sampling_window.phi().unwrap(), 0.0f64); // 1s elapsed since last reported heartbeat. tokio::time::advance(Duration::from_secs(1)).await; - assert!((sampling_window.phi() - (1.0 / mean)).abs() < f64::EPSILON); + assert_nearly_equal(sampling_window.phi().unwrap(), 1.0f64 / mean); // Check reported heartbeat later than max_interval is ignore. tokio::time::advance(Duration::from_secs(5)).await; sampling_window.report_heartbeat(); tokio::time::advance(Duration::from_secs(2)).await; + + assert_nearly_equal(sampling_window.phi().unwrap(), 2.0f64 / mean); + } + + #[track_caller] + fn assert_nearly_equal(value: f64, expected: f64) { assert!( - (sampling_window.phi() - (2.0 / mean)).abs() < f64::EPSILON, - "Mean value should not change." + (value - expected).abs() < f64::EPSILON, + "value ({value}) is not not nearly equal to expected {expected}" ); } #[test] fn test_bounded_array_stats() { - let mut bounded_array = BoundedArrayStats::new(10); - for i in 1..10 { + let capacity = 5; + let mut bounded_array = BoundedArrayStats::with_capacity(capacity); + let mut queue: VecDeque = VecDeque::new(); + for i in 1..=capacity { + assert!(bounded_array.len() < capacity); + assert!(!bounded_array.is_filled); bounded_array.append(i as f64); + queue.push_back(i as f64); + assert_eq!(bounded_array.len(), i); + assert_eq!(queue.len(), i); + assert_nearly_equal(bounded_array.sum(), queue.iter().copied().sum::()); } - assert_eq!(bounded_array.index, 9); - assert_eq!(bounded_array.len(), 9); - assert!(!bounded_array.is_filled); - assert!((bounded_array.mean() - 5.0f64).abs() < f64::EPSILON); - for i in 10..14 { + assert!(bounded_array.is_filled); + assert_nearly_equal(bounded_array.sum(), queue.iter().copied().sum()); + + for i in capacity + 1..capacity * 2 { bounded_array.append(i as f64); + queue.push_back(i as f64); + queue.pop_front(); + assert_nearly_equal(bounded_array.sum(), queue.iter().copied().sum::()); + assert_eq!(queue.len(), capacity); + assert_eq!(bounded_array.len(), capacity); } - assert_eq!(bounded_array.index, 3); - assert_eq!(bounded_array.len(), 10); - assert!(bounded_array.is_filled); - assert!((bounded_array.mean() - 8.5f64).abs() < f64::EPSILON); } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index dcd0a4f..04c124f 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -76,7 +76,7 @@ impl Chitchat { let self_node_state = chitchat.self_node_state(); // Immediately mark the node as alive to ensure it responds to SYN messages. - self_node_state.update_heartbeat(); + self_node_state.inc_heartbeat(); // Set initial key/value pairs. for (key, value) in initial_key_values { @@ -94,15 +94,19 @@ impl Chitchat { } } + /// Digest contains important information about the list of members in + /// the cluster. + fn report_heartbeats_in_digest(&mut self, digest: &Digest) { + for (chitchat_id, node_digest) in &digest.node_digests { + self.report_heartbeat(chitchat_id, node_digest.heartbeat); + } + } + fn process_delta(&mut self, delta: Delta) { - // Warning: order matters here. - // `report_heartbeats` will compare the current known heartbeat with the one - // in the delta, while `apply_delta` is actually updating this heartbeat. - self.report_heartbeats(&delta); self.cluster_state.apply_delta(delta); } - pub(crate) fn process_message(&mut self, msg: ChitchatMessage) -> Option { + self.update_self_heartbeat(); match msg { ChitchatMessage::Syn { cluster_id, digest } => { if cluster_id != self.cluster_id() { @@ -113,7 +117,7 @@ impl Chitchat { ); return Some(ChitchatMessage::BadCluster); } - + self.report_heartbeats_in_digest(&digest); let scheduled_for_deletion: HashSet<_> = self.scheduled_for_deletion_nodes().collect(); let self_digest = self.compute_digest(&scheduled_for_deletion); @@ -129,6 +133,7 @@ impl Chitchat { }) } ChitchatMessage::SynAck { digest, delta } => { + self.report_heartbeats_in_digest(&digest); self.process_delta(delta); let scheduled_for_deletion = self.scheduled_for_deletion_nodes().collect::>(); @@ -157,32 +162,21 @@ impl Chitchat { /// Reports heartbeats to the failure detector for nodes in the delta for which we received an /// update. - fn report_heartbeats(&mut self, delta: &Delta) { - for node_delta in &delta.node_deltas { - if let Some(node_state) = self.cluster_state.node_states.get(&node_delta.chitchat_id) { - if node_state.heartbeat() < node_delta.heartbeat { - self.failure_detector - .report_heartbeat(&node_delta.chitchat_id); - } - } else { - self.failure_detector - .report_unknown(&node_delta.chitchat_id); - self.failure_detector - .update_node_liveness(&node_delta.chitchat_id); - } + fn report_heartbeat(&mut self, chitchat_id: &ChitchatId, heartbeat: Heartbeat) { + let node_state = self.cluster_state.node_state_mut(chitchat_id); + if node_state.try_set_heartbeat(heartbeat) { + self.failure_detector.report_heartbeat(chitchat_id); } } /// Marks the node as dead or alive depending on the new phi values and updates the live nodes /// watcher accordingly. pub(crate) fn update_nodes_liveness(&mut self) { - self.cluster_state - .nodes() - .filter(|&chitchat_id| *chitchat_id != self.config.chitchat_id) - .for_each(|chitchat_id| { + for chitchat_id in self.cluster_state.nodes() { + if chitchat_id != &self.config.chitchat_id { self.failure_detector.update_node_liveness(chitchat_id); - }); - + } + } let current_live_nodes = self .live_nodes() .flat_map(|chitchat_id| { @@ -278,8 +272,8 @@ impl Chitchat { ClusterStateSnapshot::from(&self.cluster_state) } - pub(crate) fn update_heartbeat(&mut self) { - self.self_node_state().update_heartbeat(); + pub(crate) fn update_self_heartbeat(&mut self) { + self.self_node_state().inc_heartbeat(); } pub(crate) fn cluster_state(&self) -> &ClusterState { @@ -579,7 +573,7 @@ mod tests { } #[tokio::test] - async fn test_dead_node_kvs_are_when_node_joins() -> anyhow::Result<()> { + async fn test_dead_node_kvs_are_gossiped_too_when_node_joins() -> anyhow::Result<()> { let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE); // starting 2 nodes. let mut nodes = setup_nodes(40001..=40002, &transport).await; @@ -608,7 +602,7 @@ mod tests { ) .await; let node2_chitchat = node2.chitchat(); - // We have received node3's key + // We have received node1's key let value = node2_chitchat .lock() .await @@ -622,30 +616,30 @@ mod tests { // Take node 1 down. let node1 = nodes.remove(0); - assert_eq!(node1.chitchat_id().advertise_port(), 40001); + assert_eq!(node1.chitchat_id().advertise_port(), 40_001); node1.shutdown().await.unwrap(); // Node 2 has detected that node 1 is missing. - { + let node_id2 = { let node2 = nodes.first().unwrap(); - assert_eq!(node2.chitchat_id().advertise_port(), 40002); + assert_eq!(node2.chitchat_id().advertise_port(), 40_002); wait_for_chitchat_state(node2.chitchat(), &[ChitchatId::for_local_test(40_002)]).await; - } + node2.chitchat_id().clone() + }; // Restart node at localhost:40001 with new name let mut new_config = ChitchatConfig::for_test(40_001); new_config.chitchat_id.node_id = "new_node".to_string(); let new_chitchat_id = new_config.chitchat_id.clone(); - let seed_addr = ChitchatId::for_local_test(40_001).gossip_advertise_addr; + let seed_addr = ChitchatId::for_local_test(40_002).gossip_advertise_addr; new_config.seed_nodes = vec![seed_addr.to_string()]; let new_node_chitchat_handle = spawn_chitchat(new_config, Vec::new(), &transport) .await .unwrap(); - let new_node_chitchat = new_node_chitchat_handle.chitchat(); wait_for_chitchat_state( new_node_chitchat.clone(), - &[ChitchatId::for_local_test(40_002), new_chitchat_id], + &[new_chitchat_id.clone(), node_id2.clone()], ) .await; @@ -735,6 +729,7 @@ mod tests { .node_state(&dead_chitchat_id) .is_some()); } + // Wait a bit more than `dead_node_grace_period` since all nodes will not // notice cluster change at the same time. let wait_for = DEAD_NODE_GRACE_PERIOD.add(Duration::from_secs(5)); @@ -749,6 +744,7 @@ mod tests { .node_state(&dead_chitchat_id) .is_none()); } + shutdown_nodes(nodes).await?; Ok(()) } diff --git a/chitchat/src/message.rs b/chitchat/src/message.rs index 8952669..d2b2caa 100644 --- a/chitchat/src/message.rs +++ b/chitchat/src/message.rs @@ -163,7 +163,7 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), Heartbeat(0)); + delta.add_node(node.clone(), 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); delta.set_serialized_len(62); @@ -186,7 +186,7 @@ mod tests { let mut delta = Delta::default(); let node = ChitchatId::for_local_test(10_001); // +37 bytes = 27 bytes (ChitchatId) + 2 bytes (node delta len) + 8 bytes (heartbeat). - delta.add_node(node.clone(), Heartbeat(0)); + delta.add_node(node.clone(), 0u64); // +29 bytes. delta.add_kv(&node, "key", "value", 0, true); delta.set_serialized_len(62); diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 4124607..acd0dc2 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -272,7 +272,11 @@ impl Server { .dead_nodes() .map(|chitchat_id| chitchat_id.gossip_advertise_addr) .collect::>(); - let seed_nodes: HashSet = chitchat_guard.seed_nodes(); + let seed_nodes: HashSet = chitchat_guard + .seed_nodes() + .into_iter() + .filter(|addr| *addr != chitchat_guard.self_chitchat_id().gossip_advertise_addr) + .collect(); let (selected_nodes, random_dead_node_opt, random_seed_node_opt) = select_nodes_for_gossip( &mut self.rng, peer_nodes, @@ -281,7 +285,7 @@ impl Server { seed_nodes, ); - chitchat_guard.update_heartbeat(); + chitchat_guard.update_self_heartbeat(); chitchat_guard.gc_keys_marked_for_deletion(); // Drop lock to prevent deadlock in [`UdpSocket::gossip`]. @@ -579,13 +583,21 @@ mod tests { // Add our test socket to the server's nodes. server_handle .with_chitchat(|server_chitchat| { - server_chitchat.update_heartbeat(); + server_chitchat.update_self_heartbeat(); let syn = server_chitchat.create_syn_message(); let syn_ack = test_chitchat.process_message(syn).unwrap(); server_chitchat.process_message(syn_ack); }) .await; + let node_state = test_chitchat + .cluster_state() + .node_state(&server_id) + .unwrap(); + let heartbeat = node_state.heartbeat(); + + assert_eq!(heartbeat, Heartbeat(2)); + // Wait for syn, with updated heartbeat let (_, syn) = timeout(test_transport.recv()).await.unwrap(); @@ -602,8 +614,8 @@ mod tests { }; let node_delta = delta.get(&server_id).unwrap(); - let heartbeat = node_delta.heartbeat; - assert_eq!(heartbeat, Heartbeat(3)); + let heartbeat = node_delta.last_gc_version; + assert_eq!(heartbeat, 0u64); server_handle.shutdown().await.unwrap(); } diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index 3397268..881032a 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -30,7 +30,8 @@ pub struct NodeState { // // After we GC the tombstones, we cannot safely do replication with // nodes that are asking for a diff with version lower than this. - max_garbage_collected_tombstone_version: Option, + // nodes that are asking for a diff from with version lower than this. + last_gc_version: Version, } impl Debug for NodeState { @@ -49,9 +50,9 @@ impl NodeState { node_id, heartbeat: Heartbeat(0), key_values: Default::default(), - max_version: Default::default(), + max_version: 0u64, listeners, - max_garbage_collected_tombstone_version: None, + last_gc_version: 0u64, } } @@ -66,7 +67,7 @@ impl NodeState { key_values: Default::default(), max_version: Default::default(), listeners: Listeners::default(), - max_garbage_collected_tombstone_version: None, + last_gc_version: 0u64, } } @@ -146,10 +147,22 @@ impl NodeState { versioned_value.value = "".to_string(); } - pub(crate) fn update_heartbeat(&mut self) { + pub(crate) fn inc_heartbeat(&mut self) { self.heartbeat.inc(); } + /// Attempts to set the heartbeat of another node. + /// If the value is actually not an update, just ignore the data and return false. + /// Otherwise, returns true. + pub fn try_set_heartbeat(&mut self, heartbeat_new_value: Heartbeat) -> bool { + if heartbeat_new_value > self.heartbeat { + self.heartbeat = heartbeat_new_value; + true + } else { + false + } + } + fn digest(&self) -> NodeDigest { NodeDigest { heartbeat: self.heartbeat, @@ -168,7 +181,7 @@ impl NodeState { /// Removes the keys marked for deletion such that `tombstone + grace_period > heartbeat`. fn gc_keys_marked_for_deletion(&mut self, grace_period: Duration) { let now = Instant::now(); - let mut max_deleted_version = self.max_garbage_collected_tombstone_version; + let mut max_deleted_version = self.last_gc_version; self.key_values .retain(|_, versioned_value: &mut VersionedValue| { let Some(deleted_instant) = versioned_value.tombstone else { @@ -180,10 +193,10 @@ impl NodeState { return true; } // We have exceeded the tombstone grace period. Time to remove it. - max_deleted_version = Some(versioned_value.version).max(max_deleted_version); + max_deleted_version = versioned_value.version.max(max_deleted_version); false }); - self.max_garbage_collected_tombstone_version = max_deleted_version; + self.last_gc_version = max_deleted_version; } /// Returns an iterator over the versioned values that are strictly greater than @@ -280,6 +293,8 @@ impl ClusterState { pub(crate) fn node_state_mut(&mut self, chitchat_id: &ChitchatId) -> &mut NodeState { // TODO use the `hash_raw_entry` feature once it gets stabilized. + // Most of the time the entry is already present. We avoid cloning chitchat_id with + // this if statement. self.node_states .entry(chitchat_id.clone()) .or_insert_with(|| NodeState::new(chitchat_id.clone(), self.listeners.clone())) @@ -302,6 +317,11 @@ impl ClusterState { } pub(crate) fn apply_delta(&mut self, delta: Delta) { + // Remove nodes to reset. + if !delta.nodes_to_reset.is_empty() { + tracing::info!(nodes_to_reset=?delta.nodes_to_reset, "nodes to reset"); + } + // Clearing the node of the states to reset. for node_to_reset in delta.nodes_to_reset { // We don't want to remove the entire state here: the node could be alive and the @@ -315,15 +335,30 @@ impl ClusterState { for node_delta in delta.node_deltas { let NodeDelta { chitchat_id, - heartbeat, + last_gc_version, key_values, } = node_delta; - let node_state = self - .node_states - .entry(chitchat_id.clone()) - .or_insert_with(|| NodeState::new(chitchat_id, self.listeners.clone())); - if node_state.heartbeat < heartbeat { - node_state.heartbeat = heartbeat; + let node_state = self.node_state_mut(&chitchat_id); + if node_state.last_gc_version == 0u64 { + // You may have expected `node_state.last_gc_version = max(last_gc_version, + // node_state.last_gc_version)`. This is correct too of course, but + // slightly too restrictive. + // + // `last_gc_version` expresses the idea: what is the oldest version from which I can + // confidently emit delta from. The reason why we update it here, is + // because a node that was just reset or just joined the cluster will get updates + // from another node that are actually only make sense in the context of the + // emission of delta from a `last_gc_version`. + // + // We want to avoid the case where: + // - Cluster with Node A, B, C + // - Node A, inserts Key K that gets replicated. + // - Network partition as {A} {B,C} + // - A deletes Key K and GCs it. + // - Network restoration + // - B gossips with A and get reset + // - C gossips with B and does NOT get reset. + node_state.last_gc_version = last_gc_version; } for (key, versioned_value) in key_values { node_state.max_version = node_state.max_version.max(versioned_value.version); @@ -369,12 +404,7 @@ impl ClusterState { stale_nodes.insert(chitchat_id, node_state); continue; }; - let should_reset = - if let Some(max_gc_version) = node_state.max_garbage_collected_tombstone_version { - max_gc_version >= node_digest.max_version - } else { - false - }; + let should_reset = node_state.last_gc_version > node_digest.max_version; if should_reset { warn!("Node to reset {chitchat_id:?}"); nodes_to_reset.push(chitchat_id); @@ -392,8 +422,10 @@ impl ClusterState { } for stale_node in stale_nodes.into_iter() { - if !delta_serializer.try_add_node(stale_node.chitchat_id.clone(), stale_node.heartbeat) - { + if !delta_serializer.try_add_node( + stale_node.chitchat_id.clone(), + stale_node.node_state.last_gc_version, + ) { break; } let mut added_something = false; @@ -437,12 +469,9 @@ impl<'a> SortedStaleNodes<'a> { /// Adds a node to the list of stale nodes. fn insert(&mut self, chitchat_id: &'a ChitchatId, node_state: &'a NodeState) { let staleness = node_state.num_key_values() + 1; // +1 for the heartbeat. - let heartbeat = node_state.heartbeat; let floor_version = 0; - let stale_node = StaleNode { chitchat_id, - heartbeat, node_state, floor_version, }; @@ -468,7 +497,6 @@ impl<'a> SortedStaleNodes<'a> { if staleness > 0 { let stale_node = StaleNode { chitchat_id, - heartbeat, node_state, floor_version, }; @@ -498,7 +526,6 @@ impl<'a> SortedStaleNodes<'a> { #[derive(Debug)] struct StaleNode<'a> { chitchat_id: &'a ChitchatId, - heartbeat: Heartbeat, node_state: &'a NodeState, floor_version: u64, } @@ -567,7 +594,6 @@ mod tests { let node_state = NodeState::for_test(); let stale_node = StaleNode { chitchat_id: &node, - heartbeat: Heartbeat(0), node_state: &node_state, floor_version: 0, }; @@ -588,7 +614,6 @@ mod tests { let stale_node = StaleNode { chitchat_id: &node, - heartbeat: Heartbeat(0), node_state: &node_state, floor_version: 1, }; @@ -692,7 +717,6 @@ mod tests { let mut stale_nodes = SortedStaleNodes::default(); let stale_node1 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_001), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; @@ -700,19 +724,16 @@ mod tests { let stale_node2 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_002), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; let stale_node3 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_003), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; let stale_node4 = StaleNode { chitchat_id: &ChitchatId::for_local_test(10_004), - heartbeat: Heartbeat(0), node_state: &NodeState::for_test(), floor_version: 0, }; @@ -919,13 +940,13 @@ mod tests { node2_state.set_with_version("key_c".to_string(), "3".to_string(), 1); // 1 let mut delta = Delta::default(); - delta.add_node(node1.clone(), Heartbeat(0)); + delta.add_node(node1.clone(), 0u64); delta.add_kv(&node1, "key_a", "4", 4, false); delta.add_kv(&node1, "key_b", "2", 2, false); // Reset node 2. delta.add_node_to_reset(node2.clone()); - delta.add_node(node2.clone(), Heartbeat(0)); + delta.add_node(node2.clone(), 0u64); delta.add_kv(&node2, "key_d", "4", 4, false); cluster_state.apply_delta(delta); @@ -986,7 +1007,7 @@ mod tests { for (num_entries, &mtu) in mtu_per_num_entries.iter().enumerate() { let mut expected_delta = Delta::default(); for &(node, key, val, version, tombstone) in &expected_delta_atoms[..num_entries] { - expected_delta.add_node(node.clone(), Heartbeat(0)); + expected_delta.add_node(node.clone(), 0u64); expected_delta.add_kv(node, key, val, version, tombstone); } { @@ -1135,11 +1156,11 @@ mod tests { ); assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_node(node1.clone(), 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_node(node2.clone(), 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.set_serialized_len(78); + expected_delta.set_serialized_len(73); assert_eq!(delta, expected_delta); } @@ -1160,12 +1181,12 @@ mod tests { ); assert!(delta.nodes_to_reset.is_empty()); let mut expected_delta = Delta::default(); - expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_node(node1.clone(), 0u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); expected_delta.add_kv(&node1, "key_a", "", 3, true); - expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_node(node2.clone(), 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.set_serialized_len(90); + expected_delta.set_serialized_len(83); assert_eq!(delta, expected_delta); } @@ -1186,11 +1207,11 @@ mod tests { ); let mut expected_delta = Delta::default(); expected_delta.add_node_to_reset(node1.clone()); - expected_delta.add_node(node1.clone(), Heartbeat(10_000)); + expected_delta.add_node(node1.clone(), 3u64); expected_delta.add_kv(&node1, "key_b", "2", 2, false); - expected_delta.add_node(node2.clone(), Heartbeat(0)); + expected_delta.add_node(node2.clone(), 0u64); expected_delta.add_kv(&node2.clone(), "key_c", "3", 2, false); - expected_delta.set_serialized_len(81); + expected_delta.set_serialized_len(80); assert_eq!(delta, expected_delta); } } diff --git a/chitchat/src/types.rs b/chitchat/src/types.rs index afcf37b..1e14e35 100644 --- a/chitchat/src/types.rs +++ b/chitchat/src/types.rs @@ -122,7 +122,7 @@ pub struct Heartbeat(pub(crate) u64); impl Heartbeat { pub(crate) fn inc(&mut self) { - self.0 += 1; + self.0 = self.0.wrapping_add(1); } } diff --git a/chitchat/tests/cluster_test.rs b/chitchat/tests/cluster_test.rs index 807a380..6c720ec 100644 --- a/chitchat/tests/cluster_test.rs +++ b/chitchat/tests/cluster_test.rs @@ -232,6 +232,14 @@ pub fn create_chitchat_id(id: &str) -> ChitchatId { } } +pub fn test_chitchat_id(port: u16) -> ChitchatId { + ChitchatId { + node_id: format!("node_{port}"), + generation_id: 0, + gossip_advertise_addr: ([127, 0, 0, 1], port).into(), + } +} + /// Copy-pasted from Quickwit repo. /// Finds a random available TCP port. /// @@ -325,14 +333,82 @@ async fn test_simple_simulation_with_network_partition() { } #[tokio::test] -async fn test_marked_for_deletion_gc_with_network_partition() { - const TIMEOUT: Duration = Duration::from_millis(500); +async fn test_marked_for_deletion_gc_with_network_partition_2_nodes() { // let _ = tracing_subscriber::fmt::try_init(); + const TIMEOUT: Duration = Duration::from_millis(500); let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); - let chitchat_id_1 = create_chitchat_id("node-1"); - let chitchat_id_2 = create_chitchat_id("node-2"); - let chitchat_id_3 = create_chitchat_id("node-3"); - let chitchat_id_4 = create_chitchat_id("node-4"); + let chitchat_id_1 = test_chitchat_id(1); + let chitchat_id_2 = test_chitchat_id(2); + let peer_seeds = vec![chitchat_id_1.clone(), chitchat_id_2.clone()]; + let operations = vec![ + Operation::AddNode { + chitchat_id: chitchat_id_1.clone(), + peer_seeds: Some(peer_seeds.clone()), + }, + Operation::AddNode { + chitchat_id: chitchat_id_2.clone(), + peer_seeds: Some(peer_seeds.clone()), + }, + Operation::InsertKeysValues { + chitchat_id: chitchat_id_1.clone(), + keys_values: vec![("key_a".to_string(), "0".to_string())], + }, + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), + timeout_opt: Some(TIMEOUT), + }, + // Isolate node 2. + Operation::RemoveNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()), + Operation::Wait(Duration::from_secs(5)), + // Mark for deletion key. + Operation::MarkKeyForDeletion { + chitchat_id: chitchat_id_1.clone(), + key: "key_a".to_string(), + }, + // Check marked for deletion is not propagated to node 3. + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::MarkedForDeletion("key_a".to_string(), false), + timeout_opt: None, + }, + // Wait for garbage collection: grace period * heartbeat ~ 1 second + margin of 1 second. + Operation::Wait(Duration::from_secs(2)), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_1.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), + timeout_opt: None, + }, + Operation::Wait(TIMEOUT), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), true), + timeout_opt: None, + }, + // Relink node 2 + Operation::AddNetworkLink(chitchat_id_1.clone(), chitchat_id_2.clone()), + Operation::NodeStateAssert { + server_chitchat_id: chitchat_id_2.clone(), + chitchat_id: chitchat_id_1.clone(), + // The key should be deleted. + predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), + timeout_opt: Some(TIMEOUT), + }, + ]; + simulator.execute(operations).await; +} +#[tokio::test] +async fn test_marked_for_deletion_gc_with_network_partition_4_nodes() { + const TIMEOUT: Duration = Duration::from_millis(500); + let mut simulator = Simulator::new(Duration::from_millis(100), Duration::from_secs(1)); + let chitchat_id_1 = test_chitchat_id(1); + let chitchat_id_2 = test_chitchat_id(2); + let chitchat_id_3 = test_chitchat_id(3); + let chitchat_id_4 = test_chitchat_id(4); let peer_seeds = vec![ chitchat_id_1.clone(), chitchat_id_2.clone(), @@ -431,6 +507,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { Operation::NodeStateAssert { server_chitchat_id: chitchat_id_3.clone(), chitchat_id: chitchat_id_1.clone(), + // The key should be deleted. predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), timeout_opt: Some(TIMEOUT), }, @@ -438,7 +515,7 @@ async fn test_marked_for_deletion_gc_with_network_partition() { server_chitchat_id: chitchat_id_4.clone(), chitchat_id: chitchat_id_1.clone(), predicate: NodeStatePredicate::KeyPresent("key_a".to_string(), false), - timeout_opt: Some(TIMEOUT), + timeout_opt: Some(TIMEOUT * 10), }, ]; simulator.execute(operations).await;