Skip to content

Commit

Permalink
introducing deserializable
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed Feb 9, 2024
1 parent 783bfd0 commit 5ef5af5
Show file tree
Hide file tree
Showing 6 changed files with 504 additions and 115 deletions.
9 changes: 8 additions & 1 deletion chitchat/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,16 @@ bytes = "1"
itertools = "0.12"
rand = { version = "0.8", features = ["small_rng"] }
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.28.0", features = ["net", "sync", "rt-multi-thread", "macros", "time"] }
tokio = { version = "1.28.0", features = [
"net",
"sync",
"rt-multi-thread",
"macros",
"time",
] }
tokio-stream = { version = "0.1", features = ["sync"] }
tracing = "0.1"
zstd = "0.13"

[dev-dependencies]
assert-json-diff = "2"
Expand Down
163 changes: 129 additions & 34 deletions chitchat/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,98 @@ pub struct Delta {
pub(crate) nodes_to_reset: HashSet<ChitchatId>,
}

enum DeltaOp {
Node(ChitchatId),
KeyValue {
key: String,

Check failure on line 16 in chitchat/src/delta.rs

View workflow job for this annotation

GitHub Actions / clippy

fields `key` and `versioned_value` are never read

error: fields `key` and `versioned_value` are never read --> chitchat/src/delta.rs:16:9 | 15 | KeyValue { | -------- fields in this variant 16 | key: String, | ^^^ 17 | versioned_value: VersionedValue, | ^^^^^^^^^^^^^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`

Check failure on line 16 in chitchat/src/delta.rs

View workflow job for this annotation

GitHub Actions / clippy

fields `key` and `versioned_value` are never read

error: fields `key` and `versioned_value` are never read --> chitchat/src/delta.rs:16:9 | 15 | KeyValue { | -------- fields in this variant 16 | key: String, | ^^^ 17 | versioned_value: VersionedValue, | ^^^^^^^^^^^^^^^ | = note: `-D dead-code` implied by `-D warnings` = help: to override `-D warnings` add `#[allow(dead_code)]`
versioned_value: VersionedValue,
},
NodesToReset(ChitchatId),
}

impl Deserializable for DeltaOp {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let tag_bytes: [u8; 1] = Deserializable::deserialize(buf)?;
let tag = tag_bytes[0];
match tag {
0 => {
let chitchat_id = ChitchatId::deserialize(buf)?;
Ok(DeltaOp::Node(chitchat_id))
}
1 => {
let key = String::deserialize(buf)?;
let value = String::deserialize(buf)?;
let version = u64::deserialize(buf)?;
let tombstone = Option::<u64>::deserialize(buf)?;
let versioned_value: VersionedValue = VersionedValue {
value,
version,
tombstone,
};
Ok(DeltaOp::KeyValue {
key,
versioned_value,
})
}
2 => {
let chitchat_id = ChitchatId::deserialize(buf)?;
Ok(DeltaOp::NodesToReset(chitchat_id))
}
_ => Err(anyhow::anyhow!("Invalid tag: {}", tag)),
}
}
}

enum DeltaOpRef<'a> {
Node(&'a ChitchatId),

Check failure on line 56 in chitchat/src/delta.rs

View workflow job for this annotation

GitHub Actions / clippy

variants `Node`, `KeyValue`, and `NodesToReset` are never constructed

error: variants `Node`, `KeyValue`, and `NodesToReset` are never constructed --> chitchat/src/delta.rs:56:5 | 55 | enum DeltaOpRef<'a> { | ---------- variants in this enum 56 | Node(&'a ChitchatId), | ^^^^ 57 | KeyValue { | ^^^^^^^^ ... 61 | NodesToReset(&'a ChitchatId), | ^^^^^^^^^^^^

Check failure on line 56 in chitchat/src/delta.rs

View workflow job for this annotation

GitHub Actions / clippy

variants `Node`, `KeyValue`, and `NodesToReset` are never constructed

error: variants `Node`, `KeyValue`, and `NodesToReset` are never constructed --> chitchat/src/delta.rs:56:5 | 55 | enum DeltaOpRef<'a> { | ---------- variants in this enum 56 | Node(&'a ChitchatId), | ^^^^ 57 | KeyValue { | ^^^^^^^^ ... 61 | NodesToReset(&'a ChitchatId), | ^^^^^^^^^^^^
KeyValue {
key: &'a str,
versioned_value: &'a VersionedValue,
},
NodesToReset(&'a ChitchatId),
}

impl<'a> Serializable for DeltaOpRef<'a> {
fn serialize(&self, buf: &mut Vec<u8>) {
match self {
DeltaOpRef::Node(chitchat_id) => {
buf.push(0u8);
chitchat_id.serialize(buf);
}
DeltaOpRef::KeyValue {
key,
versioned_value,
} => {
buf.push(1u8);
key.serialize(buf);
versioned_value.value.serialize(buf);
versioned_value.version.serialize(buf);
versioned_value.tombstone.serialize(buf);
}
DeltaOpRef::NodesToReset(chitchat_id) => {
buf.push(2u8);
chitchat_id.serialize(buf);
}
}
}

fn serialized_len(&self) -> usize {
1 + match self {
DeltaOpRef::Node(chitchat_id) => chitchat_id.serialized_len(),
DeltaOpRef::KeyValue {
key,
versioned_value,
} => {
key.serialized_len()
+ versioned_value.value.serialized_len()
+ versioned_value.version.serialized_len()
+ versioned_value.tombstone.serialized_len()
}
DeltaOpRef::NodesToReset(chitchat_id) => chitchat_id.serialized_len(),
}
}
}

impl Serializable for Delta {
fn serialize(&self, buf: &mut Vec<u8>) {
(self.node_deltas.len() as u16).serialize(buf);
Expand All @@ -23,6 +115,21 @@ impl Serializable for Delta {
}
}

fn serialized_len(&self) -> usize {
let mut len = 2;
for (chitchat_id, node_delta) in &self.node_deltas {
len += chitchat_id.serialized_len();
len += node_delta.serialized_len();
}
len += 2;
for chitchat_id in &self.nodes_to_reset {
len += chitchat_id.serialized_len();
}
len
}
}

impl Deserializable for Delta {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let mut node_deltas: BTreeMap<ChitchatId, NodeDelta> = Default::default();
let num_nodes = u16::deserialize(buf)?;
Expand All @@ -42,19 +149,6 @@ impl Serializable for Delta {
nodes_to_reset,
})
}

fn serialized_len(&self) -> usize {
let mut len = 2;
for (chitchat_id, node_delta) in &self.node_deltas {
len += chitchat_id.serialized_len();
len += node_delta.serialized_len();
}
len += 2;
for chitchat_id in &self.nodes_to_reset {
len += chitchat_id.serialized_len();
}
len
}
}

#[cfg(test)]
Expand Down Expand Up @@ -234,7 +328,29 @@ impl Serializable for NodeDelta {
tombstone.serialize(buf);
}
}
fn serialized_len(&self) -> usize {
let mut len = 2;
len += self.heartbeat.serialized_len();

for (
key,
VersionedValue {
value,
version,
tombstone,
},
) in &self.key_values
{
len += key.serialized_len();
len += value.serialized_len();
len += version.serialized_len();
len += tombstone.serialized_len();
}
len
}
}

impl Deserializable for NodeDelta {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let heartbeat = Heartbeat::deserialize(buf)?;
let mut key_values: BTreeMap<String, VersionedValue> = Default::default();
Expand All @@ -261,27 +377,6 @@ impl Serializable for NodeDelta {
max_version,
})
}

fn serialized_len(&self) -> usize {
let mut len = 2;
len += self.heartbeat.serialized_len();

for (
key,
VersionedValue {
value,
version,
tombstone,
},
) in &self.key_values
{
len += key.serialized_len();
len += value.serialized_len();
len += version.serialized_len();
len += tombstone.serialized_len();
}
len
}
}

#[cfg(test)]
Expand Down
21 changes: 11 additions & 10 deletions chitchat/src/digest.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,18 @@ impl Serializable for Digest {
node_digest.max_version.serialize(buf);
}
}
fn serialized_len(&self) -> usize {
let mut len = (self.node_digests.len() as u16).serialized_len();
for (chitchat_id, node_digest) in &self.node_digests {
len += chitchat_id.serialized_len();
len += node_digest.heartbeat.serialized_len();
len += node_digest.max_version.serialized_len();
}
len
}
}

impl Deserializable for Digest {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let num_nodes = u16::deserialize(buf)?;
let mut node_digests: BTreeMap<ChitchatId, NodeDigest> = Default::default();
Expand All @@ -59,14 +70,4 @@ impl Serializable for Digest {
}
Ok(Digest { node_digests })
}

fn serialized_len(&self) -> usize {
let mut len = (self.node_digests.len() as u16).serialized_len();
for (chitchat_id, node_digest) in &self.node_digests {
len += chitchat_id.serialized_len();
len += node_digest.heartbeat.serialized_len();
len += node_digest.max_version.serialized_len();
}
len
}
}
26 changes: 14 additions & 12 deletions chitchat/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use anyhow::Context;

use crate::delta::Delta;
use crate::digest::Digest;
use crate::serialize::Serializable;
use crate::serialize::{Deserializable, Serializable};

/// Chitchat message.
///
Expand Down Expand Up @@ -73,6 +73,19 @@ impl Serializable for ChitchatMessage {
}
}

fn serialized_len(&self) -> usize {
match self {
ChitchatMessage::Syn { cluster_id, digest } => {
1 + cluster_id.serialized_len() + digest.serialized_len()
}
ChitchatMessage::SynAck { digest, delta } => syn_ack_serialized_len(digest, delta),
ChitchatMessage::Ack { delta } => 1 + delta.serialized_len(),
ChitchatMessage::BadCluster => 1,
}
}
}

impl Deserializable for ChitchatMessage {
fn deserialize(buf: &mut &[u8]) -> anyhow::Result<Self> {
let code = buf
.first()
Expand All @@ -98,17 +111,6 @@ impl Serializable for ChitchatMessage {
MessageType::BadCluster => Ok(Self::BadCluster),
}
}

fn serialized_len(&self) -> usize {
match self {
ChitchatMessage::Syn { cluster_id, digest } => {
1 + cluster_id.serialized_len() + digest.serialized_len()
}
ChitchatMessage::SynAck { digest, delta } => syn_ack_serialized_len(digest, delta),
ChitchatMessage::Ack { delta } => 1 + delta.serialized_len(),
ChitchatMessage::BadCluster => 1,
}
}
}

pub(crate) fn syn_ack_serialized_len(digest: &Digest, delta: &Delta) -> usize {
Expand Down
Loading

0 comments on commit 5ef5af5

Please sign in to comment.