diff --git a/README.md b/README.md index 2225b1b..e69464d 100644 --- a/README.md +++ b/README.md @@ -13,7 +13,7 @@ zingodisclosure@proton.me Will eventually hold the rust implementations of the LightWallet Service and Darkside RPCs, along with the wallet-side and server-side Nym Service implementations. # Zingo-ProxyD -Currently a lightweight gRPC server for testing and development. Zingo-ProxyD also has a basic nym server, currently only receives send_transaction commands send over the mixnet. +Currently a lightweight gRPC server for testing and development. Zingo-ProxyD also has a basic nym server, currently only receives send_transaction and get_lightd_info commands send over the mixnet. This should not be used to run mainnet nodes in its current form as it lacks the queueing and error checking logic necessary. Under the "nym_poc" feature flag Zingo-ProxyD can also act as a Nym powered proxy between zcash wallets and Zingo-ProxyD, capable of sending zcash transactions over the Nym Mixnet. @@ -54,6 +54,6 @@ The walletside Nym implementations are moving to ease wallet integration but the 4) Copy nym address displayed 5) Run `$ cargo run --features "nym_poc" -- ` -From zingolib: [transactions send with this build will be sent over the mixnet] +From zingolib: [get_lightd_info and send_transaction commands sent with this build will be sent over the mixnet] 6) Run `$ cargo run --release --package zingo-cli -- --chain "testnet" --server "127.0.0.1:8088" --data-dir ~/wallets/testnet_wallet` diff --git a/rust-toolchain.toml b/rust-toolchain.toml new file mode 100644 index 0000000..ad43674 --- /dev/null +++ b/rust-toolchain.toml @@ -0,0 +1,6 @@ +[toolchain] +channel = "1.76.0" +components = ["rustfmt", "clippy"] + +[profile] +minimal = true diff --git a/zingo-proxyd/src/nym_server.rs b/zingo-proxyd/src/nym_server.rs index 8773938..29c036f 100644 --- a/zingo-proxyd/src/nym_server.rs +++ b/zingo-proxyd/src/nym_server.rs @@ -11,30 +11,65 @@ use std::sync::{ use nym_sdk::mixnet::{MixnetMessageSender, ReconstructedMessage}; use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag; -use prost::Message; -use zcash_client_backend::proto::service::RawTransaction; +// use prost::Message; +// use zcash_client_backend::proto::service::RawTransaction; -use zingo_rpc::primitives::NymClient; +use zingo_rpc::{ + jsonrpc::connector::test_node_and_return_uri, + primitives::{NymClient, ProxyClient}, + queue::request::ZingoProxyRequest, +}; /// Wrapper struct for a Nym client. -pub struct NymServer(pub NymClient); +pub struct NymServer { + /// NymClient data + pub nym_client: NymClient, + /// Nym Address + pub nym_addr: String, + /// Represents the Online status of the gRPC server. + pub online: Arc, +} impl NymServer { /// Receives and decodes encoded gRPC messages sent over the nym mixnet, processes them, encodes the response. /// The encoded response is sent back to the sender using a surb (single use reply block). - pub async fn serve( - mut self, - online: Arc, - ) -> tokio::task::JoinHandle> { + pub async fn serve(mut self) -> tokio::task::JoinHandle> { let mut request_in: Vec = Vec::new(); tokio::task::spawn(async move { // NOTE: This interval may need to be reduced or removed / moved once scale testing begins. let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50)); - while online.load(Ordering::SeqCst) { - while let Some(request_nym) = self.0 .0.wait_for_messages().await { + // NOTE: the following should be removed with the addition of the queue and worker pool. + let lwd_port = 8080; + let zebrad_port = 18232; + println!("@zingoproxyd[nym]: Launching temporary proxy client.."); + let proxy_client = ProxyClient { + lightwalletd_uri: http::Uri::builder() + .scheme("http") + .authority(format!("localhost:{lwd_port}")) + .path_and_query("/") + .build() + .unwrap(), + zebrad_uri: http::Uri::builder() + .scheme("http") + .authority(format!("localhost:{zebrad_port}")) + .path_and_query("/") + .build() + .unwrap(), + // zebrad_uri: test_node_and_return_uri( + // &zebrad_port, + // Some("xxxxxx".to_string()), + // Some("xxxxxx".to_string()), + // ) + // .await + // .unwrap(), + online: self.online.clone(), + }; + while self.online.load(Ordering::SeqCst) { + // --- wait for request. + while let Some(request_nym) = self.nym_client.0.wait_for_messages().await { if request_nym.is_empty() { interval.tick().await; - if !online.load(Ordering::SeqCst) { + if !self.online.load(Ordering::SeqCst) { println!("Nym server shutting down."); return Ok(()); } @@ -43,46 +78,138 @@ impl NymServer { request_in = request_nym; break; } + + // --- decode request let request_vu8 = request_in .first() .map(|r| r.message.clone()) .ok_or_else(|| "No response received from the nym network".to_string()) .unwrap(); - - //print request for testing - println!( - "@zingoproxyd[nym]: request received: {:?}", - &request_vu8[..] - ); - println!( - "@zingoproxyd[nym]: request length: {}", - &request_vu8[..].len() - ); - - let request = RawTransaction::decode(&request_vu8[..]).unwrap(); - let response = NymClient::nym_send_transaction(&request).await.unwrap(); - let mut response_vu8 = Vec::new(); - response.encode(&mut response_vu8).unwrap(); - - //print response for testing - println!("@zingoproxyd[nym]: response sent: {:?}", &response_vu8[..]); - println!( - "@zingoproxyd[nym]: response length: {}", - &response_vu8[..].len() - ); - + // --- fetch recipient address let return_recipient = AnonymousSenderTag::try_from_base58_string( request_in[0].sender_tag.unwrap().to_base58_string(), ) .unwrap(); - self.0 - .0 - .send_reply(return_recipient, response_vu8) + // --- build ZingoProxyRequest + let zingo_proxy_request = + ZingoProxyRequest::new_from_nym(return_recipient, request_vu8.as_ref()); + + // print request for testing + // println!( + // "@zingoproxyd[nym][TEST]: ZingoProxyRequest recieved: {:?}.", + // zingo_proxy_request + // ); + + // --- process request + // NOTE: when the queue is added requests will not be processed here but by the queue! + let response = proxy_client + .process_nym_request(&zingo_proxy_request) + .await + .unwrap(); + + // print response for testing + // println!( + // "@zingoproxyd[nym][TEST]: Response sent: {:?}.", + // &response[..], + // ); + + // --- send response + self.nym_client + .0 + .send_reply(return_recipient, response) .await .unwrap(); } + // Why print this? println!("Nym server shutting down."); Ok(()) }) } + + /// Returns a new NymServer Inatanse + pub async fn new(nym_conf_path: &str, online: Arc) -> Self { + let nym_client = NymClient::nym_spawn(nym_conf_path).await; + let nym_addr = nym_client.0.nym_address().to_string(); + NymServer { + nym_client, + nym_addr, + online, + } + } } + +// impl NymServer { +// /// Receives and decodes encoded gRPC messages sent over the nym mixnet, processes them, encodes the response. +// /// The encoded response is sent back to the sender using a surb (single use reply block). +// pub async fn serve( +// mut self, +// online: Arc, +// ) -> tokio::task::JoinHandle> { +// let mut request_in: Vec = Vec::new(); +// tokio::task::spawn(async move { +// // NOTE: This interval may need to be reduced or removed / moved once scale testing begins. +// let mut interval = tokio::time::interval(tokio::time::Duration::from_millis(50)); +// while online.load(Ordering::SeqCst) { +// // --- wait for request. +// while let Some(request_nym) = self.0 .0.wait_for_messages().await { +// if request_nym.is_empty() { +// interval.tick().await; +// if !online.load(Ordering::SeqCst) { +// println!("Nym server shutting down."); +// return Ok(()); +// } +// continue; +// } +// request_in = request_nym; +// break; +// } + +// // --- decode request +// let request_vu8 = request_in +// .first() +// .map(|r| r.message.clone()) +// .ok_or_else(|| "No response received from the nym network".to_string()) +// .unwrap(); + +// // --- print request for testing +// println!( +// "@zingoproxyd[nym]: request received: {:?} - request length: {}", +// &request_vu8[..], +// &request_vu8[..].len() +// ); + +// // --- deserialize request +// let request = RawTransaction::decode(&request_vu8[..]).unwrap(); + +// // --- process request +// let response = NymClient::nym_send_transaction(&request).await.unwrap(); + +// // --- decode response +// let mut response_vu8 = Vec::new(); +// response.encode(&mut response_vu8).unwrap(); + +// //print response for testing +// println!( +// "@zingoproxyd[nym]: response sent: {:?} - response length: {}", +// &response_vu8[..], +// &response_vu8[..].len() +// ); + +// // --- fetch recipient address +// let return_recipient = AnonymousSenderTag::try_from_base58_string( +// request_in[0].sender_tag.unwrap().to_base58_string(), +// ) +// .unwrap(); + +// // --- send response +// self.0 +// .0 +// .send_reply(return_recipient, response_vu8) +// .await +// .unwrap(); +// } +// println!("Nym server shutting down."); +// Ok(()) +// }) +// } +// } diff --git a/zingo-proxyd/src/proxy.rs b/zingo-proxyd/src/proxy.rs index 3c5bc25..7235f7a 100644 --- a/zingo-proxyd/src/proxy.rs +++ b/zingo-proxyd/src/proxy.rs @@ -3,9 +3,9 @@ //! TODO: - Add ProxyServerError error type and rewrite functions to return >, propagating internal errors. //! - Update spawn_server and nym_spawn to return > and > and use here. -use crate::{nym_server::NymServer, server::spawn_server}; +use crate::{nym_server::NymServer, server::spawn_grpc_server}; use zcash_client_backend::proto::service::compact_tx_streamer_client::CompactTxStreamerClient; -use zingo_rpc::primitives::NymClient; +use zingo_rpc::jsonrpc::connector::test_node_and_return_uri; use std::sync::atomic::{AtomicBool, Ordering}; use std::sync::Arc; @@ -26,8 +26,18 @@ pub async fn spawn_proxy( let nym_addr_out: Option; startup_message(); - println!("@zingoproxyd: Launching Zingo-Proxy..\n@zingoproxyd: Launching gRPC Server.."); - let proxy_handle = spawn_server(proxy_port, lwd_port, zebrad_port, online.clone()).await; + println!("@zingoproxyd: Launching Zingo-Proxy!\n@zingoproxyd: Checking connection with node.."); + // TODO Add user and password fields. + let _zebrad_uri = test_node_and_return_uri( + zebrad_port, + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await + .unwrap(); + + println!("@zingoproxyd: Launching gRPC Server.."); + let proxy_handle = spawn_grpc_server(proxy_port, lwd_port, zebrad_port, online.clone()).await; handles.push(proxy_handle); #[cfg(not(feature = "nym_poc"))] @@ -42,10 +52,14 @@ pub async fn spawn_proxy( #[cfg(not(feature = "nym_poc"))] { println!("@zingoproxyd[nym]: Launching Nym Server.."); - let nym_server: NymServer = NymServer(NymClient::nym_spawn(nym_conf_path).await); - nym_addr_out = Some(nym_server.0 .0.nym_address().to_string()); - let nym_proxy_handle = nym_server.serve(online).await; + // let nym_server: NymServer = NymServer(NymClient::nym_spawn(nym_conf_path).await); + // nym_addr_out = Some(nym_server.0 .0.nym_address().to_string()); + // let nym_proxy_handle = nym_server.serve(online).await; + let nym_server = NymServer::new(nym_conf_path, online).await; + nym_addr_out = Some(nym_server.nym_addr.clone()); + let nym_proxy_handle = nym_server.serve().await; + handles.push(nym_proxy_handle); // TODO: Add wait_on_nym_startup(nym_addr_out, online.clone()) function to test nym server. tokio::time::sleep(tokio::time::Duration::from_secs(2)).await; diff --git a/zingo-proxyd/src/server.rs b/zingo-proxyd/src/server.rs index a52ae80..d4598bf 100644 --- a/zingo-proxyd/src/server.rs +++ b/zingo-proxyd/src/server.rs @@ -73,7 +73,7 @@ impl ProxyServer { } /// Spawns a gRPC server. -pub async fn spawn_server( +pub async fn spawn_grpc_server( proxy_port: &u16, lwd_port: &u16, zebrad_port: &u16, diff --git a/zingo-rpc/src/blockcache/utils.rs b/zingo-rpc/src/blockcache/utils.rs index d8169a3..04d1f2d 100644 --- a/zingo-rpc/src/blockcache/utils.rs +++ b/zingo-rpc/src/blockcache/utils.rs @@ -6,6 +6,8 @@ use std::io::{Cursor, Read}; use crate::jsonrpc::connector::JsonRpcConnectorError; /// Parser Error Type. +/// +/// TODO: Move this error and other Zingo-Proxy error types intoown errors mod. #[derive(Debug, thiserror::Error)] pub enum ParseError { /// Io Error. @@ -20,9 +22,15 @@ pub enum ParseError { /// UTF-8 conversion error. #[error("UTF-8 Error: {0}")] Utf8Error(#[from] std::str::Utf8Error), + /// UTF-8 conversion error. + #[error("UTF-8 Conversion Error: {0}")] + FromUtf8Error(#[from] std::string::FromUtf8Error), /// Hexadecimal parsing error. #[error("Hex Parse Error: {0}")] ParseIntError(#[from] std::num::ParseIntError), + /// Errors originating from prost decodings. + #[error("Prost Decode Error: {0}")] + ProstDecodeError(#[from] prost::DecodeError), } /// Used for decoding zcash blocks from a bytestring. diff --git a/zingo-rpc/src/lib.rs b/zingo-rpc/src/lib.rs index 195644d..ba73e50 100644 --- a/zingo-rpc/src/lib.rs +++ b/zingo-rpc/src/lib.rs @@ -7,5 +7,7 @@ pub mod blockcache; pub mod jsonrpc; pub mod nym; pub mod primitives; +pub mod queue; pub mod rpc; pub mod utils; +pub mod walletrpc; diff --git a/zingo-rpc/src/nym.rs b/zingo-rpc/src/nym.rs index c2c3c6e..8bfcd24 100644 --- a/zingo-rpc/src/nym.rs +++ b/zingo-rpc/src/nym.rs @@ -1,3 +1,4 @@ //! Backend Nym functionality. +pub mod client; pub mod utils; diff --git a/zingo-rpc/src/nym/client.rs b/zingo-rpc/src/nym/client.rs new file mode 100644 index 0000000..c7d613a --- /dev/null +++ b/zingo-rpc/src/nym/client.rs @@ -0,0 +1,64 @@ +//! Nym client functionality. +//! +//! TODO: - Add NymClientError error type and rewrite functions to return >. + +use nym_sdk::mixnet::{ + MixnetClientBuilder, MixnetMessageSender, Recipient, ReconstructedMessage, StoragePaths, +}; +use std::path::PathBuf; + +use crate::primitives::NymClient; + +impl NymClient { + /// Spawns a nym client and connects to the mixnet. + pub async fn nym_spawn(str_path: &str) -> Self { + //nym_bin_common::logging::setup_logging(); + let client = MixnetClientBuilder::new_with_default_storage( + StoragePaths::new_from_dir(PathBuf::from(str_path)).unwrap(), + ) + .await + .unwrap() + .build() + .unwrap() + .connect_to_mixnet() + .await + .unwrap(); + + let nym_addr = client.nym_address().to_string(); + println!("@zingoproxyd[nym]: Nym server listening on: {nym_addr}."); + + Self(client) + } + + /// Forwards an encoded gRPC request over the nym mixnet to the nym address specified and waits for the response. + /// + /// TODO: Add timout for waiting for response. + pub async fn nym_forward( + &mut self, + recipient_address: &str, + message: Vec, + ) -> Result, Box> { + let recipient: Recipient = + Recipient::try_from_base58_string(recipient_address.to_string()).unwrap(); + self.0.send_plain_message(recipient, message).await.unwrap(); + + let mut nym_response: Vec = Vec::new(); + while let Some(response_in) = self.0.wait_for_messages().await { + if response_in.is_empty() { + continue; + } + nym_response = response_in; + break; + } + let response_out = nym_response + .first() + .map(|r| r.message.clone()) + .ok_or_else(|| "No response received from the nym network".to_string())?; + Ok(response_out) + } + + /// Closes the nym client. + pub async fn nym_close(self) { + self.0.disconnect().await; + } +} diff --git a/zingo-rpc/src/nym/utils.rs b/zingo-rpc/src/nym/utils.rs index 6c4df59..61e88b8 100644 --- a/zingo-rpc/src/nym/utils.rs +++ b/zingo-rpc/src/nym/utils.rs @@ -1,78 +1,37 @@ //! Utility functions for Nym-Proxy -//! -//! TODO: - Add NymClientError error type and rewrite functions to return >. -use nym_sdk::mixnet::{ - MixnetClientBuilder, MixnetMessageSender, Recipient, ReconstructedMessage, StoragePaths, -}; -use std::path::PathBuf; +use std::io::Cursor; +use zcash_encoding::CompactSize; -use crate::primitives::NymClient; +use crate::blockcache::utils::{read_bytes, ParseError}; -impl NymClient { - /// Spawns a nym client and connects to the mixnet. - pub async fn nym_spawn(str_path: &str) -> Self { - //nym_bin_common::logging::setup_logging(); - let client = MixnetClientBuilder::new_with_default_storage( - StoragePaths::new_from_dir(PathBuf::from(str_path)).unwrap(), - ) - .await - .unwrap() - .build() - .unwrap() - .connect_to_mixnet() - .await - .unwrap(); - - let nym_addr = client.nym_address().to_string(); - println!("@zingoproxyd[nym]: Nym server listening on: {nym_addr}."); - - Self(client) - } - - /// Forwards an encoded gRPC request over the nym mixnet to the nym address specified and waits for the response. - pub async fn nym_forward( - &mut self, - recipient_address: &str, - message: Vec, - ) -> Result, Box> { - let recipient: Recipient = - Recipient::try_from_base58_string(recipient_address.to_string()).unwrap(); - self.0.send_plain_message(recipient, message).await.unwrap(); - - let mut nym_response: Vec = Vec::new(); - while let Some(response_in) = self.0.wait_for_messages().await { - if response_in.is_empty() { - continue; - } - nym_response = response_in; - break; - } - let response_out = nym_response - .first() - .map(|r| r.message.clone()) - .ok_or_else(|| "No response received from the nym network".to_string())?; - Ok(response_out) - } - - /// Closes the nym client. - pub async fn nym_close(self) { - self.0.disconnect().await; - } +/// Reads a RPC method name from a Vec and returns this as a string along with the remaining data in the input. +fn read_nym_method(data: &[u8]) -> Result<(String, &[u8]), ParseError> { + let mut cursor = Cursor::new(data); + let method_len = CompactSize::read(&mut cursor)? as usize; + let method = String::from_utf8(read_bytes(&mut cursor, method_len, "failed to read")?)?; + Ok((method, &data[cursor.position() as usize..])) } -/// Serialises gRPC request to a buffer. -pub async fn serialize_request( - request: &T, -) -> Result, Box> { - let mut buf = Vec::new(); - request.encode(&mut buf)?; - Ok(buf) +/// Check the body of the request is the correct length. +fn check_nym_body(data: &[u8]) -> Result<&[u8], ParseError> { + let mut cursor = Cursor::new(data); + let body_len = CompactSize::read(&mut cursor)? as usize; + if &body_len != &data[cursor.position() as usize..].len() { + return Err(ParseError::InvalidData( + "Incorrect request body size read.".to_string(), + )); + }; + Ok(&data[cursor.position() as usize..]) } -/// Decodes gRPC request from a buffer -pub async fn deserialize_response( - data: &[u8], -) -> Result> { - T::decode(data).map_err(Into::into) +/// Extracts metadata from a NymRequest. +/// +/// Returns [ID, Method, RequestData]. +pub fn read_nym_request_data(data: &[u8]) -> Result<(u64, String, &[u8]), ParseError> { + let mut cursor = Cursor::new(data); + let id = CompactSize::read(&mut cursor)?; + let (method, data) = read_nym_method(&data[cursor.position() as usize..])?; + let body = check_nym_body(data)?; + Ok((id, method, body)) } diff --git a/zingo-rpc/src/queue.rs b/zingo-rpc/src/queue.rs new file mode 100644 index 0000000..0ebea07 --- /dev/null +++ b/zingo-rpc/src/queue.rs @@ -0,0 +1,3 @@ +//! Zingo-Proxy request queue. + +pub mod request; diff --git a/zingo-rpc/src/queue/request.rs b/zingo-rpc/src/queue/request.rs new file mode 100644 index 0000000..e510f11 --- /dev/null +++ b/zingo-rpc/src/queue/request.rs @@ -0,0 +1,215 @@ +//! Request types. + +use std::time::SystemTime; + +use nym_sphinx_anonymous_replies::requests::AnonymousSenderTag; +use tonic::metadata::MetadataMap; + +use crate::nym::utils::read_nym_request_data; + +/// Zingo-Proxy request errors. +#[derive(Debug, thiserror::Error)] +pub enum RequestError { + /// Errors originating from incorrect enum types being called. + #[error("Incorrect variant")] + IncorrectVariant, + /// System time errors. + #[error("System time error: {0}")] + SystemTimeError(#[from] std::time::SystemTimeError), +} + +/// Requests queuing metadata. +#[derive(Debug)] +struct QueueData { + // / Exclusive request id. + // request_id: u64, // TODO: implement with request queue (implement exlusive request_id generator in queue object). + /// Time which the request was received. + time_received: SystemTime, + /// Number of times the request has been requeued. + requeue_attempts: u32, +} + +impl QueueData { + /// Returns a new instance of QueueData. + fn new() -> Self { + QueueData { + time_received: SystemTime::now(), + requeue_attempts: 0, + } + } + + /// Increases the requeue attempts for the request. + pub fn increase_requeues(&mut self) { + self.requeue_attempts += 1; + } + + /// Returns the duration sunce the request was received. + fn duration(&self) -> Result { + self.time_received.elapsed().map_err(RequestError::from) + } + + /// Returns the number of times the request has been requeued. + fn requeues(&self) -> u32 { + self.requeue_attempts + } +} + +/// Requests metadata either contains a return address for nym requests or a tonic MetaDataMap for gRPC requests. +#[derive(Debug, Clone)] +pub enum RequestMetaData { + /// Return address for Nym requests. + AnonSendrTag(AnonymousSenderTag), + /// Metadata for gRPC requests. + MetaDataMap(MetadataMap), +} + +impl TryFrom for AnonymousSenderTag { + type Error = RequestError; + + fn try_from(value: RequestMetaData) -> Result { + match value { + RequestMetaData::AnonSendrTag(tag) => Ok(tag), + _ => Err(RequestError::IncorrectVariant), + } + } +} + +impl TryFrom for MetadataMap { + type Error = RequestError; + + fn try_from(value: RequestMetaData) -> Result { + match value { + RequestMetaData::MetaDataMap(map) => Ok(map), + _ => Err(RequestError::IncorrectVariant), + } + } +} + +/// Nym request data. +#[derive(Debug)] +struct NymRequest { + id: u64, + method: String, + metadata: RequestMetaData, + body: Vec, +} + +/// Grpc request data. +/// TODO: Convert incoming gRPC calls to GrpcRequest before adding to queue (implement with request queue). +#[derive(Debug)] +struct GrpcRequest { + id: u64, + method: String, + metadata: RequestMetaData, + body: Vec, +} + +/// Requests originating from the Nym server. +#[derive(Debug)] +pub struct NymServerRequest { + queuedata: QueueData, + request: NymRequest, +} + +/// Requests originating from the gRPC server. +#[derive(Debug)] +pub struct GrpcServerRequest { + queuedata: QueueData, + request: GrpcRequest, +} + +/// Zingo-Proxy request, used by request queue. +#[derive(Debug)] +pub enum ZingoProxyRequest { + /// Requests originating from the Nym server. + NymServerRequest(NymServerRequest), + /// Requests originating from the gRPC server. + GrpcServerRequest(GrpcServerRequest), +} + +impl ZingoProxyRequest { + /// Creates a ZingoProxyRequest from an encoded gRPC service call, recieved by the Nym server. + pub fn new_from_nym(metadata: AnonymousSenderTag, bytes: &[u8]) -> Self { + let (id, method, body) = read_nym_request_data(bytes).unwrap(); + ZingoProxyRequest::NymServerRequest(NymServerRequest { + queuedata: QueueData::new(), + request: NymRequest { + id, + method, + metadata: RequestMetaData::AnonSendrTag(metadata), + body: body.to_vec(), + }, + }) + } + + /// Creates a ZingoProxyRequest from a gRPC service call, recieved by the gRPC server. + /// + /// TODO: implement proper functionality along with queue. + pub fn new_from_grpc(metadata: MetadataMap, bytes: &[u8]) -> Self { + ZingoProxyRequest::GrpcServerRequest(GrpcServerRequest { + queuedata: QueueData::new(), + request: GrpcRequest { + id: 0, // TODO + method: "TODO".to_string(), // TODO + metadata: RequestMetaData::MetaDataMap(metadata), + body: bytes.to_vec(), + }, + }) + } + + /// Increases the requeue attempts for the request. + pub fn increase_requeues(&mut self) { + match self { + ZingoProxyRequest::NymServerRequest(ref mut req) => req.queuedata.increase_requeues(), + ZingoProxyRequest::GrpcServerRequest(ref mut req) => req.queuedata.increase_requeues(), + } + } + + /// Returns the duration sunce the request was received. + pub fn duration(&self) -> Result { + match self { + ZingoProxyRequest::NymServerRequest(ref req) => req.queuedata.duration(), + ZingoProxyRequest::GrpcServerRequest(ref req) => req.queuedata.duration(), + } + } + + /// Returns the number of times the request has been requeued. + pub fn requeues(&self) -> u32 { + match self { + ZingoProxyRequest::NymServerRequest(ref req) => req.queuedata.requeues(), + ZingoProxyRequest::GrpcServerRequest(ref req) => req.queuedata.requeues(), + } + } + + /// Returns the client assigned id for this request, only used to construct response. + pub fn client_id(&self) -> u64 { + match self { + ZingoProxyRequest::NymServerRequest(ref req) => req.request.id, + ZingoProxyRequest::GrpcServerRequest(ref req) => req.request.id, + } + } + + /// Returns the RPC being called by the request. + pub fn method(&self) -> String { + match self { + ZingoProxyRequest::NymServerRequest(ref req) => req.request.method.clone(), + ZingoProxyRequest::GrpcServerRequest(ref req) => req.request.method.clone(), + } + } + + /// Returns request metadata including sender data. + pub fn metadata(&self) -> RequestMetaData { + match self { + ZingoProxyRequest::NymServerRequest(ref req) => req.request.metadata.clone(), + ZingoProxyRequest::GrpcServerRequest(ref req) => req.request.metadata.clone(), + } + } + + /// Returns the number of times the request has been requeued. + pub fn body(&self) -> Vec { + match self { + ZingoProxyRequest::NymServerRequest(ref req) => req.request.body.clone(), + ZingoProxyRequest::GrpcServerRequest(ref req) => req.request.body.clone(), + } + } +} diff --git a/zingo-rpc/src/rpc/nymservice.rs b/zingo-rpc/src/rpc/nymservice.rs index 1aece94..fd8cde0 100644 --- a/zingo-rpc/src/rpc/nymservice.rs +++ b/zingo-rpc/src/rpc/nymservice.rs @@ -1,46 +1,80 @@ //! Lightwallet service RPC Nym implementations. -use std::sync::Arc; +use prost::Message; +use zcash_client_backend::proto::service::compact_tx_streamer_server::CompactTxStreamer; -use http::Uri; -use tonic::Request; -use zcash_client_backend::proto::service::{RawTransaction, SendResponse}; -use zingo_netutils::GrpcConnector; +use crate::{primitives::ProxyClient, queue::request::ZingoProxyRequest}; -use crate::primitives::NymClient; +impl ProxyClient { + /// Processes gRPC requests coming from the nym server. + pub async fn process_nym_request( + &self, + request: &ZingoProxyRequest, + ) -> Result, tonic::Status> { + match request { + ZingoProxyRequest::NymServerRequest(_) => match request.method().as_str() { + "GetLightdInfo" => match prost::Message::decode(&request.body()[..]) { + Ok(input) => { + let tonic_request = tonic::Request::new(input); + let tonic_response = self.get_lightd_info(tonic_request) + .await?.into_inner(); -impl NymClient { - /// Forwards the recieved send_transaction request on to a Lightwalletd and returns the response. - pub async fn nym_send_transaction( - request: &RawTransaction, - ) -> Result> { - // TODO: Expose zproxy_port to point to actual zproxy listen port. - let zproxy_port = 8080; - let zproxy_uri = Uri::builder() - .scheme("http") - .authority(format!("localhost:{zproxy_port}")) - .path_and_query("/") - .build() - .unwrap(); - let _lwd_uri_main = Uri::builder() - .scheme("https") - .authority("eu.lightwalletd.com:443") - .path_and_query("/") - .build() - .unwrap(); - // replace zproxy_uri with lwd_uri_main to connect to mainnet: - let client = Arc::new(GrpcConnector::new(zproxy_uri)); - - let mut cmp_client = client - .get_client() - .await - .map_err(|e| format!("Error getting client: {:?}", e))?; - let grpc_request = Request::new(request.clone()); - - let response = cmp_client - .send_transaction(grpc_request) - .await - .map_err(|e| format!("Send Error: {}", e))?; - Ok(response.into_inner()) + let mut response_vec = Vec::new(); + tonic_response.encode(&mut response_vec).map_err(|e| { + tonic::Status::internal(format!( + "Failed to encode response: {}", + e + )) + })?; + Ok(response_vec) + } + Err(e) => Err(tonic::Status::internal(format!( + "Failed to decode request: {}", + e + ))), + }, + "SendTransaction" => match prost::Message::decode(&request.body()[..]) { + Ok(input) => { + let tonic_request = tonic::Request::new(input); + let tonic_response = self.send_transaction(tonic_request) + .await?.into_inner(); + let mut response_vec = Vec::new(); + tonic_response.encode(&mut response_vec).map_err(|e| { + tonic::Status::internal(format!( + "Failed to encode response: {}", + e + )) + })?; + Ok(response_vec) + } + Err(e) => Err(tonic::Status::internal(format!( + "Failed to decode request: {}", + e + ))), + }, + "get_latest_block" | + "get_block" | + "get_block_nullifiers" | + "get_block_range" | + "get_block_range_nullifiers" | + "get_transaction" | + "send_transaction" | + "get_taddress_txids" | + "get_taddress_balance" | + "get_taddress_balance_stream" | + "get_mempool_tx" | + "get_mempool_stream" | + "get_tree_state" | + "get_latest_tree_state" | + "get_subtree_roots" | + "get_address_utxos" | + "get_address_utxos_stream" | + "ping" => { + Err(tonic::Status::unimplemented("RPC not yet implemented over nym. If you require this RPC please open an issue or PR at the Zingo-Proxy github (https://github.com/zingolabs/zingo-proxy).")) + }, + _ => Err(tonic::Status::invalid_argument("Incorrect Method String")), + }, + _ => Err(tonic::Status::invalid_argument("Incorrect Request Type")), + } } } diff --git a/zingo-rpc/src/rpc/nymwalletservice.rs b/zingo-rpc/src/rpc/nymwalletservice.rs index 0be38ed..23595fe 100644 --- a/zingo-rpc/src/rpc/nymwalletservice.rs +++ b/zingo-rpc/src/rpc/nymwalletservice.rs @@ -16,8 +16,8 @@ use zcash_client_backend::proto::{ use crate::{ define_grpc_passthrough, - nym::utils::{deserialize_response, serialize_request}, primitives::{NymClient, ProxyClient}, + walletrpc::utils::{deserialize_response, serialize_request, write_nym_request_data}, }; #[async_trait] @@ -57,9 +57,8 @@ impl CompactTxStreamer for ProxyClient { &self, request: Request, ) -> Result, Status> { - println!("@zingoproxyd[nym]: Received call of send_transaction."); - - //serialize RawTransaction + println!("@zingoproxyd[nym_poc]: Received call of send_transaction."); + // -- serialize RawTransaction let serialized_request = match serialize_request(&request.into_inner()).await { Ok(data) => data, Err(e) => { @@ -69,39 +68,31 @@ impl CompactTxStreamer for ProxyClient { ))) } }; - - //print request for testing: - println!( - "@zingoproxyd[nym][TEST]: Request sent: {:?}.", - serialized_request - ); - println!( - "@zingoproxyd[nym][TEST]: Request length: {}.", - serialized_request.len() - ); - - // -- forward request over nym + // -- create ZingoProxyRequest + let nym_request = match write_nym_request_data( + 0, + "SendTransaction".to_string(), + serialized_request.as_ref(), + ) { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to write nym request data: {}", + e + ))) + } + }; + // -- forward request over nym and wait for response let args: Vec = env::args().collect(); let recipient_address: String = args[1].clone(); let nym_conf_path = "/tmp/nym_client"; let mut client = NymClient::nym_spawn(nym_conf_path).await; let response_data = client - .nym_forward(recipient_address.as_str(), serialized_request) + .nym_forward(recipient_address.as_str(), nym_request) .await .unwrap(); client.nym_close().await; - - //print response for testing - println!( - "@zingoproxyd[nym][TEST]: Response received: {:?}.", - response_data - ); - println!( - "@zingoproxyd[nym][TEST]: Response length: {}.", - response_data.len() - ); - - //deserialize SendResponse + // -- deserialize SendResponse let response: SendResponse = match deserialize_response(response_data.as_slice()).await { Ok(res) => res, Err(e) => { @@ -111,7 +102,6 @@ impl CompactTxStreamer for ProxyClient { ))) } }; - Ok(Response::new(response)) } @@ -196,12 +186,63 @@ impl CompactTxStreamer for ProxyClient { ) -> tonic::Streaming ); - define_grpc_passthrough!( - fn get_lightd_info( - &self, - request: tonic::Request, - ) -> LightdInfo - ); + // define_grpc_passthrough!( + // fn get_lightd_info( + // &self, + // request: tonic::Request, + // ) -> LightdInfo + // ); + async fn get_lightd_info( + &self, + request: Request, + ) -> Result, Status> { + println!("@zingoproxyd[nym_poc]: Received call of get_lightd_info."); + // -- serialize Empty + let serialized_request = match serialize_request(&request.into_inner()).await { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to serialize request: {}", + e + ))) + } + }; + // -- create ZingoProxyRequest + let nym_request = match write_nym_request_data( + 0, + "GetLightdInfo".to_string(), + serialized_request.as_ref(), + ) { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to write nym request data: {}", + e + ))) + } + }; + // -- forward request over nym and wait for response + let args: Vec = env::args().collect(); + let recipient_address: String = args[1].clone(); + let nym_conf_path = "/tmp/nym_client"; + let mut client = NymClient::nym_spawn(nym_conf_path).await; + let response_data = client + .nym_forward(recipient_address.as_str(), nym_request) + .await + .unwrap(); + client.nym_close().await; + // -- deserialize LightdInfo + let response: LightdInfo = match deserialize_response(response_data.as_slice()).await { + Ok(res) => res, + Err(e) => { + return Err(Status::internal(format!( + "Failed to decode response: {}", + e + ))) + } + }; + Ok(Response::new(response)) + } define_grpc_passthrough!( fn ping( diff --git a/zingo-rpc/src/walletrpc.rs b/zingo-rpc/src/walletrpc.rs new file mode 100644 index 0000000..3755a45 --- /dev/null +++ b/zingo-rpc/src/walletrpc.rs @@ -0,0 +1,4 @@ +//! Walletside nym gRPC implementations. + +pub mod service; +pub mod utils; diff --git a/zingo-rpc/src/walletrpc/service.rs b/zingo-rpc/src/walletrpc/service.rs new file mode 100644 index 0000000..989d86d --- /dev/null +++ b/zingo-rpc/src/walletrpc/service.rs @@ -0,0 +1,471 @@ +//! Wrapper implementation of LibRustZCash's CompactTXStreamerClient that also holds nym-enabled implementations. +//! +//! NOTE: Currently only send_transaction has been implemented over nym. + +use http::Uri; +use http_body::Body; +use zcash_client_backend::proto::compact_formats::{CompactBlock, CompactTx}; +use zcash_client_backend::proto::service::{ + compact_tx_streamer_client::CompactTxStreamerClient, RawTransaction, SendResponse, +}; +use zcash_client_backend::proto::service::{ + Address, AddressList, Balance, BlockId, BlockRange, ChainSpec, Duration, Empty, Exclude, + GetAddressUtxosArg, GetAddressUtxosReply, GetAddressUtxosReplyList, GetSubtreeRootsArg, + LightdInfo, PingResponse, SubtreeRoot, TransparentAddressBlockFilter, TreeState, TxFilter, +}; + +use bytes::Bytes; +use std::error::Error as StdError; +use tonic::{self, codec::CompressionEncoding, Status}; +use tonic::{service::interceptor::InterceptedService, transport::Endpoint}; + +use crate::{ + primitives::NymClient, + walletrpc::utils::{deserialize_response, serialize_request}, +}; + +use super::utils::write_nym_request_data; + +/// Wrapper struct for the Nym enabled CompactTxStreamerClient. +#[derive(Debug, Clone)] +pub struct NymTxStreamerClient { + /// LibRustZcash's CompactTxStreamerClient. + pub compact_tx_streamer_client: CompactTxStreamerClient, +} + +impl NymTxStreamerClient { + /// Attempt to create a new client by connecting to a given endpoint. + pub async fn connect(dst: D) -> Result + where + D: TryInto, + D::Error: Into> + Send + Sync + 'static, + >::Error: std::error::Error, + { + let client = CompactTxStreamerClient::connect(dst).await?; + Ok(Self { + compact_tx_streamer_client: client, + }) + } +} + +impl NymTxStreamerClient +where + T: tonic::client::GrpcService + Send + 'static, + T::Error: + std::error::Error + Into> + Send + Sync, + T::ResponseBody: Body + Send + 'static, + ::Error: + std::error::Error + Into> + Send + Sync, +{ + /// Creates a new gRPC clientwith the provided [`GrpcService`]. + pub fn new(inner: T) -> Self { + Self { + compact_tx_streamer_client: CompactTxStreamerClient::new(inner), + } + } + + /// Creates a new gRPC client with the provided [`GrpcService`] and `Uri`. + /// + /// The provided Uri will use only the scheme and authority parts as the path_and_query portion will be set for each method. + pub fn with_origin(inner: T, origin: Uri) -> Self { + Self { + compact_tx_streamer_client: CompactTxStreamerClient::with_origin(inner, origin), + } + } + + /// Creates a new service with interceptor middleware. + pub fn with_interceptor( + inner: T, + interceptor: F, + ) -> CompactTxStreamerClient> + where + F: tonic::service::Interceptor, + T::ResponseBody: Default, + T: tonic::codegen::Service< + http::Request, + Response = http::Response< + >::ResponseBody, + >, + >, + >>::Error: + Into> + Send + Sync + 'static, + >, + >>::Error: std::error::Error, + { + CompactTxStreamerClient::new(InterceptedService::new(inner, interceptor)) + } + + /// Compress requests with the given encoding. + /// + /// This requires the server to support it otherwise it might respond with an error. + #[must_use] + pub fn send_compressed(self, encoding: CompressionEncoding) -> Self { + Self { + compact_tx_streamer_client: CompactTxStreamerClient::send_compressed( + self.compact_tx_streamer_client, + encoding, + ), + } + } + + /// Enable decompressing responses. + #[must_use] + pub fn accept_compressed(self, encoding: CompressionEncoding) -> Self { + Self { + compact_tx_streamer_client: CompactTxStreamerClient::accept_compressed( + self.compact_tx_streamer_client, + encoding, + ), + } + } + + /// Limits the maximum size of a decoded message. + /// + /// Default: `4MB` + #[must_use] + pub fn max_decoding_message_size(self, limit: usize) -> Self { + Self { + compact_tx_streamer_client: CompactTxStreamerClient::max_decoding_message_size( + self.compact_tx_streamer_client, + limit, + ), + } + } + + /// Limits the maximum size of an encoded message. + /// + /// Default: `usize::MAX` + #[must_use] + pub fn max_encoding_message_size(self, limit: usize) -> Self { + Self { + compact_tx_streamer_client: CompactTxStreamerClient::max_encoding_message_size( + self.compact_tx_streamer_client, + limit, + ), + } + } + + /// Return the height of the tip of the best chain. + pub async fn get_latest_block( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_latest_block(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Return the compact block corresponding to the given block identifier. + pub async fn get_block( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_block(&mut self.compact_tx_streamer_client, request).await + } + + /// Same as GetBlock except actions contain only nullifiers. + pub async fn get_block_nullifiers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_block_nullifiers(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Return a list of consecutive compact blocks + pub async fn get_block_range( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result>, tonic::Status> + { + CompactTxStreamerClient::get_block_range(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Same as GetBlockRange except actions contain only nullifiers. + pub async fn get_block_range_nullifiers( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result>, tonic::Status> + { + CompactTxStreamerClient::get_block_range_nullifiers( + &mut self.compact_tx_streamer_client, + request, + ) + .await + } + + /// Return the requested full (not compact) transaction (as from zcashd). + pub async fn get_transaction( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_transaction(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Submit the given transaction to the Zcash network. + /// + /// If nym_addr is provided, the transaction is encoded and sent over the Nym mixnet. + pub async fn send_transaction( + &mut self, + request: impl tonic::IntoRequest, + nym_addr: Option<&str>, + ) -> std::result::Result, Status> { + match nym_addr { + Some(addr) => { + match nym_sphinx_addressing::clients::Recipient::try_from_base58_string(addr) { + Ok(_recipient) => { + let serialized_request = + match serialize_request(&request.into_request().into_inner()).await { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to serialize request: {}", + e + ))) + } + }; + let nym_request = match write_nym_request_data( + 0, + "SendTransaction".to_string(), + serialized_request.as_ref(), + ) { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to write nym request data: {}", + e + ))) + } + }; + let nym_conf_path = "/tmp/nym_client"; + let mut client = NymClient::nym_spawn(nym_conf_path).await; + let response_data = client.nym_forward(addr, nym_request).await.unwrap(); + client.nym_close().await; + let response: SendResponse = + match deserialize_response(response_data.as_slice()).await { + Ok(res) => res, + Err(e) => { + return Err(Status::internal(format!( + "Failed to decode response: {}", + e + ))) + } + }; + Ok(tonic::Response::new(response)) + } + Err(e) => { + return Err(Status::invalid_argument(format!( + "Failed to parse nym address: {}", + e + ))); + } + } + } + None => { + CompactTxStreamerClient::send_transaction( + &mut self.compact_tx_streamer_client, + request, + ) + .await + } + } + } + + /// Return the txids corresponding to the given t-address within the given block range. + pub async fn get_taddress_txids( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result>, tonic::Status> + { + CompactTxStreamerClient::get_taddress_txids(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Return the balance corresponding to the given t-address. + pub async fn get_taddress_balance( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_taddress_balance(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Return the balance corresponding to the given t-address. + /// + /// TODO: Doc comment is ambiguous, add correct information. + pub async fn get_taddress_balance_stream( + &mut self, + request: impl tonic::IntoStreamingRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_taddress_balance_stream( + &mut self.compact_tx_streamer_client, + request, + ) + .await + } + + /// Return the compact transactions currently in the mempool; the results + /// can be a few seconds out of date. If the Exclude list is empty, return + /// all transactions; otherwise return all *except* those in the Exclude list + /// (if any); this allows the client to avoid receiving transactions that it + /// already has (from an earlier call to this rpc). The transaction IDs in the + /// Exclude list can be shortened to any number of bytes to make the request + /// more bandwidth-efficient; if two or more transactions in the mempool + /// match a shortened txid, they are all sent (none is excluded). Transactions + /// in the exclude list that don't exist in the mempool are ignored. + pub async fn get_mempool_tx( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result>, tonic::Status> + { + CompactTxStreamerClient::get_mempool_tx(&mut self.compact_tx_streamer_client, request).await + } + + /// Return a stream of current Mempool transactions. This will keep the output stream open while + /// there are mempool transactions. It will close the returned stream when a new block is mined. + pub async fn get_mempool_stream( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result>, tonic::Status> + { + CompactTxStreamerClient::get_mempool_stream(&mut self.compact_tx_streamer_client, request) + .await + } + + /// GetTreeState returns the note commitment tree state corresponding to the given block. + /// See section 3.7 of the Zcash protocol specification. It returns several other useful + /// values also (even though they can be obtained using GetBlock). + /// The block can be specified by either height or hash. + pub async fn get_tree_state( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_tree_state(&mut self.compact_tx_streamer_client, request).await + } + + /// Returns the note commitment tree state. + /// + /// TODO: Doc comment is ambiguous, add correct information. + pub async fn get_latest_tree_state( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_latest_tree_state( + &mut self.compact_tx_streamer_client, + request, + ) + .await + } + + /// Returns a stream of information about roots of subtrees of the Sapling and Orchard + /// note commitment trees. + pub async fn get_subtree_roots( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result>, tonic::Status> + { + CompactTxStreamerClient::get_subtree_roots(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Returns utxos belonging to the given address. + pub async fn get_address_utxos( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::get_address_utxos(&mut self.compact_tx_streamer_client, request) + .await + } + + /// Returns stream of utxos belonging to the given address. + pub async fn get_address_utxos_stream( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result< + tonic::Response>, + tonic::Status, + > { + CompactTxStreamerClient::get_address_utxos_stream( + &mut self.compact_tx_streamer_client, + request, + ) + .await + } + + /// Return information about this lightwalletd instance and the blockchain. + pub async fn get_lightd_info( + &mut self, + request: impl tonic::IntoRequest, + nym_addr: Option<&str>, + ) -> std::result::Result, tonic::Status> { + match nym_addr { + Some(addr) => { + match nym_sphinx_addressing::clients::Recipient::try_from_base58_string(addr) { + Ok(_recipient) => { + let serialized_request = + match serialize_request(&request.into_request().into_inner()).await { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to serialize request: {}", + e + ))) + } + }; + let nym_request = match write_nym_request_data( + 0, + "GetLightdInfo".to_string(), + serialized_request.as_ref(), + ) { + Ok(data) => data, + Err(e) => { + return Err(Status::internal(format!( + "Failed to write nym request data: {}", + e + ))) + } + }; + let nym_conf_path = "/tmp/nym_client"; + let mut client = NymClient::nym_spawn(nym_conf_path).await; + let response_data = client.nym_forward(addr, nym_request).await.unwrap(); + client.nym_close().await; + let response: LightdInfo = + match deserialize_response(response_data.as_slice()).await { + Ok(res) => res, + Err(e) => { + return Err(Status::internal(format!( + "Failed to decode response: {}", + e + ))) + } + }; + Ok(tonic::Response::new(response)) + } + Err(e) => { + return Err(Status::invalid_argument(format!( + "Failed to parse nym address: {}", + e + ))); + } + } + } + None => { + CompactTxStreamerClient::get_lightd_info( + &mut self.compact_tx_streamer_client, + request, + ) + .await + } + } + } + + /// Testing-only, requires lightwalletd --ping-very-insecure (do not enable in production). + pub async fn ping( + &mut self, + request: impl tonic::IntoRequest, + ) -> std::result::Result, tonic::Status> { + CompactTxStreamerClient::ping(&mut self.compact_tx_streamer_client, request).await + } +} diff --git a/zingo-rpc/src/walletrpc/utils.rs b/zingo-rpc/src/walletrpc/utils.rs new file mode 100644 index 0000000..f08c609 --- /dev/null +++ b/zingo-rpc/src/walletrpc/utils.rs @@ -0,0 +1,36 @@ +//! Utility functions for wallet side nym code. + +use zcash_encoding::CompactSize; + +use crate::blockcache::utils::ParseError; + +/// Serialises gRPC request to a buffer. +pub async fn serialize_request( + request: &T, +) -> Result, Box> { + let mut buf = Vec::new(); + request.encode(&mut buf)?; + Ok(buf) +} + +/// Decodes gRPC request from a buffer +pub async fn deserialize_response( + data: &[u8], +) -> Result { + T::decode(data).map_err(ParseError::from) +} + +/// Prepends an encoded tonic request with metadata required by the Nym server. +/// +/// Encodes the request ID as a Zcash CompactSize [u64]. +/// Encodes the RPC method String into a Vec prepended by a Zcash CompactSize indicating its length in bytes. +pub fn write_nym_request_data(id: u64, method: String, body: &[u8]) -> Result, ParseError> { + let method_bytes = method.into_bytes(); + let mut buffer = Vec::new(); + CompactSize::write(&mut buffer, id as usize)?; + CompactSize::write(&mut buffer, method_bytes.len())?; + buffer.extend(method_bytes); + CompactSize::write(&mut buffer, body.len())?; + buffer.extend(body); + Ok(buffer) +}