Skip to content

Commit

Permalink
Change test_tcp to use Poll API. (tokio-rs#712)
Browse files Browse the repository at this point in the history
  • Loading branch information
ahmedcharles authored and alexcrichton committed Aug 23, 2017
1 parent 369d67b commit c6447d9
Showing 1 changed file with 132 additions and 138 deletions.
270 changes: 132 additions & 138 deletions test/test_tcp.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
extern crate mio;
extern crate env_logger;

use std::cmp;
use std::io::prelude::*;
use std::io;
Expand All @@ -14,26 +11,11 @@ use net2::{self, TcpStreamExt};
use {TryRead, TryWrite};
use mio::{Token, Ready, PollOpt, Poll, Events};
use iovec::IoVec;
use mio::deprecated::{EventLoop, Handler};
use mio::net::{TcpListener, TcpStream};

#[test]
fn accept() {
struct H { hit: bool, listener: TcpListener }

impl Handler for H {
type Timeout = ();
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token,
events: Ready) {
self.hit = true;
assert_eq!(token, Token(1));
assert!(events.is_readable());
assert!(self.listener.accept().is_ok());
event_loop.shutdown();
}
}
struct H { hit: bool, listener: TcpListener, shutdown: bool }

let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let addr = l.local_addr().unwrap();
Expand All @@ -42,37 +24,32 @@ fn accept() {
net::TcpStream::connect(&addr).unwrap();
});

let mut e = EventLoop::new().unwrap();
let poll = Poll::new().unwrap();

poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();

let mut events = Events::with_capacity(128);

e.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
let mut h = H { hit: false, listener: l, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();

let mut h = H { hit: false, listener: l };
e.run(&mut h).unwrap();
for event in &events {
h.hit = true;
assert_eq!(event.token(), Token(1));
assert!(event.readiness().is_readable());
assert!(h.listener.accept().is_ok());
h.shutdown = true;
}
}
assert!(h.hit);
assert!(h.listener.accept().unwrap_err().kind() == io::ErrorKind::WouldBlock);
t.join().unwrap();
}

#[test]
fn connect() {
struct H { hit: u32 }

impl Handler for H {
type Timeout = ();
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token,
events: Ready) {
assert_eq!(token, Token(1));
match self.hit {
0 => assert!(events.is_writable()),
1 => assert!(events.is_readable()),
_ => panic!(),
}
self.hit += 1;
event_loop.shutdown();
}
}
struct H { hit: u32, shutdown: bool }

let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
Expand All @@ -86,47 +63,54 @@ fn connect() {
tx2.send(()).unwrap();
});

let mut e = EventLoop::new().unwrap();
let poll = Poll::new().unwrap();
let s = TcpStream::connect(&addr).unwrap();

e.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();
poll.register(&s, Token(1), Ready::readable() | Ready::writable(), PollOpt::edge()).unwrap();

let mut events = Events::with_capacity(128);

let mut h = H { hit: 0 };
e.run(&mut h).unwrap();
let mut h = H { hit: 0, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();

for event in &events {
assert_eq!(event.token(), Token(1));
match h.hit {
0 => assert!(event.readiness().is_writable()),
1 => assert!(event.readiness().is_readable()),
_ => panic!(),
}
h.hit += 1;
h.shutdown = true;
}
}
assert_eq!(h.hit, 1);
tx.send(()).unwrap();
rx2.recv().unwrap();
e.run(&mut h).unwrap();
h.shutdown = false;
while !h.shutdown {
poll.poll(&mut events, None).unwrap();

for event in &events {
assert_eq!(event.token(), Token(1));
match h.hit {
0 => assert!(event.readiness().is_writable()),
1 => assert!(event.readiness().is_readable()),
_ => panic!(),
}
h.hit += 1;
h.shutdown = true;
}
}
assert_eq!(h.hit, 2);
t.join().unwrap();
}

#[test]
fn read() {
const N: usize = 16 * 1024 * 1024;
struct H { amt: usize, socket: TcpStream }

impl Handler for H {
type Timeout = ();
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token,
_events: Ready) {
assert_eq!(token, Token(1));
let mut b = [0; 1024];
loop {
if let Some(amt) = self.socket.try_read(&mut b).unwrap() {
self.amt += amt;
} else {
break
}
if self.amt >= N {
event_loop.shutdown();
break
}
}
}
}
struct H { amt: usize, socket: TcpStream, shutdown: bool }

let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
Expand All @@ -140,13 +124,33 @@ fn read() {
}
});

let mut e = EventLoop::new().unwrap();
let poll = Poll::new().unwrap();
let s = TcpStream::connect(&addr).unwrap();

e.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
poll.register(&s, Token(1), Ready::readable(), PollOpt::edge()).unwrap();

let mut events = Events::with_capacity(128);

let mut h = H { amt: 0, socket: s };
e.run(&mut h).unwrap();
let mut h = H { amt: 0, socket: s, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();

for event in &events {
assert_eq!(event.token(), Token(1));
let mut b = [0; 1024];
loop {
if let Some(amt) = h.socket.try_read(&mut b).unwrap() {
h.amt += amt;
} else {
break
}
if h.amt >= N {
h.shutdown = true;
break
}
}
}
}
t.join().unwrap();
}

Expand Down Expand Up @@ -225,29 +229,7 @@ fn read_bufs() {
#[test]
fn write() {
const N: usize = 16 * 1024 * 1024;
struct H { amt: usize, socket: TcpStream }

impl Handler for H {
type Timeout = ();
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token,
_events: Ready) {
assert_eq!(token, Token(1));
let b = [0; 1024];
loop {
if let Some(amt) = self.socket.try_write(&b).unwrap() {
self.amt += amt;
} else {
break
}
if self.amt >= N {
event_loop.shutdown();
break
}
}
}
}
struct H { amt: usize, socket: TcpStream, shutdown: bool }

let l = net::TcpListener::bind("127.0.0.1:0").unwrap();
let addr = l.local_addr().unwrap();
Expand All @@ -261,13 +243,33 @@ fn write() {
}
});

let mut e = EventLoop::new().unwrap();
let poll = Poll::new().unwrap();
let s = TcpStream::connect(&addr).unwrap();

e.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();
poll.register(&s, Token(1), Ready::writable(), PollOpt::edge()).unwrap();

let mut events = Events::with_capacity(128);

let mut h = H { amt: 0, socket: s };
e.run(&mut h).unwrap();
let mut h = H { amt: 0, socket: s, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();

for event in &events {
assert_eq!(event.token(), Token(1));
let b = [0; 1024];
loop {
if let Some(amt) = h.socket.try_write(&b).unwrap() {
h.amt += amt;
} else {
break
}
if h.amt >= N {
h.shutdown = true;
break
}
}
}
}
t.join().unwrap();
}

Expand Down Expand Up @@ -327,59 +329,51 @@ fn write_bufs() {

#[test]
fn connect_then_close() {
struct H { listener: TcpListener }

impl Handler for H {
type Timeout = ();
type Message = ();

fn ready(&mut self, event_loop: &mut EventLoop<Self>, token: Token,
_events: Ready) {
if token == Token(1) {
let s = self.listener.accept().unwrap().0;
event_loop.register(&s, Token(3), Ready::readable() | Ready::writable(),
PollOpt::edge()).unwrap();
drop(s);
} else if token == Token(2) {
event_loop.shutdown();
}
}
}
struct H { listener: TcpListener, shutdown: bool }

let mut e = EventLoop::new().unwrap();
let poll = Poll::new().unwrap();
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();
let s = TcpStream::connect(&l.local_addr().unwrap()).unwrap();

e.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
e.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();
poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
poll.register(&s, Token(2), Ready::readable(), PollOpt::edge()).unwrap();

let mut h = H { listener: l };
e.run(&mut h).unwrap();
}

#[test]
fn listen_then_close() {
struct H;
let mut events = Events::with_capacity(128);

impl Handler for H {
type Timeout = ();
type Message = ();
let mut h = H { listener: l, shutdown: false };
while !h.shutdown {
poll.poll(&mut events, None).unwrap();

fn ready(&mut self, _: &mut EventLoop<Self>, token: Token, _: Ready) {
if token == Token(1) {
panic!("recieved ready() on a closed TcpListener")
for event in &events {
if event.token() == Token(1) {
let s = h.listener.accept().unwrap().0;
poll.register(&s, Token(3), Ready::readable() | Ready::writable(),
PollOpt::edge()).unwrap();
drop(s);
} else if event.token() == Token(2) {
h.shutdown = true;
}
}
}
}

let mut e = EventLoop::new().unwrap();
#[test]
fn listen_then_close() {
let poll = Poll::new().unwrap();
let l = TcpListener::bind(&"127.0.0.1:0".parse().unwrap()).unwrap();

e.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
poll.register(&l, Token(1), Ready::readable(), PollOpt::edge()).unwrap();
drop(l);

let mut h = H;
e.run_once(&mut h, Some(Duration::from_millis(100))).unwrap();
let mut events = Events::with_capacity(128);

poll.poll(&mut events, Some(Duration::from_millis(100))).unwrap();

for event in &events {
if event.token() == Token(1) {
panic!("recieved ready() on a closed TcpListener")
}
}
}

fn assert_send<T: Send>() {
Expand Down

0 comments on commit c6447d9

Please sign in to comment.