Skip to content

Commit

Permalink
added dispatcher
Browse files Browse the repository at this point in the history
  • Loading branch information
idky137 committed Aug 2, 2024
1 parent 3df3b8a commit b52664b
Show file tree
Hide file tree
Showing 5 changed files with 172 additions and 7 deletions.
40 changes: 40 additions & 0 deletions zingo-rpc/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,46 @@
//! Zingo-Proxy client server.
use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag;
use std::sync::{atomic::AtomicBool, Arc};
use tokio::sync::mpsc;

use self::{
dispatcher::NymDispatcher,
ingestor::{NymIngestor, TcpIngestor},
request::ZingoProxyRequest,
worker::WorkerPool,
};

pub mod dispatcher;
pub mod error;
pub mod ingestor;
pub mod request;
pub mod worker;

///
pub struct Queue<T> {
/// Maximum length of the queue.
max_size: usize,
/// Queue sender.
queue_tx: mpsc::Sender<T>,
/// Queue receiver.
queue_rx: mpsc::Receiver<T>,
}

/// LightWallet server capable of servicing clients over both http and nym.
pub struct Server {
/// Listens for incoming gRPC requests over HTTP.
tcp_ingestor: TcpIngestor,
/// Listens for incoming gRPC requests over Nym Mixnet.
nym_ingestor: NymIngestor,
/// Sends gRPC responses over Nym Mixnet.
nym_dispatcher: NymDispatcher,
/// Dynamically sized pool of workers.
worker_pool: WorkerPool,
/// Request queue.
request_queue: Queue<ZingoProxyRequest>,
/// Nym response queue.
nym_response_queue: Queue<(Vec<u8>, AnonymousSenderTag)>,
/// Represents the Online status of the Server.
pub online: Arc<AtomicBool>,
}
106 changes: 106 additions & 0 deletions zingo-rpc/src/server/dispatcher.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
//! Holds the server dispatcher (replyer) implementations.
use nym_sdk::mixnet::MixnetMessageSender;
use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag;
use std::sync::{
atomic::{AtomicBool, Ordering},
Arc,
};
use tokio::sync::mpsc;

use crate::{
nym::{client::NymClient, error::NymError},
server::error::DispatcherError,
};

/// Status of the worker.
#[derive(Debug, Clone)]
pub enum DispatcherStatus {
/// On hold, due to blockcache / node error.
Inactive,
/// Listening for new requests.
Listening,
}

/// Sends gRPC responses over Nym Mixnet.
pub struct NymDispatcher {
/// Nym Client
dispatcher: NymClient,
/// Used to send requests to the queue.
response_queue: mpsc::Receiver<(Vec<u8>, AnonymousSenderTag)>,
/// Used to send requests to the queue.
response_requeue: mpsc::Sender<(Vec<u8>, AnonymousSenderTag)>,
/// Represents the Online status of the gRPC server.
online: Arc<AtomicBool>,
/// Current status of the ingestor.
status: DispatcherStatus,
}

impl NymDispatcher {
/// Creates a Nym Ingestor
pub async fn spawn(
nym_conf_path: &str,
response_queue: mpsc::Receiver<(Vec<u8>, AnonymousSenderTag)>,
response_requeue: mpsc::Sender<(Vec<u8>, AnonymousSenderTag)>,
online: Arc<AtomicBool>,
) -> Result<Self, DispatcherError> {
let client = NymClient::spawn(&format!("{}/dispatcher", nym_conf_path)).await?;
Ok(NymDispatcher {
dispatcher: client,
response_queue,
response_requeue,
online,
status: DispatcherStatus::Inactive,
})
}

/// Starts Nym service.
pub async fn serve(mut self) -> tokio::task::JoinHandle<Result<(), DispatcherError>> {
tokio::task::spawn(async move {
// NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
// TODO Check self.status and wait on server / node if on hold.
self.status = DispatcherStatus::Listening;
loop {
tokio::select! {
_ = interval.tick() => {
if !self.check_online() {
println!("Nym dispatcher shutting down.");
return Ok(());
}
}
incoming = self.response_queue.recv() => {
match incoming {
Some(response) => {
if !self.check_online() {
println!("Nym dispatcher shutting down.");
return Ok(());
}
if let Err(nym_e) = self.dispatcher
.client
.send_reply(response.1, response.0.clone())
.await.map_err(NymError::from) {
// TODO: Convert to use try_send().
if let Err(e) = self.response_requeue.send(response).await {
eprintln!("Failed to send response over nym: {}\nAnd failed to requeue response: {}\nFatal error! Restarting nym dispatcher.", nym_e, e);
// TODO: Handle error. Restart nym dispatcher.
}
eprintln!("Failed to send response over nym: {}\nResponse requeued, restarting nym dispatcher.", nym_e);
// TODO: Handle error. Restart nym dispatcher.
}
}
None => {
println!("Response queue closed, nym dispatcher shutting down.");
return Ok(());
}
}
}
}
}
})
}

fn check_online(&self) -> bool {
self.online.load(Ordering::SeqCst)
}
}
8 changes: 8 additions & 0 deletions zingo-rpc/src/server/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ pub enum IngestorError {
QueuePushError(#[from] TrySendError<ZingoProxyRequest>),
}

/// Zingo-Proxy distpater errors.
#[derive(Debug, thiserror::Error)]
pub enum DispatcherError {
/// Nym based errors.
#[error("Nym error: {0}")]
NymError(#[from] NymError),
}

/// Zingo-Proxy worker errors.
#[derive(Debug, thiserror::Error)]
pub enum WorkerError {
Expand Down
11 changes: 6 additions & 5 deletions zingo-rpc/src/server/ingestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ use crate::{
};

/// Status of the worker.
///
/// TODO: Add duration to each variant.
#[derive(Debug, Clone)]
pub enum IngestorStatus {
/// On hold, due to blockcache / node error.
Expand All @@ -25,7 +23,7 @@ pub enum IngestorStatus {
Listening,
}

/// Configuration data for gRPC server.
/// Listens for incoming gRPC requests over HTTP.
pub struct TcpIngestor {
/// Tcp Listener.
ingestor: TcpListener,
Expand Down Expand Up @@ -75,6 +73,7 @@ impl TcpIngestor {
println!("Tcp ingestor shutting down.");
return Ok(());
}
// TODO: Convert to use try_send().
if let Err(e) = self.queue.send(ZingoProxyRequest::new_from_grpc(stream)).await {
// TODO:: Return queue full tonic status over tcpstream and close (that TcpStream..).
eprintln!("Failed to send connection: {}", e);
Expand All @@ -101,7 +100,7 @@ impl TcpIngestor {
}
}

/// Wrapper struct for a Nym client.
/// Listens for incoming gRPC requests over Nym Mixnet.
pub struct NymIngestor {
/// Nym Client
ingestor: NymClient,
Expand All @@ -120,7 +119,7 @@ impl NymIngestor {
queue: mpsc::Sender<ZingoProxyRequest>,
online: Arc<AtomicBool>,
) -> Result<Self, IngestorError> {
let listener = NymClient::spawn(nym_conf_path).await?;
let listener = NymClient::spawn(&format!("{}/ingestor", nym_conf_path)).await?;
Ok(NymIngestor {
ingestor: listener,
queue,
Expand Down Expand Up @@ -165,12 +164,14 @@ impl NymIngestor {
// TODO: Handle RequestError here.
let zingo_proxy_request =
ZingoProxyRequest::new_from_nym(return_recipient, request_vu8.as_ref())?;
// TODO: Convert to use try_send().
if let Err(e) = self.queue.send(zingo_proxy_request).await {
// TODO: Return queue full tonic status over nym mixnet.
eprintln!("Failed to send connection: {}", e);
}
}
None => {
// TODO: Error in nym client, handle error here (restart ingestor?).
eprintln!("Failed to receive message from Nym network.");
if !self.online.load(Ordering::SeqCst) {
println!("Nym ingestor shutting down.");
Expand Down
14 changes: 12 additions & 2 deletions zingo-rpc/src/server/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ pub struct Worker {
grpc_client: GrpcClient,
/// Workers current status.
status: WorkerStatus,
/// Represents the Online status of the gRPC server.
/// Represents the Online status of the Worker.
pub online: Arc<AtomicBool>,
}

Expand Down Expand Up @@ -133,7 +133,7 @@ impl Worker {

/// Starts queue worker service routine.
///
/// TODO: Add requeue on error.
/// TODO: Add requeue logic for node errors.
pub async fn serve(mut self) -> tokio::task::JoinHandle<Result<(), WorkerError>> {
tokio::task::spawn(async move {
// NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
Expand Down Expand Up @@ -218,3 +218,13 @@ impl Worker {
self.online.load(Ordering::SeqCst)
}
}

/// Dynamically sized pool of workers.
pub struct WorkerPool {
/// Maximun number of concurrent workers allowed.
max_size: usize,
/// Workers currently in the pool
workers: Vec<Worker>,
/// Represents the Online status of the WorkerPool.
pub online: Arc<AtomicBool>,
}

0 comments on commit b52664b

Please sign in to comment.