Skip to content

Commit

Permalink
tokio: use tokio-util for futures compat
Browse files Browse the repository at this point in the history
  • Loading branch information
Keruspe committed May 1, 2024
1 parent 1cb2d11 commit f0510a7
Show file tree
Hide file tree
Showing 2 changed files with 8 additions and 48 deletions.
5 changes: 5 additions & 0 deletions tokio-reactor-trait/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,11 @@ version = "^1.0"
default-features = false
features = ["net", "rt", "time"]

[dependencies.tokio-util]
version = "^0.7"
default-features = false
features = ["compat"]

[dependencies.reactor-trait]
version = "^1.1"
path = ".."
Expand Down
51 changes: 3 additions & 48 deletions tokio-reactor-trait/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,13 @@ use async_trait::async_trait;
use futures_core::Stream;
use reactor_trait::{AsyncIOHandle, TcpReactor, TimeReactor};
use std::{
io::{self, IoSlice},
io,
net::SocketAddr,
pin::Pin,
task::{Context, Poll},
time::{Duration, Instant},
};
use tokio::io::{AsyncRead, AsyncWrite, ReadBuf};
use tokio::net::TcpStream;
use tokio_stream::{wrappers::IntervalStream, StreamExt};
use tokio_util::compat::TokioAsyncReadCompatExt;

/// Dummy object implementing reactor-trait common interfaces on top of tokio
#[derive(Debug, Default, Copy, Clone, PartialEq, Eq, PartialOrd, Ord)]
Expand All @@ -35,50 +33,7 @@ impl TcpReactor for Tokio {
async fn connect<A: Into<SocketAddr> + Send>(
addr: A,
) -> io::Result<Box<dyn AsyncIOHandle + Send>> {
Ok(Box::new(TcpStreamWrapper(
TcpStream::connect(addr.into()).await?,
)))
}
}

struct TcpStreamWrapper(TcpStream);

impl futures_io::AsyncRead for TcpStreamWrapper {
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<futures_io::Result<usize>> {
let mut b = ReadBuf::new(buf);
Pin::new(&mut self.0)
.poll_read(cx, &mut b)
.map(|res| res.map(|()| b.filled().len()))
}
}

impl futures_io::AsyncWrite for TcpStreamWrapper {
fn poll_write(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &[u8],
) -> Poll<futures_io::Result<usize>> {
Pin::new(&mut self.0).poll_write(cx, buf)
}

fn poll_write_vectored(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
bufs: &[IoSlice<'_>],
) -> Poll<futures_io::Result<usize>> {
Pin::new(&mut self.0).poll_write_vectored(cx, bufs)
}

fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
Pin::new(&mut self.0).poll_flush(cx)
}

fn poll_close(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<futures_io::Result<()>> {
Pin::new(&mut self.0).poll_shutdown(cx)
Ok(Box::new(TcpStream::connect(addr.into()).await?.compat()))
}
}

Expand Down

0 comments on commit f0510a7

Please sign in to comment.