diff --git a/packages/centipede_control/src/pure.rs b/packages/centipede_control/src/pure.rs index e0e9875..15c94b5 100644 --- a/packages/centipede_control/src/pure.rs +++ b/packages/centipede_control/src/pure.rs @@ -4,17 +4,21 @@ use std::{ collections::{BTreeMap, HashMap, HashSet}, + mem, net::SocketAddr, - ops::{Add, Deref}, + ops::Deref, time::{Duration, SystemTime}, }; -use centipede_proto::{control::Message, marker::auth}; -use chacha20poly1305::ChaCha20Poly1305; -use rand::CryptoRng; +use centipede_proto::{ + control::{Content, Message}, + marker::auth, +}; +use chacha20poly1305::{ChaCha20Poly1305, KeyInit}; +use rand::{CryptoRng, Rng}; /// A Centipede control daemon, implemented as a pure state machine. -pub struct Controller { +pub struct Controller { /// Our private key. private_key: ed25519_dalek::SigningKey, @@ -42,7 +46,7 @@ enum PeerState { /// We're listening for an incoming handshake. Listening { /// Addresses on which we're listening for incoming packets, and will advertise to the peer. - rx_local_addrs: Vec, + local_addrs: HashSet, /// The maximum time we're willing to wait between the peer's heartbeats. rx_max_heartbeat_interval: Duration, @@ -50,40 +54,46 @@ enum PeerState { /// We've initiated a handshake and are waiting for a response. Initiating { /// The time at which we initiated the handshake. - timestamp: SystemTime, + handshake_timestamp: SystemTime, /// The ephemeral key we used to initiate the handshake. ecdh_secret: x25519_dalek::EphemeralSecret, + /// Addresses on which we're listening for incoming packets, and will advertise to the peer. + local_addrs: HashSet, + /// Addresses we've received heartbeats from, by the time they were last received. - tx_remote_addrs: BTreeMap>, + tx_remote_addrs: BTreeMap>, /// The maximum time we're willing to wait between the peer's heartbeats. rx_max_heartbeat_interval: Duration, }, /// We share a cipher with the peer and are exchanging heartbeats. Connected { + /// The timestamp of the handshake that established the connection, **on the initiator's clock**. + handshake_timestamp: SystemTime, + /// The shared cipher. cipher: ChaCha20Poly1305, - /// Addresses on which we're listening for incoming packets, by the last time we sent a heartbeat. - rx_local_addrs: BTreeMap>, + /// Addreses on which we're listening for incoming packets, and are sending heartbeats from. + local_addrs: HashSet, + + /// Addresses on which we're listening for incoming packets, by the next time we should send a heartbeat. + queued_heartbeats: BTreeMap>, /// Addresses we've received heartbeats from, by the time they were last received. - tx_remote_addrs: BTreeMap>, + tx_remote_addrs: BTreeMap>, /// The maximum time we're willing to wait between the peer's heartbeats. rx_max_heartbeat_interval: Duration, /// The target interval between our heartbeats. tx_heartbeat_interval: Duration, - - /// The (peer clock) timestamp of the last heartbeat we received. - rx_heartbeat_timestamp: u64, }, } -impl Controller { +impl Controller { /// Create a new, empty controller. /// /// # Arguments @@ -111,20 +121,61 @@ impl Controller { // TODO: add settings for generating links to the remote addresses we get heartbeats from /// Register (or reregister) a peer and listen for incoming connections. /// + /// Note that this will never initiate a handshake, even when doing so would be necessary to change the max heartbeat interval. + /// For this reason, you should not reduce the max heartbeat interval for a connected peer, lest the peer be disconnected. + /// /// # Arguments /// /// * `now` - the current time. /// * `public_key` - the public key of the peer. - /// * `rx_local_addrs` - addresses we'll tell the peer to send packets to. + /// * `local_addrs` - addresses we'll tell the peer to send packets to. /// * `max_heartbeat_interval` - the maximum time we're willing to wait between the peer's heartbeats. pub fn listen( &mut self, now: SystemTime, public_key: ed25519_dalek::VerifyingKey, - rx_local_addrs: Vec, + local_addrs: HashSet, max_heartbeat_interval: Duration, ) { - todo!() + // Add the addresses to the router config, and mark it as changed if necessary. + // This must happen for any current state of the peer, + // since we're going to need to listen on these addresses regardless. + for &addr in &local_addrs { + if self.router_config.recv_addrs.insert(addr) { + self.router_config_changed = true; + } + } + + match self.peers.get_mut(&public_key) { + Some(PeerState::Listening { .. }) | None => { + self.peers.insert( + public_key, + PeerState::Listening { + local_addrs, + rx_max_heartbeat_interval: max_heartbeat_interval, + }, + ); + } + Some(PeerState::Initiating { + local_addrs: state_rx_local_addrs, + .. + }) => { + *state_rx_local_addrs = local_addrs; + } + Some(PeerState::Connected { + local_addrs: old_local_addrs, + queued_heartbeats, + .. + }) => { + for addr in local_addrs { + // Check if the address is new. I.e., if it's already queued for a heartbeat. + // Otherwise, we immediately queue a heartbeat for it. + if !old_local_addrs.insert(addr) { + queued_heartbeats.entry(now).or_default().insert(addr); + } + } + } + } } /// Initiate a connection to a peer. Must be called after `listen`. @@ -140,7 +191,50 @@ impl Controller { public_key: ed25519_dalek::VerifyingKey, tx_remote_addrs: Vec, ) { - todo!() + // Get the old state, ensuring that we know the peer. + let old_state = self + .peers + .remove(&public_key) + .expect("initiate must be called on a connected peer. call listen first"); + + // Extract the local addresses and the max heartbeat interval from the old state. + let (local_addrs, rx_max_heartbeat_interval) = + old_state.forget_connection_and_destructure(); + + let ecdh_secret = x25519_dalek::EphemeralSecret::random_from_rng(&mut self.rng); + + // Generate the initiate message. + let message = Message::new( + &self.private_key, + public_key, + Content::Initiate { + handshake_timestamp: now, + ecdh_public_key: (&ecdh_secret).into(), + max_heartbeat_interval: rx_max_heartbeat_interval, + }, + ); + for &local_addr in &local_addrs { + for &remote_addr in &tx_remote_addrs { + self.send_queue.push(OutgoingMessage { + from: local_addr, + to: remote_addr, + message: message.clone(), + }); + } + } + + // Insert the new state. + self.peers.insert( + public_key, + PeerState::Initiating { + handshake_timestamp: now, + ecdh_secret, + local_addrs, + // We shouldn't start sending to the remote addresses until we've received a response. + tx_remote_addrs: BTreeMap::new(), + rx_max_heartbeat_interval, + }, + ); } /// Disconnect from a peer. @@ -162,9 +256,164 @@ impl Controller { pub fn handle_incoming>( &mut self, now: SystemTime, - message: IncomingMessage, + incoming: IncomingMessage, ) { - todo!() + // First we make all the checks we can without verifying the message's signature. + + // Check that the message even claims to be addressed to us. + if incoming.message.claimed_recipient() != &self.private_key.verifying_key() { + return; + } + + // Check that we know the peer. + if !self.peers.contains_key(incoming.message.claimed_sender()) { + return; + } + + // Now we verify the message's signature, and return if it's invalid. + let message = match incoming.message.authenticate() { + Ok(message) => message, + Err(_) => return, + }; + + // Note that, since we haven't returned, the claimed sender and recipient are now guaranteed to + // be the actual sender and recipient and we know that the we are the actual recipient from the first guard. + + // Now we can proceed to handling the message. + let new_state = match ( + self.peers.remove(message.sender()).unwrap(), + message.content(), + ) { + // Receive an incoming initiate. + ( + old_state, + Content::Initiate { + handshake_timestamp, + ecdh_public_key: peer_ecdh_public_key, + max_heartbeat_interval: tx_max_heartbeat_interval, + }, + ) if old_state.should_accept_incoming_initiate(*handshake_timestamp) => { + // Since we want to accept the incoming initiate, we can forget the old state and extract the local addresses and the max heartbeat interval. + let (local_addrs, rx_max_heartbeat_interval) = + old_state.forget_connection_and_destructure(); + + // Generate our ephemeral keypair for the ECDH key exchange. + let ecdh_secret_key = x25519_dalek::EphemeralSecret::random_from_rng(&mut self.rng); + let ecdh_public_key = (&ecdh_secret_key).into(); + + // Send the initiate acknowledgement. + let response = Message::new( + &self.private_key, + *message.sender(), + Content::InitiateAcknowledge { + handshake_timestamp: *handshake_timestamp, + ecdh_public_key, + max_heartbeat_interval: rx_max_heartbeat_interval, + }, + ); + for &local_addr in local_addrs.iter() { + self.send_queue.push(OutgoingMessage { + from: local_addr, + to: incoming.from, + message: response.clone(), + }); + } + + // Generate the cipher using the shared secret of the ECDH key exchange. + let cipher = ChaCha20Poly1305::new(&chacha20poly1305::Key::from( + ecdh_secret_key + .diffie_hellman(peer_ecdh_public_key) + .to_bytes(), + )); + + // Initialize the receiving tunnel. + self.router_config.recv_tunnels.insert( + public_key_to_peer_id(message.sender()), + centipede_router::config::RecvTunnel { + cipher: cipher.clone(), + }, + ); + self.router_config_changed = true; + + PeerState::Connected { + handshake_timestamp: *handshake_timestamp, + cipher, + // Create the heartbeat queue, with the initial heartbeats queued. + queued_heartbeats: [(now, local_addrs.clone())].into_iter().collect(), + local_addrs, + // We have not yet received any heartbeats, but we know that we can send to the iniating address. + tx_remote_addrs: [(now, [incoming.from].into_iter().collect())] + .into_iter() + .collect(), + // Use the max heartbeat interval from the `listen` call. + rx_max_heartbeat_interval, + // Aim to beat three times in each interval, in case packets are dropped, but + // don't beat more than once per second. + tx_heartbeat_interval: (*tx_max_heartbeat_interval / 4) + .min(Duration::from_secs(1)), + } + } + (old_state, Content::Initiate { .. }) => { + // We don't want to accept the incoming initiate, so we just put the old state back. + old_state + } + + // Receive an incoming initiate acknowledgement. + ( + PeerState::Initiating { + handshake_timestamp, + ecdh_secret, + local_addrs, + mut tx_remote_addrs, + rx_max_heartbeat_interval, + }, + Content::InitiateAcknowledge { + handshake_timestamp: peer_handshake_timestamp, + ecdh_public_key: peer_ecdh_public_key, + max_heartbeat_interval: tx_max_heartbeat_interval, + }, + ) if handshake_timestamp == *peer_handshake_timestamp => { + // Generate the cipher using the shared secret of the ECDH key exchange. + let cipher = ChaCha20Poly1305::new(&chacha20poly1305::Key::from( + ecdh_secret.diffie_hellman(peer_ecdh_public_key).to_bytes(), + )); + + // Initialize the receiving tunnel. + self.router_config.recv_tunnels.insert( + public_key_to_peer_id(message.sender()), + centipede_router::config::RecvTunnel { + cipher: cipher.clone(), + }, + ); + self.router_config_changed = true; + + // The acknowledgement counts as a heartbeat, so we ensure its source is in the tx_remote_addrs. + for addr_set in tx_remote_addrs.values_mut() { + addr_set.remove(&incoming.from); + } + tx_remote_addrs + .entry(now) + .or_default() + .insert(incoming.from); + + PeerState::Connected { + handshake_timestamp, + cipher, + // Create the heartbeat queue, with the initial heartbeats queued. + queued_heartbeats: [(now, local_addrs.clone())].into_iter().collect(), + local_addrs, + tx_remote_addrs, + rx_max_heartbeat_interval, + // Aim to beat three times in each interval, in case packets are dropped, but + // don't beat more than once per second. + tx_heartbeat_interval: (*tx_max_heartbeat_interval / 4) + .min(Duration::from_secs(1)), + } + } + + _ => todo!(), + }; + self.peers.insert(*message.sender(), new_state); } /// Poll for events. @@ -174,36 +423,42 @@ impl Controller { /// * `now` - the current time. pub fn poll(&mut self, now: SystemTime) -> Events { for (peer_key, peer_state) in self.peers.iter_mut() { - // send heartbeats + // Send queued heartbeats. if let PeerState::Connected { tx_heartbeat_interval, + queued_heartbeats, tx_remote_addrs, .. } = peer_state { - while let Some(first_entry) = tx_remote_addrs.first_entry() { - if now.duration_since(*first_entry.key()).unwrap() >= *tx_heartbeat_interval { - for local_addr in first_entry.remove() { - for &remote_addr in tx_remote_addrs.values().flatten() { - self.send_queue.push(OutgoingMessage { - from: local_addr, - to: remote_addr, - message: Message::new( - &self.private_key, - *peer_key, - centipede_proto::control::Content::Heartbeat { - timestamp: now - .duration_since(SystemTime::UNIX_EPOCH) - .unwrap(), - }, - ), - }); - } + let mut to_requeue = HashSet::new(); + + while queued_heartbeats + .first_key_value() + .is_some_and(|(&t, _)| t <= now) + { + let (_, addrs) = queued_heartbeats.pop_first().unwrap(); + + for &local_addr in &addrs { + for &remote_addr in tx_remote_addrs.values().flatten() { + self.send_queue.push(OutgoingMessage { + from: local_addr, + to: remote_addr, + message: Message::new( + &self.private_key, + *peer_key, + Content::Heartbeat, + ), + }); } - } else { - break; } + + to_requeue.extend(addrs); } + + // Queue the next set of heartbeats for all the addresses we sent heartbeats to. + queued_heartbeats + .insert(now.checked_add(*tx_heartbeat_interval).unwrap(), to_requeue); } // expire old tx_remote_addrs @@ -254,7 +509,45 @@ impl Controller { } } +impl PeerState { + /// Extract the local addresses and the max heartbeat interval from the state, forgetting the connection. + fn forget_connection_and_destructure(self) -> (HashSet, Duration) { + match self { + PeerState::Listening { + local_addrs, + rx_max_heartbeat_interval, + } + | PeerState::Initiating { + local_addrs, + rx_max_heartbeat_interval, + .. + } + | PeerState::Connected { + local_addrs, + rx_max_heartbeat_interval, + .. + } => (local_addrs, rx_max_heartbeat_interval), + } + } + + /// Returns `true` iff the current state should be superseded by accepting an incoming initiate with the given timestamp. + fn should_accept_incoming_initiate(&self, timestamp: SystemTime) -> bool { + match self { + PeerState::Listening { .. } => true, + PeerState::Initiating { + handshake_timestamp, + .. + } + | PeerState::Connected { + handshake_timestamp, + .. + } => *handshake_timestamp < timestamp, + } + } +} + /// An incoming message, consisting of a message and the address it claims to be from. +#[derive(Debug)] pub struct IncomingMessage> { /// The address the message claims to be from. pub from: SocketAddr, @@ -264,6 +557,7 @@ pub struct IncomingMessage> { } /// An outgoing message, consisting of a message and how it should be sent. +#[derive(Debug)] pub struct OutgoingMessage { /// The address the message should be sent from. pub from: SocketAddr, @@ -293,7 +587,7 @@ fn public_key_to_peer_id(public_key: &ed25519_dalek::VerifyingKey) -> [u8; 8] { mod tests { use std::{iter, vec}; - use centipede_proto::control::{Content as MessageContent, Message}; + use centipede_proto::control::Message; use rand::{Rng, SeedableRng}; use rand_chacha::ChaChaRng; @@ -363,15 +657,15 @@ mod tests { controller.listen( clock.now(), peer_key.verifying_key(), - vec![local_addr], + [local_addr].into_iter().collect(), Duration::from_secs(60), ); clock.increment(Duration::from_millis(1)); - let poll = controller.poll(clock.now()); + let events = controller.poll(clock.now()); - let router_config = poll + let router_config = events .router_config .expect("controller should produce a router config after listen"); assert!( @@ -390,14 +684,14 @@ mod tests { ); assert!( - poll.outgoing_messages.is_empty(), + events.outgoing_messages.is_empty(), "there should be no outgoing messages immediately after listen" ); clock.increment(Duration::from_secs(10)); let peer_secret = x25519_dalek::EphemeralSecret::random_from_rng(&mut rng); - let handshake_timestamp = clock.now_since_epoch(); + let handshake_timestamp = clock.now(); controller.handle_incoming( clock.now(), IncomingMessage { @@ -405,7 +699,7 @@ mod tests { message: Message::new( &peer_key, private_key.verifying_key(), - MessageContent::Initiate { + Content::Initiate { handshake_timestamp, ecdh_public_key: (&peer_secret).into(), max_heartbeat_interval: Duration::from_secs(60), @@ -415,9 +709,9 @@ mod tests { }, ); - let event = controller.poll(clock.now()); + let events = controller.poll(clock.now()); - let router_config = event + let router_config = events .router_config .expect("controller should produce a router config after listening for and receiving an incoming initiate"); assert!( @@ -432,7 +726,7 @@ mod tests { "controller cannot know where to send packets until receiving heartbeats" ); - let mut outgoing_msgs = poll.outgoing_messages.into_iter(); + let mut outgoing_msgs = events.outgoing_messages.into_iter(); let response = outgoing_msgs .next() @@ -451,7 +745,7 @@ mod tests { "response should be from the local peer" ); match response.message.content() { - MessageContent::InitiateAcknowledge { + Content::InitiateAcknowledge { handshake_timestamp: timestamp, .. } => { @@ -473,9 +767,7 @@ mod tests { ); assert_eq!( heartbeat.message.content(), - &MessageContent::Heartbeat { - timestamp: clock.now_since_epoch() - }, + &Content::Heartbeat, "second message from a listening controller should be the first heartbeat" ); @@ -508,7 +800,7 @@ mod tests { controller.listen( clock.now(), peer_key.verifying_key(), - vec![local_addr], + [local_addr].into_iter().collect(), Duration::from_secs(60), ); @@ -529,7 +821,7 @@ mod tests { "handshake initiation should not change the router config immediately" ); - let handshake_timestamp = clock.now_since_epoch(); + let handshake_timestamp = clock.now(); let initiate = events .outgoing_messages .pop() @@ -549,7 +841,7 @@ mod tests { "initiate should be from the local peer" ); match initiate.message.content() { - MessageContent::Initiate { + Content::Initiate { handshake_timestamp: timestamp, .. } => { @@ -575,7 +867,7 @@ mod tests { message: Message::new( &peer_key, private_key.verifying_key(), - MessageContent::InitiateAcknowledge { + Content::InitiateAcknowledge { handshake_timestamp, ecdh_public_key: (&x25519_dalek::EphemeralSecret::random_from_rng(&mut rng)) @@ -617,7 +909,7 @@ mod tests { ); assert_eq!( heartbeat.message.content(), - &MessageContent::Heartbeat { timestamp: clock.now_since_epoch() }, + &Content::Heartbeat, "second message from an initiating controller should be stamped with the current time" ); @@ -644,10 +936,6 @@ mod tests { self.now } - fn now_since_epoch(&self) -> Duration { - self.now.duration_since(SystemTime::UNIX_EPOCH).unwrap() - } - fn increment(&mut self, duration: Duration) { self.now += duration; } @@ -659,7 +947,7 @@ mod tests { iter::repeat_with(move || { Duration::from_secs(rng.gen_range(0..(100 * 365 * 24 * 60 * 60))) }) - .take(100), + .take(31), ) .map(|dur| TestClock { now: SystemTime::UNIX_EPOCH + dur, @@ -667,7 +955,7 @@ mod tests { } fn test_keys(mut rng: ChaChaRng) -> impl Iterator { - iter::repeat_with(move || ed25519_dalek::SigningKey::generate(&mut rng)).take(100) + iter::repeat_with(move || ed25519_dalek::SigningKey::generate(&mut rng)).take(32) } fn test_rng() -> rand_chacha::ChaChaRng { diff --git a/packages/centipede_proto/src/control.rs b/packages/centipede_proto/src/control.rs index 9046e6f..76be4dd 100644 --- a/packages/centipede_proto/src/control.rs +++ b/packages/centipede_proto/src/control.rs @@ -1,13 +1,17 @@ -use std::{fmt::Debug, marker::PhantomData, ops::Deref, time::Duration}; - -use ed25519_dalek::{ed25519::signature::Keypair, Signer}; +use std::{ + fmt::Debug, + marker::PhantomData, + ops::Deref, + time::{Duration, SystemTime}, +}; + +use ed25519_dalek::Signer; use serde::{Deserialize, Serialize}; use thiserror::Error; use crate::marker::auth; /// A parsed control message used to establish and maintain a connection. -#[derive(Clone, PartialEq, Eq)] pub struct Message where B: Deref, @@ -40,7 +44,7 @@ pub enum Content { Initiate { /// Timestamp of the initiation **on the initiator's clock**, measured from the UNIX epoch. /// Used to identify and order the handshake, and prevent replay attacks. - handshake_timestamp: Duration, + handshake_timestamp: SystemTime, /// The initiator's ECDH public key. ecdh_public_key: x25519_dalek::PublicKey, @@ -53,7 +57,7 @@ pub enum Content { InitiateAcknowledge { /// Timestamp of the initiation **on the initiator's clock**, measured from the UNIX epoch. /// Used to match the acknowledgement to the initiation. - handshake_timestamp: Duration, + handshake_timestamp: SystemTime, /// The responder's ECDH public key. ecdh_public_key: x25519_dalek::PublicKey, @@ -64,12 +68,7 @@ pub enum Content { /// Inform the receiver that the initiator is listening on /// (and reachable at) the address from which the message was sent. - Heartbeat { - /// The time at which the heartbeat was sent **on the sender's clock**, measured from the UNIX epoch. - /// Note that this is _not_ compared to the receiver's clock, but instead used - /// to discard old heartbeats (again by the sender's reckoning), preventing replay attacks. - timestamp: Duration, - }, + Heartbeat, // FIXME: add measures to prevent replay attacks } impl Message @@ -228,6 +227,23 @@ impl Message, auth::Valid> { } } +impl Clone for Message +where + B: Deref + Clone, + A: auth::Status, +{ + fn clone(&self) -> Self { + Self { + buffer: self.buffer.clone(), + sender: self.sender, + recipient: self.recipient, + signature: self.signature, + content: self.content.clone(), + _auth: PhantomData::, + } + } +} + impl Debug for Message where B: Deref, @@ -295,7 +311,7 @@ mod tests { let verifying_key = signing_key.verifying_key(); let content = Content::Initiate { - handshake_timestamp: Duration::ZERO, + handshake_timestamp: SystemTime::UNIX_EPOCH, ecdh_public_key: x25519_dalek::PublicKey::from([0; 32]), max_heartbeat_interval: Duration::from_secs(60), }; @@ -318,7 +334,7 @@ mod tests { let verifying_key = signing_key.verifying_key(); let content = Content::Initiate { - handshake_timestamp: Duration::ZERO, + handshake_timestamp: SystemTime::UNIX_EPOCH, ecdh_public_key: x25519_dalek::PublicKey::from([0; 32]), max_heartbeat_interval: Duration::from_secs(60), }; @@ -345,7 +361,7 @@ mod tests { let signing_key = ed25519_dalek::SigningKey::from_bytes(&[1; 32]); let content = Content::Initiate { - handshake_timestamp: Duration::ZERO, + handshake_timestamp: SystemTime::UNIX_EPOCH, ecdh_public_key: x25519_dalek::PublicKey::from([0; 32]), max_heartbeat_interval: Duration::from_secs(60), };