diff --git a/chitchat/src/delta.rs b/chitchat/src/delta.rs index 5399560..5066062 100644 --- a/chitchat/src/delta.rs +++ b/chitchat/src/delta.rs @@ -6,7 +6,7 @@ use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue}; #[derive(Debug, Default, Eq, PartialEq)] pub struct Delta { pub(crate) nodes_to_reset: Vec, - pub(crate) node_deltas: Vec<(ChitchatId, NodeDelta)>, + pub(crate) node_deltas: Vec, } impl Delta { @@ -15,21 +15,18 @@ impl Delta { .nodes_to_reset .iter() .map(|node_to_reset| DeltaOpRef::NodeToReset(node_to_reset)); - let node_deltas = self - .node_deltas - .iter() - .flat_map(|(chitchat_id, node_delta)| { - std::iter::once(DeltaOpRef::Node { - chitchat_id, - heartbeat: node_delta.heartbeat, - }) - .chain(node_delta.key_values.iter().map( - |(key, versioned_value)| DeltaOpRef::KeyValue { - key, - versioned_value, - }, - )) - }); + let node_deltas = self.node_deltas.iter().flat_map(|node_delta| { + std::iter::once(DeltaOpRef::Node { + chitchat_id: &node_delta.chitchat_id, + heartbeat: node_delta.heartbeat, + }) + .chain(node_delta.key_values.iter().map(|(key, versioned_value)| { + DeltaOpRef::KeyValue { + key, + versioned_value, + } + })) + }); nodes_to_reset_ops.chain(node_deltas) } } @@ -245,39 +242,37 @@ impl Delta { pub fn num_tuples(&self) -> usize { self.node_deltas .iter() - .map(|(_, node_delta)| node_delta.num_tuples()) + .map(|node_delta| node_delta.num_tuples()) .sum() } - pub fn add_node(&mut self, target_chitchat_id: ChitchatId, heartbeat: Heartbeat) { + pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) { assert!(self .node_deltas .iter() - .find(|(chitchat_id, node_delta)| { chitchat_id == &target_chitchat_id }) + .find(|node_delta| { &node_delta.chitchat_id == &chitchat_id }) .is_none()); - self.node_deltas.push(( - target_chitchat_id, - NodeDelta { - heartbeat, - ..Default::default() - }, - )); + self.node_deltas.push(NodeDelta { + chitchat_id, + heartbeat, + key_values: Vec::new(), + max_version: 0, + }); } pub fn add_kv( &mut self, - target_chitchat_id: &ChitchatId, + chitchat_id: &ChitchatId, key: &str, value: &str, version: crate::Version, tombstone: Option, ) { - let (_, node_delta) = self + let node_delta = self .node_deltas .iter_mut() - .find(|(chitchat_id, _)| chitchat_id == target_chitchat_id) + .find(|node_delta| &node_delta.chitchat_id == chitchat_id) .unwrap(); - node_delta.max_version = node_delta.max_version.max(version); node_delta.key_values.push(( key.to_string(), @@ -289,11 +284,10 @@ impl Delta { )); } - pub(crate) fn get(&self, target_chitchat_id: &ChitchatId) -> Option<&NodeDelta> { + pub(crate) fn get(&self, chitchat_id: &ChitchatId) -> Option<&NodeDelta> { self.node_deltas .iter() - .find(|(chitchat_id, _)| chitchat_id == target_chitchat_id) - .map(|(_, node_deltas)| node_deltas) + .find(|node_delta| &node_delta.chitchat_id == chitchat_id) } pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) { @@ -301,8 +295,9 @@ impl Delta { } } -#[derive(Debug, Default, Eq, PartialEq, serde::Serialize)] +#[derive(Debug, Eq, PartialEq, serde::Serialize)] pub(crate) struct NodeDelta { + pub chitchat_id: ChitchatId, pub heartbeat: Heartbeat, pub key_values: Vec<(String, VersionedValue)>, // This attribute is computed upon deserialization. 0 if `key_values` is empty. @@ -320,8 +315,7 @@ impl NodeDelta { pub(crate) struct DeltaBuilder { existing_nodes: HashSet, delta: Delta, - current_chitchat_id: Option, - current_node_delta: NodeDelta, + current_node_delta: Option, } impl DeltaBuilder { @@ -336,22 +330,27 @@ impl DeltaBuilder { chitchat_id, heartbeat, } => { - anyhow::ensure!(self.current_chitchat_id.as_ref() != Some(&chitchat_id)); + self.flush(); anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id)); self.existing_nodes.insert(chitchat_id.clone()); - self.flush(); - self.current_chitchat_id = Some(chitchat_id); - self.current_node_delta.heartbeat = heartbeat; + // TODO what happens with `max_version` if we don't have any kv fitting the mtu? + self.current_node_delta = Some(NodeDelta { + chitchat_id, + heartbeat, + key_values: Vec::new(), + max_version: 0, + }); } DeltaOp::KeyValue { key, versioned_value, } => { - self.current_node_delta.max_version = self - .current_node_delta - .max_version - .max(versioned_value.version); - self.current_node_delta + let Some(current_node_delta) = self.current_node_delta.as_mut() else { + anyhow::bail!("received a key-value op without a node op before."); + }; + current_node_delta.max_version = + current_node_delta.max_version.max(versioned_value.version); + current_node_delta .key_values .push((key.to_string(), versioned_value)); } @@ -364,14 +363,13 @@ impl DeltaBuilder { } fn flush(&mut self) { - let Some(chitchat_id) = std::mem::take(&mut self.current_chitchat_id) else { + let Some(node_delta) = self.current_node_delta.take() else { // There are no nodes in the builder. // (this happens when the delta builder is freshly created and no ops have been received // yet.) return; }; - let node_delta = std::mem::take(&mut self.current_node_delta); - self.delta.node_deltas.push((chitchat_id, node_delta)); + self.delta.node_deltas.push(node_delta); } } diff --git a/chitchat/src/lib.rs b/chitchat/src/lib.rs index af4bbaf..a3c878a 100644 --- a/chitchat/src/lib.rs +++ b/chitchat/src/lib.rs @@ -112,7 +112,7 @@ impl Chitchat { let dead_nodes: HashSet<_> = self.dead_nodes().collect(); let self_digest = self.compute_digest(); let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest.serialized_len(); - let delta = self.cluster_state.compute_delta( + let delta = self.cluster_state.compute_partial_delta_respecting_mtu( &digest, delta_mtu, &dead_nodes, @@ -126,7 +126,7 @@ impl Chitchat { ChitchatMessage::SynAck { digest, delta } => { self.process_delta(delta); let dead_nodes = self.dead_nodes().collect::>(); - let delta = self.cluster_state.compute_delta( + let delta = self.cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1, &dead_nodes, @@ -156,16 +156,19 @@ impl Chitchat { /// Reports heartbeats to the failure detector for nodes in the delta for which we received an /// update. fn report_heartbeats(&mut self, delta: &Delta) { - for (chitchat_id, node_delta) in &delta.node_deltas { - if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) { + for node_delta in &delta.node_deltas { + if let Some(node_state) = self.cluster_state.node_states.get(&node_delta.chitchat_id) { if node_state.heartbeat() < node_delta.heartbeat || node_state.max_version() < node_delta.max_version { - self.failure_detector.report_heartbeat(chitchat_id); + self.failure_detector + .report_heartbeat(&node_delta.chitchat_id); } } else { - self.failure_detector.report_unknown(chitchat_id); - self.failure_detector.update_node_liveness(chitchat_id); + self.failure_detector + .report_unknown(&node_delta.chitchat_id); + self.failure_detector + .update_node_liveness(&node_delta.chitchat_id); } } } diff --git a/chitchat/src/server.rs b/chitchat/src/server.rs index 481cbaa..4124607 100644 --- a/chitchat/src/server.rs +++ b/chitchat/src/server.rs @@ -414,9 +414,7 @@ mod tests { use tokio_stream::{Stream, StreamExt}; use super::*; - use crate::delta::Delta; use crate::message::ChitchatMessage; - use crate::serialize::Deserializable; use crate::transport::{ChannelTransport, Transport}; use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE}; diff --git a/chitchat/src/state.rs b/chitchat/src/state.rs index d42bb66..e13ba88 100644 --- a/chitchat/src/state.rs +++ b/chitchat/src/state.rs @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize}; use tokio::sync::watch; use tracing::warn; -use crate::delta::{Delta, DeltaWriter}; +use crate::delta::{Delta, DeltaWriter, NodeDelta}; use crate::digest::{Digest, NodeDigest}; use crate::listener::Listeners; use crate::{ChitchatId, Heartbeat, KeyChangeEvent, MaxVersion, Version, VersionedValue}; @@ -296,17 +296,23 @@ impl ClusterState { .retain(|chitchat_id, _| !delta.nodes_to_reset.contains(chitchat_id)); // Apply delta. - for (chitchat_id, node_delta) in delta.node_deltas { + for node_delta in delta.node_deltas { + let NodeDelta { + chitchat_id, + heartbeat, + key_values, + max_version, + } = node_delta; let node_state = self .node_states .entry(chitchat_id.clone()) .or_insert_with(|| NodeState::new(chitchat_id, self.listeners.clone())); - if node_state.heartbeat < node_delta.heartbeat { - node_state.heartbeat = node_delta.heartbeat; + if node_state.heartbeat < heartbeat { + node_state.heartbeat = heartbeat; node_state.last_heartbeat = Instant::now(); } - node_state.max_version = node_state.max_version.max(node_delta.max_version); - for (key, versioned_value) in node_delta.key_values { + node_state.max_version = node_state.max_version.max(max_version); + for (key, versioned_value) in key_values { node_state.max_version = node_state.max_version.max(versioned_value.version); node_state.set_versioned_value(key, versioned_value); } @@ -337,7 +343,7 @@ impl ClusterState { } /// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering. - pub fn compute_delta( + pub fn compute_partial_delta_respecting_mtu( &self, digest: &Digest, mtu: usize, @@ -535,7 +541,7 @@ fn random_generator() -> impl Rng { #[cfg(test)] mod tests { use super::*; - use crate::serialize::{Deserializable, Serializable}; + use crate::serialize::Serializable; use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE; #[test] @@ -944,12 +950,12 @@ mod tests { dead_nodes: &HashSet<&ChitchatId>, expected_delta_atoms: &[(&ChitchatId, &str, &str, Version, Option)], ) { - let max_delta = cluster_state.compute_delta(digest, usize::MAX, dead_nodes, 10_000); + let max_delta = cluster_state.compute_partial_delta_respecting_mtu(digest, usize::MAX, dead_nodes, 10_000); let mut buf = Vec::new(); Serializable::serialize(&max_delta, &mut buf); let mut mtu_per_num_entries = Vec::new(); for mtu in 100..buf.len() { - let delta = cluster_state.compute_delta(digest, mtu, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000); let num_tuples = delta.num_tuples(); if mtu_per_num_entries.len() == num_tuples + 1 { continue; @@ -965,11 +971,11 @@ mod tests { expected_delta.add_kv(node, key, val, version, tombstone); } { - let delta = cluster_state.compute_delta(digest, mtu, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000); assert_eq!(&delta, &expected_delta); } { - let delta = cluster_state.compute_delta(digest, mtu + 1, dead_nodes, 10_000); + let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu + 1, dead_nodes, 10_000); assert_eq!(&delta, &expected_delta); } } @@ -1098,7 +1104,7 @@ mod tests { let node1 = ChitchatId::for_local_test(10_001); digest.add_node(node1.clone(), Heartbeat(0), 1); { - let delta = cluster_state.compute_delta( + let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(), @@ -1116,7 +1122,7 @@ mod tests { // Node 1 heartbeat in digest + grace period (9_999) is inferior to the // node1's hearbeat in the cluster state. Thus we expect the cluster to compute a // delta that will reset node 1. - let delta = cluster_state.compute_delta( + let delta = cluster_state.compute_partial_delta_respecting_mtu( &digest, MAX_UDP_DATAGRAM_PAYLOAD_SIZE, &HashSet::new(),