diff --git a/tokio-reactor-trait/Cargo.toml b/tokio-reactor-trait/Cargo.toml index f8a2c83..33cc598 100644 --- a/tokio-reactor-trait/Cargo.toml +++ b/tokio-reactor-trait/Cargo.toml @@ -24,6 +24,11 @@ version = "^1.0" default-features = false features = ["net", "rt", "time"] +[dependencies.tokio-util] +version = "^0.7" +default-features = false +features = ["compat"] + [dependencies.reactor-trait] version = "^1.1" path = ".." diff --git a/tokio-reactor-trait/src/lib.rs b/tokio-reactor-trait/src/lib.rs index 1257dea..a485eda 100644 --- a/tokio-reactor-trait/src/lib.rs +++ b/tokio-reactor-trait/src/lib.rs @@ -2,15 +2,13 @@ use async_trait::async_trait; use futures_core::Stream; use reactor_trait::{AsyncIOHandle, TcpReactor, TimeReactor}; use std::{ - io::{self, IoSlice}, + io, net::SocketAddr, - pin::Pin, - task::{Context, Poll}, time::{Duration, Instant}, }; -use tokio::io::{AsyncRead, AsyncWrite, ReadBuf}; use tokio::net::TcpStream; use tokio_stream::{wrappers::IntervalStream, StreamExt}; +use tokio_util::compat::TokioAsyncReadCompatExt; /// Dummy object implementing reactor-trait common interfaces on top of tokio #[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)] @@ -35,50 +33,7 @@ impl TcpReactor for Tokio { async fn connect + Send>( addr: A, ) -> io::Result> { - Ok(Box::new(TcpStreamWrapper( - TcpStream::connect(addr.into()).await?, - ))) - } -} - -struct TcpStreamWrapper(TcpStream); - -impl futures_io::AsyncRead for TcpStreamWrapper { - fn poll_read( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &mut [u8], - ) -> Poll> { - let mut b = ReadBuf::new(buf); - Pin::new(&mut self.0) - .poll_read(cx, &mut b) - .map(|res| res.map(|()| b.filled().len())) - } -} - -impl futures_io::AsyncWrite for TcpStreamWrapper { - fn poll_write( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - buf: &[u8], - ) -> Poll> { - Pin::new(&mut self.0).poll_write(cx, buf) - } - - fn poll_write_vectored( - mut self: Pin<&mut Self>, - cx: &mut Context<'_>, - bufs: &[IoSlice<'_>], - ) -> Poll> { - Pin::new(&mut self.0).poll_write_vectored(cx, bufs) - } - - fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_flush(cx) - } - - fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - Pin::new(&mut self.0).poll_shutdown(cx) + Ok(Box::new(TcpStream::connect(addr.into()).await?.compat())) } }