Skip to content

Commit

Permalink
rust: LinearReader: reclaim dead space when possible (#1298)
Browse files Browse the repository at this point in the history
### Changelog
- Rust: fixed a bug where a reader not reading the exact amount required
into the LinearReader might cause the internal buffer to grow unbounded.

### Docs

<!-- Link to a Docs PR, tracking ticket in Linear, OR write "None" if no
documentation changes are needed. -->

### Description

<!-- Describe the problem, what has changed, and motivation behind those
changes. Pretend you are advocating for this change and the reader is
skeptical. -->

<!-- In addition to unit tests, describe any manual testing you did to
validate this change. -->

<table><tr><th>Before</th><th>After</th></tr><tr><td>

<!--before content goes here-->

</td><td>

<!--after content goes here-->

</td></tr></table>

<!-- If necessary, link relevant Linear or Github issues. Use `Fixes:
foxglove/repo#1234` to auto-close the Github issue or Fixes: FG-### for
Linear isses. -->
  • Loading branch information
james-rms authored Dec 19, 2024
1 parent 9e49a99 commit c99a5f1
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 5 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ categories = [ "science::robotics", "compression" ]
repository = "https://github.com/foxglove/mcap"
documentation = "https://docs.rs/mcap"
readme = "README.md"
version = "0.13.1"
version = "0.13.2"
edition = "2021"
license = "MIT"

Expand Down
80 changes: 76 additions & 4 deletions rust/src/sans_io/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,14 @@ impl RwBuf {
// returns a mutable view of the un-written part of the buffer, resizing as needed to ensure
// N bytes are available to write into.
fn tail_with_size(&mut self, n: usize) -> &mut [u8] {
let unread_len = self.end - self.start;
// Compact the output buffer if there is sufficient free space and there is more free
// than used.
if self.start > 4096 && self.start > unread_len {
self.data.copy_within(self.start..self.end, 0);
self.start = 0;
self.end = unread_len;
}
let desired_end = self.end + n;
self.data.resize(desired_end, 0);
&mut self.data[self.end..]
Expand Down Expand Up @@ -318,10 +326,6 @@ impl LinearReader {
self.file_data.end += written;
}

if self.file_data.len() == 0 {
self.file_data.clear();
}

/// Macros for loading data into the reader. These return early with NeedMore(n) if
/// more data is needed.
///
Expand Down Expand Up @@ -1029,4 +1033,72 @@ mod tests {
);
Ok(())
}

// Ensures that the internal buffer for the linear reader gets compacted regularly and does not
// expand unbounded.
#[test]
fn test_buffer_compaction() -> McapResult<()> {
let mut buf = Vec::new();
{
let mut cursor = std::io::Cursor::new(buf);
let data = Vec::from_iter(std::iter::repeat(0x20u8).take(1024 * 1024 * 4));
let mut writer = crate::WriteOptions::new()
.compression(None)
.chunk_size(None)
.create(&mut cursor)?;
let channel = std::sync::Arc::new(crate::Channel {
topic: "chat".to_owned(),
schema: None,
message_encoding: "json".to_owned(),
metadata: BTreeMap::new(),
});
writer.add_channel(&channel)?;
for n in 0..3 {
writer.write(&crate::Message {
channel: channel.clone(),
sequence: n,
log_time: n as u64,
publish_time: n as u64,
data: std::borrow::Cow::Borrowed(&data[..]),
})?;
if n == 1 {
writer.flush()?;
}
}
writer.finish()?;
drop(writer);
buf = cursor.into_inner();
}
let mut reader = LinearReader::new();
let mut cursor = std::io::Cursor::new(buf);
let mut opcodes: Vec<u8> = Vec::new();
let mut iter_count = 0;
let mut max_needed: usize = 0;
while let Some(action) = reader.next_action() {
match action? {
ReadAction::NeedMore(n) => {
max_needed = std::cmp::max(max_needed, n);
// read slightly more than requested, such that the data in the buffer does not
// hit zero after the next action.
let written = cursor.read(reader.insert(n + 1))?;
reader.set_written(written);
let buffer_size = reader.file_data.data.len();
assert!(
buffer_size < std::cmp::max(max_needed * 2, 4096),
"max needed: {0}, buffer size: {1}",
max_needed,
buffer_size
);
}
ReadAction::GetRecord { data, opcode } => {
opcodes.push(opcode);
parse_record(opcode, data)?;
}
}
iter_count += 1;
// guard against infinite loop
assert!(iter_count < 10000);
}
Ok(())
}
}

0 comments on commit c99a5f1

Please sign in to comment.