Skip to content

Commit

Permalink
Rename Reactor to EventLoop
Browse files Browse the repository at this point in the history
This helps to reflect that the goals of EventLoop are limited to
implementing what an epoll loop would involve. This also frees up
Reactor to be used as a higher level, more involved, abstraction.
  • Loading branch information
carllerche committed Oct 2, 2014
1 parent f38a7be commit 3909056
Show file tree
Hide file tree
Showing 10 changed files with 124 additions and 124 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ The following are specifically omitted from MIO and are left to the user
or higher level libraries.

* File operations
* Thread pools / multi-threaded reactor
* Thread pools / multi-threaded event loop

## Platforms

Expand Down
14 changes: 7 additions & 7 deletions src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,12 @@ pub struct MioError {

#[deriving(Show, PartialEq, Clone)]
pub enum MioErrorKind {
Eof, // End of file or socket closed
WouldBlock, // The operation would have blocked
BufUnderflow, // Buf does not contain enough data to perform read op
BufOverflow, // Buf does not contain enough capacity to perform write op
ReactorTerminated, // The reactor is not running anymore
OtherError, // System error not covered by other kinds
Eof, // End of file or socket closed
WouldBlock, // The operation would have blocked
BufUnderflow, // Buf does not contain enough data to perform read op
BufOverflow, // Buf does not contain enough capacity to perform write op
EventLoopTerminated, // The event loop is not running anymore
OtherError, // System error not covered by other kinds
}

impl MioError {
Expand Down Expand Up @@ -91,7 +91,7 @@ impl MioError {
Some(err) => io::IoError::from_errno(err.kind as uint, false),
None => io::standard_error(io::OtherIoError)
},
ReactorTerminated => io::standard_error(OtherIoError)
EventLoopTerminated => io::standard_error(OtherIoError)
}
}
}
96 changes: 48 additions & 48 deletions src/reactor.rs → src/event_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,13 @@ use socket::{Socket, SockAddr};
use timer::{Timer, Timeout, TimerResult};
use token::Token;

/// A lightweight IO reactor.
/// A lightweight event loop.
///
/// TODO:
/// - Enforce private tokens
#[deriving(Clone, Show)]
pub struct ReactorConfig {
pub struct EventLoopConfig {
pub io_poll_timeout_ms: uint,

// == Notifications ==
Expand All @@ -29,9 +29,9 @@ pub struct ReactorConfig {
pub timer_capacity: uint,
}

impl Default for ReactorConfig {
fn default() -> ReactorConfig {
ReactorConfig {
impl Default for EventLoopConfig {
fn default() -> EventLoopConfig {
EventLoopConfig {
io_poll_timeout_ms: 1_000,
notify_capacity: 1_024,
messages_per_tick: 64,
Expand All @@ -42,24 +42,24 @@ impl Default for ReactorConfig {
}
}

pub struct Reactor<T, M: Send> {
pub struct EventLoop<T, M: Send> {
run: bool,
poll: Poll,
timer: Timer<T>,
notify: Notify<M>,
config: ReactorConfig,
config: EventLoopConfig,
}

// Token used to represent notifications
static NOTIFY: Token = Token(uint::MAX);

impl<T, M: Send> Reactor<T, M> {
/// Initializes a new reactor. The reactor will not be running yet.
pub fn new() -> MioResult<Reactor<T, M>> {
Reactor::configured(Default::default())
impl<T, M: Send> EventLoop<T, M> {
/// Initializes a new event loop. The event loop will not be running yet.
pub fn new() -> MioResult<EventLoop<T, M>> {
EventLoop::configured(Default::default())
}

pub fn configured(config: ReactorConfig) -> MioResult<Reactor<T, M>> {
pub fn configured(config: EventLoopConfig) -> MioResult<EventLoop<T, M>> {
// Create the IO poller
let mut poll = try!(Poll::new());

Expand All @@ -78,7 +78,7 @@ impl<T, M: Send> Reactor<T, M> {
// Set the timer's starting time reference point
timer.setup();

Ok(Reactor {
Ok(EventLoop {
run: true,
poll: poll,
timer: timer,
Expand All @@ -87,10 +87,10 @@ impl<T, M: Send> Reactor<T, M> {
})
}

/// Returns a sender that allows sending messages to the reactor in a
/// thread-safe way, waking up the reactor if needed.
pub fn channel(&self) -> ReactorSender<M> {
ReactorSender::new(self.notify.clone())
/// Returns a sender that allows sending messages to the event loop in a
/// thread-safe way, waking up the event loop if needed.
pub fn channel(&self) -> EventLoopSender<M> {
EventLoopSender::new(self.notify.clone())
}

/// After the requested time interval, the handler's `timeout` function
Expand All @@ -105,28 +105,28 @@ impl<T, M: Send> Reactor<T, M> {
self.timer.clear(timeout)
}

/// Tells the reactor to exit after it is done handling all events in the
/// Tells the event loop to exit after it is done handling all events in the
/// current iteration.
pub fn shutdown(&mut self) {
self.run = false;
}

/// Tells the reactor to exit immidiately. All pending events will be dropped.
/// Tells the event loop to exit immidiately. All pending events will be dropped.
pub fn shutdown_now(&mut self) {
unimplemented!()
}

/// Registers an IO handle with the reactor.
/// Registers an IO handle with the event loop.
pub fn register<H: IoHandle>(&mut self, io: &H, token: Token) -> MioResult<()> {
self.poll.register(io, token)
}

/// Connects the socket to the specified address. When the operation
/// completes, the handler will be notified with the supplied token.
///
/// The goal of this method is to ensure that the reactor will always
/// The goal of this method is to ensure that the event loop will always
/// notify about the connection, even if the connection happens
/// immediately. Otherwise, every consumer of the reactor would have
/// immediately. Otherwise, every consumer of the event loop would have
/// to worry about possibly-immediate connection.
pub fn connect<S: Socket>(&mut self, io: &S, addr: &SockAddr, token: Token) -> MioResult<()> {
debug!("socket connect; addr={}", addr);
Expand All @@ -135,11 +135,11 @@ impl<T, M: Send> Reactor<T, M> {
if try!(os::connect(io.desc(), addr)) {
// On some OSs, connecting to localhost succeeds immediately. In
// this case, queue the writable callback for execution during the
// next reactor tick.
// next event loop tick.
debug!("socket connected immediately; addr={}", addr);
}

// Register interest with socket on the reactor
// Register interest with socket on the event loop
try!(self.register(io, token));

Ok(())
Expand All @@ -159,41 +159,41 @@ impl<T, M: Send> Reactor<T, M> {
Ok(())
}

/// Keep spinning the reactor indefinitely, and notify the handler whenever
/// Keep spinning the event loop indefinitely, and notify the handler whenever
/// any of the registered handles are ready.
pub fn run<H: Handler<T, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
pub fn run<H: Handler<T, M>>(&mut self, mut handler: H) -> EventLoopResult<H> {
self.run = true;

while self.run {
// Execute ticks as long as the reactor is running
// Execute ticks as long as the event loop is running
match self.tick(&mut handler) {
Err(e) => return Err(ReactorError::new(handler, e)),
Err(e) => return Err(EventLoopError::new(handler, e)),
_ => {}
}
}

Ok(handler)
}

/// Spin the reactor once, with a timeout of one second, and notify the
/// Spin the event loop once, with a timeout of one second, and notify the
/// handler if any of the registered handles become ready during that
/// time.
pub fn run_once<H: Handler<T, M>>(&mut self, mut handler: H) -> ReactorResult<H> {
pub fn run_once<H: Handler<T, M>>(&mut self, mut handler: H) -> EventLoopResult<H> {
// Execute a single tick
match self.tick(&mut handler) {
Err(e) => return Err(ReactorError::new(handler, e)),
Err(e) => return Err(EventLoopError::new(handler, e)),
_ => {}
}

Ok(handler)
}

// Executes a single run of the reactor loop
// Executes a single run of the event loop loop
fn tick<H: Handler<T, M>>(&mut self, handler: &mut H) -> MioResult<()> {
let mut messages;
let mut pending;

debug!("reactor tick");
debug!("event loop tick");

// Check the notify channel for any pending messages. If there are any,
// avoid blocking when polling for IO events. Messages will be
Expand Down Expand Up @@ -296,30 +296,30 @@ impl<T, M: Send> Reactor<T, M> {
}

#[deriving(Clone)]
pub struct ReactorSender<M: Send> {
pub struct EventLoopSender<M: Send> {
notify: Notify<M>
}

impl<M: Send> ReactorSender<M> {
fn new(notify: Notify<M>) -> ReactorSender<M> {
ReactorSender { notify: notify }
impl<M: Send> EventLoopSender<M> {
fn new(notify: Notify<M>) -> EventLoopSender<M> {
EventLoopSender { notify: notify }
}

pub fn send(&self, msg: M) -> Result<(), M> {
self.notify.notify(msg)
}
}

pub type ReactorResult<H> = Result<H, ReactorError<H>>;
pub type EventLoopResult<H> = Result<H, EventLoopError<H>>;

pub struct ReactorError<H> {
pub struct EventLoopError<H> {
pub handler: H,
pub error: MioError
}

impl<H> ReactorError<H> {
fn new(handler: H, error: MioError) -> ReactorError<H> {
ReactorError {
impl<H> EventLoopError<H> {
fn new(handler: H, error: MioError) -> EventLoopError<H> {
EventLoopError {
handler: handler,
error: error
}
Expand All @@ -331,11 +331,11 @@ mod tests {
use std::str;
use std::sync::Arc;
use std::sync::atomics::{AtomicInt, SeqCst};
use super::Reactor;
use super::EventLoop;
use io::{IoWriter, IoReader};
use {io, buf, Buf, Handler, Token};

type TestReactor = Reactor<uint, ()>;
type TestEventLoop = EventLoop<uint, ()>;

struct Funtimes {
rcount: Arc<AtomicInt>,
Expand All @@ -352,15 +352,15 @@ mod tests {
}

impl Handler<uint, ()> for Funtimes {
fn readable(&mut self, _reactor: &mut TestReactor, token: Token) {
fn readable(&mut self, _event_loop: &mut TestEventLoop, token: Token) {
(*self.rcount).fetch_add(1, SeqCst);
assert_eq!(token, Token(10));
}
}

#[test]
fn test_readable() {
let mut reactor = Reactor::new().ok().expect("Couldn't make reactor");
let mut event_loop = EventLoop::new().ok().expect("Couldn't make event loop");

let (mut reader, mut writer) = io::pipe().unwrap();

Expand All @@ -369,9 +369,9 @@ mod tests {
let handler = Funtimes::new(rcount.clone(), wcount.clone());

writer.write(&mut buf::wrap("hello".as_bytes())).unwrap();
reactor.register(&reader, Token(10)).unwrap();
event_loop.register(&reader, Token(10)).unwrap();

let _ = reactor.run_once(handler);
let _ = event_loop.run_once(handler);
let mut b = buf::ByteBuf::new(16);

assert_eq!((*rcount).load(SeqCst), 1);
Expand Down
10 changes: 5 additions & 5 deletions src/handler.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
use reactor::Reactor;
use event_loop::EventLoop;
use token::Token;

#[allow(unused_variable)]
pub trait Handler<T, M: Send> {
fn readable(&mut self, reactor: &mut Reactor<T, M>, token: Token) {
fn readable(&mut self, event_loop: &mut EventLoop<T, M>, token: Token) {
}

fn writable(&mut self, reactor: &mut Reactor<T, M>, token: Token) {
fn writable(&mut self, event_loop: &mut EventLoop<T, M>, token: Token) {
}

fn notify(&mut self, reactor: &mut Reactor<T, M>, msg: M) {
fn notify(&mut self, event_loop: &mut EventLoop<T, M>, msg: M) {
}

fn timeout(&mut self, reactor: &mut Reactor<T, M>, timeout: T) {
fn timeout(&mut self, event_loop: &mut EventLoop<T, M>, timeout: T) {
}
}
12 changes: 6 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ pub use poll::{
IoEvent,
IoEventKind,
};
pub use reactor::{
Reactor,
ReactorConfig,
ReactorResult,
ReactorSender,
pub use event_loop::{
EventLoop,
EventLoopConfig,
EventLoopResult,
EventLoopSender,
};
pub use slab::Slab;
pub use socket::{
Expand All @@ -61,12 +61,12 @@ pub use token::{

pub mod buf;
mod error;
mod event_loop;
mod handler;
mod io;
mod notify;
mod os;
mod poll;
mod reactor;
mod slab;
mod socket;
mod timer;
Expand Down
8 changes: 4 additions & 4 deletions src/notify.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ use os;

static SLEEP: int = -1;

/// Send notifications to the reactor, waking it up if necessary. If the
/// reactor is not currently sleeping, avoid using an OS wake-up strategy
/// Send notifications to the event loop, waking it up if necessary. If the
/// event loop is not currently sleeping, avoid using an OS wake-up strategy
/// (eventfd, pipe, ...). Backed by a pre-allocated lock free MPMC queue.
///
/// TODO: Use more efficient wake-up strategy if available
Expand Down Expand Up @@ -75,7 +75,7 @@ impl<M: Send> NotifyInner<M> {
let mut val;

loop {
// If there are pending messages, then whether or not the reactor
// If there are pending messages, then whether or not the event loop
// was planning to sleep does not matter - it will not sleep.
if cur > 0 {
if max >= cur {
Expand Down Expand Up @@ -136,7 +136,7 @@ impl<M: Send> NotifyInner<M> {
if cur == SLEEP {
if self.awaken.wakeup().is_err() {
// TODO: Don't fail
fail!("failed to awaken reactor");
fail!("failed to awaken event loop");
}
}

Expand Down
Loading

0 comments on commit 3909056

Please sign in to comment.