Skip to content

Commit

Permalink
code simplification
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 12, 2024
1 parent bfc4bcf commit f4d8661
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 71 deletions.
94 changes: 46 additions & 48 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ use crate::{ChitchatId, Heartbeat, MaxVersion, VersionedValue};
#[derive(Debug, Default, Eq, PartialEq)]
pub struct Delta {
pub(crate) nodes_to_reset: Vec<ChitchatId>,
pub(crate) node_deltas: Vec<(ChitchatId, NodeDelta)>,
pub(crate) node_deltas: Vec<NodeDelta>,
}

impl Delta {
Expand All @@ -15,21 +15,18 @@ impl Delta {
.nodes_to_reset
.iter()
.map(|node_to_reset| DeltaOpRef::NodeToReset(node_to_reset));
let node_deltas = self
.node_deltas
.iter()
.flat_map(|(chitchat_id, node_delta)| {
std::iter::once(DeltaOpRef::Node {
chitchat_id,
heartbeat: node_delta.heartbeat,
})
.chain(node_delta.key_values.iter().map(
|(key, versioned_value)| DeltaOpRef::KeyValue {
key,
versioned_value,
},
))
});
let node_deltas = self.node_deltas.iter().flat_map(|node_delta| {
std::iter::once(DeltaOpRef::Node {
chitchat_id: &node_delta.chitchat_id,
heartbeat: node_delta.heartbeat,
})
.chain(node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue {
key,
versioned_value,
}
}))
});
nodes_to_reset_ops.chain(node_deltas)
}
}
Expand Down Expand Up @@ -245,39 +242,37 @@ impl Delta {
pub fn num_tuples(&self) -> usize {
self.node_deltas
.iter()
.map(|(_, node_delta)| node_delta.num_tuples())
.map(|node_delta| node_delta.num_tuples())
.sum()
}

pub fn add_node(&mut self, target_chitchat_id: ChitchatId, heartbeat: Heartbeat) {
pub fn add_node(&mut self, chitchat_id: ChitchatId, heartbeat: Heartbeat) {
assert!(self
.node_deltas
.iter()
.find(|(chitchat_id, node_delta)| { chitchat_id == &target_chitchat_id })
.find(|node_delta| { &node_delta.chitchat_id == &chitchat_id })
.is_none());
self.node_deltas.push((
target_chitchat_id,
NodeDelta {
heartbeat,
..Default::default()
},
));
self.node_deltas.push(NodeDelta {
chitchat_id,
heartbeat,
key_values: Vec::new(),
max_version: 0,
});
}

pub fn add_kv(
&mut self,
target_chitchat_id: &ChitchatId,
chitchat_id: &ChitchatId,
key: &str,
value: &str,
version: crate::Version,
tombstone: Option<u64>,
) {
let (_, node_delta) = self
let node_delta = self
.node_deltas
.iter_mut()
.find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
.unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.push((
key.to_string(),
Expand All @@ -289,20 +284,20 @@ impl Delta {
));
}

pub(crate) fn get(&self, target_chitchat_id: &ChitchatId) -> Option<&NodeDelta> {
pub(crate) fn get(&self, chitchat_id: &ChitchatId) -> Option<&NodeDelta> {
self.node_deltas
.iter()
.find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
.map(|(_, node_deltas)| node_deltas)
.find(|node_delta| &node_delta.chitchat_id == chitchat_id)
}

pub fn add_node_to_reset(&mut self, chitchat_id: ChitchatId) {
self.nodes_to_reset.push(chitchat_id);
}
}

#[derive(Debug, Default, Eq, PartialEq, serde::Serialize)]
#[derive(Debug, Eq, PartialEq, serde::Serialize)]
pub(crate) struct NodeDelta {
pub chitchat_id: ChitchatId,
pub heartbeat: Heartbeat,
pub key_values: Vec<(String, VersionedValue)>,
// This attribute is computed upon deserialization. 0 if `key_values` is empty.
Expand All @@ -320,8 +315,7 @@ impl NodeDelta {
pub(crate) struct DeltaBuilder {
existing_nodes: HashSet<ChitchatId>,
delta: Delta,
current_chitchat_id: Option<ChitchatId>,
current_node_delta: NodeDelta,
current_node_delta: Option<NodeDelta>,
}

impl DeltaBuilder {
Expand All @@ -336,22 +330,27 @@ impl DeltaBuilder {
chitchat_id,
heartbeat,
} => {
anyhow::ensure!(self.current_chitchat_id.as_ref() != Some(&chitchat_id));
self.flush();
anyhow::ensure!(!self.existing_nodes.contains(&chitchat_id));
self.existing_nodes.insert(chitchat_id.clone());
self.flush();
self.current_chitchat_id = Some(chitchat_id);
self.current_node_delta.heartbeat = heartbeat;
// TODO what happens with `max_version` if we don't have any kv fitting the mtu?
self.current_node_delta = Some(NodeDelta {
chitchat_id,
heartbeat,
key_values: Vec::new(),
max_version: 0,
});
}
DeltaOp::KeyValue {
key,
versioned_value,
} => {
self.current_node_delta.max_version = self
.current_node_delta
.max_version
.max(versioned_value.version);
self.current_node_delta
let Some(current_node_delta) = self.current_node_delta.as_mut() else {
anyhow::bail!("received a key-value op without a node op before.");
};
current_node_delta.max_version =
current_node_delta.max_version.max(versioned_value.version);
current_node_delta
.key_values
.push((key.to_string(), versioned_value));
}
Expand All @@ -364,14 +363,13 @@ impl DeltaBuilder {
}

fn flush(&mut self) {
let Some(chitchat_id) = std::mem::take(&mut self.current_chitchat_id) else {
let Some(node_delta) = self.current_node_delta.take() else {
// There are no nodes in the builder.
// (this happens when the delta builder is freshly created and no ops have been received
// yet.)
return;
};
let node_delta = std::mem::take(&mut self.current_node_delta);
self.delta.node_deltas.push((chitchat_id, node_delta));
self.delta.node_deltas.push(node_delta);
}
}

Expand Down
17 changes: 10 additions & 7 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl Chitchat {
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest();
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest.serialized_len();
let delta = self.cluster_state.compute_delta(
let delta = self.cluster_state.compute_partial_delta_respecting_mtu(
&digest,
delta_mtu,
&dead_nodes,
Expand All @@ -126,7 +126,7 @@ impl Chitchat {
ChitchatMessage::SynAck { digest, delta } => {
self.process_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta = self.cluster_state.compute_delta(
let delta = self.cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&dead_nodes,
Expand Down Expand Up @@ -156,16 +156,19 @@ impl Chitchat {
/// Reports heartbeats to the failure detector for nodes in the delta for which we received an
/// update.
fn report_heartbeats(&mut self, delta: &Delta) {
for (chitchat_id, node_delta) in &delta.node_deltas {
if let Some(node_state) = self.cluster_state.node_states.get(chitchat_id) {
for node_delta in &delta.node_deltas {
if let Some(node_state) = self.cluster_state.node_states.get(&node_delta.chitchat_id) {
if node_state.heartbeat() < node_delta.heartbeat
|| node_state.max_version() < node_delta.max_version
{
self.failure_detector.report_heartbeat(chitchat_id);
self.failure_detector
.report_heartbeat(&node_delta.chitchat_id);
}
} else {
self.failure_detector.report_unknown(chitchat_id);
self.failure_detector.update_node_liveness(chitchat_id);
self.failure_detector
.report_unknown(&node_delta.chitchat_id);
self.failure_detector
.update_node_liveness(&node_delta.chitchat_id);
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions chitchat/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -414,9 +414,7 @@ mod tests {
use tokio_stream::{Stream, StreamExt};

use super::*;
use crate::delta::Delta;
use crate::message::ChitchatMessage;
use crate::serialize::Deserializable;
use crate::transport::{ChannelTransport, Transport};
use crate::{Heartbeat, NodeState, MAX_UDP_DATAGRAM_PAYLOAD_SIZE};

Expand Down
34 changes: 20 additions & 14 deletions chitchat/src/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use serde::{Deserialize, Serialize};
use tokio::sync::watch;
use tracing::warn;

use crate::delta::{Delta, DeltaWriter};
use crate::delta::{Delta, DeltaWriter, NodeDelta};
use crate::digest::{Digest, NodeDigest};
use crate::listener::Listeners;
use crate::{ChitchatId, Heartbeat, KeyChangeEvent, MaxVersion, Version, VersionedValue};
Expand Down Expand Up @@ -296,17 +296,23 @@ impl ClusterState {
.retain(|chitchat_id, _| !delta.nodes_to_reset.contains(chitchat_id));

// Apply delta.
for (chitchat_id, node_delta) in delta.node_deltas {
for node_delta in delta.node_deltas {
let NodeDelta {
chitchat_id,
heartbeat,
key_values,
max_version,
} = node_delta;
let node_state = self
.node_states
.entry(chitchat_id.clone())
.or_insert_with(|| NodeState::new(chitchat_id, self.listeners.clone()));
if node_state.heartbeat < node_delta.heartbeat {
node_state.heartbeat = node_delta.heartbeat;
if node_state.heartbeat < heartbeat {
node_state.heartbeat = heartbeat;
node_state.last_heartbeat = Instant::now();
}
node_state.max_version = node_state.max_version.max(node_delta.max_version);
for (key, versioned_value) in node_delta.key_values {
node_state.max_version = node_state.max_version.max(max_version);
for (key, versioned_value) in key_values {
node_state.max_version = node_state.max_version.max(versioned_value.version);
node_state.set_versioned_value(key, versioned_value);
}
Expand Down Expand Up @@ -337,7 +343,7 @@ impl ClusterState {
}

/// Implements the Scuttlebutt reconciliation with the scuttle-depth ordering.
pub fn compute_delta(
pub fn compute_partial_delta_respecting_mtu(
&self,
digest: &Digest,
mtu: usize,
Expand Down Expand Up @@ -535,7 +541,7 @@ fn random_generator() -> impl Rng {
#[cfg(test)]
mod tests {
use super::*;
use crate::serialize::{Deserializable, Serializable};
use crate::serialize::Serializable;
use crate::MAX_UDP_DATAGRAM_PAYLOAD_SIZE;

#[test]
Expand Down Expand Up @@ -944,12 +950,12 @@ mod tests {
dead_nodes: &HashSet<&ChitchatId>,
expected_delta_atoms: &[(&ChitchatId, &str, &str, Version, Option<u64>)],
) {
let max_delta = cluster_state.compute_delta(digest, usize::MAX, dead_nodes, 10_000);
let max_delta = cluster_state.compute_partial_delta_respecting_mtu(digest, usize::MAX, dead_nodes, 10_000);
let mut buf = Vec::new();
Serializable::serialize(&max_delta, &mut buf);
let mut mtu_per_num_entries = Vec::new();
for mtu in 100..buf.len() {
let delta = cluster_state.compute_delta(digest, mtu, dead_nodes, 10_000);
let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000);
let num_tuples = delta.num_tuples();
if mtu_per_num_entries.len() == num_tuples + 1 {
continue;
Expand All @@ -965,11 +971,11 @@ mod tests {
expected_delta.add_kv(node, key, val, version, tombstone);
}
{
let delta = cluster_state.compute_delta(digest, mtu, dead_nodes, 10_000);
let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu, dead_nodes, 10_000);
assert_eq!(&delta, &expected_delta);
}
{
let delta = cluster_state.compute_delta(digest, mtu + 1, dead_nodes, 10_000);
let delta = cluster_state.compute_partial_delta_respecting_mtu(digest, mtu + 1, dead_nodes, 10_000);
assert_eq!(&delta, &expected_delta);
}
}
Expand Down Expand Up @@ -1098,7 +1104,7 @@ mod tests {
let node1 = ChitchatId::for_local_test(10_001);
digest.add_node(node1.clone(), Heartbeat(0), 1);
{
let delta = cluster_state.compute_delta(
let delta = cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
&HashSet::new(),
Expand All @@ -1116,7 +1122,7 @@ mod tests {
// Node 1 heartbeat in digest + grace period (9_999) is inferior to the
// node1's hearbeat in the cluster state. Thus we expect the cluster to compute a
// delta that will reset node 1.
let delta = cluster_state.compute_delta(
let delta = cluster_state.compute_partial_delta_respecting_mtu(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE,
&HashSet::new(),
Expand Down

0 comments on commit f4d8661

Please sign in to comment.