Skip to content

Commit

Permalink
Spreading KVs for half o the dead node grace period.
Browse files Browse the repository at this point in the history
See README for more information.

This change is made because we use chitchat as a reliable broadcast to
update published position in Quickwit.
  • Loading branch information
fulmicoton committed Feb 13, 2024
1 parent c83727c commit 267a2e6
Show file tree
Hide file tree
Showing 4 changed files with 47 additions and 17 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -100,12 +100,13 @@ Just deleting the state is of course impossible. After the given `DEAD_NODE_GRAC
We first stop sharing data about nodes in the `ScheduledForDeletion` state,
nor listing them node in our digest.

We also ignore any updates received about the dead node, but we keep track of the last update received. Eventually, all the nodes of the cluster will have
marked the dead node as `ScheduledForDeletion`.
We also ignore any updates received about the dead node. For simplification, we do not even keep track of the last update received. Eventually, all the nodes of the cluster will have marked the dead node as `ScheduledForDeletion`.

After another `DEAD_NODE_GRACE_PERIOD` / 2 has elapsed since the last update received, we delete the dead node state.

Note that this absolutely breaks the reliable broadcast nature of this.
It is important to set `DEAD_NODE_GRACE_PERIOD` with a value such `DEAD_NODE_GRACE_PERIOD / 2` is much greater than the period it takes to detect a faulty node.

Note that we are here breaking the reliable broadcast nature of chitchat.
New nodes joining after `DEAD_NODE_GRACE_PERIOD` for instance, will never know about the state of the dead node.

Also, if a node was disconnected from the cluster for more than `DEAD_NODE_GRACE_PERIOD / 2` and reconnects, it is likely to spread information
Expand Down
22 changes: 20 additions & 2 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ impl FailureDetector {
/// Removes and returns the list of garbage collectible nodes.
pub fn garbage_collect(&mut self) -> Vec<ChitchatId> {
let mut garbage_collected_nodes = Vec::new();
for (chitchat_id, instant) in self.dead_nodes.iter() {
if instant.elapsed() >= self.config.dead_node_grace_period {
let now = Instant::now();
for (chitchat_id, time_of_death) in self.dead_nodes.iter() {
if (now - *time_of_death) >= self.config.dead_node_grace_period {
garbage_collected_nodes.push(chitchat_id.clone())
}
}
Expand All @@ -102,6 +103,23 @@ impl FailureDetector {
self.dead_nodes.keys()
}

/// Returns the list of nodes considered dead by the failure detector.
pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
let now = Instant::now();
let half_dead_node_grace_period = self.config.dead_node_grace_period.div_f32(2.0f32);
// Note: we can't just compute the threshold now - half_dead_node_grace_period, because it would
// underflow on some platform (MacOS).
self.dead_nodes
.iter()
.filter_map(move |(chitchat_id, time_of_death)| {
if *time_of_death + half_dead_node_grace_period < now {
Some(chitchat_id)
} else {
None
}
})
}

/// Returns the current phi value of a node.
fn phi(&mut self, chitchat_id: &ChitchatId) -> Option<f64> {
self.node_samples
Expand Down
24 changes: 16 additions & 8 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ impl Chitchat {
}

pub(crate) fn create_syn_message(&self) -> ChitchatMessage {
let digest = self.compute_digest();
let scheduled_for_deletion: HashSet<_> = self.scheduled_for_deletion_nodes().collect();
let digest = self.compute_digest(&scheduled_for_deletion);
ChitchatMessage::Syn {
cluster_id: self.config.cluster_id.clone(),
digest,
Expand All @@ -106,15 +107,16 @@ impl Chitchat {
return Some(ChitchatMessage::BadCluster);
}
// Ensure for every reply from this node, at least the heartbeat is changed.
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest();
let scheduled_for_deletion: HashSet<_> =
self.scheduled_for_deletion_nodes().collect();
let self_digest = self.compute_digest(&scheduled_for_deletion);
let empty_delta = Delta::default();
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE
- syn_ack_serialized_len(&self_digest, &empty_delta);
let delta = self.cluster_state.compute_delta(
&digest,
delta_mtu,
&dead_nodes,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
Expand All @@ -125,11 +127,11 @@ impl Chitchat {
ChitchatMessage::SynAck { digest, delta } => {
self.report_heartbeats(&delta);
self.cluster_state.apply_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let scheduled_for_deletion = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&dead_nodes,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta })
Expand Down Expand Up @@ -249,6 +251,11 @@ impl Chitchat {
self.failure_detector.dead_nodes()
}

/// Returns the set of nodes considered dead by the failure detector.
pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
self.failure_detector.scheduled_for_deletion_nodes()
}

/// Returns the set of seed nodes.
pub fn seed_nodes(&self) -> HashSet<SocketAddr> {
self.cluster_state.seed_addrs()
Expand Down Expand Up @@ -277,8 +284,9 @@ impl Chitchat {
}

/// Computes the node's digest.
fn compute_digest(&self) -> Digest {
self.cluster_state.compute_digest()
fn compute_digest(&self, scheduled_for_deletion_nodes: &HashSet<&ChitchatId>) -> Digest {
self.cluster_state
.compute_digest(scheduled_for_deletion_nodes)
}

/// Subscribes a callback that will be called every time a key matching the supplied prefix
Expand Down
11 changes: 7 additions & 4 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,12 @@ impl ClusterState {
}
}

pub fn compute_digest(&self) -> Digest {
pub fn compute_digest(&self, scheduled_for_deletion: &HashSet<&ChitchatId>) -> Digest {
Digest {
node_digests: self
.node_states
.iter()
.filter(|(chitchat_id, _)| !scheduled_for_deletion.contains(chitchat_id))
.map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest()))
.collect(),
}
Expand All @@ -336,18 +337,20 @@ impl ClusterState {
}

/// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering.
///
/// Nodes that are scheduled for deletion (as passed by argument) are not shared.
pub fn compute_delta(
&self,
digest: &Digest,
mtu: usize,
dead_nodes: &HashSet<&ChitchatId>,
scheduled_for_deletion: &HashSet<&ChitchatId>,
marked_for_deletion_grace_period: u64,
) -> Delta {
let mut stale_nodes = SortedStaleNodes::default();
let mut nodes_to_reset = Vec::new();

for (chitchat_id, node_state) in &self.node_states {
if dead_nodes.contains(chitchat_id) {
if scheduled_for_deletion.contains(chitchat_id) {
continue;
}
let Some(node_digest) = digest.node_digests.get(chitchat_id) else {
Expand Down Expand Up @@ -833,7 +836,7 @@ mod tests {
node2_state.set("key_a", "");
node2_state.set("key_b", "");

let digest = cluster_state.compute_digest();
let digest = cluster_state.compute_digest(&HashSet::new());

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1);
Expand Down

0 comments on commit 267a2e6

Please sign in to comment.