diff --git a/packages/centipede/src/config.rs b/packages/centipede/src/config.rs index 392cd48..94a4b14 100644 --- a/packages/centipede/src/config.rs +++ b/packages/centipede/src/config.rs @@ -21,10 +21,6 @@ pub struct Centipede { /// Addresses on which the daemon should listen for incoming packets. pub recv_addrs: Vec, - /// of workers to spawn. - #[serde(default = "num_cpus::get")] - pub workers: usize, - /// List of peers. pub peers: Vec, } diff --git a/packages/centipede/src/main.rs b/packages/centipede/src/main.rs index e7d74fc..a61a89f 100644 --- a/packages/centipede/src/main.rs +++ b/packages/centipede/src/main.rs @@ -101,7 +101,7 @@ fn main() -> Result<()> { thread::scope(|s| { { let shutdown = shutdown.clone(); - let mut worker = Worker::new( + let worker = Worker::new( router.worker(), control_message_sink.clone(), tun_dev @@ -111,53 +111,12 @@ fn main() -> Result<()> { ) .wrap_err("failed to create worker 0")?; - s.spawn(move || { - let mut events = mio::Events::with_capacity(1024); - loop { - if shutdown.load(Ordering::Relaxed) { - break; - } - - if let Ok(outgoing) = rx_outgoing_control.try_recv() { - let res = worker.send_control_message::>( - outgoing.from, - outgoing.to, - outgoing.message, - ); - - if let Err(e) = res { - println!( - "{:?}", - Report::new(InWorkerThread { - inner: e, - thread_number: 0 - }) - ); - - log::info!("shutting down due to error"); - shutdown.store(true, Ordering::Relaxed); - } - } - - if let Err(e) = worker.wait_and_handle(&mut events) { - println!( - "{:?}", - Report::new(InWorkerThread { - inner: e, - thread_number: 0 - }) - ); - - log::info!("shutting down due to error"); - shutdown.store(true, Ordering::Relaxed); - } - } - }); + s.spawn(move || worker_loop(worker, 0, shutdown, Some(rx_outgoing_control))); } for i in 1..opt.workers { let shutdown = shutdown.clone(); - let mut worker = Worker::new( + let worker = Worker::new( router.worker(), control_message_sink.clone(), tun_dev @@ -166,26 +125,8 @@ fn main() -> Result<()> { .wrap_err_with(|| format!("failed to get TUN queue {}", i))?, ) .wrap_err_with(|| format!("failed to create worker {}", i))?; - s.spawn(move || { - let mut events = mio::Events::with_capacity(1024); - loop { - if shutdown.load(Ordering::Relaxed) { - break; - } - if let Err(e) = worker.wait_and_handle(&mut events) { - println!( - "{:?}", - Report::new(InWorkerThread { - inner: e, - thread_number: i - }) - ); - - log::info!("shutting down due to error"); - shutdown.store(true, Ordering::Relaxed); - } - } - }); + + s.spawn(move || worker_loop(worker, i, shutdown, None)); } let router_configurator = router.configurator(); @@ -230,6 +171,54 @@ fn main() -> Result<()> { }) } +fn worker_loop( + mut worker: Worker, + thread_number: usize, + shutdown: Arc, + rx_outgoing_control: Option>, +) { + let mut events = mio::Events::with_capacity(1024); + loop { + if shutdown.load(Ordering::Relaxed) { + break; + } + + if let Some(Ok(outgoing)) = rx_outgoing_control.as_ref().map(mpsc::Receiver::try_recv) { + let res = worker.send_control_message::>( + outgoing.from, + outgoing.to, + outgoing.message, + ); + + if let Err(e) = res { + println!( + "{:?}", + Report::new(InWorkerThread { + inner: e, + thread_number + }) + ); + + log::info!("shutting down due to error"); + shutdown.store(true, Ordering::Relaxed); + } + } + + if let Err(e) = worker.wait_and_handle(&mut events) { + println!( + "{:?}", + Report::new(InWorkerThread { + inner: e, + thread_number + }) + ); + + log::info!("shutting down due to error"); + shutdown.store(true, Ordering::Relaxed); + } + } +} + #[derive(Debug, Error, Diagnostic)] #[error("worker thread {thread_number} failed")] struct InWorkerThread {