From 713e5219984efa7151d738ff821b0538e7a1e01b Mon Sep 17 00:00:00 2001 From: Max Niederman Date: Sun, 21 Apr 2024 23:29:34 -0700 Subject: [PATCH] fix: initialize send tunnel (but not links) after receiving an incoming initate --- Cargo.lock | 12 +++- packages/centipede_control/Cargo.toml | 1 + packages/centipede_control/src/lib.rs | 71 +++++++++++++++++++--- packages/centipede_control/tests/simple.rs | 4 +- packages/centipede_router/Cargo.toml | 1 + packages/centipede_router/src/worker.rs | 26 ++++++-- 6 files changed, 101 insertions(+), 14 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index d7055eb..ca3f433 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -141,6 +141,12 @@ version = "0.21.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9d297deb1925b89f2ccc13d7635fa0714f12c87adce1c75356b39ca9b7178567" +[[package]] +name = "base64" +version = "0.22.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9475866fec1451be56a3c2400fd081ff546538961565ccb5b7142cbd22bc7a51" + [[package]] name = "base64ct" version = "1.6.0" @@ -178,7 +184,7 @@ checksum = "8cd6604a82acf3039f1144f54b8eb34e91ffba622051189e71b781822d5ee1f5" name = "centipede" version = "0.1.0" dependencies = [ - "base64", + "base64 0.21.7", "centipede_control", "centipede_router", "centipede_worker", @@ -203,6 +209,7 @@ dependencies = [ name = "centipede_control" version = "0.1.0" dependencies = [ + "base64 0.22.0", "centipede_proto", "centipede_router", "chacha20poly1305", @@ -233,6 +240,7 @@ dependencies = [ "arc-swap", "centipede_proto", "chacha20poly1305", + "log", ] [[package]] @@ -1091,7 +1099,7 @@ version = "3.6.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "15d167997bd841ec232f5b2b8e0e26606df2e7caa4c31b95ea9ca52b200bd270" dependencies = [ - "base64", + "base64 0.21.7", "chrono", "hex", "indexmap 1.9.3", diff --git a/packages/centipede_control/Cargo.toml b/packages/centipede_control/Cargo.toml index 57ec7ed..423f47a 100644 --- a/packages/centipede_control/Cargo.toml +++ b/packages/centipede_control/Cargo.toml @@ -6,6 +6,7 @@ edition = "2021" # See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html [dependencies] +base64 = "0.22.0" centipede_proto = { version = "0.1.0", path = "../centipede_proto" } centipede_router = { version = "0.1.0", path = "../centipede_router" } chacha20poly1305 = "0.10.1" diff --git a/packages/centipede_control/src/lib.rs b/packages/centipede_control/src/lib.rs index 7ae8c81..7dc158a 100644 --- a/packages/centipede_control/src/lib.rs +++ b/packages/centipede_control/src/lib.rs @@ -11,6 +11,7 @@ use std::{ time::{Duration, SystemTime}, }; +use base64::prelude::*; use centipede_proto::{ control::{Content, Message}, marker::auth, @@ -150,7 +151,10 @@ impl Controller { local_addrs: HashSet, max_heartbeat_interval: Duration, ) { - log::debug!("listening for incoming connections from {public_key:?}"); + log::debug!( + "listening for incoming connections from `{}`", + BASE64_STANDARD.encode(public_key) + ); // Add the addresses to the router config, and mark it as changed if necessary. // This must happen for any current state of the peer, @@ -238,7 +242,10 @@ impl Controller { public_key: ed25519_dalek::VerifyingKey, remote_addrs: Vec, ) { - log::debug!("initiating connection to {public_key:?} at {remote_addrs:?}"); + log::debug!( + "initiating connection to `{peer}` at {remote_addrs:?}", + peer = BASE64_STANDARD.encode(public_key) + ); // Get the old state, ensuring that we know the peer. let old_state = self @@ -353,7 +360,7 @@ impl Controller { now: SystemTime, incoming: IncomingMessage, ) { - log::debug!( + log::trace!( "handling incoming message from remote addr {:?} with claimed content {:?}", incoming.from, incoming.message.claimed_content(), @@ -363,18 +370,23 @@ impl Controller { // Check that the message even claims to be addressed to us. if incoming.message.claimed_recipient() != &self.private_key.verifying_key() { + log::warn!("received message not addressed to us"); return; } // Check that we know the peer. if !self.peers.contains_key(incoming.message.claimed_sender()) { + log::warn!("received message from unknown peer"); return; } // Now we verify the message's signature, and return if it's invalid. let message = match incoming.message.authenticate() { Ok(message) => message, - Err(_) => return, + Err(_) => { + log::warn!("received message with invalid signature"); + return; + } }; // Note that, since we haven't returned, the claimed sender and recipient are now guaranteed to @@ -394,6 +406,16 @@ impl Controller { max_heartbeat_interval: tx_max_heartbeat_interval, }, ) if old_state.should_accept_incoming_initiate(*handshake_timestamp) => { + log::info!( + "accepting incoming handshake from `{peer}` at address {from} and time {time}", + peer = BASE64_STANDARD.encode(message.sender()), + from = incoming.from, + time = handshake_timestamp + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs(), + ); + // Since we want to accept the incoming initiate, we can forget the old state and extract the local addresses and the max heartbeat interval. let (local_addrs, rx_max_heartbeat_interval) = old_state.forget_connection_and_destructure(); @@ -403,6 +425,11 @@ impl Controller { let ecdh_public_key = (&ecdh_secret_key).into(); // Send the initiate acknowledgement. + log::debug!( + "sending acknowledgements to `{peer}` from addresses {from:?}", + peer = BASE64_STANDARD.encode(message.sender()), + from = local_addrs + ); let response = Message::new( &self.private_key, *message.sender(), @@ -427,7 +454,13 @@ impl Controller { .to_bytes(), )); + log::debug!( + "initializing send/receive tunnels to/from `{peer}`", + peer = BASE64_STANDARD.encode(message.sender()) + ); + // Initialize the receiving tunnel. + // FIXME: reset packet memory self.router_config.recv_tunnels.insert( public_key_to_peer_id(message.sender()), centipede_router::config::RecvTunnel { @@ -436,6 +469,16 @@ impl Controller { ); self.router_config_changed = true; + // Initialize the sending tunnel, but without any links yet. + // We should only add links once we've received an initiate acknowledgement and know that the peer knows the cipher. + self.router_config.send_tunnels.insert( + public_key_to_peer_id(message.sender()), + centipede_router::config::SendTunnel { + cipher: cipher.clone(), + links: HashSet::new(), + }, + ); + PeerState::Connected { handshake_timestamp: *handshake_timestamp, cipher, @@ -446,7 +489,7 @@ impl Controller { remote_addrs: [(now, [incoming.from].into_iter().collect())] .into_iter() .collect(), - // However, we shouldn't send packets to it until we've received an initiate acknowledgement and know that the peer knows the cipher. + // We have a send tunnel, but no links are associated with it yet. sending_to: HashSet::new(), // Use the max heartbeat interval from the `listen` call. rx_max_heartbeat_interval, @@ -476,6 +519,11 @@ impl Controller { max_heartbeat_interval: tx_max_heartbeat_interval, }, ) if handshake_timestamp == *peer_handshake_timestamp => { + log::info!( + "received initiation acknowledgement from `{peer}`", + peer = BASE64_STANDARD.encode(message.sender()) + ); + // Generate the cipher using the shared secret of the ECDH key exchange. let cipher = ChaCha20Poly1305::new(&chacha20poly1305::Key::from( ecdh_secret.diffie_hellman(peer_ecdh_public_key).to_bytes(), @@ -545,6 +593,15 @@ impl Controller { } (mut state, Content::Heartbeat) => { + log::debug!( + "received heartbeat from `{peer}` at {time}", + peer = BASE64_STANDARD.encode(message.sender()), + time = now + .duration_since(SystemTime::UNIX_EPOCH) + .unwrap_or_default() + .as_secs() + ); + // Delay expiration of the remote address. if let PeerState::Connected { remote_addrs, .. } | PeerState::Initiating { remote_addrs, .. } = &mut state @@ -669,10 +726,10 @@ impl Controller { } if self.router_config_changed { - log::debug!("poll resulted in router config change",); + log::trace!("poll resulted in router config change",); } if !self.send_queue.is_empty() { - log::debug!("poll resulted in sending messages: {:#?}", self.send_queue); + log::trace!("poll resulted in sending messages: {:#?}", self.send_queue); } Events { diff --git a/packages/centipede_control/tests/simple.rs b/packages/centipede_control/tests/simple.rs index 6849dba..429a456 100644 --- a/packages/centipede_control/tests/simple.rs +++ b/packages/centipede_control/tests/simple.rs @@ -145,7 +145,9 @@ fn listen() { router_config .send_tunnels .get(&public_key_to_peer_id(&peer_key.verifying_key())) - .is_none(), + .expect("controller should have a send tunnel after listening for and receiving an incoming initiate") + .links + .is_empty(), "controller cannot know where to send packets until receiving heartbeats" ); diff --git a/packages/centipede_router/Cargo.toml b/packages/centipede_router/Cargo.toml index 6e4585b..06dca62 100644 --- a/packages/centipede_router/Cargo.toml +++ b/packages/centipede_router/Cargo.toml @@ -7,3 +7,4 @@ edition = "2021" arc-swap = "1.6.0" centipede_proto = { version = "0.1.0", path = "../centipede_proto" } chacha20poly1305 = "0.10.1" +log = "0.4.21" diff --git a/packages/centipede_router/src/worker.rs b/packages/centipede_router/src/worker.rs index 70d5501..0c8975d 100644 --- a/packages/centipede_router/src/worker.rs +++ b/packages/centipede_router/src/worker.rs @@ -64,9 +64,18 @@ impl<'r> WorkerHandle<'r> { let decrypted = message.decrypt_in_place(&tunnel.cipher).ok()?; match tunnel.memory.observe(decrypted.sequence_number()) { - PacketRecollection::New => Some(ReceivePacket { decrypted }), - PacketRecollection::Seen => None, - PacketRecollection::Confusing => None, + PacketRecollection::New => { + log::trace!("new packet observed, creating receive obligation"); + Some(ReceivePacket { decrypted }) + } + PacketRecollection::Seen => { + log::trace!("duplicate packet observed, ignoring"); + None + } + PacketRecollection::Confusing => { + log::warn!("confusing packet observed, ignoring"); + None + } } } @@ -170,6 +179,8 @@ impl<'p> HandleOutgoing<'p> { .map(|tunnel| tunnel.links.iter()) .unwrap_or_default(); + log::trace!("creating outgoing packet coroutine with remaining links: {remaining_links:?}"); + Self { packet_plaintext: packet, router_state: Pin::new(router_state), @@ -181,6 +192,8 @@ impl<'p> HandleOutgoing<'p> { // TODO: consider taking the scratch buffer by reference, so the caller doesn't have to `mem::take` it or similar. /// Resume the coroutine, yielding the next packet to send. pub fn resume(&mut self, scratch: Vec) -> Option { + log::trace!("resuming outgoing packet coroutine"); + match self.remaining_links.next() { Some(&link) => { let tunnel = *self.tunnels.peek()?; @@ -192,12 +205,17 @@ impl<'p> HandleOutgoing<'p> { message.overwrite_packet(self.packet_plaintext.iter().copied()); let message = message.encrypt_in_place(&tunnel.cipher); + log::trace!("creating obligation to send packet along {link:?}"); + Some(SendPacket { link, message }) } None => { self.remaining_links = self.tunnels.peek()?.links.iter(); self.tunnels.next().unwrap(); - None + + log::trace!("advancing handle outgoing to next send tunnel"); + + self.resume(scratch) } } }