Skip to content

Commit

Permalink
feat(worker): forward control messages to an mpsc channel
Browse files Browse the repository at this point in the history
  • Loading branch information
max-niederman committed Mar 31, 2024
1 parent afb7f69 commit 3d6ba49
Show file tree
Hide file tree
Showing 2 changed files with 39 additions and 4 deletions.
12 changes: 12 additions & 0 deletions packages/centipede_proto/src/control.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,18 @@ where
pub fn as_buffer(&self) -> &B {
&self.buffer
}

/// Copy the message's underlying buffer into a `Message<Vec<u8>, A>`.
pub fn to_vec_backed(self) -> Message<Vec<u8>, A> {
Message {
buffer: self.buffer.to_vec(),
sender: self.sender,
recipient: self.recipient,
signature: self.signature,
content: self.content,
_auth: PhantomData::<A>,
}
}
}

impl<B> Message<B, auth::Unknown>
Expand Down
31 changes: 27 additions & 4 deletions packages/centipede_worker/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@ use std::{
io,
mem::{self, MaybeUninit},
os::fd::AsRawFd,
sync::atomic::{AtomicBool, Ordering},
sync::{
atomic::{AtomicBool, Ordering},
mpsc,
},
task::Poll,
time::Duration,
};

use centipede_proto::{MessageDiscriminant, PacketMessage};
use centipede_proto::{marker::auth, ControlMessage, MessageDiscriminant, PacketMessage};
use centipede_router::worker::{ConfigChanged, WorkerHandle};
use mio::unix::SourceFd;
use sockets::Sockets;
Expand All @@ -23,6 +26,9 @@ pub struct Worker<'r> {
/// The underlying handle to the router.
router_handle: WorkerHandle<'r>,

/// The sending half of the control message channel.
control_message_channel: mpsc::Sender<ControlMessage<Vec<u8>, auth::Unknown>>,

/// The TUN queue.
tun_queue: hypertube::Queue<'r, false>,

Expand All @@ -35,9 +41,14 @@ pub struct Worker<'r> {

impl<'r> Worker<'r> {
/// Create a new worker.
pub fn new(router_handle: WorkerHandle<'r>, tun_queue: hypertube::Queue<'r, false>) -> Self {
pub fn new(
router_handle: WorkerHandle<'r>,
control_message_channel: mpsc::Sender<ControlMessage<Vec<u8>, auth::Unknown>>,
tun_queue: hypertube::Queue<'r, false>,
) -> Self {
Self {
router_handle,
control_message_channel,
tun_queue,
sockets: Sockets::new(),
poll: mio::Poll::new().unwrap(),
Expand Down Expand Up @@ -148,7 +159,19 @@ impl<'r> Worker<'r> {
let msg = unsafe { MaybeUninit::slice_assume_init_mut(&mut buf[..n]) };

match centipede_proto::discriminate(&*msg) {
Ok(MessageDiscriminant::Control) => todo!(),
Ok(MessageDiscriminant::Control) => {
let control = match ControlMessage::deserialize(&*msg) {
Ok(control) => control,
Err(e) => {
log::warn!("failed to parse packet message: {}", e);
continue;
}
};

self.control_message_channel
.send(control.to_vec_backed())
.expect("failed to send control message");
}
Ok(MessageDiscriminant::Packet) => {
let packet = match PacketMessage::from_buffer(msg) {
Ok(packet) => packet,
Expand Down

0 comments on commit 3d6ba49

Please sign in to comment.