Skip to content

Commit

Permalink
No delta buffer
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 12, 2024
1 parent 31df52d commit bfc4bcf
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 124 deletions.
93 changes: 55 additions & 38 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,18 +10,25 @@ pub struct Delta {
}

impl Delta {
fn get_operations<'a>(&'a self) -> impl Iterator<Item=DeltaOpRef<'a>> {
let nodes_to_reset_ops = self.nodes_to_reset.iter()
fn get_operations<'a>(&'a self) -> impl Iterator<Item = DeltaOpRef<'a>> {
let nodes_to_reset_ops = self
.nodes_to_reset
.iter()
.map(|node_to_reset| DeltaOpRef::NodeToReset(node_to_reset));
let node_deltas = self.node_deltas
let node_deltas = self
.node_deltas
.iter()
.flat_map(|(chitchat_id, node_delta)| {
std::iter::once(DeltaOpRef::Node { chitchat_id, heartbeat: node_delta.heartbeat })
.chain(
node_delta.key_values.iter().map(|(key, versioned_value)| {
DeltaOpRef::KeyValue { key, versioned_value }
})
)
std::iter::once(DeltaOpRef::Node {
chitchat_id,
heartbeat: node_delta.heartbeat,
})
.chain(node_delta.key_values.iter().map(
|(key, versioned_value)| DeltaOpRef::KeyValue {
key,
versioned_value,
},
))
});
nodes_to_reset_ops.chain(node_deltas)
}
Expand All @@ -48,7 +55,7 @@ pub(crate) enum DeltaOpRef<'a> {
KeyValue {
key: &'a str,
versioned_value: &'a VersionedValue,
}
},
}

#[repr(u8)]
Expand Down Expand Up @@ -111,26 +118,28 @@ impl Deserializable for DeltaOp {
versioned_value,
})
}

}
}
}

impl DeltaOp {
pub(crate) fn as_ref(&self) -> DeltaOpRef {
match self {
DeltaOp::Node { chitchat_id, heartbeat } => {
DeltaOpRef::Node {
chitchat_id,
heartbeat: *heartbeat,
}
},
DeltaOp::KeyValue { key, versioned_value } => {
DeltaOpRef::KeyValue { key, versioned_value }
DeltaOp::Node {
chitchat_id,
heartbeat,
} => DeltaOpRef::Node {
chitchat_id,
heartbeat: *heartbeat,
},
DeltaOp::NodeToReset(node_to_reset) => {
DeltaOpRef::NodeToReset(node_to_reset)
DeltaOp::KeyValue {
key,
versioned_value,
} => DeltaOpRef::KeyValue {
key,
versioned_value,
},
DeltaOp::NodeToReset(node_to_reset) => DeltaOpRef::NodeToReset(node_to_reset),
}
}
}
Expand Down Expand Up @@ -194,7 +203,6 @@ impl<'a> Serializable for DeltaOpRef<'a> {
}

/// Slow serializable implementation but it is only here for tests.
#[cfg(test)]
impl Serializable for Delta {
fn serialize(&self, buf: &mut Vec<u8>) {
let mut compressed_stream_writer = CompressedStreamWriter::with_block_threshold(16_384);
Expand Down Expand Up @@ -242,13 +250,18 @@ impl Delta {
}

pub fn add_node(&mut self, target_chitchat_id: ChitchatId, heartbeat: Heartbeat) {
assert!(self.node_deltas.iter().find(|(chitchat_id, node_delta)| {
chitchat_id == &target_chitchat_id
}).is_none());
self.node_deltas.push((target_chitchat_id, NodeDelta {
heartbeat,
.. Default::default()
}));
assert!(self
.node_deltas
.iter()
.find(|(chitchat_id, node_delta)| { chitchat_id == &target_chitchat_id })
.is_none());
self.node_deltas.push((
target_chitchat_id,
NodeDelta {
heartbeat,
..Default::default()
},
));
}

pub fn add_kv(
Expand All @@ -259,7 +272,11 @@ impl Delta {
version: crate::Version,
tombstone: Option<u64>,
) {
let (_, node_delta) = self.node_deltas.iter_mut().find(|(chitchat_id, _)| chitchat_id == target_chitchat_id).unwrap();
let (_, node_delta) = self
.node_deltas
.iter_mut()
.find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
.unwrap();

node_delta.max_version = node_delta.max_version.max(version);
node_delta.key_values.push((
Expand All @@ -268,12 +285,14 @@ impl Delta {
value: value.to_string(),
version,
tombstone,
})
);
},
));
}

pub(crate) fn get(&self, target_chitchat_id: &ChitchatId) -> Option<&NodeDelta> {
self.node_deltas.iter().find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
self.node_deltas
.iter()
.find(|(chitchat_id, _)| chitchat_id == target_chitchat_id)
.map(|(_, node_deltas)| node_deltas)
}

Expand Down Expand Up @@ -413,8 +432,8 @@ impl DeltaWriter {
self.add_op(delta_op)
}

pub fn finish(self) -> Vec<u8> {
self.compressed_stream_writer.finish()
pub fn finish(self) -> Delta {
self.delta_builder.finish()
}
}

Expand Down Expand Up @@ -525,9 +544,7 @@ mod tests {

#[track_caller]
fn test_aux_delta_writer(delta_writer: DeltaWriter, expected_len: usize) {
let delta_payload: Vec<u8> = delta_writer.finish();
assert_eq!(delta_payload.len(), expected_len);
let delta = Delta::deserialize(&mut &delta_payload[..]).unwrap();
let delta: Delta = delta_writer.finish();
test_serdeser_aux(&delta, expected_len)
}

Expand Down
37 changes: 10 additions & 27 deletions chitchat/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use delta::Delta;
use failure_detector::FailureDetector;
pub use failure_detector::FailureDetectorConfig;
pub use listener::ListenerHandle;
use serialize::Deserializable;
pub use serialize::Serializable;
use tokio::sync::watch;
use tokio_stream::wrappers::WatchStream;
Expand All @@ -30,7 +29,6 @@ use tracing::{error, warn};
pub use self::configuration::ChitchatConfig;
pub use self::state::{ClusterStateSnapshot, NodeState};
use crate::digest::Digest;
use crate::message::syn_ack_serialized_len;
pub use crate::message::ChitchatMessage;
pub use crate::server::{spawn_chitchat, ChitchatHandle};
use crate::state::ClusterState;
Expand Down Expand Up @@ -95,17 +93,6 @@ impl Chitchat {
}
}

fn process_delta_payload(&mut self, mut delta_payload: &[u8]) {
let Ok(delta) = Delta::deserialize(&mut delta_payload) else {
warn!("invalid delta");
return;
};
if !delta_payload.is_empty() {
warn!("data remaining after delta");
}
self.process_delta(delta);
}

fn process_delta(&mut self, delta: Delta) {
self.report_heartbeats(&delta);
self.cluster_state.apply_delta(delta);
Expand All @@ -124,35 +111,31 @@ impl Chitchat {
}
let dead_nodes: HashSet<_> = self.dead_nodes().collect();
let self_digest = self.compute_digest();
let delta_mtu =
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - syn_ack_serialized_len(&digest, &Vec::new());
let delta_payload = self.cluster_state.compute_delta(
let delta_mtu = MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - digest.serialized_len();
let delta = self.cluster_state.compute_delta(
&digest,
delta_mtu,
&dead_nodes,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::SynAck {
digest: self_digest,
delta_payload,
delta,
})
}
ChitchatMessage::SynAck {
digest,
delta_payload,
} => {
self.process_delta_payload(&delta_payload);
ChitchatMessage::SynAck { digest, delta } => {
self.process_delta(delta);
let dead_nodes = self.dead_nodes().collect::<HashSet<_>>();
let delta_payload = self.cluster_state.compute_delta(
let delta = self.cluster_state.compute_delta(
&digest,
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1 - Vec::<u8>::new().serialized_len(),
MAX_UDP_DATAGRAM_PAYLOAD_SIZE - 1,
&dead_nodes,
self.config.marked_for_deletion_grace_period as u64,
);
Some(ChitchatMessage::Ack { delta_payload })
Some(ChitchatMessage::Ack { delta })
}
ChitchatMessage::Ack { delta_payload } => {
self.process_delta_payload(&delta_payload);
ChitchatMessage::Ack { delta } => {
self.process_delta(delta);
None
}
ChitchatMessage::BadCluster => {
Expand Down
Loading

0 comments on commit bfc4bcf

Please sign in to comment.