diff --git a/Cargo.lock b/Cargo.lock index 197e76531ee..663f5e9fc13 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -395,7 +395,7 @@ dependencies = [ "log", "memchr", "once_cell", - "pin-project-lite 0.2.12", + "pin-project-lite", "pin-utils", "slab", "wasm-bindgen-futures", @@ -443,7 +443,7 @@ dependencies = [ "futures-sink", "futures-util", "memchr", - "pin-project-lite 0.2.12", + "pin-project-lite", ] [[package]] @@ -499,7 +499,7 @@ dependencies = [ "memchr", "mime", "percent-encoding", - "pin-project-lite 0.2.12", + "pin-project-lite", "rustversion", "serde", "serde_json", @@ -913,7 +913,7 @@ dependencies = [ "bytes", "futures-core", "memchr", - "pin-project-lite 0.2.12", + "pin-project-lite", "tokio", "tokio-util", ] @@ -1638,7 +1638,7 @@ dependencies = [ "futures-io", "memchr", "parking", - "pin-project-lite 0.2.12", + "pin-project-lite", "waker-fn", ] @@ -1709,7 +1709,7 @@ dependencies = [ "futures-sink", "futures-task", "memchr", - "pin-project-lite 0.2.12", + "pin-project-lite", "pin-utils", "slab", ] @@ -1969,7 +1969,7 @@ checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1" dependencies = [ "bytes", "http", - "pin-project-lite 0.2.12", + "pin-project-lite", ] [[package]] @@ -2012,7 +2012,7 @@ dependencies = [ "httparse", "httpdate", "itoa", - "pin-project-lite 0.2.12", + "pin-project-lite", "socket2 0.4.9", "tokio", "tower-service", @@ -3254,7 +3254,7 @@ dependencies = [ "libp2p-tcp", "log", "parking_lot", - "quicksink", + "pin-project-lite", "rcgen", "rw-stream-sink", "soketto", @@ -3984,15 +3984,9 @@ dependencies = [ [[package]] name = "pin-project-lite" -version = "0.1.12" +version = "0.2.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "257b64915a082f7811703966789728173279bdebb956b143dbcd23f6f970a777" - -[[package]] -name = "pin-project-lite" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" +checksum = "8afb450f006bf6385ca15ef45d71d2288452bc3683ce2e2cacc0d18e4be60b58" [[package]] name = "pin-utils" @@ -4072,7 +4066,7 @@ dependencies = [ "concurrent-queue", "libc", "log", - "pin-project-lite 0.2.12", + "pin-project-lite", "windows-sys", ] @@ -4238,17 +4232,6 @@ dependencies = [ "quickcheck", ] -[[package]] -name = "quicksink" -version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "77de3c815e5a160b1539c6592796801df2043ae35e123b46d73380cfa57af858" -dependencies = [ - "futures-core", - "futures-sink", - "pin-project-lite 0.1.12", -] - [[package]] name = "quinn" version = "0.10.2" @@ -4259,7 +4242,7 @@ dependencies = [ "async-std", "bytes", "futures-io", - "pin-project-lite 0.2.12", + "pin-project-lite", "quinn-proto", "quinn-udp", "rustc-hash", @@ -4426,7 +4409,7 @@ dependencies = [ "futures-util", "itoa", "percent-encoding", - "pin-project-lite 0.2.12", + "pin-project-lite", "ryu", "tokio", "tokio-util", @@ -4554,7 +4537,7 @@ dependencies = [ "native-tls", "once_cell", "percent-encoding", - "pin-project-lite 0.2.12", + "pin-project-lite", "serde", "serde_json", "serde_urlencoded", @@ -5528,7 +5511,7 @@ dependencies = [ "mio", "num_cpus", "parking_lot", - "pin-project-lite 0.2.12", + "pin-project-lite", "signal-hook-registry", "socket2 0.5.5", "tokio-macros", @@ -5587,7 +5570,7 @@ dependencies = [ "futures-core", "futures-io", "futures-sink", - "pin-project-lite 0.2.12", + "pin-project-lite", "tokio", "tracing", ] @@ -5601,7 +5584,7 @@ dependencies = [ "futures-core", "futures-util", "pin-project", - "pin-project-lite 0.2.12", + "pin-project-lite", "tokio", "tower-layer", "tower-service", @@ -5625,7 +5608,7 @@ dependencies = [ "mime", "mime_guess", "percent-encoding", - "pin-project-lite 0.2.12", + "pin-project-lite", "tokio", "tokio-util", "tower-layer", @@ -5652,7 +5635,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" dependencies = [ "log", - "pin-project-lite 0.2.12", + "pin-project-lite", "tracing-attributes", "tracing-core", ] diff --git a/transports/websocket/Cargo.toml b/transports/websocket/Cargo.toml index a9ffdbe7c7e..cf3e9607953 100644 --- a/transports/websocket/Cargo.toml +++ b/transports/websocket/Cargo.toml @@ -18,7 +18,7 @@ libp2p-core = { workspace = true } libp2p-identity = { workspace = true } log = "0.4.20" parking_lot = "0.12.0" -quicksink = "0.1" +pin-project-lite = "0.2.13" rw-stream-sink = { workspace = true } soketto = "0.7.0" url = "2.4" diff --git a/transports/websocket/src/framed.rs b/transports/websocket/src/framed.rs index fc3f6514f12..07013973fdc 100644 --- a/transports/websocket/src/framed.rs +++ b/transports/websocket/src/framed.rs @@ -18,7 +18,7 @@ // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER // DEALINGS IN THE SOFTWARE. -use crate::{error::Error, tls}; +use crate::{error::Error, quicksink, tls}; use either::Either; use futures::{future::BoxFuture, prelude::*, ready, stream::BoxStream}; use futures_rustls::{client, rustls, server}; diff --git a/transports/websocket/src/lib.rs b/transports/websocket/src/lib.rs index d7dd7628888..e0b3d09ca25 100644 --- a/transports/websocket/src/lib.rs +++ b/transports/websocket/src/lib.rs @@ -24,6 +24,7 @@ pub mod error; pub mod framed; +mod quicksink; pub mod tls; use error::Error; diff --git a/transports/websocket/src/quicksink.rs b/transports/websocket/src/quicksink.rs new file mode 100644 index 00000000000..d9edb4dfe0d --- /dev/null +++ b/transports/websocket/src/quicksink.rs @@ -0,0 +1,350 @@ +// Copyright (c) 2019-2020 Parity Technologies (UK) Ltd. +// +// Licensed under the Apache License, Version 2.0 +// or the MIT +// license , at your +// option. All files in the project carrying such notice may not be copied, +// modified, or distributed except according to those terms. +// +// Forked into rust-libp2p and further distributed under the MIT license. + +// Create a [`Sink`] implementation from an initial value and a closure +// returning a [`Future`]. +// +// This is very similar to how `futures::stream::unfold` creates a `Stream` +// implementation from a seed value and a future-returning closure. +// +// # Examples +// +// ```no_run +// use async_std::io; +// use futures::prelude::*; +// use crate::quicksink::Action; +// +// crate::quicksink::make_sink(io::stdout(), |mut stdout, action| async move { +// match action { +// Action::Send(x) => stdout.write_all(x).await?, +// Action::Flush => stdout.flush().await?, +// Action::Close => stdout.close().await? +// } +// Ok::<_, io::Error>(stdout) +// }); +// ``` +// +// # Panics +// +// - If any of the [`Sink`] methods produce an error, the sink transitions +// to a failure state and none of its methods must be called afterwards or +// else a panic will occur. +// - If [`Sink::poll_close`] has been called, no other sink method must be +// called afterwards or else a panic will be caused. + +use futures::{ready, sink::Sink}; +use pin_project_lite::pin_project; +use std::{ + future::Future, + pin::Pin, + task::{Context, Poll}, +}; + +/// Returns a `Sink` impl based on the initial value and the given closure. +/// +/// The closure will be applied to the initial value and an [`Action`] that +/// informs it about the action it should perform. The returned [`Future`] +/// will resolve to another value and the process starts over using this +/// output. +pub(crate) fn make_sink(init: S, f: F) -> SinkImpl +where + F: FnMut(S, Action) -> T, + T: Future>, +{ + SinkImpl { + lambda: f, + future: None, + param: Some(init), + state: State::Empty, + _mark: std::marker::PhantomData, + } +} + +/// The command given to the closure so that it can perform appropriate action. +/// +/// Presumably the closure encapsulates a resource to perform I/O. The commands +/// correspond to methods of the [`Sink`] trait and provide the closure with +/// sufficient information to know what kind of action to perform with it. +#[derive(Clone, Debug, PartialEq, Eq)] +pub(crate) enum Action { + /// Send the given value. + /// Corresponds to [`Sink::start_send`]. + Send(A), + /// Flush the resource. + /// Corresponds to [`Sink::poll_flush`]. + Flush, + /// Close the resource. + /// Corresponds to [`Sink::poll_close`]. + Close, +} + +/// The various states the `Sink` may be in. +#[derive(Debug, PartialEq, Eq)] +enum State { + /// The `Sink` is idle. + Empty, + /// The `Sink` is sending a value. + Sending, + /// The `Sink` is flushing its resource. + Flushing, + /// The `Sink` is closing its resource. + Closing, + /// The `Sink` is closed (terminal state). + Closed, + /// The `Sink` experienced an error (terminal state). + Failed, +} + +pin_project! { + /// `SinkImpl` implements the `Sink` trait. + #[derive(Debug)] + pub(crate) struct SinkImpl { + lambda: F, + #[pin] future: Option, + param: Option, + state: State, + _mark: std::marker::PhantomData<(A, E)> + } +} + +impl Sink for SinkImpl +where + F: FnMut(S, Action) -> T, + T: Future>, +{ + type Error = E; + + fn poll_ready(self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + let mut this = self.project(); + match this.state { + State::Sending | State::Flushing => { + match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) { + Ok(p) => { + this.future.set(None); + *this.param = Some(p); + *this.state = State::Empty; + Poll::Ready(Ok(())) + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + Poll::Ready(Err(e)) + } + } + } + State::Closing => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) { + Ok(_) => { + this.future.set(None); + *this.state = State::Closed; + panic!("SinkImpl::poll_ready called on a closing sink.") + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + Poll::Ready(Err(e)) + } + }, + State::Empty => { + assert!(this.param.is_some()); + Poll::Ready(Ok(())) + } + State::Closed => panic!("SinkImpl::poll_ready called on a closed sink."), + State::Failed => panic!("SinkImpl::poll_ready called after error."), + } + } + + fn start_send(self: Pin<&mut Self>, item: A) -> Result<(), Self::Error> { + assert_eq!(State::Empty, self.state); + let mut this = self.project(); + let param = this.param.take().unwrap(); + let future = (this.lambda)(param, Action::Send(item)); + this.future.set(Some(future)); + *this.state = State::Sending; + Ok(()) + } + + fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + loop { + let mut this = self.as_mut().project(); + match this.state { + State::Empty => { + if let Some(p) = this.param.take() { + let future = (this.lambda)(p, Action::Flush); + this.future.set(Some(future)); + *this.state = State::Flushing + } else { + return Poll::Ready(Ok(())); + } + } + State::Sending => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) + { + Ok(p) => { + this.future.set(None); + *this.param = Some(p); + *this.state = State::Empty + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + return Poll::Ready(Err(e)); + } + }, + State::Flushing => { + match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) { + Ok(p) => { + this.future.set(None); + *this.param = Some(p); + *this.state = State::Empty; + return Poll::Ready(Ok(())); + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + return Poll::Ready(Err(e)); + } + } + } + State::Closing => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) + { + Ok(_) => { + this.future.set(None); + *this.state = State::Closed; + return Poll::Ready(Ok(())); + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + return Poll::Ready(Err(e)); + } + }, + State::Closed => return Poll::Ready(Ok(())), + State::Failed => panic!("SinkImpl::poll_flush called after error."), + } + } + } + + fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context) -> Poll> { + loop { + let mut this = self.as_mut().project(); + match this.state { + State::Empty => { + if let Some(p) = this.param.take() { + let future = (this.lambda)(p, Action::Close); + this.future.set(Some(future)); + *this.state = State::Closing; + } else { + return Poll::Ready(Ok(())); + } + } + State::Sending => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) + { + Ok(p) => { + this.future.set(None); + *this.param = Some(p); + *this.state = State::Empty + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + return Poll::Ready(Err(e)); + } + }, + State::Flushing => { + match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) { + Ok(p) => { + this.future.set(None); + *this.param = Some(p); + *this.state = State::Empty + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + return Poll::Ready(Err(e)); + } + } + } + State::Closing => match ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)) + { + Ok(_) => { + this.future.set(None); + *this.state = State::Closed; + return Poll::Ready(Ok(())); + } + Err(e) => { + this.future.set(None); + *this.state = State::Failed; + return Poll::Ready(Err(e)); + } + }, + State::Closed => return Poll::Ready(Ok(())), + State::Failed => panic!("SinkImpl::poll_closed called after error."), + } + } + } +} + +#[cfg(test)] +mod tests { + use crate::quicksink::{make_sink, Action}; + use async_std::{io, task}; + use futures::{channel::mpsc, prelude::*, stream}; + + #[test] + fn smoke_test() { + task::block_on(async { + let sink = make_sink(io::stdout(), |mut stdout, action| async move { + match action { + Action::Send(x) => stdout.write_all(x).await?, + Action::Flush => stdout.flush().await?, + Action::Close => stdout.close().await?, + } + Ok::<_, io::Error>(stdout) + }); + + let values = vec![Ok(&b"hello\n"[..]), Ok(&b"world\n"[..])]; + assert!(stream::iter(values).forward(sink).await.is_ok()) + }) + } + + #[test] + fn replay() { + task::block_on(async { + let (tx, rx) = mpsc::channel(5); + + let sink = make_sink(tx, |mut tx, action| async move { + tx.send(action.clone()).await?; + if action == Action::Close { + tx.close().await? + } + Ok::<_, mpsc::SendError>(tx) + }); + + futures::pin_mut!(sink); + + let expected = [ + Action::Send("hello\n"), + Action::Flush, + Action::Send("world\n"), + Action::Flush, + Action::Close, + ]; + + for &item in &["hello\n", "world\n"] { + sink.send(item).await.unwrap() + } + + sink.close().await.unwrap(); + + let actual = rx.collect::>().await; + + assert_eq!(&expected[..], &actual[..]) + }); + } +}