Skip to content

Commit

Permalink
Merge pull request #35 from idky137/add_walletrpc
Browse files Browse the repository at this point in the history
Nym 2.0
  • Loading branch information
Oscar-Pepper authored Jul 18, 2024
2 parents 1cbd634 + ccaf2c8 commit 13c2f12
Show file tree
Hide file tree
Showing 17 changed files with 1,176 additions and 191 deletions.
4 changes: 2 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ [email protected]
Will eventually hold the rust implementations of the LightWallet Service and Darkside RPCs, along with the wallet-side and server-side Nym Service implementations.

# Zingo-ProxyD
Currently a lightweight gRPC server for testing and development. Zingo-ProxyD also has a basic nym server, currently only receives send_transaction commands send over the mixnet.
Currently a lightweight gRPC server for testing and development. Zingo-ProxyD also has a basic nym server, currently only receives send_transaction and get_lightd_info commands send over the mixnet.
This should not be used to run mainnet nodes in its current form as it lacks the queueing and error checking logic necessary.

Under the "nym_poc" feature flag Zingo-ProxyD can also act as a Nym powered proxy between zcash wallets and Zingo-ProxyD, capable of sending zcash transactions over the Nym Mixnet.
Expand Down Expand Up @@ -54,6 +54,6 @@ The walletside Nym implementations are moving to ease wallet integration but the
4) Copy nym address displayed
5) Run `$ cargo run --features "nym_poc" -- <nym address copied>`

From zingolib: [transactions send with this build will be sent over the mixnet]
From zingolib: [get_lightd_info and send_transaction commands sent with this build will be sent over the mixnet]
6) Run `$ cargo run --release --package zingo-cli -- --chain "testnet" --server "127.0.0.1:8088" --data-dir ~/wallets/testnet_wallet`

6 changes: 6 additions & 0 deletions rust-toolchain.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
[toolchain]
channel = "1.76.0"
components = ["rustfmt", "clippy"]

[profile]
minimal = true
201 changes: 164 additions & 37 deletions zingo-proxyd/src/nym_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,65 @@ use std::sync::{

use nym_sdk::mixnet::{MixnetMessageSender, ReconstructedMessage};
use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag;
use prost::Message;
use zcash_client_backend::proto::service::RawTransaction;
// use prost::Message;
// use zcash_client_backend::proto::service::RawTransaction;

use zingo_rpc::primitives::NymClient;
use zingo_rpc::{
jsonrpc::connector::test_node_and_return_uri,
primitives::{NymClient, ProxyClient},
queue::request::ZingoProxyRequest,
};

/// Wrapper struct for a Nym client.
pub struct NymServer(pub NymClient);
pub struct NymServer {
/// NymClient data
pub nym_client: NymClient,
/// Nym Address
pub nym_addr: String,
/// Represents the Online status of the gRPC server.
pub online: Arc<AtomicBool>,
}

impl NymServer {
/// Receives and decodes encoded gRPC messages sent over the nym mixnet, processes them, encodes the response.
/// The encoded response is sent back to the sender using a surb (single use reply block).
pub async fn serve(
mut self,
online: Arc<AtomicBool>,
) -> tokio::task::JoinHandle<Result<(), tonic::transport::Error>> {
pub async fn serve(mut self) -> tokio::task::JoinHandle<Result<(), tonic::transport::Error>> {
let mut request_in: Vec<ReconstructedMessage> = Vec::new();
tokio::task::spawn(async move {
// NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
while online.load(Ordering::SeqCst) {
while let Some(request_nym) = self.0 .0.wait_for_messages().await {
// NOTE: the following should be removed with the addition of the queue and worker pool.
let lwd_port = 8080;
let zebrad_port = 18232;
println!("@zingoproxyd[nym]: Launching temporary proxy client..");
let proxy_client = ProxyClient {
lightwalletd_uri: http::Uri::builder()
.scheme("http")
.authority(format!("localhost:{lwd_port}"))
.path_and_query("/")
.build()
.unwrap(),
zebrad_uri: http::Uri::builder()
.scheme("http")
.authority(format!("localhost:{zebrad_port}"))
.path_and_query("/")
.build()
.unwrap(),
// zebrad_uri: test_node_and_return_uri(
// &zebrad_port,
// Some("xxxxxx".to_string()),
// Some("xxxxxx".to_string()),
// )
// .await
// .unwrap(),
online: self.online.clone(),
};
while self.online.load(Ordering::SeqCst) {
// --- wait for request.
while let Some(request_nym) = self.nym_client.0.wait_for_messages().await {
if request_nym.is_empty() {
interval.tick().await;
if !online.load(Ordering::SeqCst) {
if !self.online.load(Ordering::SeqCst) {
println!("Nym server shutting down.");
return Ok(());
}
Expand All @@ -43,46 +78,138 @@ impl NymServer {
request_in = request_nym;
break;
}

// --- decode request
let request_vu8 = request_in
.first()
.map(|r| r.message.clone())
.ok_or_else(|| "No response received from the nym network".to_string())
.unwrap();

//print request for testing
println!(
"@zingoproxyd[nym]: request received: {:?}",
&request_vu8[..]
);
println!(
"@zingoproxyd[nym]: request length: {}",
&request_vu8[..].len()
);

let request = RawTransaction::decode(&request_vu8[..]).unwrap();
let response = NymClient::nym_send_transaction(&request).await.unwrap();
let mut response_vu8 = Vec::new();
response.encode(&mut response_vu8).unwrap();

//print response for testing
println!("@zingoproxyd[nym]: response sent: {:?}", &response_vu8[..]);
println!(
"@zingoproxyd[nym]: response length: {}",
&response_vu8[..].len()
);

// --- fetch recipient address
let return_recipient = AnonymousSenderTag::try_from_base58_string(
request_in[0].sender_tag.unwrap().to_base58_string(),
)
.unwrap();
self.0
.0
.send_reply(return_recipient, response_vu8)
// --- build ZingoProxyRequest
let zingo_proxy_request =
ZingoProxyRequest::new_from_nym(return_recipient, request_vu8.as_ref());

// print request for testing
// println!(
// "@zingoproxyd[nym][TEST]: ZingoProxyRequest recieved: {:?}.",
// zingo_proxy_request
// );

// --- process request
// NOTE: when the queue is added requests will not be processed here but by the queue!
let response = proxy_client
.process_nym_request(&zingo_proxy_request)
.await
.unwrap();

// print response for testing
// println!(
// "@zingoproxyd[nym][TEST]: Response sent: {:?}.",
// &response[..],
// );

// --- send response
self.nym_client
.0
.send_reply(return_recipient, response)
.await
.unwrap();
}
// Why print this?
println!("Nym server shutting down.");
Ok(())
})
}

/// Returns a new NymServer Inatanse
pub async fn new(nym_conf_path: &str, online: Arc<AtomicBool>) -> Self {
let nym_client = NymClient::nym_spawn(nym_conf_path).await;
let nym_addr = nym_client.0.nym_address().to_string();
NymServer {
nym_client,
nym_addr,
online,
}
}
}

// impl NymServer {
// /// Receives and decodes encoded gRPC messages sent over the nym mixnet, processes them, encodes the response.
// /// The encoded response is sent back to the sender using a surb (single use reply block).
// pub async fn serve(
// mut self,
// online: Arc<AtomicBool>,
// ) -> tokio::task::JoinHandle<Result<(), tonic::transport::Error>> {
// let mut request_in: Vec<ReconstructedMessage> = Vec::new();
// tokio::task::spawn(async move {
// // NOTE: This interval may need to be reduced or removed / moved once scale testing begins.
// let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50));
// while online.load(Ordering::SeqCst) {
// // --- wait for request.
// while let Some(request_nym) = self.0 .0.wait_for_messages().await {
// if request_nym.is_empty() {
// interval.tick().await;
// if !online.load(Ordering::SeqCst) {
// println!("Nym server shutting down.");
// return Ok(());
// }
// continue;
// }
// request_in = request_nym;
// break;
// }

// // --- decode request
// let request_vu8 = request_in
// .first()
// .map(|r| r.message.clone())
// .ok_or_else(|| "No response received from the nym network".to_string())
// .unwrap();

// // --- print request for testing
// println!(
// "@zingoproxyd[nym]: request received: {:?} - request length: {}",
// &request_vu8[..],
// &request_vu8[..].len()
// );

// // --- deserialize request
// let request = RawTransaction::decode(&request_vu8[..]).unwrap();

// // --- process request
// let response = NymClient::nym_send_transaction(&request).await.unwrap();

// // --- decode response
// let mut response_vu8 = Vec::new();
// response.encode(&mut response_vu8).unwrap();

// //print response for testing
// println!(
// "@zingoproxyd[nym]: response sent: {:?} - response length: {}",
// &response_vu8[..],
// &response_vu8[..].len()
// );

// // --- fetch recipient address
// let return_recipient = AnonymousSenderTag::try_from_base58_string(
// request_in[0].sender_tag.unwrap().to_base58_string(),
// )
// .unwrap();

// // --- send response
// self.0
// .0
// .send_reply(return_recipient, response_vu8)
// .await
// .unwrap();
// }
// println!("Nym server shutting down.");
// Ok(())
// })
// }
// }
28 changes: 21 additions & 7 deletions zingo-proxyd/src/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
//! TODO: - Add ProxyServerError error type and rewrite functions to return <Result<(), ProxyServerError>>, propagating internal errors.
//! - Update spawn_server and nym_spawn to return <Result<(), GrpcServerError>> and <Result<(), NymServerError>> and use here.
use crate::{nym_server::NymServer, server::spawn_server};
use crate::{nym_server::NymServer, server::spawn_grpc_server};
use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient;
use zingo_rpc::primitives::NymClient;
use zingo_rpc::jsonrpc::connector::test_node_and_return_uri;

use std::sync::atomic::{AtomicBool, Ordering};
use std::sync::Arc;
Expand All @@ -26,8 +26,18 @@ pub async fn spawn_proxy(
let nym_addr_out: Option<String>;

startup_message();
println!("@zingoproxyd: Launching Zingo-Proxy..\n@zingoproxyd: Launching gRPC Server..");
let proxy_handle = spawn_server(proxy_port, lwd_port, zebrad_port, online.clone()).await;
println!("@zingoproxyd: Launching Zingo-Proxy!\n@zingoproxyd: Checking connection with node..");
// TODO Add user and password fields.
let _zebrad_uri = test_node_and_return_uri(
zebrad_port,
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await
.unwrap();

println!("@zingoproxyd: Launching gRPC Server..");
let proxy_handle = spawn_grpc_server(proxy_port, lwd_port, zebrad_port, online.clone()).await;
handles.push(proxy_handle);

#[cfg(not(feature = "nym_poc"))]
Expand All @@ -42,10 +52,14 @@ pub async fn spawn_proxy(
#[cfg(not(feature = "nym_poc"))]
{
println!("@zingoproxyd[nym]: Launching Nym Server..");
let nym_server: NymServer = NymServer(NymClient::nym_spawn(nym_conf_path).await);
nym_addr_out = Some(nym_server.0 .0.nym_address().to_string());

let nym_proxy_handle = nym_server.serve(online).await;
// let nym_server: NymServer = NymServer(NymClient::nym_spawn(nym_conf_path).await);
// nym_addr_out = Some(nym_server.0 .0.nym_address().to_string());
// let nym_proxy_handle = nym_server.serve(online).await;
let nym_server = NymServer::new(nym_conf_path, online).await;
nym_addr_out = Some(nym_server.nym_addr.clone());
let nym_proxy_handle = nym_server.serve().await;

handles.push(nym_proxy_handle);
// TODO: Add wait_on_nym_startup(nym_addr_out, online.clone()) function to test nym server.
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
Expand Down
2 changes: 1 addition & 1 deletion zingo-proxyd/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ impl ProxyServer {
}

/// Spawns a gRPC server.
pub async fn spawn_server(
pub async fn spawn_grpc_server(
proxy_port: &u16,
lwd_port: &u16,
zebrad_port: &u16,
Expand Down
8 changes: 8 additions & 0 deletions zingo-rpc/src/blockcache/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ use std::io::{Cursor, Read};
use crate::jsonrpc::connector::JsonRpcConnectorError;

/// Parser Error Type.
///
/// TODO: Move this error and other Zingo-Proxy error types intoown errors mod.
#[derive(Debug, thiserror::Error)]
pub enum ParseError {
/// Io Error.
Expand All @@ -20,9 +22,15 @@ pub enum ParseError {
/// UTF-8 conversion error.
#[error("UTF-8 Error: {0}")]
Utf8Error(#[from] std::str::Utf8Error),
/// UTF-8 conversion error.
#[error("UTF-8 Conversion Error: {0}")]
FromUtf8Error(#[from] std::string::FromUtf8Error),
/// Hexadecimal parsing error.
#[error("Hex Parse Error: {0}")]
ParseIntError(#[from] std::num::ParseIntError),
/// Errors originating from prost decodings.
#[error("Prost Decode Error: {0}")]
ProstDecodeError(#[from] prost::DecodeError),
}

/// Used for decoding zcash blocks from a bytestring.
Expand Down
2 changes: 2 additions & 0 deletions zingo-rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,5 +7,7 @@ pub mod blockcache;
pub mod jsonrpc;
pub mod nym;
pub mod primitives;
pub mod queue;
pub mod rpc;
pub mod utils;
pub mod walletrpc;
1 change: 1 addition & 0 deletions zingo-rpc/src/nym.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
//! Backend Nym functionality.
pub mod client;
pub mod utils;
Loading

0 comments on commit 13c2f12

Please sign in to comment.