Skip to content

Commit

Permalink
fix: get correct encode blob info (#545)
Browse files Browse the repository at this point in the history
* fix: get correct encode blob info

* fix: is_snapshot

* test: fix test case

* test: update ts tests

* chore: add change file

* chore: fix warnings
  • Loading branch information
zxch3n authored Nov 9, 2024
1 parent 715bf75 commit 8486234
Show file tree
Hide file tree
Showing 11 changed files with 290 additions and 51 deletions.
5 changes: 5 additions & 0 deletions .changeset/strong-penguins-peel.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"loro-crdt": patch
---

Fix get encoded blob meta
11 changes: 7 additions & 4 deletions crates/loro-ffi/src/doc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,11 @@ impl LoroDoc {

/// Decodes the metadata for an imported blob from the provided bytes.
#[inline]
pub fn decode_import_blob_meta(&self, bytes: &[u8]) -> LoroResult<ImportBlobMetadata> {
let s = InnerLoroDoc::decode_import_blob_meta(bytes)?;
pub fn decode_import_blob_meta(
bytes: &[u8],
check_checksum: bool,
) -> LoroResult<ImportBlobMetadata> {
let s = InnerLoroDoc::decode_import_blob_meta(bytes, check_checksum)?;
Ok(s.into())
}

Expand Down Expand Up @@ -626,7 +629,7 @@ pub struct ImportBlobMetadata {
pub start_frontiers: Arc<Frontiers>,
pub end_timestamp: i64,
pub change_num: u32,
pub is_snapshot: bool,
pub mode: String,
}

impl From<loro::ImportBlobMetadata> for ImportBlobMetadata {
Expand All @@ -638,7 +641,7 @@ impl From<loro::ImportBlobMetadata> for ImportBlobMetadata {
start_frontiers: Arc::new(value.start_frontiers.into()),
end_timestamp: value.end_timestamp,
change_num: value.change_num,
is_snapshot: value.is_snapshot,
mode: value.mode.to_string(),
}
}
}
Expand Down
59 changes: 53 additions & 6 deletions crates/loro-internal/src/encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -372,7 +372,10 @@ impl ParsedHeaderAndBody<'_> {
}

const MIN_HEADER_SIZE: usize = 22;
pub(crate) fn parse_header_and_body(bytes: &[u8]) -> Result<ParsedHeaderAndBody, LoroError> {
pub(crate) fn parse_header_and_body(
bytes: &[u8],
check_checksum: bool,
) -> Result<ParsedHeaderAndBody, LoroError> {
let reader = &bytes;
if bytes.len() < MIN_HEADER_SIZE {
return Err(LoroError::DecodeError("Invalid import data".into()));
Expand All @@ -396,7 +399,9 @@ pub(crate) fn parse_header_and_body(bytes: &[u8]) -> Result<ParsedHeaderAndBody,
body: reader,
};

ans.check_checksum()?;
if check_checksum {
ans.check_checksum()?;
}
Ok(ans)
}

Expand Down Expand Up @@ -525,6 +530,38 @@ pub(crate) fn decode_snapshot(
})
}

#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum EncodedBlobMode {
OutdatedRle,
OutdatedSnapshot,
Snapshot,
ShallowSnapshot,
Updates,
}

impl std::fmt::Display for EncodedBlobMode {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.write_str(match self {
EncodedBlobMode::OutdatedRle => "outdated-update",
EncodedBlobMode::OutdatedSnapshot => "outdated-snapshot",
EncodedBlobMode::Snapshot => "snapshot",
EncodedBlobMode::ShallowSnapshot => "shallow-snapshot",
EncodedBlobMode::Updates => "update",
})
}
}

impl EncodedBlobMode {
pub fn is_snapshot(&self) -> bool {
matches!(
self,
EncodedBlobMode::Snapshot
| EncodedBlobMode::ShallowSnapshot
| EncodedBlobMode::OutdatedSnapshot
)
}
}

#[derive(Debug, Clone)]
pub struct ImportBlobMetadata {
/// The partial start version vector.
Expand All @@ -543,19 +580,29 @@ pub struct ImportBlobMetadata {
pub start_frontiers: Frontiers,
pub end_timestamp: i64,
pub change_num: u32,
pub is_snapshot: bool,
pub mode: EncodedBlobMode,
}

impl LoroDoc {
/// Decodes the metadata for an imported blob from the provided bytes.
pub fn decode_import_blob_meta(blob: &[u8]) -> LoroResult<ImportBlobMetadata> {
outdated_encode_reordered::decode_import_blob_meta(blob)
pub fn decode_import_blob_meta(
blob: &[u8],
check_checksum: bool,
) -> LoroResult<ImportBlobMetadata> {
let parsed = parse_header_and_body(blob, check_checksum)?;
match parsed.mode {
EncodeMode::Auto => unreachable!(),
EncodeMode::OutdatedRle | EncodeMode::OutdatedSnapshot => {
outdated_encode_reordered::decode_import_blob_meta(parsed)
}
EncodeMode::FastSnapshot => fast_snapshot::decode_snapshot_blob_meta(parsed),
EncodeMode::FastUpdates => fast_snapshot::decode_updates_blob_meta(parsed),
}
}
}

#[cfg(test)]
mod test {


use loro_common::{loro_value, ContainerID, ContainerType, LoroValue, ID};

Expand Down
75 changes: 73 additions & 2 deletions crates/loro-internal/src/encoding/fast_snapshot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ use crate::{
change::Change, encoding::shallow_snapshot, oplog::ChangeStore, LoroDoc, OpLog, VersionVector,
};
use bytes::{Buf, Bytes};
use loro_common::{IdSpan, LoroError, LoroResult};
use loro_common::{HasCounterSpan, IdSpan, LoroError, LoroResult};
use tracing::trace;

use super::{EncodedBlobMode, ImportBlobMetadata, ParsedHeaderAndBody};
pub(crate) const EMPTY_MARK: &[u8] = b"E";
pub(crate) struct Snapshot {
pub oplog_bytes: Bytes,
Expand Down Expand Up @@ -63,6 +65,23 @@ pub(super) fn _decode_snapshot_bytes(bytes: Bytes) -> LoroResult<Snapshot> {
})
}

pub(super) fn _decode_snapshot_meta_partial(bytes: &[u8]) -> (&[u8], bool) {
let mut r = bytes;
let oplog_bytes_len = read_u32_le_slice(&mut r) as usize;
let oplog_bytes = &r[..oplog_bytes_len];
r = &r[oplog_bytes_len..];
let state_bytes_len = read_u32_le_slice(&mut r) as usize;
r = &r[state_bytes_len..];
let shallow_bytes_len = read_u32_le_slice(&mut r) as usize;
(oplog_bytes, shallow_bytes_len > 0)
}

fn read_u32_le_slice(r: &mut &[u8]) -> u32 {
let mut buf = [0; 4];
r.read_exact(&mut buf).unwrap();
u32::from_le_bytes(buf)
}

fn read_u32_le(r: &mut bytes::buf::Reader<Bytes>) -> u32 {
let mut buf = [0; 4];
r.read_exact(&mut buf).unwrap();
Expand Down Expand Up @@ -247,7 +266,6 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<Vec<Chang
let old_reader_len = reader.len();
let len = leb128::read::unsigned(&mut reader).unwrap() as usize;
index += old_reader_len - reader.len();
trace!("index={}", index);
let block_bytes = body.slice(index..index + len);
trace!("decoded block_bytes = {:?}", &block_bytes);
let new_changes = ChangeStore::decode_block_bytes(block_bytes, &oplog.arena, self_vv)?;
Expand All @@ -259,3 +277,56 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, body: Bytes) -> Result<Vec<Chang
changes.sort_unstable_by_key(|x| x.lamport);
Ok(changes)
}

pub(crate) fn decode_snapshot_blob_meta(
parsed: ParsedHeaderAndBody,
) -> LoroResult<ImportBlobMetadata> {
let (oplog_bytes, is_shallow) = _decode_snapshot_meta_partial(parsed.body);
let mode = if is_shallow {
EncodedBlobMode::ShallowSnapshot
} else {
EncodedBlobMode::Snapshot
};

let doc = LoroDoc::new();
let mut oplog = doc.oplog.try_lock().unwrap();
oplog.decode_change_store(oplog_bytes.to_vec().into())?;
let timestamp = oplog.get_greatest_timestamp(oplog.dag.frontiers());
let f = oplog.dag.shallow_since_frontiers().clone();
let start_timestamp = oplog.get_timestamp_of_version(&f);
let change_num = oplog.change_store().change_num() as u32;

Ok(ImportBlobMetadata {
mode,
partial_start_vv: oplog.dag.shallow_since_vv().to_vv(),
partial_end_vv: oplog.vv().clone(),
start_timestamp,
start_frontiers: f,
end_timestamp: timestamp,
change_num,
})
}

pub(crate) fn decode_updates_blob_meta(
parsed: ParsedHeaderAndBody,
) -> LoroResult<ImportBlobMetadata> {
let doc = LoroDoc::new();
let mut oplog = doc.oplog.try_lock().unwrap();
let changes = decode_updates(&mut oplog, parsed.body.to_vec().into())?;
let mut start_vv = VersionVector::new();
let mut end_vv = VersionVector::new();
for c in changes.iter() {
start_vv.insert(c.id.peer, c.id.counter);
end_vv.insert(c.id.peer, c.ctr_end());
}

Ok(ImportBlobMetadata {
mode: EncodedBlobMode::Updates,
partial_start_vv: start_vv,
partial_end_vv: end_vv,
start_timestamp: changes.first().map(|x| x.timestamp).unwrap_or(0),
start_frontiers: Default::default(),
end_timestamp: changes.last().map(|x| x.timestamp).unwrap_or(0),
change_num: changes.len() as u32,
})
}
16 changes: 9 additions & 7 deletions crates/loro-internal/src/encoding/outdated_encode_reordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,12 @@ use crate::{

use self::encode::{encode_changes, encode_ops, init_encode, TempOp};

use super::ImportStatus;
use super::{
arena::*,
parse_header_and_body,
value::{Value, ValueDecodedArenasTrait, ValueKind, ValueReader, ValueWriter},
ImportBlobMetadata,
};
use super::{ImportStatus, ParsedHeaderAndBody};

pub(crate) use crate::encoding::value_register::ValueRegister;

Expand Down Expand Up @@ -169,9 +168,7 @@ pub(crate) fn decode_updates(oplog: &mut OpLog, bytes: &[u8]) -> LoroResult<Vec<
Ok(changes)
}

pub fn decode_import_blob_meta(bytes: &[u8]) -> LoroResult<ImportBlobMetadata> {
let parsed = parse_header_and_body(bytes)?;
let is_snapshot = parsed.mode.is_snapshot();
pub fn decode_import_blob_meta(parsed: ParsedHeaderAndBody) -> LoroResult<ImportBlobMetadata> {
let iterators = serde_columnar::iter_from_bytes::<EncodedDoc>(parsed.body)?;
let DecodedArenas { peer_ids, .. } = decode_arena(&iterators.arenas)?;
let start_vv: VersionVector = iterators
Expand Down Expand Up @@ -210,7 +207,13 @@ pub fn decode_import_blob_meta(bytes: &[u8]) -> LoroResult<ImportBlobMetadata> {
}

Ok(ImportBlobMetadata {
is_snapshot,
mode: match parsed.mode {
super::EncodeMode::OutdatedRle => super::EncodedBlobMode::OutdatedRle,
super::EncodeMode::OutdatedSnapshot => super::EncodedBlobMode::OutdatedSnapshot,
super::EncodeMode::FastSnapshot => super::EncodedBlobMode::Snapshot,
super::EncodeMode::FastUpdates => super::EncodedBlobMode::Updates,
super::EncodeMode::Auto => unreachable!(),
},
start_frontiers: frontiers,
partial_start_vv: start_vv,
partial_end_vv: VersionVector::from_iter(
Expand Down Expand Up @@ -1666,7 +1669,6 @@ struct EncodedStateInfo {

#[cfg(test)]
mod test {


use loro_common::LoroValue;

Expand Down
4 changes: 2 additions & 2 deletions crates/loro-internal/src/loro.rs
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ impl LoroDoc {

pub fn from_snapshot(bytes: &[u8]) -> LoroResult<Self> {
let doc = Self::new();
let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes)?;
let ParsedHeaderAndBody { mode, body, .. } = parse_header_and_body(bytes, true)?;
if mode.is_snapshot() {
decode_snapshot(&doc, mode, body)?;
Ok(doc)
Expand Down Expand Up @@ -424,7 +424,7 @@ impl LoroDoc {
origin: InternalString,
) -> Result<ImportStatus, LoroError> {
ensure_cov::notify_cov("loro_internal::import");
let parsed = parse_header_and_body(bytes)?;
let parsed = parse_header_and_body(bytes, true)?;
info!("Importing with mode={:?}", &parsed.mode);
let result = match parsed.mode {
EncodeMode::OutdatedRle => {
Expand Down
4 changes: 2 additions & 2 deletions crates/loro-wasm/src/convert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ impl From<ImportBlobMetadata> for JsImportBlobMetadata {
let end_vv: JsValue = end_vv.into();
let start_timestamp: JsValue = JsValue::from_f64(meta.start_timestamp as f64);
let end_timestamp: JsValue = JsValue::from_f64(meta.end_timestamp as f64);
let is_snapshot: JsValue = JsValue::from_bool(meta.is_snapshot);
let mode: JsValue = JsValue::from_str(&meta.mode.to_string());
let change_num: JsValue = JsValue::from_f64(meta.change_num as f64);
let ans = Object::new();
js_sys::Reflect::set(
Expand All @@ -335,7 +335,7 @@ impl From<ImportBlobMetadata> for JsImportBlobMetadata {
js_sys::Reflect::set(&ans, &JsValue::from_str("startFrontiers"), &js_frontiers).unwrap();
js_sys::Reflect::set(&ans, &JsValue::from_str("startTimestamp"), &start_timestamp).unwrap();
js_sys::Reflect::set(&ans, &JsValue::from_str("endTimestamp"), &end_timestamp).unwrap();
js_sys::Reflect::set(&ans, &JsValue::from_str("isSnapshot"), &is_snapshot).unwrap();
js_sys::Reflect::set(&ans, &JsValue::from_str("mode"), &mode).unwrap();
js_sys::Reflect::set(&ans, &JsValue::from_str("changeNum"), &change_num).unwrap();
let ans: JsValue = ans.into();
ans.into()
Expand Down
11 changes: 7 additions & 4 deletions crates/loro-wasm/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4362,11 +4362,14 @@ impl Container {
/// - endVersionVector
/// - startTimestamp
/// - endTimestamp
/// - isSnapshot
/// - mode
/// - changeNum
#[wasm_bindgen(js_name = "decodeImportBlobMeta")]
pub fn decode_import_blob_meta(blob: &[u8]) -> JsResult<JsImportBlobMetadata> {
let meta: ImportBlobMetadata = LoroDocInner::decode_import_blob_meta(blob)?;
pub fn decode_import_blob_meta(
blob: &[u8],
check_checksum: bool,
) -> JsResult<JsImportBlobMetadata> {
let meta: ImportBlobMetadata = LoroDocInner::decode_import_blob_meta(blob, check_checksum)?;
Ok(meta.into())
}

Expand Down Expand Up @@ -4670,7 +4673,7 @@ export interface ImportBlobMetadata {
startFrontiers: OpId[],
startTimestamp: number;
endTimestamp: number;
isSnapshot: boolean;
mode: "outdated-snapshot" | "outdated-update" | "snapshot" | "shallow-snapshot" | "update";
changeNum: number;
}
Expand Down
Loading

0 comments on commit 8486234

Please sign in to comment.