From f4a754674526100dc026a7ec6e4279608998f801 Mon Sep 17 00:00:00 2001 From: Max Niederman Date: Wed, 17 Jan 2024 00:37:08 -0800 Subject: [PATCH] fix(centipede_router): add integration test and fix handle_outgoing bug --- packages/centipede_router/src/controller.rs | 10 +- packages/centipede_router/src/lib.rs | 10 +- packages/centipede_router/src/worker.rs | 10 +- .../centipede_router/tests/udp_threads.rs | 225 ++++++++++++++++++ 4 files changed, 240 insertions(+), 15 deletions(-) create mode 100644 packages/centipede_router/tests/udp_threads.rs diff --git a/packages/centipede_router/src/controller.rs b/packages/centipede_router/src/controller.rs index 3ebb1f8..e11e5ed 100644 --- a/packages/centipede_router/src/controller.rs +++ b/packages/centipede_router/src/controller.rs @@ -111,7 +111,7 @@ mod tests { #[test] fn construct() { - Router::new([0; 8], vec![]); + Router::new([0; 8]); } fn state<'c>(controller: &Controller) -> Arc { @@ -120,7 +120,7 @@ mod tests { #[test] fn crud_receive_tunnel() { - let mut router = Router::new([0; 8], vec![]); + let mut router = Router::new([0; 8]); let (mut controller, _) = router.handles(0); controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); @@ -135,7 +135,7 @@ mod tests { #[test] fn crud_send_tunnel() { - let mut router = Router::new([0; 8], vec![]); + let mut router = Router::new([0; 8]); let (mut controller, _) = router.handles(0); let link = Link { @@ -158,7 +158,7 @@ mod tests { #[test] fn receive_updates_preserve_state() { - let mut router = Router::new([0; 8], vec![]); + let mut router = Router::new([0; 8]); let (mut controller, _) = router.handles(0); controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); @@ -175,7 +175,7 @@ mod tests { #[test] fn send_updates_preserve_state() { - let mut router = Router::new([0; 8], vec![]); + let mut router = Router::new([0; 8]); let (mut controller, _) = router.handles(0); controller.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()), vec![]); diff --git a/packages/centipede_router/src/lib.rs b/packages/centipede_router/src/lib.rs index c2db9d4..4b95bca 100644 --- a/packages/centipede_router/src/lib.rs +++ b/packages/centipede_router/src/lib.rs @@ -31,9 +31,6 @@ struct ConfiguredRouter { /// Our local peer identifier. local_id: PeerId, - /// Local addresses on which to receive messages. - recv_addrs: Vec, - /// Set of receiving tunnels, by sender identifier. recv_tunnels: HashMap, @@ -68,22 +65,21 @@ struct SendTunnel { #[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)] pub struct Link { /// The local address. - local: SocketAddr, + pub local: SocketAddr, /// The remote address. - remote: SocketAddr, + pub remote: SocketAddr, } pub type PeerId = [u8; 8]; impl Router { /// Create a new router. - pub fn new(peer_id: PeerId, recv_addrs: Vec) -> Self { + pub fn new(peer_id: PeerId) -> Self { Self { state: ArcSwap::from_pointee(ConfiguredRouter { generation: 0, local_id: peer_id, - recv_addrs, recv_tunnels: HashMap::new(), send_tunnels: HashMap::new(), }), diff --git a/packages/centipede_router/src/worker.rs b/packages/centipede_router/src/worker.rs index 76b51af..a74d53d 100644 --- a/packages/centipede_router/src/worker.rs +++ b/packages/centipede_router/src/worker.rs @@ -68,6 +68,10 @@ impl> ReceivePacket { } /// A coroutine yielding packets to send. +/// +/// Once the `coroutines` feature is stable, +/// this will be replaced with a coroutine literal. +#[must_use = "outgoing packet coroutine must be resumed"] pub struct HandleOutgoing<'p> { /// Plaintext of the outgoing packet being handled. packet_plaintext: &'p [u8], @@ -137,8 +141,8 @@ impl<'p> HandleOutgoing<'p> { Some(SendPacket { link, message }) } None => { - self.tunnels.next()?; - self.remaining_links = self.tunnels.peek().unwrap().links.iter(); + self.remaining_links = self.tunnels.peek()?.links.iter(); + self.tunnels.next().unwrap(); None } } @@ -148,7 +152,7 @@ impl<'p> HandleOutgoing<'p> { /// The obligation to /// send a packet from the system's networking stack /// to another peer on the Centipede network. -#[must_use] +#[must_use = "send packet obligation must be fulfilled"] pub struct SendPacket { /// The link to send the packet over. link: Link, diff --git a/packages/centipede_router/tests/udp_threads.rs b/packages/centipede_router/tests/udp_threads.rs new file mode 100644 index 0000000..46516ec --- /dev/null +++ b/packages/centipede_router/tests/udp_threads.rs @@ -0,0 +1,225 @@ +use std::{ + collections::HashMap, + io, + net::{SocketAddr, UdpSocket}, + thread, + time::Duration, +}; + +use centipede_proto::{ + marker::{auth, text}, + PacketMessage, +}; +use centipede_router::{ + controller::Controller, + worker::{SendPacket, Worker}, + Link, PeerId, Router, +}; +use chacha20poly1305::{ChaCha20Poly1305, KeyInit}; + +/// Test a network with zero peers. +/// +/// Primarily a meta-test on the test harness. +#[test] +fn empty_network() { + PeerCtx::run(vec![]); +} + +/// Test a network with one peer. +/// +/// Primarily a meta-test on the test harness. +#[test] +fn single_peer() { + PeerCtx::run(vec![PeerSpec { + id: [0; 8], + addr_count: 1, + entrypoint: Box::new(|mut ctx: PeerCtx<'_>| { + let mut obligations = ctx.worker.handle_outgoing(&[0; 1024]); + assert!( + obligations.resume(vec![]).is_none(), + "router is alone, but attempted to send a packet" + ); + }), + }]); +} + +/// Send a single message over a half-duplex connection. +#[test] +fn half_duplex_single_message() { + const PACKET: &[u8] = "Hello, centipede!".as_bytes(); + + PeerCtx::run(vec![ + PeerSpec { + id: [0; 8], + addr_count: 1, + entrypoint: Box::new(|mut ctx: PeerCtx<'_>| { + ctx.controller.upsert_send_tunnel( + [1; 8], + dummy_cipher(), + ctx.possible_links_to([1; 8]), + ); + + let mut obligations = ctx.worker.handle_outgoing(PACKET); + let mut scratch = Vec::new(); + + scratch = match obligations.resume(scratch) { + Some(obligation) => { + assert_eq!(obligation.message().sequence_number(), 0); + assert_eq!(obligation.message().sender(), [0; 8]); + + ctx.fulfill_send(obligation) + } + None => panic!("sending router did not attempt to send a packet"), + }; + + assert!( + obligations.resume(scratch).is_none(), + "sending router attempted to send a second packet when only one link was configured" + ); + }), + }, + PeerSpec { + id: [1; 8], + addr_count: 1, + entrypoint: Box::new(|mut ctx: PeerCtx<'_>| { + ctx.controller.upsert_receive_tunnel([0; 8], dummy_cipher()); + + let packets = ctx.receive_block(); + assert_eq!(packets.len(), 1, "received wrong number of packets"); + let packet = packets.into_iter().next().unwrap(); + + match ctx.worker.handle_incoming(packet) { + Some(obligation) => { + assert_eq!(obligation.packet(), PACKET, "received wrong packet"); + } + None => panic!("receiving router did not attempt to receive a packet"), + } + }), + }, + ]); +} + +/// The context in which a peer test program runs. +struct PeerCtx<'r> { + controller: Controller<'r>, + worker: Worker<'r>, + + sockets: Vec, + peers: HashMap>, +} + +struct PeerSpec { + id: PeerId, + addr_count: usize, + entrypoint: Box, +} + +impl<'r> PeerCtx<'r> { + fn run(peers: Vec) { + let mut sockets: HashMap> = peers + .iter() + .map(|spec| { + ( + spec.id, + (0..spec.addr_count) + .map(|_| UdpSocket::bind("127.0.0.1:0").unwrap()) + .inspect(|s| s.set_nonblocking(true).unwrap()) + .collect(), + ) + }) + .collect(); + + let peer_addrs: HashMap> = peers + .iter() + .map(|spec| { + ( + spec.id, + sockets[&spec.id] + .iter() + .map(|s| s.local_addr().unwrap()) + .collect(), + ) + }) + .collect(); + + thread::scope(move |s| { + for spec in peers { + let peer_addrs = peer_addrs.clone(); + let sockets = sockets.remove(&spec.id).unwrap(); + + s.spawn(move || { + let mut router = Router::new(spec.id); + let (controller, workers) = router.handles(1); + let worker = workers.into_iter().next().unwrap(); + + (spec.entrypoint)(PeerCtx { + controller, + worker, + sockets, + peers: peer_addrs, + }); + }); + } + }); + } + + fn possible_links_to(&self, peer_id: PeerId) -> Vec { + self.sockets + .iter() + .map(|s| s.local_addr().unwrap()) + .flat_map(|local| { + self.peers[&peer_id] + .iter() + .map(move |&remote| Link { local, remote }) + }) + .collect() + } + + fn receive(&self) -> Vec, auth::Unknown, text::Ciphertext>> { + self.sockets + .iter() + .filter_map(|s| { + let mut buf = vec![0; 1024]; + + match s.recv(&mut buf) { + Ok(n) => { + buf.truncate(n); + Some(PacketMessage::from_buffer(buf).expect("received invalid packet")) + } + Err(err) if err.kind() == io::ErrorKind::WouldBlock => None, + Err(err) => panic!("error receiving: {}", err), + } + }) + .collect() + } + + fn receive_block(&self) -> Vec, auth::Unknown, text::Ciphertext>> { + loop { + let packets = self.receive(); + + if !packets.is_empty() { + return packets; + } + + thread::sleep(Duration::from_millis(1)); + } + } + + fn fulfill_send(&self, obligation: SendPacket) -> Vec { + let socket = self + .sockets + .iter() + .find(|s| s.local_addr().unwrap() == obligation.link().local) + .expect("no socket for link specified by router"); + + socket + .send_to(obligation.message().as_buffer(), obligation.link().remote) + .unwrap(); + + obligation.fulfill() + } +} + +fn dummy_cipher() -> ChaCha20Poly1305 { + ChaCha20Poly1305::new(&[0; 32].into()) +}