diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d43c192233..8ba03b9a03 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ] repository = "https://github.com/foxglove/mcap" documentation = "https://docs.rs/mcap" readme = "README.md" -version = "0.13.2" +version = "0.13.3" edition = "2021" license = "MIT" diff --git a/rust/src/sans_io/read.rs b/rust/src/sans_io/read.rs index 5d38293f73..67d6b53ca3 100644 --- a/rust/src/sans_io/read.rs +++ b/rust/src/sans_io/read.rs @@ -46,6 +46,8 @@ struct ChunkState { uncompressed_data_hasher: Option, // The number of compressed bytes left in the chunk that have not been read out of `file_data`. compressed_remaining: u64, + // The number of uncompressed bytes left in the chunk that have not been decompressed yet. + uncompressed_remaining: u64, // The total uncompressed length of the chunk records field. uncompressed_len: u64, // The number of bytes in the chunk record after the `records` field ends. @@ -283,6 +285,7 @@ impl LinearReader { uncompressed_data_hasher: Some(crc32fast::Hasher::new()), uncompressed_len: header.uncompressed_size, compressed_remaining: header.compressed_size, + uncompressed_remaining: header.uncompressed_size, padding_after_compressed_data: 0, }); Ok(result) @@ -354,13 +357,14 @@ impl LinearReader { // decompress ensures that $n bytes are available in the uncompressed_content buffer. macro_rules! decompress { - ($n: expr, $remaining: expr, $decompressor:expr) => {{ + ($n: expr, $chunk_state: expr, $decompressor:expr) => {{ match decompress_inner( $decompressor, $n, &mut self.file_data, &mut self.decompressed_content, - $remaining, + &mut $chunk_state.compressed_remaining, + &mut $chunk_state.uncompressed_remaining, ) { Ok(None) => { &self.decompressed_content.data @@ -502,6 +506,7 @@ impl LinearReader { }, compressed_remaining: header.compressed_size, uncompressed_len: header.uncompressed_size, + uncompressed_remaining: header.uncompressed_size, padding_after_compressed_data, crc: header.uncompressed_crc, }; @@ -555,11 +560,7 @@ impl LinearReader { self.currently_reading = ChunkRecord; continue; } - let _ = decompress!( - uncompressed_len, - &mut state.compressed_remaining, - decompressor - ); + let _ = decompress!(uncompressed_len, state, decompressor); } } } @@ -591,20 +592,26 @@ impl LinearReader { if self.decompressed_content.len() == 0 { self.decompressed_content.clear(); } - if state.compressed_remaining == 0 + if state.uncompressed_remaining == 0 && self.decompressed_content.len() == 0 { + // We've consumed all compressed data. It's possible for there to + // still be data left in the chunk that has not yet been read into + // `self.file_data`. This can happen when a compressor adds extra + // bytes after its last frame. We need to treat this as "padding + // after the chunk" and skip over it before reading the next record. + state.padding_after_compressed_data += + check!(len_as_usize(state.compressed_remaining)); + state.compressed_remaining = 0; self.currently_reading = PaddingAfterChunk; continue; } - let opcode_len_buf = - decompress!(9, &mut state.compressed_remaining, decompressor); + let opcode_len_buf = decompress!(9, state, decompressor); let opcode = opcode_len_buf[0]; let len = check!(len_as_usize(u64::from_le_bytes( opcode_len_buf[1..9].try_into().unwrap(), ))); - let _ = - decompress!(9 + len, &mut state.compressed_remaining, decompressor); + let _ = decompress!(9 + len, state, decompressor); self.decompressed_content.mark_read(9); let (start, end) = ( self.decompressed_content.start, @@ -707,6 +714,7 @@ fn decompress_inner( src_buf: &mut RwBuf, dest_buf: &mut RwBuf, compressed_remaining: &mut u64, + uncompressed_remaining: &mut u64, ) -> McapResult> { if dest_buf.len() >= n { return Ok(None); @@ -722,12 +730,16 @@ fn decompress_inner( if dst.is_empty() { return Ok(None); } + if *uncompressed_remaining == 0 { + return Err(McapError::UnexpectedEoc); + } let src_len = std::cmp::min(have, clamp_to_usize(*compressed_remaining)); let src = &src_buf.data[src_buf.start..src_buf.start + src_len]; let res = decompressor.decompress(src, dst)?; src_buf.mark_read(res.consumed); dest_buf.end += res.wrote; *compressed_remaining -= res.consumed as u64; + *uncompressed_remaining -= res.wrote as u64; } } @@ -1101,4 +1113,29 @@ mod tests { } Ok(()) } + + #[test] + fn test_decompression_does_not_fail() { + let mut f = std::fs::File::open("tests/data/zstd_chunk_with_padding.mcap") + .expect("failed to open file"); + let blocksize: usize = 1024; + let mut reader = LinearReader::new(); + let mut message_count = 0; + while let Some(action) = reader.next_action() { + match action.expect("failed to get next action") { + ReadAction::GetRecord { opcode, .. } => { + if opcode == op::MESSAGE { + message_count += 1; + } + } + ReadAction::NeedMore(_) => { + let read = f + .read(reader.insert(blocksize)) + .expect("failed to read from file"); + reader.set_written(read); + } + } + } + assert_eq!(message_count, 12); + } } diff --git a/rust/tests/data/zstd_chunk_with_padding.mcap b/rust/tests/data/zstd_chunk_with_padding.mcap new file mode 100644 index 0000000000..118cc01c86 --- /dev/null +++ b/rust/tests/data/zstd_chunk_with_padding.mcap @@ -0,0 +1,3 @@ +version https://git-lfs.github.com/spec/v1 +oid sha256:403825d5e8b1415c3b148ed2e299becdc5a437a8ab9fa103c740f79890892e85 +size 1459251