Skip to content

Commit

Permalink
Spreading KVs for half of the dead node grace period.
Browse files Browse the repository at this point in the history
See README for more information.

This change is made because we use chitchat as a reliable broadcast to
update published position in Quickwit.
  • Loading branch information
fulmicoton committed Feb 13, 2024
1 parent c3a5baf commit c349456
Show file tree
Hide file tree
Showing 4 changed files with 183 additions and 46 deletions.
84 changes: 82 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@ This crate is used at the core of Quickwit for
- cluster membership
- failure detection
- sharing configuration, and extra metadata values
.

The idea of relying on scuttlebutt reconciliation and phi-accrual detection is borrowed from
Cassandra, itself borrowing it from DynamoDB.
The idea of relying on scuttlebutt reconciliation and phi-accrual detection is borrowed from Cassandra, itself borrowing it from DynamoDB.

A anti-entropy gossip algorithm called scuttlebutt is in charge of spreading
a common state to all nodes.
Expand All @@ -28,6 +28,9 @@ Not receiving any update from node for a given amount of time can therefore be
regarded as a sign of failure. Rather than using a hard threshold,
we use phi-accrual detection to dynamically compute a threshold.

We also abuse `chitchat` in Quickwit and use it like a reliable broadcast,
with different caveats.

# References

- ScuttleButt paper: https://www.cs.cornell.edu/home/rvr/papers/flowgossip.pdf
Expand All @@ -36,3 +39,80 @@ we use phi-accrual detection to dynamically compute a threshold.
https://www.youtube.com/watch?v=FuP1Fvrv6ZQ
- https://docs.datastax.com/en/articles/cassandra/cassandrathenandnow.html
- https://github.com/apache/cassandra/blob/f5fb1b0bd32b5dc7da13ec66d43acbdad7fe9dbf/src/java/org/apache/cassandra/gms/Gossiper.java#L1749

# Heartbeat

In order to get a constant flow of updates to feed into phi-accrual detection,
chitchat's node state includes a key-value called `heartbeat`. The heartbeat of a given node, starts at 0, and is incremented once after each round of gossip initiated.

Nodes then reports all heartbeat updates to a phi-accrual detector to
assess the liveness of this node. Liveness is a local concept. Every single
node computes its own vision of the liveness of all other nodes.

# KV deletion

A deletion of a KV is a just another type of mutation: A deletion is
associated with a version, and replicated using the same mechanism as a KV update.

The library will then interpret this versioned tombstone before exposing kv to
the user.

To avoid keeping deleted KV indefinitely, the library includes a GC mechanism. Any nodes containing a tombstone older than a given `grace period threshold`
(age is measure in ticks of heartbeat), it is safe to be deleted.

This yield the following problem. If a node was deconnected for more than
`marked_for_deletion_grace_period`, they could have missed the deletion of a KV and never be aware of it.

To address this problem, nodes that are too outdated have to reset their state.

More accurately, let's assume a Node A sends a Syn message to Node B with a digest with an outdated version V for a node N.
Node B will compare the version of the digest with its own version.

If V is fresher than `own version - marked_for_deletion_grace_period`,
Node B knows that no GC has impacted Key values with a version above V. It can
safely emit a normal delta to A.
If however V is older than `own version - marked_for_deletion_grace_period`,
a GC could have been executed. Instead of sending a delta to A, Node B will
instruct A to reset its state.

A will wipe-off whatever information it has about N, and will start syncing from a blank state.

# Node deletion

In Quickwit, we also use chitchat as a "reliable broadcast with caveats".
The idea of reliable broadcast is that the emission of a message is supposed
to evenutally reach all or no correct nodes in the nodes.

Of course, if the emitter starts failing before emitting its message, obviously one cannot expect the message to reach anyone.
However, if at least one correct nodes receives the message, it will
eventually reach all correct nodes (assuming the node stays correct).

For this reason, we keep emitting KVs from dead nodes too.

In order to avoid keeping the state of dead nodes indefinitely, we take
a very important trade off.

If a node is marked as dead for more than `DEAD_NODE_GRACE_PERIOD`, we assume that its state can be safely removed from the system. The grace period is
computed from the last time we received an update from the dead node.

Just deleting the state is of course impossible. After the given `DEAD_NODE_GRACE_PERIOD / 2`, we will mark the dead node as `ScheduledForDeletion`.

We first stop sharing data about nodes in the `ScheduledForDeletion` state,
nor listing them node in our digest.

We also ignore any updates received about the dead node. For simplification, we do not even keep track of the last update received. Eventually, all the nodes of the cluster will have marked the dead node as `ScheduledForDeletion`.

After another `DEAD_NODE_GRACE_PERIOD` / 2 has elapsed since the last update received, we delete the dead node state.

It is important to set `DEAD_NODE_GRACE_PERIOD` with a value such `DEAD_NODE_GRACE_PERIOD / 2` is much greater than the period it takes to detect a faulty node.

Note that we are here breaking the reliable broadcast nature of chitchat.
New nodes joining after `DEAD_NODE_GRACE_PERIOD` for instance, will never know about the state of the dead node.

Also, if a node was disconnected from the cluster for more than `DEAD_NODE_GRACE_PERIOD / 2` and reconnects, it is likely to spread information
about the dead node again. Worse, it could not know about the deletion
of some specific KV and spread them again.

The chitchat library does not include any mechanism to prevent this from happening. They should however eventually get deleted (after a bit more than `DEAD_NODE_GRACE_PERIOD`) if the node is really dead.

If the node is alive, it should be able to fix everyone's state via reset or regular delta.
22 changes: 20 additions & 2 deletions chitchat/src/failure_detector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,9 @@ impl FailureDetector {
/// Removes and returns the list of garbage collectible nodes.
pub fn garbage_collect(&mut self) -> Vec<ChitchatId> {
let mut garbage_collected_nodes = Vec::new();
for (chitchat_id, instant) in self.dead_nodes.iter() {
if instant.elapsed() >= self.config.dead_node_grace_period {
let now = Instant::now();
for (chitchat_id, &time_of_death) in &self.dead_nodes {
if now >= time_of_death + self.config.dead_node_grace_period {
garbage_collected_nodes.push(chitchat_id.clone())
}
}
Expand All @@ -102,6 +103,23 @@ impl FailureDetector {
self.dead_nodes.keys()
}

/// Returns the list of nodes considered dead by the failure detector.
pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
let now = Instant::now();
let half_dead_node_grace_period = self.config.dead_node_grace_period.div_f32(2.0f32);
// Note: we can't just compute the threshold now - half_dead_node_grace_period, because it
// would underflow on some platform (MacOS).
self.dead_nodes
.iter()
.filter_map(move |(chitchat_id, time_of_death)| {
if *time_of_death + half_dead_node_grace_period < now {
Some(chitchat_id)
} else {
None
}
})
}

/// Returns the current phi value of a node.
fn phi(&mut self, chitchat_id: &ChitchatId) -> Option<f64> {
self.node_samples
Expand Down
112 changes: 74 additions & 38 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,8 @@ impl Chitchat {
}

pub(crate) fn create_syn_message(&self) -> ChitchatMessage {
let digest = self.compute_digest();
let scheduled_for_deletion: HashSet<_> = self.scheduled_for_deletion_nodes().collect();
let digest = self.compute_digest(&scheduled_for_deletion);
ChitchatMessage::Syn {
cluster_id: self.config.cluster_id.clone(),
digest,
Expand All @@ -106,15 +107,16 @@ impl Chitchat {
return Some(ChitchatMessage::BadCluster);
}
// Ensure for every reply from this node, at least the heartbeat is changed.
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest();
let scheduled_for_deletion: HashSet<_> =
self.scheduled_for_deletion_nodes().collect();
let self_digest = self.compute_digest(&scheduled_for_deletion);
let empty_delta = Delta::default();
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE
- syn_ack_serialized_len(&self_digest, &empty_delta);
let delta = self.cluster_state.compute_delta(
&digest,
delta_mtu,
&dead_nodes,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
Expand All @@ -125,11 +127,12 @@ impl Chitchat {
ChitchatMessage::SynAck { digest, delta } => {
self.report_heartbeats(&delta);
self.cluster_state.apply_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let scheduled_for_deletion =
self.scheduled_for_deletion_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&dead_nodes,
&scheduled_for_deletion,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta })
Expand Down Expand Up @@ -249,6 +252,11 @@ impl Chitchat {
self.failure_detector.dead_nodes()
}

/// Returns the set of nodes considered dead by the failure detector.
pub fn scheduled_for_deletion_nodes(&self) -> impl Iterator<Item = &ChitchatId> {
self.failure_detector.scheduled_for_deletion_nodes()
}

/// Returns the set of seed nodes.
pub fn seed_nodes(&self) -> HashSet<SocketAddr> {
self.cluster_state.seed_addrs()
Expand Down Expand Up @@ -277,8 +285,9 @@ impl Chitchat {
}

/// Computes the node's digest.
fn compute_digest(&self) -> Digest {
self.cluster_state.compute_digest()
fn compute_digest(&self, scheduled_for_deletion_nodes: &HashSet<&ChitchatId>) -> Digest {
self.cluster_state
.compute_digest(scheduled_for_deletion_nodes)
}

/// Subscribes a callback that will be called every time a key matching the supplied prefix
Expand Down Expand Up @@ -577,9 +586,23 @@ mod tests {
}

#[tokio::test]
async fn test_dead_node_should_not_be_gossiped_when_node_joins() -> anyhow::Result<()> {
async fn test_dead_node_kvs_are_when_node_joins() -> anyhow::Result<()> {
let transport = ChannelTransport::with_mtu(MAX_UDP_DATAGRAM_PAYLOAD_SIZE);
let mut nodes = setup_nodes(40001..=40004, &transport).await;
// starting 2 nodes.
let mut nodes = setup_nodes(40001..=40002, &transport).await;

// Let's add a key to node1.
let node1_id = {
let node1 = nodes.first().unwrap();
let node1_chitchat = node1.chitchat();
node1_chitchat
.lock()
.await
.self_node_state()
.set("test_key", "test_val");
node1.chitchat_id().clone()
};

{
let node2 = nodes.get(1).unwrap();
assert_eq!(node2.chitchat_id().advertise_port(), 40002);
Expand All @@ -588,54 +611,67 @@ mod tests {
&[
ChitchatId::for_local_test(40001),
ChitchatId::for_local_test(40002),
ChitchatId::for_local_test(40003),
ChitchatId::for_local_test(40004),
],
)
.await;
let node2_chitchat = node2.chitchat();
// We have received node3's key
let value = node2_chitchat
.lock()
.await
.node_state(&node1_id)
.unwrap()
.get("test_key")
.unwrap()
.to_string();
assert_eq!(&value, "test_val");
}

// Take node 3 down.
let node3 = nodes.remove(2);
assert_eq!(node3.chitchat_id().advertise_port(), 40003);
node3.shutdown().await.unwrap();
// Take node 1 down.
let node1 = nodes.remove(0);
assert_eq!(node1.chitchat_id().advertise_port(), 40001);
node1.shutdown().await.unwrap();

// Node 2 has detected that node 1 is missing.
{
let node2 = nodes.get(1).unwrap();
let node2 = nodes.first().unwrap();
assert_eq!(node2.chitchat_id().advertise_port(), 40002);
wait_for_chitchat_state(
node2.chitchat(),
&[
ChitchatId::for_local_test(40_001),
ChitchatId::for_local_test(40002),
ChitchatId::for_local_test(40_004),
],
)
.await;
wait_for_chitchat_state(node2.chitchat(), &[ChitchatId::for_local_test(40_002)]).await;
}

// Restart node at localhost:40003 with new name
let mut new_config = ChitchatConfig::for_test(40_003);
// Restart node at localhost:40001 with new name
let mut new_config = ChitchatConfig::for_test(40_001);
new_config.chitchat_id.node_id = "new_node".to_string();
let new_chitchat_id = new_config.chitchat_id.clone();
let seed_addr = ChitchatId::for_local_test(40_002).gossip_advertise_addr;
let seed_addr = ChitchatId::for_local_test(40_001).gossip_advertise_addr;
new_config.seed_nodes = vec![seed_addr.to_string()];
let new_node_chitchat = spawn_chitchat(new_config, Vec::new(), &transport)
let new_node_chitchat_handle = spawn_chitchat(new_config, Vec::new(), &transport)
.await
.unwrap();

let new_node_chitchat = new_node_chitchat_handle.chitchat();
wait_for_chitchat_state(
new_node_chitchat.chitchat(),
&[
ChitchatId::for_local_test(40_001),
ChitchatId::for_local_test(40_002),
new_chitchat_id,
ChitchatId::for_local_test(40_004),
],
new_node_chitchat.clone(),
&[ChitchatId::for_local_test(40_002), new_chitchat_id],
)
.await;

nodes.push(new_node_chitchat);
{
let new_node_chitchat_guard = new_node_chitchat.lock().await;
let test_val = new_node_chitchat_guard
.node_state(&node1_id)
.unwrap()
.get("test_key")
.unwrap();
assert_eq!(test_val, "test_val");

// Let's check that node1 is seen as dead.
let dead_nodes: HashSet<&ChitchatId> = new_node_chitchat_guard.dead_nodes().collect();
assert_eq!(dead_nodes.len(), 1);
assert!(dead_nodes.contains(&node1_id));
}

nodes.push(new_node_chitchat_handle);
shutdown_nodes(nodes).await?;
Ok(())
}
Expand Down
11 changes: 7 additions & 4 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,12 @@ impl ClusterState {
}
}

pub fn compute_digest(&self) -> Digest {
pub fn compute_digest(&self, scheduled_for_deletion: &HashSet<&ChitchatId>) -> Digest {
Digest {
node_digests: self
.node_states
.iter()
.filter(|(chitchat_id, _)| !scheduled_for_deletion.contains(chitchat_id))
.map(|(chitchat_id, node_state)| (chitchat_id.clone(), node_state.digest()))
.collect(),
}
Expand All @@ -336,18 +337,20 @@ impl ClusterState {
}

/// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering.
///
/// Nodes that are scheduled for deletion (as passed by argument) are not shared.
pub fn compute_delta(
&self,
digest: &Digest,
mtu: usize,
dead_nodes: &HashSet<&ChitchatId>,
scheduled_for_deletion: &HashSet<&ChitchatId>,
marked_for_deletion_grace_period: u64,
) -> Delta {
let mut stale_nodes = SortedStaleNodes::default();
let mut nodes_to_reset = Vec::new();

for (chitchat_id, node_state) in &self.node_states {
if dead_nodes.contains(chitchat_id) {
if scheduled_for_deletion.contains(chitchat_id) {
continue;
}
let Some(node_digest) = digest.node_digests.get(chitchat_id) else {
Expand Down Expand Up @@ -833,7 +836,7 @@ mod tests {
node2_state.set("key_a", "");
node2_state.set("key_b", "");

let digest = cluster_state.compute_digest();
let digest = cluster_state.compute_digest(&HashSet::new());

let mut expected_node_digests = Digest::default();
expected_node_digests.add_node(node1.clone(), Heartbeat(0), 1);
Expand Down

0 comments on commit c349456

Please sign in to comment.