Skip to content

Commit

Permalink
fix(centipede_router): add integration test and fix handle_outgoing bug
Browse files Browse the repository at this point in the history
  • Loading branch information
max-niederman committed Jan 17, 2024
1 parent 9899160 commit f4a7546
Show file tree
Hide file tree
Showing 4 changed files with 240 additions and 15 deletions.
10 changes: 5 additions & 5 deletions packages/centipede_router/src/controller.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ mod tests {

#[test]
fn construct() {
Router::new([0; 8], vec![]);
Router::new([0; 8]);
}

fn state<'c>(controller: &Controller) -> Arc<ConfiguredRouter> {
Expand All @@ -120,7 +120,7 @@ mod tests {

#[test]
fn crud_receive_tunnel() {
let mut router = Router::new([0; 8], vec![]);
let mut router = Router::new([0; 8]);
let (mut controller, _) = router.handles(0);

controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()));
Expand All @@ -135,7 +135,7 @@ mod tests {

#[test]
fn crud_send_tunnel() {
let mut router = Router::new([0; 8], vec![]);
let mut router = Router::new([0; 8]);
let (mut controller, _) = router.handles(0);

let link = Link {
Expand All @@ -158,7 +158,7 @@ mod tests {

#[test]
fn receive_updates_preserve_state() {
let mut router = Router::new([0; 8], vec![]);
let mut router = Router::new([0; 8]);
let (mut controller, _) = router.handles(0);

controller.upsert_receive_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()));
Expand All @@ -175,7 +175,7 @@ mod tests {

#[test]
fn send_updates_preserve_state() {
let mut router = Router::new([0; 8], vec![]);
let mut router = Router::new([0; 8]);
let (mut controller, _) = router.handles(0);

controller.upsert_send_tunnel([1; 8], ChaCha20Poly1305::new((&[0; 32]).into()), vec![]);
Expand Down
10 changes: 3 additions & 7 deletions packages/centipede_router/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ struct ConfiguredRouter {
/// Our local peer identifier.
local_id: PeerId,

/// Local addresses on which to receive messages.
recv_addrs: Vec<SocketAddr>,

/// Set of receiving tunnels, by sender identifier.
recv_tunnels: HashMap<PeerId, RecvTunnel>,

Expand Down Expand Up @@ -68,22 +65,21 @@ struct SendTunnel {
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct Link {
/// The local address.
local: SocketAddr,
pub local: SocketAddr,

/// The remote address.
remote: SocketAddr,
pub remote: SocketAddr,
}

pub type PeerId = [u8; 8];

impl Router {
/// Create a new router.
pub fn new(peer_id: PeerId, recv_addrs: Vec<SocketAddr>) -> Self {
pub fn new(peer_id: PeerId) -> Self {
Self {
state: ArcSwap::from_pointee(ConfiguredRouter {
generation: 0,
local_id: peer_id,
recv_addrs,
recv_tunnels: HashMap::new(),
send_tunnels: HashMap::new(),
}),
Expand Down
10 changes: 7 additions & 3 deletions packages/centipede_router/src/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,10 @@ impl<B: Deref<Target = [u8]>> ReceivePacket<B> {
}

/// A coroutine yielding packets to send.
///
/// Once the `coroutines` feature is stable,
/// this will be replaced with a coroutine literal.
#[must_use = "outgoing packet coroutine must be resumed"]
pub struct HandleOutgoing<'p> {
/// Plaintext of the outgoing packet being handled.
packet_plaintext: &'p [u8],
Expand Down Expand Up @@ -137,8 +141,8 @@ impl<'p> HandleOutgoing<'p> {
Some(SendPacket { link, message })
}
None => {
self.tunnels.next()?;
self.remaining_links = self.tunnels.peek().unwrap().links.iter();
self.remaining_links = self.tunnels.peek()?.links.iter();
self.tunnels.next().unwrap();
None
}
}
Expand All @@ -148,7 +152,7 @@ impl<'p> HandleOutgoing<'p> {
/// The obligation to
/// send a packet from the system's networking stack
/// to another peer on the Centipede network.
#[must_use]
#[must_use = "send packet obligation must be fulfilled"]
pub struct SendPacket {
/// The link to send the packet over.
link: Link,
Expand Down
225 changes: 225 additions & 0 deletions packages/centipede_router/tests/udp_threads.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,225 @@
use std::{
collections::HashMap,
io,
net::{SocketAddr, UdpSocket},
thread,
time::Duration,
};

use centipede_proto::{
marker::{auth, text},
PacketMessage,
};
use centipede_router::{
controller::Controller,
worker::{SendPacket, Worker},
Link, PeerId, Router,
};
use chacha20poly1305::{ChaCha20Poly1305, KeyInit};

/// Test a network with zero peers.
///
/// Primarily a meta-test on the test harness.
#[test]
fn empty_network() {
PeerCtx::run(vec![]);
}

/// Test a network with one peer.
///
/// Primarily a meta-test on the test harness.
#[test]
fn single_peer() {
PeerCtx::run(vec![PeerSpec {
id: [0; 8],
addr_count: 1,
entrypoint: Box::new(|mut ctx: PeerCtx<'_>| {
let mut obligations = ctx.worker.handle_outgoing(&[0; 1024]);
assert!(
obligations.resume(vec![]).is_none(),
"router is alone, but attempted to send a packet"
);
}),
}]);
}

/// Send a single message over a half-duplex connection.
#[test]
fn half_duplex_single_message() {
const PACKET: &[u8] = "Hello, centipede!".as_bytes();

PeerCtx::run(vec![
PeerSpec {
id: [0; 8],
addr_count: 1,
entrypoint: Box::new(|mut ctx: PeerCtx<'_>| {
ctx.controller.upsert_send_tunnel(
[1; 8],
dummy_cipher(),
ctx.possible_links_to([1; 8]),
);

let mut obligations = ctx.worker.handle_outgoing(PACKET);
let mut scratch = Vec::new();

scratch = match obligations.resume(scratch) {
Some(obligation) => {
assert_eq!(obligation.message().sequence_number(), 0);
assert_eq!(obligation.message().sender(), [0; 8]);

ctx.fulfill_send(obligation)
}
None => panic!("sending router did not attempt to send a packet"),
};

assert!(
obligations.resume(scratch).is_none(),
"sending router attempted to send a second packet when only one link was configured"
);
}),
},
PeerSpec {
id: [1; 8],
addr_count: 1,
entrypoint: Box::new(|mut ctx: PeerCtx<'_>| {
ctx.controller.upsert_receive_tunnel([0; 8], dummy_cipher());

let packets = ctx.receive_block();
assert_eq!(packets.len(), 1, "received wrong number of packets");
let packet = packets.into_iter().next().unwrap();

match ctx.worker.handle_incoming(packet) {
Some(obligation) => {
assert_eq!(obligation.packet(), PACKET, "received wrong packet");
}
None => panic!("receiving router did not attempt to receive a packet"),
}
}),
},
]);
}

/// The context in which a peer test program runs.
struct PeerCtx<'r> {
controller: Controller<'r>,
worker: Worker<'r>,

sockets: Vec<UdpSocket>,
peers: HashMap<PeerId, Vec<SocketAddr>>,
}

struct PeerSpec {
id: PeerId,
addr_count: usize,
entrypoint: Box<dyn FnOnce(PeerCtx) + Send>,
}

impl<'r> PeerCtx<'r> {
fn run(peers: Vec<PeerSpec>) {
let mut sockets: HashMap<PeerId, Vec<UdpSocket>> = peers
.iter()
.map(|spec| {
(
spec.id,
(0..spec.addr_count)
.map(|_| UdpSocket::bind("127.0.0.1:0").unwrap())
.inspect(|s| s.set_nonblocking(true).unwrap())
.collect(),
)
})
.collect();

let peer_addrs: HashMap<PeerId, Vec<SocketAddr>> = peers
.iter()
.map(|spec| {
(
spec.id,
sockets[&spec.id]
.iter()
.map(|s| s.local_addr().unwrap())
.collect(),
)
})
.collect();

thread::scope(move |s| {
for spec in peers {
let peer_addrs = peer_addrs.clone();
let sockets = sockets.remove(&spec.id).unwrap();

s.spawn(move || {
let mut router = Router::new(spec.id);
let (controller, workers) = router.handles(1);
let worker = workers.into_iter().next().unwrap();

(spec.entrypoint)(PeerCtx {
controller,
worker,
sockets,
peers: peer_addrs,
});
});
}
});
}

fn possible_links_to(&self, peer_id: PeerId) -> Vec<Link> {
self.sockets
.iter()
.map(|s| s.local_addr().unwrap())
.flat_map(|local| {
self.peers[&peer_id]
.iter()
.map(move |&remote| Link { local, remote })
})
.collect()
}

fn receive(&self) -> Vec<PacketMessage<Vec<u8>, auth::Unknown, text::Ciphertext>> {
self.sockets
.iter()
.filter_map(|s| {
let mut buf = vec![0; 1024];

match s.recv(&mut buf) {
Ok(n) => {
buf.truncate(n);
Some(PacketMessage::from_buffer(buf).expect("received invalid packet"))
}
Err(err) if err.kind() == io::ErrorKind::WouldBlock => None,
Err(err) => panic!("error receiving: {}", err),
}
})
.collect()
}

fn receive_block(&self) -> Vec<PacketMessage<Vec<u8>, auth::Unknown, text::Ciphertext>> {
loop {
let packets = self.receive();

if !packets.is_empty() {
return packets;
}

thread::sleep(Duration::from_millis(1));
}
}

fn fulfill_send(&self, obligation: SendPacket) -> Vec<u8> {
let socket = self
.sockets
.iter()
.find(|s| s.local_addr().unwrap() == obligation.link().local)
.expect("no socket for link specified by router");

socket
.send_to(obligation.message().as_buffer(), obligation.link().remote)
.unwrap();

obligation.fulfill()
}
}

fn dummy_cipher() -> ChaCha20Poly1305 {
ChaCha20Poly1305::new(&[0; 32].into())
}

0 comments on commit f4a7546

Please sign in to comment.