Skip to content

Commit

Permalink
fix: initialize send tunnel (but not links) after receiving an incomi…
Browse files Browse the repository at this point in the history
…ng initate
  • Loading branch information
max-niederman committed Apr 22, 2024
1 parent 9cd3303 commit 713e521
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 14 deletions.
12 changes: 10 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions packages/centipede_control/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ edition = "2021"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
base64 = "0.22.0"
centipede_proto = { version = "0.1.0", path = "../centipede_proto" }
centipede_router = { version = "0.1.0", path = "../centipede_router" }
chacha20poly1305 = "0.10.1"
Expand Down
71 changes: 64 additions & 7 deletions packages/centipede_control/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ use std::{
time::{Duration, SystemTime},
};

use base64::prelude::*;
use centipede_proto::{
control::{Content, Message},
marker::auth,
Expand Down Expand Up @@ -150,7 +151,10 @@ impl<R: Rng + CryptoRng> Controller<R> {
local_addrs: HashSet<SocketAddr>,
max_heartbeat_interval: Duration,
) {
log::debug!("listening for incoming connections from {public_key:?}");
log::debug!(
"listening for incoming connections from `{}`",
BASE64_STANDARD.encode(public_key)
);

// Add the addresses to the router config, and mark it as changed if necessary.
// This must happen for any current state of the peer,
Expand Down Expand Up @@ -238,7 +242,10 @@ impl<R: Rng + CryptoRng> Controller<R> {
public_key: ed25519_dalek::VerifyingKey,
remote_addrs: Vec<SocketAddr>,
) {
log::debug!("initiating connection to {public_key:?} at {remote_addrs:?}");
log::debug!(
"initiating connection to `{peer}` at {remote_addrs:?}",
peer = BASE64_STANDARD.encode(public_key)
);

// Get the old state, ensuring that we know the peer.
let old_state = self
Expand Down Expand Up @@ -353,7 +360,7 @@ impl<R: Rng + CryptoRng> Controller<R> {
now: SystemTime,
incoming: IncomingMessage<B>,
) {
log::debug!(
log::trace!(
"handling incoming message from remote addr {:?} with claimed content {:?}",
incoming.from,
incoming.message.claimed_content(),
Expand All @@ -363,18 +370,23 @@ impl<R: Rng + CryptoRng> Controller<R> {

// Check that the message even claims to be addressed to us.
if incoming.message.claimed_recipient() != &self.private_key.verifying_key() {
log::warn!("received message not addressed to us");
return;
}

// Check that we know the peer.
if !self.peers.contains_key(incoming.message.claimed_sender()) {
log::warn!("received message from unknown peer");
return;
}

// Now we verify the message's signature, and return if it's invalid.
let message = match incoming.message.authenticate() {
Ok(message) => message,
Err(_) => return,
Err(_) => {
log::warn!("received message with invalid signature");
return;
}
};

// Note that, since we haven't returned, the claimed sender and recipient are now guaranteed to
Expand All @@ -394,6 +406,16 @@ impl<R: Rng + CryptoRng> Controller<R> {
max_heartbeat_interval: tx_max_heartbeat_interval,
},
) if old_state.should_accept_incoming_initiate(*handshake_timestamp) => {
log::info!(
"accepting incoming handshake from `{peer}` at address {from} and time {time}",
peer = BASE64_STANDARD.encode(message.sender()),
from = incoming.from,
time = handshake_timestamp
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs(),
);

// Since we want to accept the incoming initiate, we can forget the old state and extract the local addresses and the max heartbeat interval.
let (local_addrs, rx_max_heartbeat_interval) =
old_state.forget_connection_and_destructure();
Expand All @@ -403,6 +425,11 @@ impl<R: Rng + CryptoRng> Controller<R> {
let ecdh_public_key = (&ecdh_secret_key).into();

// Send the initiate acknowledgement.
log::debug!(
"sending acknowledgements to `{peer}` from addresses {from:?}",
peer = BASE64_STANDARD.encode(message.sender()),
from = local_addrs
);
let response = Message::new(
&self.private_key,
*message.sender(),
Expand All @@ -427,7 +454,13 @@ impl<R: Rng + CryptoRng> Controller<R> {
.to_bytes(),
));

log::debug!(
"initializing send/receive tunnels to/from `{peer}`",
peer = BASE64_STANDARD.encode(message.sender())
);

// Initialize the receiving tunnel.
// FIXME: reset packet memory
self.router_config.recv_tunnels.insert(
public_key_to_peer_id(message.sender()),
centipede_router::config::RecvTunnel {
Expand All @@ -436,6 +469,16 @@ impl<R: Rng + CryptoRng> Controller<R> {
);
self.router_config_changed = true;

// Initialize the sending tunnel, but without any links yet.
// We should only add links once we've received an initiate acknowledgement and know that the peer knows the cipher.
self.router_config.send_tunnels.insert(
public_key_to_peer_id(message.sender()),
centipede_router::config::SendTunnel {
cipher: cipher.clone(),
links: HashSet::new(),
},
);

PeerState::Connected {
handshake_timestamp: *handshake_timestamp,
cipher,
Expand All @@ -446,7 +489,7 @@ impl<R: Rng + CryptoRng> Controller<R> {
remote_addrs: [(now, [incoming.from].into_iter().collect())]
.into_iter()
.collect(),
// However, we shouldn't send packets to it until we've received an initiate acknowledgement and know that the peer knows the cipher.
// We have a send tunnel, but no links are associated with it yet.
sending_to: HashSet::new(),
// Use the max heartbeat interval from the `listen` call.
rx_max_heartbeat_interval,
Expand Down Expand Up @@ -476,6 +519,11 @@ impl<R: Rng + CryptoRng> Controller<R> {
max_heartbeat_interval: tx_max_heartbeat_interval,
},
) if handshake_timestamp == *peer_handshake_timestamp => {
log::info!(
"received initiation acknowledgement from `{peer}`",
peer = BASE64_STANDARD.encode(message.sender())
);

// Generate the cipher using the shared secret of the ECDH key exchange.
let cipher = ChaCha20Poly1305::new(&chacha20poly1305::Key::from(
ecdh_secret.diffie_hellman(peer_ecdh_public_key).to_bytes(),
Expand Down Expand Up @@ -545,6 +593,15 @@ impl<R: Rng + CryptoRng> Controller<R> {
}

(mut state, Content::Heartbeat) => {
log::debug!(
"received heartbeat from `{peer}` at {time}",
peer = BASE64_STANDARD.encode(message.sender()),
time = now
.duration_since(SystemTime::UNIX_EPOCH)
.unwrap_or_default()
.as_secs()
);

// Delay expiration of the remote address.
if let PeerState::Connected { remote_addrs, .. }
| PeerState::Initiating { remote_addrs, .. } = &mut state
Expand Down Expand Up @@ -669,10 +726,10 @@ impl<R: Rng + CryptoRng> Controller<R> {
}

if self.router_config_changed {
log::debug!("poll resulted in router config change",);
log::trace!("poll resulted in router config change",);
}
if !self.send_queue.is_empty() {
log::debug!("poll resulted in sending messages: {:#?}", self.send_queue);
log::trace!("poll resulted in sending messages: {:#?}", self.send_queue);
}

Events {
Expand Down
4 changes: 3 additions & 1 deletion packages/centipede_control/tests/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,9 @@ fn listen() {
router_config
.send_tunnels
.get(&public_key_to_peer_id(&peer_key.verifying_key()))
.is_none(),
.expect("controller should have a send tunnel after listening for and receiving an incoming initiate")
.links
.is_empty(),
"controller cannot know where to send packets until receiving heartbeats"
);

Expand Down
1 change: 1 addition & 0 deletions packages/centipede_router/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ edition = "2021"
arc-swap = "1.6.0"
centipede_proto = { version = "0.1.0", path = "../centipede_proto" }
chacha20poly1305 = "0.10.1"
log = "0.4.21"
26 changes: 22 additions & 4 deletions packages/centipede_router/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,18 @@ impl<'r> WorkerHandle<'r> {
let decrypted = message.decrypt_in_place(&tunnel.cipher).ok()?;

match tunnel.memory.observe(decrypted.sequence_number()) {
PacketRecollection::New => Some(ReceivePacket { decrypted }),
PacketRecollection::Seen => None,
PacketRecollection::Confusing => None,
PacketRecollection::New => {
log::trace!("new packet observed, creating receive obligation");
Some(ReceivePacket { decrypted })
}
PacketRecollection::Seen => {
log::trace!("duplicate packet observed, ignoring");
None
}
PacketRecollection::Confusing => {
log::warn!("confusing packet observed, ignoring");
None
}
}
}

Expand Down Expand Up @@ -170,6 +179,8 @@ impl<'p> HandleOutgoing<'p> {
.map(|tunnel| tunnel.links.iter())
.unwrap_or_default();

log::trace!("creating outgoing packet coroutine with remaining links: {remaining_links:?}");

Self {
packet_plaintext: packet,
router_state: Pin::new(router_state),
Expand All @@ -181,6 +192,8 @@ impl<'p> HandleOutgoing<'p> {
// TODO: consider taking the scratch buffer by reference, so the caller doesn't have to `mem::take` it or similar.
/// Resume the coroutine, yielding the next packet to send.
pub fn resume(&mut self, scratch: Vec<u8>) -> Option<SendPacket> {
log::trace!("resuming outgoing packet coroutine");

match self.remaining_links.next() {
Some(&link) => {
let tunnel = *self.tunnels.peek()?;
Expand All @@ -192,12 +205,17 @@ impl<'p> HandleOutgoing<'p> {
message.overwrite_packet(self.packet_plaintext.iter().copied());
let message = message.encrypt_in_place(&tunnel.cipher);

log::trace!("creating obligation to send packet along {link:?}");

Some(SendPacket { link, message })
}
None => {
self.remaining_links = self.tunnels.peek()?.links.iter();
self.tunnels.next().unwrap();
None

log::trace!("advancing handle outgoing to next send tunnel");

self.resume(scratch)
}
}
}
Expand Down

0 comments on commit 713e521

Please sign in to comment.