From 1b8f30e118ea2d211e8b37560d7686577421eb35 Mon Sep 17 00:00:00 2001 From: james-rms Date: Tue, 7 Jan 2025 08:05:28 +1100 Subject: [PATCH] rust: read: handle padded compressed chunks properly (#1299) ### Changelog - rust: fixed case where a compressed chunk with padding bytes added by the compressor at the end would break decompression. ### Docs None. ### Description When MCAPs are written, the compressor may insert padding bytes at the end of a compressed block after all useful data bytes are written. This means that when the decompressor is reading messages out, it will have decompressed all message bytes out of a compressed chunk _before_ it reaches the end of the compressed data. Right now, the rust MCAP library will see that there is more unused data in the compressed chunk, try to decompress another message, but get no output from of the decompressor when it tries. Right now this causes the reader to loop infinitely. This PR adds a file to LFS which triggers this condition and fixes it.
BeforeAfter
--- rust/Cargo.toml | 2 +- rust/src/sans_io/read.rs | 61 ++++++++++++++++---- rust/tests/data/zstd_chunk_with_padding.mcap | 3 + 3 files changed, 53 insertions(+), 13 deletions(-) create mode 100644 rust/tests/data/zstd_chunk_with_padding.mcap diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d43c192233..8ba03b9a03 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ] repository = "https://github.com/foxglove/mcap" documentation = "https://docs.rs/mcap" readme = "README.md" -version = "0.13.2" +version = "0.13.3" edition = "2021" license = "MIT" diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index 5d38293f73..67d6b53ca3 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -46,6 +46,8 @@ struct ChunkState { uncompressed_data_hasher: Option, // The number of compressed bytes left in the chunk that have not been read out of `file_data`. compressed_remaining: u64, + // The number of uncompressed bytes left in the chunk that have not been decompressed yet. + uncompressed_remaining: u64, // The total uncompressed length of the chunk records field. uncompressed_len: u64, // The number of bytes in the chunk record after the `records` field ends. @@ -283,6 +285,7 @@ impl LinearReader { uncompressed_data_hasher: Some(crc32fast::Hasher::new()), uncompressed_len: header.uncompressed_size, compressed_remaining: header.compressed_size, + uncompressed_remaining: header.uncompressed_size, padding_after_compressed_data: 0, }); Ok(result) @@ -354,13 +357,14 @@ impl LinearReader { // decompress ensures that $n bytes are available in the uncompressed_content buffer. macro_rules! decompress { - ($n: expr, $remaining: expr, $decompressor:expr) => {{ + ($n: expr, $chunk_state: expr, $decompressor:expr) => {{ match decompress_inner( $decompressor, $n, &mut self.file_data, &mut self.decompressed_content, - $remaining, + &mut $chunk_state.compressed_remaining, + &mut $chunk_state.uncompressed_remaining, ) { Ok(None) => { &self.decompressed_content.data @@ -502,6 +506,7 @@ impl LinearReader { }, compressed_remaining: header.compressed_size, uncompressed_len: header.uncompressed_size, + uncompressed_remaining: header.uncompressed_size, padding_after_compressed_data, crc: header.uncompressed_crc, }; @@ -555,11 +560,7 @@ impl LinearReader { self.currently_reading = ChunkRecord; continue; } - let _ = decompress!( - uncompressed_len, - &mut state.compressed_remaining, - decompressor - ); + let _ = decompress!(uncompressed_len, state, decompressor); } } } @@ -591,20 +592,26 @@ impl LinearReader { if self.decompressed_content.len() == 0 { self.decompressed_content.clear(); } - if state.compressed_remaining == 0 + if state.uncompressed_remaining == 0 && self.decompressed_content.len() == 0 { + // We've consumed all compressed data. It's possible for there to + // still be data left in the chunk that has not yet been read into + // `self.file_data`. This can happen when a compressor adds extra + // bytes after its last frame. We need to treat this as "padding + // after the chunk" and skip over it before reading the next record. + state.padding_after_compressed_data += + check!(len_as_usize(state.compressed_remaining)); + state.compressed_remaining = 0; self.currently_reading = PaddingAfterChunk; continue; } - let opcode_len_buf = - decompress!(9, &mut state.compressed_remaining, decompressor); + let opcode_len_buf = decompress!(9, state, decompressor); let opcode = opcode_len_buf[0]; let len = check!(len_as_usize(u64::from_le_bytes( opcode_len_buf[1..9].try_into().unwrap(), ))); - let _ = - decompress!(9 + len, &mut state.compressed_remaining, decompressor); + let _ = decompress!(9 + len, state, decompressor); self.decompressed_content.mark_read(9); let (start, end) = ( self.decompressed_content.start, @@ -707,6 +714,7 @@ fn decompress_inner( src_buf: &mut RwBuf, dest_buf: &mut RwBuf, compressed_remaining: &mut u64, + uncompressed_remaining: &mut u64, ) -> McapResult> { if dest_buf.len() >= n { return Ok(None); @@ -722,12 +730,16 @@ fn decompress_inner( if dst.is_empty() { return Ok(None); } + if *uncompressed_remaining == 0 { + return Err(McapError::UnexpectedEoc); + } let src_len = std::cmp::min(have, clamp_to_usize(*compressed_remaining)); let src = &src_buf.data[src_buf.start..src_buf.start + src_len]; let res = decompressor.decompress(src, dst)?; src_buf.mark_read(res.consumed); dest_buf.end += res.wrote; *compressed_remaining -= res.consumed as u64; + *uncompressed_remaining -= res.wrote as u64; } } @@ -1101,4 +1113,29 @@ mod tests { } Ok(()) } + + #[test] + fn test_decompression_does_not_fail() { + let mut f = std::fs::File::open("tests/data/zstd_chunk_with_padding.mcap") + .expect("failed to open file"); + let blocksize: usize = 1024; + let mut reader = LinearReader::new(); + let mut message_count = 0; + while let Some(action) = reader.next_action() { + match action.expect("failed to get next action") { + ReadAction::GetRecord { opcode, .. } => { + if opcode == op::MESSAGE { + message_count += 1; + } + } + ReadAction::NeedMore(_) => { + let read = f + .read(reader.insert(blocksize)) + .expect("failed to read from file"); + reader.set_written(read); + } + } + } + assert_eq!(message_count, 12); + } } diff --git a/rust/tests/data/zstd_chunk_with_padding.mcap b/rust/tests/data/zstd_chunk_with_padding.mcap new file mode 100644 index 0000000000..118cc01c86 --- /dev/null +++ b/rust/tests/data/zstd_chunk_with_padding.mcap @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:403825d5e8b1415c3b148ed2e299becdc5a437a8ab9fa103c740f79890892e85 +size 1459251