Skip to content

Commit

Permalink
feat(util): introduce a "fused" body combinator
Browse files Browse the repository at this point in the history
this commit introduces a new `Body` combinator to the `http-body-util`
library, `http_body_util::combinators::Fuse<B>`.

this combinator is roughly equivalent to the `std::iter::Fuse<I>`
iterator, which returns `None` after the inner iterator returns it once.

while bodies *should* return `Poll::Ready(None)` indefinitely after
reaching the end of the stream or returning an error, this combinator
can help prevent further polling of an underlying body implementation,
in the same manner that `std::iter::Iterator::fuse()` helps prevent an
underlying iterator that might e.g. yield `Some(value)` after yielding
`None`, or panic.

Signed-off-by: katelyn martin <[email protected]>
  • Loading branch information
cratelyn committed Feb 3, 2025
1 parent 1090bff commit 4c77dab
Show file tree
Hide file tree
Showing 3 changed files with 247 additions and 0 deletions.
232 changes: 232 additions & 0 deletions http-body-util/src/combinators/fuse.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
use std::{
pin::Pin,
task::{Context, Poll},
};

use http_body::{Body, Frame, SizeHint};

/// A "fused" [`Body`].
///
/// This [`Body`] yields [`Poll::Ready(None)`] forever after the underlying body yields
/// [`Poll::Ready(None)`], or an error [`Poll::Ready(Some(Err(_)))`], once.
///
/// Bodies should ideally continue to return [`Poll::Ready(None)`] indefinitely after the end of
/// the stream is reached. [`Fuse<B>`] avoids polling its underlying body `B` further after the
/// underlying stream as ended, which can be useful for implementation that cannot uphold this
/// guarantee.
///
/// This is akin to the functionality that [`std::iter::Iterator::fuse()`] provides for
/// [`Iterator`][std::iter::Iterator]s.
#[derive(Debug)]
pub struct Fuse<B> {
inner: Option<B>,
}

impl<B> Fuse<B>
where
B: Body,
{
/// Returns a fused body.
pub fn new(body: B) -> Self {
Self {
inner: if body.is_end_stream() {
None
} else {
Some(body)
},
}
}
}

impl<B> Body for Fuse<B>
where
B: Body + Unpin,
{
type Data = B::Data;
type Error = B::Error;

fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<B::Data>, B::Error>>> {
let Self { inner } = self.get_mut();

let Some((frame, eos)) =
inner
.as_mut()
.map(|mut inner| match Pin::new(&mut inner).poll_frame(cx) {
frame @ Poll::Ready(Some(Ok(_))) => (frame, inner.is_end_stream()),
end @ Poll::Ready(Some(Err(_)) | None) => (end, true),
poll @ Poll::Pending => (poll, false),
})
else {
return Poll::Ready(None);
};

eos.then(|| inner.take());
frame
}

fn is_end_stream(&self) -> bool {
self.inner.is_none()
}

fn size_hint(&self) -> SizeHint {
self.inner
.as_ref()
.map(B::size_hint)
.unwrap_or_else(|| SizeHint::with_exact(0))
}
}

#[cfg(test)]
mod tests {
use super::*;
use bytes::Bytes;
use std::collections::VecDeque;

/// A value returned by a call to [`Body::poll_frame()`].
type PollFrame = Poll<Option<Result<Frame<Bytes>, Error>>>;

type Error = &'static str;

struct Mock<'count> {
poll_count: &'count mut u8,
polls: VecDeque<PollFrame>,
}

#[test]
fn empty_never_polls() {
let mut count = 0_u8;
let empty = Mock::new(&mut count, []);
debug_assert!(empty.is_end_stream());
let fused = Fuse::new(empty);
assert!(fused.inner.is_none());
drop(fused);
assert_eq!(count, 0);
}

#[test]
fn stops_polling_after_none() {
let mut count = 0_u8;
let empty = Mock::new(&mut count, [Poll::Ready(None)]);
debug_assert!(!empty.is_end_stream());
let mut fused = Fuse::new(empty);
assert!(fused.inner.is_some());

let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);
match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(None) => {}
other => panic!("unexpected poll outcome: {:?}", other),
}

assert!(fused.inner.is_none());
match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(None) => {}
other => panic!("unexpected poll outcome: {:?}", other),
}

drop(fused);
assert_eq!(count, 1);
}

#[test]
fn stops_polling_after_some_eos() {
let mut count = 0_u8;
let body = Mock::new(
&mut count,
[Poll::Ready(Some(Ok(Frame::data(Bytes::from_static(
b"hello",
)))))],
);
debug_assert!(!body.is_end_stream());
let mut fused = Fuse::new(body);
assert!(fused.inner.is_some());

let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);

match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes.into_data().expect("data"), "hello"),
other => panic!("unexpected poll outcome: {:?}", other),
}

assert!(fused.inner.is_none());
match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(None) => {}
other => panic!("unexpected poll outcome: {:?}", other),
}

drop(fused);
assert_eq!(count, 1);
}

#[test]
fn stops_polling_after_some_error() {
let mut count = 0_u8;
let body = Mock::new(
&mut count,
[
Poll::Ready(Some(Ok(Frame::data(Bytes::from_static(b"hello"))))),
Poll::Ready(Some(Err("oh no"))),
Poll::Ready(Some(Ok(Frame::data(Bytes::from_static(b"world"))))),
],
);
debug_assert!(!body.is_end_stream());
let mut fused = Fuse::new(body);
assert!(fused.inner.is_some());

let waker = futures_util::task::noop_waker();
let mut cx = Context::from_waker(&waker);

match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(Some(Ok(bytes))) => assert_eq!(bytes.into_data().expect("data"), "hello"),
other => panic!("unexpected poll outcome: {:?}", other),
}

assert!(fused.inner.is_some());
match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(Some(Err("oh no"))) => {}
other => panic!("unexpected poll outcome: {:?}", other),
}

assert!(fused.inner.is_none());
match Pin::new(&mut fused).poll_frame(&mut cx) {
Poll::Ready(None) => {}
other => panic!("unexpected poll outcome: {:?}", other),
}

drop(fused);
assert_eq!(count, 2);
}

// === impl Mock ===

impl<'count> Mock<'count> {
fn new(poll_count: &'count mut u8, polls: impl IntoIterator<Item = PollFrame>) -> Self {
Self {
poll_count,
polls: polls.into_iter().collect(),
}
}
}

impl<'a> Body for Mock<'a> {
type Data = Bytes;
type Error = &'static str;

fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let Self { poll_count, polls } = self.get_mut();
**poll_count = poll_count.saturating_add(1);
polls.pop_front().unwrap_or(Poll::Ready(None))
}

fn is_end_stream(&self) -> bool {
self.polls.is_empty()
}
}
}
2 changes: 2 additions & 0 deletions http-body-util/src/combinators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
mod box_body;
mod collect;
mod frame;
mod fuse;
mod map_err;
mod map_frame;
mod with_trailers;
Expand All @@ -11,6 +12,7 @@ pub use self::{
box_body::{BoxBody, UnsyncBoxBody},
collect::Collect,
frame::Frame,
fuse::Fuse,
map_err::MapErr,
map_frame::MapFrame,
with_trailers::WithTrailers,
Expand Down
13 changes: 13 additions & 0 deletions http-body-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,19 @@ pub trait BodyExt: http_body::Body {
{
BodyDataStream::new(self)
}

/// Creates a "fused" body.
///
/// This [`Body`][http_body::Body] yields [`Poll::Ready(None)`] forever after the underlying
/// body yields [`Poll::Ready(None)`], or an error [`Poll::Ready(Some(Err(_)))`], once.
///
/// See [`Fuse<B>`][combinators::Fuse] for more information.
fn fuse(self) -> combinators::Fuse<Self>
where
Self: Sized,
{
combinators::Fuse::new(self)
}
}

impl<T: ?Sized> BodyExt for T where T: http_body::Body {}

0 comments on commit 4c77dab

Please sign in to comment.