From 7ac53366c48380d89e7792621f3cd1161bb38bcb Mon Sep 17 00:00:00 2001 From: Max Niederman Date: Sun, 21 Jan 2024 17:34:45 -0800 Subject: [PATCH] feat(router): use declarative configuration --- packages/centipede_router/src/config.rs | 288 ++++++++++++++++++++ packages/centipede_router/src/controller.rs | 230 ---------------- packages/centipede_router/src/lib.rs | 52 ++-- packages/centipede_router/src/worker.rs | 14 +- packages/centipede_router/tests/common.rs | 10 +- packages/centipede_router/tests/udp_conn.rs | 58 +++- packages/centipede_router/tests/udp_perf.rs | 51 ++-- packages/centipede_worker/src/lib.rs | 4 +- 8 files changed, 404 insertions(+), 303 deletions(-) create mode 100644 packages/centipede_router/src/config.rs delete mode 100644 packages/centipede_router/src/controller.rs diff --git a/packages/centipede_router/src/config.rs b/packages/centipede_router/src/config.rs new file mode 100644 index 0000000..841e1ec --- /dev/null +++ b/packages/centipede_router/src/config.rs @@ -0,0 +1,288 @@ +use std::{ + collections::{HashMap, HashSet}, + net::SocketAddr, + sync::{atomic::AtomicU64, Arc}, +}; + +use chacha20poly1305::ChaCha20Poly1305; + +use crate::{Link, PeerId}; + +/// Configuration of a router. +#[derive(Clone)] +pub struct Router { + /// The router's local peer identifier. + pub local_id: PeerId, + + /// Addresses on which workers should listen for incoming packets. + pub recv_addrs: HashSet, + + /// Set of receiving tunnels, by sender identifier. + pub recv_tunnels: HashMap, + + /// Set of sending tunnels, by receiver identifier. + pub send_tunnels: HashMap, +} + +/// Configuration of a receiving tunnel. +#[derive(Clone)] +pub struct RecvTunnel { + /// Cipher with which to decrypt messages. + pub cipher: ChaCha20Poly1305, +} + +/// Configuration of a sending tunnel. +#[derive(Clone)] +pub struct SendTunnel { + /// Cipher with which to encrypt messages. + pub cipher: ChaCha20Poly1305, + + /// Address pairs on which to send messages. + pub links: HashSet, +} + +// TODO: optimize to avoid incrementing the generation where possible +/// Apply a configuration to the state of a router. +pub(crate) fn apply(config: &Router, state: &crate::ConfiguredRouter) -> crate::ConfiguredRouter { + crate::ConfiguredRouter { + generation: state.generation.wrapping_add(1), + local_id: config.local_id, + recv_addrs: config.recv_addrs.iter().copied().collect(), + recv_tunnels: config + .recv_tunnels + .iter() + .map(|(id, tun)| { + ( + *id, + crate::RecvTunnel { + cipher: tun.cipher.clone(), + memory: state + .recv_tunnels + .get(id) + .map(|tun| tun.memory.clone()) + .unwrap_or_default(), + }, + ) + }) + .collect(), + send_tunnels: config + .send_tunnels + .iter() + .map(|(id, tun)| { + ( + *id, + crate::SendTunnel { + links: tun.links.iter().copied().collect(), + cipher: tun.cipher.clone(), + next_sequence_number: state + .send_tunnels + .get(id) + .map(|tun| tun.next_sequence_number.clone()) + .unwrap_or(Arc::new(AtomicU64::new(0))), + }, + ) + }) + .collect(), + } +} + +/// A handle to the router for configuration. +pub struct ConfiguratorHandle<'r> { + router: &'r crate::Router, +} + +impl<'r> ConfiguratorHandle<'r> { + /// Create a new configurator handle. + pub(crate) fn new(router: &'r crate::Router) -> Self { + Self { router } + } + + /// Drive the router to a new configuration. + pub fn configure(&self, config: &Router) { + let state = self.router.state.load(); + let new_state = apply(config, &state); + self.router.state.store(Arc::new(new_state)); + } +} + +#[cfg(test)] +mod tests { + use chacha20poly1305::KeyInit; + + use super::*; + + #[test] + fn apply_config_to_default_state() { + let mut state = crate::ConfiguredRouter::default(); + + state = apply( + &super::Router { + local_id: [1; 8], + recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect(), + recv_tunnels: HashMap::new(), + send_tunnels: HashMap::new(), + }, + &state, + ); + assert_eq!(state.generation, 1); + assert_eq!(state.local_id, [1; 8]); + assert_eq!( + state.recv_addrs.iter().copied().collect::>(), + [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect() + ); + assert!(state.recv_tunnels.is_empty()); + assert!(state.send_tunnels.is_empty()); + } + + #[test] + fn updating_send_tunnel_preserves_sequence() { + let mut state = crate::ConfiguredRouter::default(); + + state = apply( + &super::Router { + local_id: [1; 8], + recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect(), + recv_tunnels: HashMap::new(), + send_tunnels: { + let mut map = HashMap::new(); + map.insert( + [2; 8], + super::SendTunnel { + cipher: ChaCha20Poly1305::new((&[0; 32]).into()), + links: [Link { + local: SocketAddr::from(([127, 0, 0, 1], 0)), + remote: SocketAddr::from(([127, 0, 0, 1], 0)), + }] + .into_iter() + .collect(), + }, + ); + map + }, + }, + &state, + ); + assert_eq!(state.generation, 1); + assert_eq!(state.local_id, [1; 8]); + assert_eq!( + state.recv_addrs.iter().copied().collect::>(), + [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect() + ); + assert!(state.recv_tunnels.is_empty()); + assert_eq!(state.send_tunnels.len(), 1); + assert_eq!( + state.send_tunnels[&[2; 8]].links, + vec![Link { + local: SocketAddr::from(([127, 0, 0, 1], 0)), + remote: SocketAddr::from(([127, 0, 0, 1], 0)), + }] + ); + let sequence = state.send_tunnels[&[2; 8]].next_sequence_number.as_ref() as *const _; + + state = apply( + &super::Router { + local_id: [1; 8], + recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect(), + recv_tunnels: HashMap::new(), + send_tunnels: { + let mut map = HashMap::new(); + map.insert( + [2; 8], + super::SendTunnel { + cipher: ChaCha20Poly1305::new((&[0; 32]).into()), + links: [Link { + local: SocketAddr::from(([127, 0, 0, 1], 0)), + remote: SocketAddr::from(([127, 0, 0, 1], 0)), + }] + .into_iter() + .collect(), + }, + ); + map + }, + }, + &state, + ); + + assert_eq!( + state.send_tunnels[&[2; 8]].next_sequence_number.as_ref() as *const _, + sequence, + "sequence number generator was not preserved" + ); + } + + #[test] + fn updating_recv_tunnel_preserves_memory() { + let mut state = crate::ConfiguredRouter::default(); + + state = apply( + &super::Router { + local_id: [1; 8], + recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect(), + recv_tunnels: { + let mut map = HashMap::new(); + map.insert( + [2; 8], + super::RecvTunnel { + cipher: ChaCha20Poly1305::new((&[0; 32]).into()), + }, + ); + map + }, + send_tunnels: HashMap::new(), + }, + &state, + ); + assert_eq!(state.generation, 1); + assert_eq!(state.local_id, [1; 8]); + assert_eq!( + state.recv_addrs.iter().copied().collect::>(), + [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect() + ); + assert_eq!(state.recv_tunnels.len(), 1); + let memory = state.recv_tunnels[&[2; 8]].memory.as_ref() as *const _; + assert!(state.send_tunnels.is_empty()); + + state = apply( + &super::Router { + local_id: [1; 8], + recv_addrs: [SocketAddr::from(([127, 0, 0, 1], 0))] + .into_iter() + .collect(), + recv_tunnels: { + let mut map = HashMap::new(); + map.insert( + [2; 8], + super::RecvTunnel { + cipher: ChaCha20Poly1305::new((&[1; 32]).into()), + }, + ); + map + }, + send_tunnels: HashMap::new(), + }, + &state, + ); + + assert_eq!(state.generation, 2); + assert_eq!( + state.recv_tunnels[&[2; 8]].memory.as_ref() as *const _, + memory, + "packet memory was not preserved" + ); + } +} diff --git a/packages/centipede_router/src/controller.rs b/packages/centipede_router/src/controller.rs deleted file mode 100644 index 9ea0616..0000000 --- a/packages/centipede_router/src/controller.rs +++ /dev/null @@ -1,230 +0,0 @@ -use std::{ - collections::HashSet, - net::SocketAddr, - sync::{atomic::AtomicU64, Arc}, -}; - -use chacha20poly1305::ChaCha20Poly1305; - -use crate::{ - packet_memory::PacketMemory, ConfiguredRouter, Link, PeerId, RecvTunnel, Router, SendTunnel, -}; - -/// A handle to the router for reconfiguration. -pub struct ControllerHandle<'r> { - router: &'r Router, -} - -impl<'r> ControllerHandle<'r> { - /// Create a new controller, given a router. - /// - /// It is a logic error to create a controller for a router when there is already a controller for that router. - pub(crate) fn new(router: &'r Router) -> Self { - Self { router } - } - - /// Complete a transaction on the router. - /// - /// This function is the only way to mutate the router's state, - /// and there can only be one controller for a router at a time. - /// This guarantees that the state cannot be mutated concurrently. - pub fn transaction(&mut self, f: impl FnOnce(&mut Transaction) -> R) -> R { - let mut transaction = Transaction { - config: (*self.router.state.load_full()).clone(), - }; - let ret = f(&mut transaction); - - transaction.config.generation = transaction.config.generation.wrapping_add(1); - self.router.state.store(Arc::new(transaction.config)); - - ret - } -} - -pub struct Transaction { - config: ConfiguredRouter, -} - -impl Transaction { - /// Update the addresses on which to listen. - /// - /// # Note - /// This function deduplicates the given addresses by their hashes. - pub fn set_recv_addrs(&mut self, addrs: impl IntoIterator) { - let set: HashSet<_> = addrs.into_iter().collect(); - self.config.recv_addrs = set.into_iter().collect(); - } - - /// Insert or update a receive tunnel. - pub fn upsert_receive_tunnel(&mut self, sender_id: PeerId, cipher: ChaCha20Poly1305) { - if let Some(tunnel) = self.config.recv_tunnels.get_mut(&sender_id) { - tunnel.cipher = cipher; - } else { - self.config.recv_tunnels.insert( - sender_id, - RecvTunnel { - cipher, - memory: Arc::new(PacketMemory::default()), - }, - ); - } - } - - /// Delete a receive tunnel. - pub fn delete_receive_tunnel(&mut self, sender_id: PeerId) { - self.config.recv_tunnels.remove(&sender_id); - } - - /// Insert or update a send tunnel. - pub fn upsert_send_tunnel( - &mut self, - receiver_id: PeerId, - cipher: ChaCha20Poly1305, - links: Vec, - ) { - if let Some(tunnel) = self.config.send_tunnels.get_mut(&receiver_id) { - tunnel.cipher = cipher; - tunnel.links = links; - } else { - self.config.send_tunnels.insert( - receiver_id, - SendTunnel { - links, - cipher, - next_sequence_number: Arc::new(AtomicU64::new(0)), - }, - ); - } - } - - /// Delete a send tunnel. - pub fn delete_send_tunnel(&mut self, receiver_id: PeerId) { - self.config.send_tunnels.remove(&receiver_id); - } -} - -#[cfg(test)] -mod tests { - use std::{net::SocketAddr, sync::atomic::Ordering}; - - use chacha20poly1305::KeyInit; - - use crate::{packet_memory::PacketRecollection, Router}; - - use super::*; - - #[test] - fn construct() { - Router::new([0; 8], vec![]); - } - - fn state(controller: &ControllerHandle) -> Arc { - controller.router.state.load_full() - } - - #[test] - fn set_recv_addrs() { - let mut router = Router::new([0; 8], vec![]); - let (mut controller, _) = router.handles(0); - - assert_eq!(state(&controller).recv_addrs, vec![]); - let prev_generation = state(&controller).generation; - - controller.transaction(|trans| { - trans.set_recv_addrs([SocketAddr::from(([0, 0, 0, 0], 0))]); - }); - - assert_eq!( - state(&controller).recv_addrs, - vec![SocketAddr::from(([0, 0, 0, 0], 0))] - ); - assert_ne!(state(&controller).generation, prev_generation); - } - - #[test] - fn crud_receive_tunnel() { - let mut router = Router::new([0; 8], vec![]); - let (mut controller, _) = router.handles(0); - - controller.transaction(|trans| { - trans.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); - trans.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into())); - }); - assert!(state(&controller).recv_tunnels.contains_key(&[1; 8])); - - controller.transaction(|trans| trans.delete_receive_tunnel([1; 8])); - assert!(!state(&controller).recv_tunnels.contains_key(&[1; 8])); - } - - #[test] - fn crud_send_tunnel() { - let mut router = Router::new([0; 8], vec![]); - let (mut controller, _) = router.handles(0); - - let link = Link { - local: SocketAddr::from(([0, 0, 0, 0], 0)), - remote: SocketAddr::from(([0, 0, 0, 1], 1)), - }; - - controller.transaction(|trans| { - trans.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()), vec![link]) - }); - assert_eq!(state(&controller).send_tunnels[&[1; 8]].links, vec![link]); - - controller.transaction(|trans| { - trans.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into()), vec![link]); - }); - assert_eq!(state(&controller).send_tunnels[&[1; 8]].links, vec![link]); - - controller.transaction(|trans| { - trans.delete_send_tunnel([1; 8]); - }); - assert!(state(&controller).send_tunnels.is_empty()); - } - - #[test] - fn receive_updates_preserve_state() { - let mut router = Router::new([0; 8], vec![]); - let (mut controller, _) = router.handles(0); - - controller.transaction(|trans| { - trans.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into())); - }); - - state(&controller).recv_tunnels[&[1; 8]].memory.observe(0); - - controller.transaction(|trans| { - trans.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into())); - }); - - assert_eq!( - state(&controller).recv_tunnels[&[1; 8]].memory.observe(0), - PacketRecollection::Seen - ) - } - - #[test] - fn send_updates_preserve_state() { - let mut router = Router::new([0; 8], vec![]); - let (mut controller, _) = router.handles(0); - - controller.transaction(|trans| { - trans.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()), vec![]); - }); - - state(&controller).send_tunnels[&[1; 8]] - .next_sequence_number - .store(1, Ordering::SeqCst); - - controller.transaction(|trans| { - trans.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[1; 32]).into()), vec![]) - }); - - assert_eq!( - state(&controller).send_tunnels[&[1; 8]] - .next_sequence_number - .load(Ordering::SeqCst), - 1 - ); - } -} diff --git a/packages/centipede_router/src/lib.rs b/packages/centipede_router/src/lib.rs index 00885fd..75139e2 100644 --- a/packages/centipede_router/src/lib.rs +++ b/packages/centipede_router/src/lib.rs @@ -1,38 +1,44 @@ pub mod worker; -pub mod controller; +pub mod config; mod packet_memory; use std::{ collections::HashMap, - iter, net::SocketAddr, - sync::{atomic::AtomicU64, Arc}, + sync::{ + atomic::{AtomicBool, AtomicU64, Ordering}, + Arc, + }, }; use arc_swap::ArcSwap; use chacha20poly1305::ChaCha20Poly1305; -use controller::ControllerHandle; use packet_memory::PacketMemory; -use worker::WorkerHandle; + +pub use config::ConfiguratorHandle; +pub use worker::WorkerHandle; /// The shared state of a Centipede tunnel router. pub struct Router { /// The configured state of the router. state: ArcSwap, + + /// Lock to prevent multiple configurators from existing at once. + configurator_lock: AtomicBool, } /// The shared state of a configured Centipede tunnel router. -#[derive(Clone)] +#[derive(Clone, Default)] struct ConfiguredRouter { /// The generation of this configuration. - generation: u64, + generation: usize, /// Our local peer identifier. local_id: PeerId, /// Addresses on which to listen for incoming packets. - /// + /// /// This list should not contain duplicates. recv_addrs: Vec, @@ -80,25 +86,27 @@ pub type PeerId = [u8; 8]; impl Router { /// Create a new router. - pub fn new(peer_id: PeerId, recv_addrs: Vec) -> Self { + pub fn new(config: &config::Router) -> Self { Self { - state: ArcSwap::from_pointee(ConfiguredRouter { - generation: 0, - local_id: peer_id, - recv_addrs, - recv_tunnels: HashMap::new(), - send_tunnels: HashMap::new(), - }), + state: ArcSwap::from_pointee(config::apply(config, &ConfiguredRouter::default())), + configurator_lock: AtomicBool::new(false), } } - /// Get one controller and N worker handles to the router. - pub fn handles(&mut self, n: usize) -> (ControllerHandle<'_>, Vec>) { - let this = &*self; + /// Get a configurator handle to the router. + /// + /// # Panics + /// Panics if another configurator handle already exists. + pub fn configurator(&self) -> ConfiguratorHandle<'_> { + if self.configurator_lock.swap(true, Ordering::Acquire) { + panic!("another configurator already exists"); + } - let controller = ControllerHandle::new(this); - let workers = iter::repeat_with(|| WorkerHandle::new(this)).take(n).collect(); + ConfiguratorHandle::new(self) + } - (controller, workers) + /// Get a worker handle to the router. + pub fn worker(&self) -> WorkerHandle<'_> { + WorkerHandle::new(self) } } diff --git a/packages/centipede_router/src/worker.rs b/packages/centipede_router/src/worker.rs index 2acebe4..70d5501 100644 --- a/packages/centipede_router/src/worker.rs +++ b/packages/centipede_router/src/worker.rs @@ -15,6 +15,7 @@ use std::{ pin::Pin, slice, sync::{atomic::Ordering, Arc}, + task::Poll, }; /// A handle to the router for the use of a worker. @@ -23,7 +24,7 @@ pub struct WorkerHandle<'r> { router: &'r Router, /// The last observed configuration generation, if any. - last_config_generation: Option, + last_config_generation: Option, } impl<'r> WorkerHandle<'r> { @@ -36,16 +37,16 @@ impl<'r> WorkerHandle<'r> { } /// Check if the configuration has changed. - pub fn check_config(&mut self) -> Option { + pub fn poll_config_changed(&mut self) -> Poll { let config = self.router.state.load(); if self.last_config_generation != Some(config.generation) { self.last_config_generation = Some(config.generation); - Some(ConfigChanged { + Poll::Ready(ConfigChanged { router_state: self.router.state.load(), }) } else { - None + Poll::Pending } } @@ -86,14 +87,14 @@ pub struct ConfigChanged { impl ConfigChanged { /// Addresses on which the worker should listen for incoming packets. - /// + /// /// This iterator will not yield duplicates. pub fn recv_addrs(&self) -> impl Iterator + '_ { self.router_state.recv_addrs.iter().copied() } /// Addresses from which the worker may be expected to send outgoing packets. - /// + /// /// This iterator may yield duplicates. pub fn send_addrs(&self) -> impl Iterator + '_ { self.router_state @@ -177,6 +178,7 @@ impl<'p> HandleOutgoing<'p> { } } + // TODO: consider taking the scratch buffer by reference, so the caller doesn't have to `mem::take` it or similar. /// Resume the coroutine, yielding the next packet to send. pub fn resume(&mut self, scratch: Vec) -> Option { match self.remaining_links.next() { diff --git a/packages/centipede_router/tests/common.rs b/packages/centipede_router/tests/common.rs index 710f785..55ad220 100644 --- a/packages/centipede_router/tests/common.rs +++ b/packages/centipede_router/tests/common.rs @@ -1,15 +1,7 @@ -use centipede_router::{controller::ControllerHandle, worker::WorkerHandle, Router}; +use centipede_router::{config::ConfiguratorHandle, worker::WorkerHandle, Router}; use chacha20poly1305::{ChaCha20Poly1305, KeyInit}; /// A dummy cipher for testing. pub fn dummy_cipher() -> ChaCha20Poly1305 { ChaCha20Poly1305::new(&[0; 32].into()) } - -/// Get one controller and one worker handle to the router. -pub fn get_single_handles(router: &mut Router) -> (ControllerHandle<'_>, WorkerHandle<'_>) { - let (mut controller, workers) = router.handles(1); - let mut worker = workers.into_iter().next().unwrap(); - - (controller, worker) -} diff --git a/packages/centipede_router/tests/udp_conn.rs b/packages/centipede_router/tests/udp_conn.rs index bdf7ca3..3aea809 100644 --- a/packages/centipede_router/tests/udp_conn.rs +++ b/packages/centipede_router/tests/udp_conn.rs @@ -1,5 +1,5 @@ use std::{ - collections::HashMap, + collections::{HashMap, HashSet}, io, net::{SocketAddr, UdpSocket}, thread, @@ -11,7 +11,7 @@ use centipede_proto::{ PacketMessage, }; use centipede_router::{ - controller::ControllerHandle, + config::{self, ConfiguratorHandle}, worker::{SendPacket, WorkerHandle}, Link, PeerId, Router, }; @@ -55,10 +55,21 @@ fn half_duplex_single_message() { id: [0; 8], addr_count: 1, entrypoint: Box::new(|mut ctx: PeerCtx<'_>| { - let links = ctx.possible_links_to([1; 8]); - ctx.controller.transaction(move |trans| { - trans.upsert_send_tunnel([1; 8], dummy_cipher(), links.clone()) + ctx.configurator.configure(&config::Router { + send_tunnels: { + let mut map = HashMap::new(); + map.insert( + [1; 8], + config::SendTunnel { + cipher: dummy_cipher(), + links: ctx.possible_links_to([1; 8]), + }, + ); + map + }, + ..ctx.init_config.clone() }); + let mut obligations = ctx.worker.handle_outgoing(PACKET); let mut scratch = Vec::new(); @@ -82,8 +93,19 @@ fn half_duplex_single_message() { id: [1; 8], addr_count: 1, entrypoint: Box::new(|mut ctx: PeerCtx<'_>| { - ctx.controller - .transaction(|trans| trans.upsert_receive_tunnel([0; 8], dummy_cipher())); + ctx.configurator.configure(&config::Router { + recv_tunnels: { + let mut map = HashMap::new(); + map.insert( + [0; 8], + config::RecvTunnel { + cipher: dummy_cipher(), + }, + ); + map + }, + ..ctx.init_config.clone() + }); let packets = ctx.receive_block(); assert_eq!(packets.len(), 1, "received wrong number of packets"); @@ -102,7 +124,8 @@ fn half_duplex_single_message() { /// The context in which a peer test program runs. struct PeerCtx<'r> { - controller: ControllerHandle<'r>, + init_config: config::Router, + configurator: ConfiguratorHandle<'r>, worker: WorkerHandle<'r>, sockets: Vec, @@ -149,13 +172,18 @@ impl<'r> PeerCtx<'r> { let sockets = sockets.remove(&spec.id).unwrap(); s.spawn(move || { - let mut router = - Router::new(spec.id, peer_addrs.get(&spec.id).unwrap().clone()); - let (controller, worker) = get_single_handles(&mut router); - + let init_config = config::Router { + local_id: spec.id, + recv_addrs: peer_addrs.get(&spec.id).unwrap().iter().copied().collect(), + recv_tunnels: HashMap::new(), + send_tunnels: HashMap::new(), + }; + + let router = Router::new(&init_config); (spec.entrypoint)(PeerCtx { - controller, - worker, + init_config, + configurator: router.configurator(), + worker: router.worker(), sockets, peers: peer_addrs, }); @@ -164,7 +192,7 @@ impl<'r> PeerCtx<'r> { }); } - fn possible_links_to(&self, peer_id: PeerId) -> Vec { + fn possible_links_to(&self, peer_id: PeerId) -> HashSet { self.sockets .iter() .map(|s| s.local_addr().unwrap()) diff --git a/packages/centipede_router/tests/udp_perf.rs b/packages/centipede_router/tests/udp_perf.rs index 58f082e..a18cf97 100644 --- a/packages/centipede_router/tests/udp_perf.rs +++ b/packages/centipede_router/tests/udp_perf.rs @@ -4,7 +4,7 @@ extern crate test; use std::{mem, net::UdpSocket, thread}; use centipede_proto::PacketMessage; -use centipede_router::{Link, Router}; +use centipede_router::{config, Link, Router}; mod common; use common::*; @@ -39,12 +39,20 @@ fn half_duplex_iter(packet_size: usize, num_packets: usize) { // receiver s.spawn(move || { - let mut router = Router::new([1; 8], vec![recv_addr]); - let (mut controller, mut worker) = get_single_handles(&mut router); - - controller.transaction(|trans| { - trans.upsert_receive_tunnel([0; 8], dummy_cipher()); + let router = Router::new(&config::Router { + local_id: [1; 8], + recv_addrs: [recv_addr].into(), + recv_tunnels: [( + [0; 8], + config::RecvTunnel { + cipher: dummy_cipher(), + }, + )] + .into_iter() + .collect(), + send_tunnels: [].into(), }); + let mut worker = router.worker(); let mut read_buf = vec![0; PacketMessage::measure(packet_size)]; @@ -63,8 +71,24 @@ fn half_duplex_iter(packet_size: usize, num_packets: usize) { // sender s.spawn(move || { - let mut router = Router::new([0; 8], vec![]); - let (mut controller, mut worker) = get_single_handles(&mut router); + let router = Router::new(&config::Router { + local_id: [0; 8], + recv_addrs: [].into(), + recv_tunnels: [].into(), + send_tunnels: [( + [1; 8], + config::SendTunnel { + links: [Link { + local: recv_addr, + remote: recv_addr, + }] + .into(), + cipher: dummy_cipher(), + }, + )] + .into(), + }); + let mut worker = router.worker(); let send_socket = UdpSocket::bind("127.0.0.1:0").unwrap(); @@ -73,17 +97,6 @@ fn half_duplex_iter(packet_size: usize, num_packets: usize) { send_socket.local_addr().unwrap(), ); - controller.transaction(|trans| { - trans.upsert_send_tunnel( - [1; 8], - dummy_cipher(), - vec![Link { - local: send_socket.local_addr().unwrap(), - remote: recv_addr, - }], - ); - }); - let mut read_buf = vec![0; packet_size]; let mut write_buf = Vec::new(); diff --git a/packages/centipede_worker/src/lib.rs b/packages/centipede_worker/src/lib.rs index 1cbc84f..631c06f 100644 --- a/packages/centipede_worker/src/lib.rs +++ b/packages/centipede_worker/src/lib.rs @@ -48,7 +48,7 @@ impl<'r> Worker<'r> { /// /// Mutably borrows an event buffer for scratch space, to avoid reallocating it. pub fn wait_and_handle(&mut self, events: &mut mio::Events) -> Result<(), Error> { - if let Some(change) = self.router_handle.check_config() { + if let Poll::Ready(change) = self.router_handle.poll_config_changed() { self.handle_config_change(change)?; } @@ -106,7 +106,7 @@ impl<'r> Worker<'r> { fn handle_tun_readable(&mut self) -> Result<(), Error> { // TODO: optimize this let mut read_buf = [0; PACKET_BUFFER_SIZE]; - let mut write_buf = vec![0; PACKET_BUFFER_SIZE]; + let mut write_buf = Vec::new(); while let Poll::Ready(n) = self.tun_queue.read(&mut read_buf).map_err(Error::ReadTun)? { let buf = &mut read_buf[..n];