diff --git a/packages/centipede/src/main.rs b/packages/centipede/src/main.rs index c58eb06..7474a88 100644 --- a/packages/centipede/src/main.rs +++ b/packages/centipede/src/main.rs @@ -103,7 +103,7 @@ fn main() -> Result<()> { .queue_nonblocking(0) .into_diagnostic() .wrap_err("failed to get TUN queue 0 (for the special first worker)")?, - ); + )?; s.spawn(move || { let mut events = mio::Events::with_capacity(1024); @@ -158,7 +158,7 @@ fn main() -> Result<()> { .queue_nonblocking(0) .into_diagnostic() .wrap_err_with(|| format!("failed to get TUN queue {}", i))?, - ); + )?; s.spawn(move || { let mut events = mio::Events::with_capacity(1024); loop { diff --git a/packages/centipede_worker/src/lib.rs b/packages/centipede_worker/src/lib.rs index 3415120..c76c09f 100644 --- a/packages/centipede_worker/src/lib.rs +++ b/packages/centipede_worker/src/lib.rs @@ -47,14 +47,22 @@ impl<'r> Worker<'r> { dyn FnMut(SocketAddr, ControlMessage, auth::Unknown>) + Send + 'r, >, tun_queue: hypertube::Queue<'r, false>, - ) -> Self { - Self { + ) -> Result { + let poll = mio::Poll::new()?; + + poll.registry().register( + &mut SourceFd(&tun_queue.as_raw_fd()), + TUN_TOKEN, + mio::Interest::READABLE, + )?; + + Ok(Self { router_handle, control_message_sink, tun_queue, sockets: Sockets::new(), - poll: mio::Poll::new().unwrap(), - } + poll, + }) } /// Send a control message using the worker's set of sockets. @@ -112,16 +120,22 @@ impl<'r> Worker<'r> { .unwrap(); } + // FIXME: deregister closed sockets + Ok(()) } /// Handle a readable event on the TUN device. fn handle_tun_readable(&mut self) -> Result<(), Error> { + log::trace!("tun readable"); + // TODO: optimize this let mut read_buf = [0; PACKET_BUFFER_SIZE]; let mut write_buf = Vec::new(); while let Poll::Ready(n) = self.tun_queue.read(&mut read_buf).map_err(Error::ReadTun)? { + log::trace!("tun read {} byte packet", n); + let buf = &mut read_buf[..n]; let mut obligations = self.router_handle.handle_outgoing(buf); @@ -147,6 +161,8 @@ impl<'r> Worker<'r> { /// Handle a readable event on a socket. fn handle_socket_readable(&mut self, idx: usize) -> Result<(), Error> { + log::trace!("socket {} readable", idx); + let socket = self .sockets .resolve_index(idx) @@ -157,6 +173,8 @@ impl<'r> Worker<'r> { loop { match socket.recv_from(&mut buf) { Ok((n, from)) => { + log::trace!("socket {} read {} byte packet", idx, n); + // SAFETY: we just read `n` bytes into the buffer. let msg = unsafe { MaybeUninit::slice_assume_init_mut(&mut buf[..n]) };