Skip to content

Commit

Permalink
rust: read: handle padded compressed chunks properly (#1299)
Browse files Browse the repository at this point in the history
### Changelog
- rust: fixed case where a compressed chunk with padding bytes added by
the compressor at the end would break decompression.

### Docs

None.

### Description

When MCAPs are written, the compressor may insert padding bytes at the
end of a compressed block after all useful data bytes are written. This
means that when the decompressor is reading messages out, it will have
decompressed all message bytes out of a compressed chunk _before_ it
reaches the end of the compressed data. Right now, the rust MCAP library
will see that there is more unused data in the compressed chunk, try to
decompress another message, but get no output from of the decompressor
when it tries. Right now this causes the reader to loop infinitely. This
PR adds a file to LFS which triggers this condition and fixes it.
<!-- Describe the problem, what has changed, and motivation behind those
changes. Pretend you are advocating for this change and the reader is
skeptical. -->

<!-- In addition to unit tests, describe any manual testing you did to
validate this change. -->

<table><tr><th>Before</th><th>After</th></tr><tr><td>

<!--before content goes here-->

</td><td>

<!--after content goes here-->

</td></tr></table>

<!-- If necessary, link relevant Linear or Github issues. Use `Fixes:
foxglove/repo#1234` to auto-close the Github issue or Fixes: FG-### for
Linear isses. -->
  • Loading branch information
james-rms authored Jan 6, 2025
1 parent c99a5f1 commit 1b8f30e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 13 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ]
repository = "https://github.com/foxglove/mcap"
documentation = "https://docs.rs/mcap"
readme = "README.md"
version = "0.13.2"
version = "0.13.3"
edition = "2021"
license = "MIT"

Expand Down
61 changes: 49 additions & 12 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,8 @@ struct ChunkState {
uncompressed_data_hasher: Option<crc32fast::Hasher>,
// The number of compressed bytes left in the chunk that have not been read out of `file_data`.
compressed_remaining: u64,
// The number of uncompressed bytes left in the chunk that have not been decompressed yet.
uncompressed_remaining: u64,
// The total uncompressed length of the chunk records field.
uncompressed_len: u64,
// The number of bytes in the chunk record after the `records` field ends.
Expand Down Expand Up @@ -283,6 +285,7 @@ impl LinearReader {
uncompressed_data_hasher: Some(crc32fast::Hasher::new()),
uncompressed_len: header.uncompressed_size,
compressed_remaining: header.compressed_size,
uncompressed_remaining: header.uncompressed_size,
padding_after_compressed_data: 0,
});
Ok(result)
Expand Down Expand Up @@ -354,13 +357,14 @@ impl LinearReader {

// decompress ensures that $n bytes are available in the uncompressed_content buffer.
macro_rules! decompress {
($n: expr, $remaining: expr, $decompressor:expr) => {{
($n: expr, $chunk_state: expr, $decompressor:expr) => {{
match decompress_inner(
$decompressor,
$n,
&mut self.file_data,
&mut self.decompressed_content,
$remaining,
&mut $chunk_state.compressed_remaining,
&mut $chunk_state.uncompressed_remaining,
) {
Ok(None) => {
&self.decompressed_content.data
Expand Down Expand Up @@ -502,6 +506,7 @@ impl LinearReader {
},
compressed_remaining: header.compressed_size,
uncompressed_len: header.uncompressed_size,
uncompressed_remaining: header.uncompressed_size,
padding_after_compressed_data,
crc: header.uncompressed_crc,
};
Expand Down Expand Up @@ -555,11 +560,7 @@ impl LinearReader {
self.currently_reading = ChunkRecord;
continue;
}
let _ = decompress!(
uncompressed_len,
&mut state.compressed_remaining,
decompressor
);
let _ = decompress!(uncompressed_len, state, decompressor);
}
}
}
Expand Down Expand Up @@ -591,20 +592,26 @@ impl LinearReader {
if self.decompressed_content.len() == 0 {
self.decompressed_content.clear();
}
if state.compressed_remaining == 0
if state.uncompressed_remaining == 0
&& self.decompressed_content.len() == 0
{
// We've consumed all compressed data. It's possible for there to
// still be data left in the chunk that has not yet been read into
// `self.file_data`. This can happen when a compressor adds extra
// bytes after its last frame. We need to treat this as "padding
// after the chunk" and skip over it before reading the next record.
state.padding_after_compressed_data +=
check!(len_as_usize(state.compressed_remaining));
state.compressed_remaining = 0;
self.currently_reading = PaddingAfterChunk;
continue;
}
let opcode_len_buf =
decompress!(9, &mut state.compressed_remaining, decompressor);
let opcode_len_buf = decompress!(9, state, decompressor);
let opcode = opcode_len_buf[0];
let len = check!(len_as_usize(u64::from_le_bytes(
opcode_len_buf[1..9].try_into().unwrap(),
)));
let _ =
decompress!(9 + len, &mut state.compressed_remaining, decompressor);
let _ = decompress!(9 + len, state, decompressor);
self.decompressed_content.mark_read(9);
let (start, end) = (
self.decompressed_content.start,
Expand Down Expand Up @@ -707,6 +714,7 @@ fn decompress_inner(
src_buf: &mut RwBuf,
dest_buf: &mut RwBuf,
compressed_remaining: &mut u64,
uncompressed_remaining: &mut u64,
) -> McapResult<Option<usize>> {
if dest_buf.len() >= n {
return Ok(None);
Expand All @@ -722,12 +730,16 @@ fn decompress_inner(
if dst.is_empty() {
return Ok(None);
}
if *uncompressed_remaining == 0 {
return Err(McapError::UnexpectedEoc);
}
let src_len = std::cmp::min(have, clamp_to_usize(*compressed_remaining));
let src = &src_buf.data[src_buf.start..src_buf.start + src_len];
let res = decompressor.decompress(src, dst)?;
src_buf.mark_read(res.consumed);
dest_buf.end += res.wrote;
*compressed_remaining -= res.consumed as u64;
*uncompressed_remaining -= res.wrote as u64;
}
}

Expand Down Expand Up @@ -1101,4 +1113,29 @@ mod tests {
}
Ok(())
}

#[test]
fn test_decompression_does_not_fail() {
let mut f = std::fs::File::open("tests/data/zstd_chunk_with_padding.mcap")
.expect("failed to open file");
let blocksize: usize = 1024;
let mut reader = LinearReader::new();
let mut message_count = 0;
while let Some(action) = reader.next_action() {
match action.expect("failed to get next action") {
ReadAction::GetRecord { opcode, .. } => {
if opcode == op::MESSAGE {
message_count += 1;
}
}
ReadAction::NeedMore(_) => {
let read = f
.read(reader.insert(blocksize))
.expect("failed to read from file");
reader.set_written(read);
}
}
}
assert_eq!(message_count, 12);
}
}
3 changes: 3 additions & 0 deletions rust/tests/data/zstd_chunk_with_padding.mcap
Git LFS file not shown

0 comments on commit 1b8f30e

Please sign in to comment.