diff --git a/Cargo.lock b/Cargo.lock index 6d726ab..a08c82d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12,6 +12,15 @@ dependencies = [ "generic-array", ] +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + [[package]] name = "android-tzdata" version = "0.1.1" @@ -27,6 +36,54 @@ dependencies = [ "libc", ] +[[package]] +name = "anstream" +version = "0.6.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d96bd03f33fe50a863e394ee9718a706f988b9079b20c3784fb726e7678b62fb" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "utf8parse", +] + +[[package]] +name = "anstyle" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8901269c6307e8d93993578286ac0edf7f195079ffff5ebdeea6a59ffb7e36bc" + +[[package]] +name = "anstyle-parse" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c75ac65da39e5fe5ab759307499ddad880d724eed2f6ce5b5e8a26f4f387928c" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e28923312444cdd728e4738b3f9c9cac739500909bb3d3c94b43551b16517648" +dependencies = [ + "windows-sys 0.52.0", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1cd54b81ec8d6180e24654d0b371ad22fc3dd083b6ff8ba325b72e00c87660a7" +dependencies = [ + "anstyle", + "windows-sys 0.52.0", +] + [[package]] name = "arc-swap" version = "1.6.0" @@ -60,6 +117,12 @@ dependencies = [ "serde", ] +[[package]] +name = "bitflags" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" + [[package]] name = "block-buffer" version = "0.10.4" @@ -89,11 +152,19 @@ name = "centipede" version = "0.1.0" dependencies = [ "base64", + "centipede_control", "centipede_router", "centipede_worker", "cidr", + "clap", + "ctrlc", "ed25519-dalek", + "hypertube", + "log", + "mio", "num_cpus", + "pretty_env_logger", + "rand", "serde", "serde_with", "toml", @@ -152,6 +223,12 @@ version = "1.0.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" +[[package]] +name = "cfg_aliases" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fd16c4719339c4530435d38e511904438d07cce7950afa3718a84ac36c10e89e" + [[package]] name = "chacha20" version = "0.9.1" @@ -209,6 +286,52 @@ dependencies = [ "zeroize", ] +[[package]] +name = "clap" +version = "4.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "90bc066a67923782aa8515dbaea16946c5bcc5addbd668bb80af688e53e548a0" +dependencies = [ + "clap_builder", + "clap_derive", +] + +[[package]] +name = "clap_builder" +version = "4.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ae129e2e766ae0ec03484e609954119f123cc1fe650337e155d03b022f24f7b4" +dependencies = [ + "anstream", + "anstyle", + "clap_lex", + "strsim 0.11.0", +] + +[[package]] +name = "clap_derive" +version = "4.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "528131438037fd55894f62d6e9f068b8f45ac57ffa77517819645d10aed04f64" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "clap_lex" +version = "0.7.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "98cc8fbded0c607b7ba9dd60cd98df59af97e84d24e49c8557331cfc26d301ce" + +[[package]] +name = "colorchoice" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "acbf1af155f9b9ef647e42cdc158db4b64a1b61f743629225fde6f3e0be2a7c7" + [[package]] name = "const-oid" version = "0.9.6" @@ -241,6 +364,16 @@ dependencies = [ "typenum", ] +[[package]] +name = "ctrlc" +version = "3.4.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "672465ae37dc1bc6380a6547a8883d5dd397b0f1faaad4f265726cc7042a5345" +dependencies = [ + "nix", + "windows-sys 0.52.0", +] + [[package]] name = "curve25519-dalek" version = "4.1.2" @@ -290,7 +423,7 @@ dependencies = [ "ident_case", "proc-macro2", "quote", - "strsim", + "strsim 0.10.0", "syn", ] @@ -361,6 +494,19 @@ dependencies = [ "zeroize", ] +[[package]] +name = "env_logger" +version = "0.10.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4cd405aab171cb85d6735e5c8d9db038c17d3ca007a4d2c25f337935c3d90580" +dependencies = [ + "humantime", + "is-terminal", + "log", + "regex", + "termcolor", +] + [[package]] name = "equivalent" version = "1.0.1" @@ -412,6 +558,12 @@ version = "0.14.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "290f1a1d9242c78d09ce40a5e87e7554ee637af1351968159f4952f028f75604" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.5" @@ -424,6 +576,12 @@ version = "0.4.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7f24254aa9a54b5c858eaee2f5bccdb46aaf0e486a595ed5fd8f86ba55232a70" +[[package]] +name = "humantime" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9a3a5bfb195931eeb336b2a7b4d761daec841b97f947d34394601737a7bba5e4" + [[package]] name = "hypertube" version = "0.2.2" @@ -501,6 +659,17 @@ version = "0.8.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "8bd11f3a29434026f5ff98c730b668ba74b1033637b8817940b54d040696133c" +[[package]] +name = "is-terminal" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f23ff5ef2b80d608d61efee834934d862cd92461afc0560dedf493e4c033738b" +dependencies = [ + "hermit-abi", + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "itoa" version = "1.0.10" @@ -524,9 +693,9 @@ checksum = "9c198f91728a82281a64e1f4f9eeb25d82cb32a5de251c6bd1b5154d63a8e7bd" [[package]] name = "log" -version = "0.4.20" +version = "0.4.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b5e6163cb8c49088c2c36f57875e58ccd8c87c7427f7fbd50ea6710b2f3f2e8f" +checksum = "90ed8c1e510134f979dbc4f070f87d4313098b704861a105fe34231c70a3901c" [[package]] name = "memchr" @@ -536,14 +705,26 @@ checksum = "523dc4f511e55ab87b694dc30d0f820d60906ef06413f93d4d7a1385599cc149" [[package]] name = "mio" -version = "0.8.10" +version = "0.8.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f3d0b296e374a4e6f3c7b0a1f5a51d748a0d34c85e7dc48fc3fa9a87657fe09" +checksum = "a4a650543ca06a924e8b371db273b2756685faae30f8487da1b56505a8f78b0c" dependencies = [ "libc", "log", "wasi", - "windows-sys", + "windows-sys 0.48.0", +] + +[[package]] +name = "nix" +version = "0.28.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ab2156c4fce2f8df6c499cc1c763e4394b7482525bf2a9701c9d79d215f519e4" +dependencies = [ + "bitflags", + "cfg-if", + "cfg_aliases", + "libc", ] [[package]] @@ -622,6 +803,16 @@ version = "0.2.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" +[[package]] +name = "pretty_env_logger" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "865724d4dbe39d9f3dd3b52b88d859d66bcb2d6a0acfd5ea68a65fb66d4bdc1c" +dependencies = [ + "env_logger", + "log", +] + [[package]] name = "proc-macro2" version = "1.0.78" @@ -670,6 +861,35 @@ dependencies = [ "getrandom", ] +[[package]] +name = "regex" +version = "1.10.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c117dbdfde9c8308975b6a18d71f3f385c89461f7b3fb054288ecf2a2058ba4c" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86b83b8b9847f9bf95ef68afb0b8e6cdb80f498442f5179a29fad448fcc1eaea" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "adad44e29e4c806119491a7f06f03de4d1af22c3a680dd47f1e6e179439d1f56" + [[package]] name = "rustc_version" version = "0.4.0" @@ -788,7 +1008,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5fac59a5cb5dd637972e5fca70daf0523c9067fcdc4842f053dae04a18f8e9" dependencies = [ "libc", - "windows-sys", + "windows-sys 0.48.0", ] [[package]] @@ -807,6 +1027,12 @@ version = "0.10.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "73473c0e59e6d5812c5dfe2a064a6444949f089e20eec9a2e5506596494e4623" +[[package]] +name = "strsim" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5ee073c9e4cd00e28217186dbe12796d692868f432bf2e97ee73bed0c56dfa01" + [[package]] name = "subtle" version = "2.5.0" @@ -824,6 +1050,15 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "termcolor" +version = "1.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06794f8f6c5c898b3275aebefa6b8a1cb24cd2c6c79397ab15774837a0bc5755" +dependencies = [ + "winapi-util", +] + [[package]] name = "thiserror" version = "1.0.56" @@ -931,6 +1166,12 @@ dependencies = [ "subtle", ] +[[package]] +name = "utf8parse" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "711b9620af191e0cdc7468a8d14e709c3dcdb115b36f838e601583af800a370a" + [[package]] name = "version_check" version = "0.9.4" @@ -997,6 +1238,37 @@ version = "0.2.91" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4f186bd2dcf04330886ce82d6f33dd75a7bfcf69ecf5763b89fcde53b6ac9838" +[[package]] +name = "winapi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" +dependencies = [ + "winapi-i686-pc-windows-gnu", + "winapi-x86_64-pc-windows-gnu", +] + +[[package]] +name = "winapi-i686-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" + +[[package]] +name = "winapi-util" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f29e6f9198ba0d26b4c9f07dbe6f9ed633e1f3d5b8b414090084349e46a52596" +dependencies = [ + "winapi", +] + +[[package]] +name = "winapi-x86_64-pc-windows-gnu" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + [[package]] name = "windows-core" version = "0.52.0" @@ -1015,6 +1287,15 @@ dependencies = [ "windows-targets 0.48.5", ] +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets 0.52.0", +] + [[package]] name = "windows-targets" version = "0.48.5" diff --git a/packages/centipede/Cargo.toml b/packages/centipede/Cargo.toml index 3bc847a..6161ca1 100644 --- a/packages/centipede/Cargo.toml +++ b/packages/centipede/Cargo.toml @@ -7,9 +7,17 @@ edition = "2021" base64 = "0.21.7" centipede_router = { version = "0.1.0", path = "../centipede_router" } centipede_worker = { version = "0.1.0", path = "../centipede_worker" } +centipede_control = { version = "0.1.0", path = "../centipede_control" } cidr = { version = "0.2.2", features = ["serde"] } +clap = { version = "4.5.4", features = ["derive"] } ed25519-dalek = { version = "2.1.0", features = ["serde"] } +log = "0.4.21" num_cpus = "1.16.0" +pretty_env_logger = "0.5.0" serde = { version = "1.0.195", features = ["derive"] } serde_with = { version = "3.5.0", features = ["base64"] } toml = "0.8.8" +rand = "0.8.5" +hypertube = "0.2.2" +mio = "0.8.11" +ctrlc = "3.4.4" diff --git a/packages/centipede/src/config.rs b/packages/centipede/src/config.rs index f87ac73..74a53ed 100644 --- a/packages/centipede/src/config.rs +++ b/packages/centipede/src/config.rs @@ -21,7 +21,7 @@ pub struct Centipede { /// Addresses on which the daemon should listen for incoming packets. pub recv_addrs: Vec, - /// Number of workers to spawn. + /// of workers to spawn. #[serde(default = "num_cpus::get")] pub workers: usize, @@ -36,6 +36,10 @@ pub struct Peer { #[serde_as(as = "Base64")] pub public_key: [u8; ed25519_dalek::PUBLIC_KEY_LENGTH], + // TODO: rename to make it clear that these are only for sending messages. + /// Local addresses from which to send messages to the peer. + pub local_addrs: Vec, + /// Known remote addresses of the peer. pub remote_addrs: Vec, } diff --git a/packages/centipede/src/main.rs b/packages/centipede/src/main.rs index e0265f2..3583199 100644 --- a/packages/centipede/src/main.rs +++ b/packages/centipede/src/main.rs @@ -1,3 +1,160 @@ +use std::{ + path::PathBuf, + sync::{ + atomic::{AtomicBool, Ordering}, + mpsc, Arc, + }, + thread, + time::{Duration, SystemTime}, +}; + +use centipede_control::{Controller, IncomingMessage}; +use centipede_router::Router; +use centipede_worker::Worker; +use rand::thread_rng; + mod config; -fn main() {} +#[derive(Debug, clap::Parser)] +struct Opt { + /// Path to config file. + config: PathBuf, +} + +fn main() { + pretty_env_logger::init(); + + let opt = ::parse(); + log::debug!("opt: {:#?}", opt); + + let config: config::Centipede = toml::from_str( + std::fs::read_to_string(&opt.config) + .expect("failed to open config file") + .as_str(), + ) + .expect("failed to parse config"); + log::debug!("config: {:#?}", config); + + let (mut controller, init_router_config) = Controller::new( + SystemTime::now(), + ed25519_dalek::SigningKey::from_bytes(&config.private_key), + thread_rng(), + ); + + let router = Router::new(&init_router_config); + let router_configurator = router.configurator(); + + let tun_dev = hypertube::builder() + .with_name(config.interface_name) + .with_address(config.address.address()) + .with_netmask(config.address.network()) + .with_num_queues(config.workers) + .build() + .expect("failed to create tun device"); + + let (tx_incoming_control, rx_incoming_control) = + mpsc::channel::>>(); + let (tx_outgoing_control, rx_outgoing_control) = + mpsc::channel::(); + let control_message_sink = Box::new(|from, message| { + tx_incoming_control + .send(IncomingMessage { from, message }) + .unwrap(); + }); + + let shutdown = Arc::new(AtomicBool::new(false)); + log::debug!("starting shutdown signal handler"); + ctrlc::set_handler({ + let shutdown = shutdown.clone(); + move || { + shutdown.store(true, Ordering::SeqCst); + } + }) + .expect("failed to set shutdown signal handler"); + + thread::scope(|s| { + { + let shutdown = shutdown.clone(); + let mut worker = Worker::new( + router.worker(), + control_message_sink.clone(), + tun_dev + .queue_nonblocking(0) + .expect("failed getting first tun queue"), + ); + + s.spawn(move || { + let mut events = mio::Events::with_capacity(1024); + loop { + if shutdown.load(Ordering::Relaxed) { + break; + } + + if let Ok(outgoing) = rx_outgoing_control.try_recv() { + worker + .send_control_message::>( + outgoing.from, + outgoing.to, + outgoing.message, + ) + .expect("failed sending control message"); + } + + worker.wait_and_handle(&mut events).unwrap(); + } + }); + } + + for i in 1..config.workers { + let shutdown = shutdown.clone(); + let mut worker = Worker::new( + router.worker(), + control_message_sink.clone(), + tun_dev + .queue_nonblocking(i) + .expect("failed getting additional tun queue"), + ); + s.spawn(move || { + let mut events = mio::Events::with_capacity(1024); + loop { + if shutdown.load(Ordering::Relaxed) { + break; + } + + worker.wait_and_handle(&mut events).unwrap(); + } + }); + } + + loop { + if shutdown.load(Ordering::Relaxed) { + log::info!("Received shutdown signal, waiting for workers to finish..."); + break; + } + + let incoming = match rx_incoming_control.recv_timeout(Duration::from_millis(10)) { + Ok(incoming) => Some(incoming), + Err(mpsc::RecvTimeoutError::Timeout) => None, + Err(mpsc::RecvTimeoutError::Disconnected) => { + panic!("incoming control message channel disconnected") + } + }; + + let now = SystemTime::now(); + + if let Some(incoming) = incoming { + controller.handle_incoming(now, incoming); + } + + let events = controller.poll(now); + + if let Some(new_router_config) = events.router_config { + router_configurator.configure(&new_router_config); + } + + for outgoing in events.outgoing_messages { + tx_outgoing_control.send(outgoing).unwrap(); + } + } + }); +} diff --git a/packages/centipede_worker/src/lib.rs b/packages/centipede_worker/src/lib.rs index 37d6411..1d4ff8d 100644 --- a/packages/centipede_worker/src/lib.rs +++ b/packages/centipede_worker/src/lib.rs @@ -25,8 +25,9 @@ pub struct Worker<'r> { /// The underlying handle to the router. router_handle: WorkerHandle<'r>, - /// The sending half of the control message channel. - control_message_channel: mpsc::Sender, auth::Unknown>>, + /// A callback for received control messages. + control_message_sink: + Box, auth::Unknown>) + Send + 'r>, /// The TUN queue. tun_queue: hypertube::Queue<'r, false>, @@ -42,12 +43,14 @@ impl<'r> Worker<'r> { /// Create a new worker. pub fn new( router_handle: WorkerHandle<'r>, - control_message_channel: mpsc::Sender, auth::Unknown>>, + control_message_sink: Box< + dyn FnMut(SocketAddr, ControlMessage, auth::Unknown>) + Send + 'r, + >, tun_queue: hypertube::Queue<'r, false>, ) -> Self { Self { router_handle, - control_message_channel, + control_message_sink, tun_queue, sockets: Sockets::new(), poll: mio::Poll::new().unwrap(), @@ -151,8 +154,8 @@ impl<'r> Worker<'r> { let mut buf: [MaybeUninit; PACKET_BUFFER_SIZE] = MaybeUninit::uninit_array(); loop { - match socket.recv(&mut buf) { - Ok(n) => { + match socket.recv_from(&mut buf) { + Ok((n, from)) => { // SAFETY: we just read `n` bytes into the buffer. let msg = unsafe { MaybeUninit::slice_assume_init_mut(&mut buf[..n]) }; @@ -166,9 +169,11 @@ impl<'r> Worker<'r> { } }; - self.control_message_channel - .send(control.to_vec_backed()) - .expect("failed to send control message"); + (self.control_message_sink)( + from.as_socket() + .expect("socket should have an IP family address"), + control.to_vec_backed(), + ) } Ok(MessageDiscriminant::Packet) => { let packet = match PacketMessage::from_buffer(msg) {