diff --git a/Cargo.lock b/Cargo.lock index d07dae3..4699186 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2248,6 +2248,29 @@ dependencies = [ "windows-sys 0.59.0", ] +[[package]] +name = "lazy-regex" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d8e41c97e6bc7ecb552016274b99fbb5d035e8de288c582d9b933af6677bfda" +dependencies = [ + "lazy-regex-proc_macros", + "once_cell", + "regex", +] + +[[package]] +name = "lazy-regex-proc_macros" +version = "3.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76e1d8b05d672c53cb9c7b920bbba8783845ae4f0b076e02a3db1d02c81b4163" +dependencies = [ + "proc-macro2", + "quote", + "regex", + "syn 2.0.87", +] + [[package]] name = "lazy_static" version = "1.5.0" @@ -5397,6 +5420,7 @@ dependencies = [ "futures", "hex", "http 1.1.0", + "lazy-regex", "prost", "thiserror", "tokio", diff --git a/zaino-fetch/src/jsonrpc/connector.rs b/zaino-fetch/src/jsonrpc/connector.rs index ef91566..600ec70 100644 --- a/zaino-fetch/src/jsonrpc/connector.rs +++ b/zaino-fetch/src/jsonrpc/connector.rs @@ -135,8 +135,8 @@ impl JsonRpcConnector { } if !status.is_success() { return Err(JsonRpcConnectorError::new(format!( - "Error: Error status from node's rpc server: {}", - status + "Error: Error status from node's rpc server: {}, {}", + status, body_str ))); } @@ -190,7 +190,7 @@ impl JsonRpcConnector { &self, addresses: Vec, ) -> Result { - let params = vec![serde_json::to_value(addresses)?]; + let params = vec![serde_json::json!({ "addresses": addresses })]; self.send_request("getaddressbalance", params).await } diff --git a/zaino-serve/Cargo.toml b/zaino-serve/Cargo.toml index 9a8e132..352f361 100644 --- a/zaino-serve/Cargo.toml +++ b/zaino-serve/Cargo.toml @@ -48,6 +48,7 @@ tokio-stream = "0.1" futures = "0.3.30" async-stream = "0.3" crossbeam-channel = "0.5" +lazy-regex = "3.3" [build-dependencies] whoami = "1.5" diff --git a/zaino-serve/src/rpc/service.rs b/zaino-serve/src/rpc/service.rs index eda1cbb..47b8d95 100644 --- a/zaino-serve/src/rpc/service.rs +++ b/zaino-serve/src/rpc/service.rs @@ -1,5 +1,6 @@ //! Lightwallet service RPC implementations. +use futures::StreamExt; use hex::FromHex; use tokio::time::timeout; use tokio_stream::wrappers::ReceiverStream; @@ -24,6 +25,21 @@ use zaino_proto::proto::{ }, }; +/// T Address Regex +static TADDR_REGEX: lazy_regex::Lazy = + lazy_regex::lazy_regex!(r"^t[a-zA-Z0-9]{34}$"); + +/// Checks for valid t Address. +/// +/// Returns Some(taddress) if address is valid else none. +fn check_taddress(taddr: &str) -> Option<&str> { + if TADDR_REGEX.is_match(taddr) { + Some(taddr) + } else { + None + } +} + /// Stream of RawTransactions, output type of get_taddress_txids. pub struct RawTransactionStream { inner: ReceiverStream>, @@ -797,11 +813,10 @@ impl CompactTxStreamer for GrpcClient { }) } - /// This RPC has not been implemented as it is not currently used by zingolib. - /// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer). + /// Returns the total balance for a list of taddrs fn get_taddress_balance<'life0, 'async_trait>( &'life0 self, - _request: tonic::Request, + request: tonic::Request, ) -> core::pin::Pin< Box< dyn core::future::Future< @@ -816,17 +831,39 @@ impl CompactTxStreamer for GrpcClient { { println!("[TEST] Received call of get_taddress_balance."); Box::pin(async { - Err(tonic::Status::unimplemented("get_taddress_balance not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).")) + let zebrad_client = JsonRpcConnector::new( + self.zebrad_uri.clone(), + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await?; + let taddrs = request.into_inner().addresses; + if !taddrs.iter().all(|taddr| check_taddress(taddr).is_some()) { + return Err(tonic::Status::invalid_argument( + "Error: One or more invalid taddresses given.", + )); + } + let balance = zebrad_client.get_address_balance(taddrs).await?; + let checked_balance: i64 = match i64::try_from(balance.balance) { + Ok(balance) => balance, + Err(_) => { + return Err(tonic::Status::unknown( + "Error: Error converting balance from u64 to i64.", + )); + } + }; + Ok(tonic::Response::new(Balance { + value_zat: checked_balance, + })) }) } - /// This RPC has not been implemented as it is not currently used by zingolib. - /// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer). + /// Returns the total balance for a list of taddrs #[must_use] #[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)] fn get_taddress_balance_stream<'life0, 'async_trait>( &'life0 self, - _request: tonic::Request>, + request: tonic::Request>, ) -> ::core::pin::Pin< Box< dyn ::core::future::Future, tonic::Status>> @@ -840,7 +877,96 @@ impl CompactTxStreamer for GrpcClient { { println!("[TEST] Received call of get_taddress_balance_stream."); Box::pin(async { - Err(tonic::Status::unimplemented("get_taddress_balance_stream not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).")) + let zebrad_client = JsonRpcConnector::new( + self.zebrad_uri.clone(), + Some("xxxxxx".to_string()), + Some("xxxxxx".to_string()), + ) + .await?; + let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::(32); + let fetcher_task_handle = tokio::spawn(async move { + let fetcher_timeout = timeout(std::time::Duration::from_secs(30), async { + let mut total_balance: u64 = 0; + loop { + match channel_rx.recv().await { + Some(taddr) => { + if check_taddress(taddr.as_str()).is_some() { + let balance = + zebrad_client.get_address_balance(vec![taddr]).await?; + total_balance += balance.balance; + } else { + return Err(tonic::Status::invalid_argument( + "Error: One or more invalid taddresses given.", + )); + } + } + None => { + return Ok(total_balance); + } + } + } + }) + .await; + match fetcher_timeout { + Ok(result) => result, + Err(_) => Err(tonic::Status::deadline_exceeded( + "Error: get_taddress_balance_stream request timed out.", + )), + } + }); + let addr_recv_timeout = timeout(std::time::Duration::from_secs(30), async { + let mut address_stream = request.into_inner(); + while let Some(address_result) = address_stream.next().await { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + let address = address_result.map_err(|e| { + tonic::Status::unknown(format!("Failed to read from stream: {}", e)) + })?; + if channel_tx.send(address.address).await.is_err() { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + return Err(tonic::Status::unknown( + "Error: Failed to send address to balance task.", + )); + } + } + drop(channel_tx); + Ok::<(), tonic::Status>(()) + }) + .await; + match addr_recv_timeout { + Ok(Ok(())) => {} + Ok(Err(e)) => { + fetcher_task_handle.abort(); + return Err(e); + } + Err(_) => { + fetcher_task_handle.abort(); + return Err(tonic::Status::deadline_exceeded( + "Error: get_taddress_balance_stream request timed out in address loop.", + )); + } + } + match fetcher_task_handle.await { + Ok(Ok(total_balance)) => { + let checked_balance: i64 = match i64::try_from(total_balance) { + Ok(balance) => balance, + Err(_) => { + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + return Err(tonic::Status::unknown( + "Error: Error converting balance from u64 to i64.", + )); + } + }; + Ok(tonic::Response::new(Balance { + value_zat: checked_balance, + })) + } + Ok(Err(e)) => Err(e), + // TODO: Hide server error from clients before release. Currently useful for dev purposes. + Err(e) => Err(tonic::Status::unknown(format!( + "Fetcher Task failed: {}", + e + ))), + } }) }