Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement Address balance rpcs #97

Merged
merged 4 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 24 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions zaino-fetch/src/jsonrpc/connector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -135,8 +135,8 @@ impl JsonRpcConnector {
}
if !status.is_success() {
return Err(JsonRpcConnectorError::new(format!(
"Error: Error status from node's rpc server: {}",
status
"Error: Error status from node's rpc server: {}, {}",
status, body_str
)));
}

Expand Down Expand Up @@ -190,7 +190,7 @@ impl JsonRpcConnector {
&self,
addresses: Vec<String>,
) -> Result<GetBalanceResponse, JsonRpcConnectorError> {
let params = vec![serde_json::to_value(addresses)?];
let params = vec![serde_json::json!({ "addresses": addresses })];
self.send_request("getaddressbalance", params).await
}

Expand Down
1 change: 1 addition & 0 deletions zaino-serve/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ tokio-stream = "0.1"
futures = "0.3.30"
async-stream = "0.3"
crossbeam-channel = "0.5"
lazy-regex = "3.3"

[build-dependencies]
whoami = "1.5"
142 changes: 134 additions & 8 deletions zaino-serve/src/rpc/service.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Lightwallet service RPC implementations.

use futures::StreamExt;
use hex::FromHex;
use tokio::time::timeout;
use tokio_stream::wrappers::ReceiverStream;
Expand All @@ -24,6 +25,21 @@ use zaino_proto::proto::{
},
};

/// T Address Regex
static TADDR_REGEX: lazy_regex::Lazy<lazy_regex::regex::Regex> =
lazy_regex::lazy_regex!(r"^t[a-zA-Z0-9]{34}$");

/// Checks for valid t Address.
///
/// Returns Some(taddress) if address is valid else none.
fn check_taddress(taddr: &str) -> Option<&str> {
if TADDR_REGEX.is_match(taddr) {
Some(taddr)
} else {
None
}
}

/// Stream of RawTransactions, output type of get_taddress_txids.
pub struct RawTransactionStream {
inner: ReceiverStream<Result<RawTransaction, tonic::Status>>,
Expand Down Expand Up @@ -797,11 +813,10 @@ impl CompactTxStreamer for GrpcClient {
})
}

/// This RPC has not been implemented as it is not currently used by zingolib.
/// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).
/// Returns the total balance for a list of taddrs
fn get_taddress_balance<'life0, 'async_trait>(
&'life0 self,
_request: tonic::Request<AddressList>,
request: tonic::Request<AddressList>,
) -> core::pin::Pin<
Box<
dyn core::future::Future<
Expand All @@ -816,17 +831,39 @@ impl CompactTxStreamer for GrpcClient {
{
println!("[TEST] Received call of get_taddress_balance.");
Box::pin(async {
Err(tonic::Status::unimplemented("get_taddress_balance not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer)."))
let zebrad_client = JsonRpcConnector::new(
self.zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await?;
let taddrs = request.into_inner().addresses;
if !taddrs.iter().all(|taddr| check_taddress(taddr).is_some()) {
Oscar-Pepper marked this conversation as resolved.
Show resolved Hide resolved
return Err(tonic::Status::invalid_argument(
"Error: One or more invalid taddresses given.",
));
}
let balance = zebrad_client.get_address_balance(taddrs).await?;
let checked_balance: i64 = match i64::try_from(balance.balance) {
Ok(balance) => balance,
Err(_) => {
return Err(tonic::Status::unknown(
"Error: Error converting balance from u64 to i64.",
));
}
};
Ok(tonic::Response::new(Balance {
value_zat: checked_balance,
}))
})
}

/// This RPC has not been implemented as it is not currently used by zingolib.
/// If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer).
/// Returns the total balance for a list of taddrs
#[must_use]
#[allow(clippy::type_complexity, clippy::type_repetition_in_bounds)]
fn get_taddress_balance_stream<'life0, 'async_trait>(
&'life0 self,
_request: tonic::Request<tonic::Streaming<Address>>,
request: tonic::Request<tonic::Streaming<Address>>,
) -> ::core::pin::Pin<
Box<
dyn ::core::future::Future<Output = Result<tonic::Response<Balance>, tonic::Status>>
Expand All @@ -840,7 +877,96 @@ impl CompactTxStreamer for GrpcClient {
{
println!("[TEST] Received call of get_taddress_balance_stream.");
Box::pin(async {
Err(tonic::Status::unimplemented("get_taddress_balance_stream not yet implemented. If you require this RPC please open an issue or PR at the Zingo-Indexer github (https://github.com/zingolabs/zingo-indexer)."))
let zebrad_client = JsonRpcConnector::new(
self.zebrad_uri.clone(),
Some("xxxxxx".to_string()),
Some("xxxxxx".to_string()),
)
.await?;
let (channel_tx, mut channel_rx) = tokio::sync::mpsc::channel::<String>(32);
let fetcher_task_handle = tokio::spawn(async move {
let fetcher_timeout = timeout(std::time::Duration::from_secs(30), async {
Oscar-Pepper marked this conversation as resolved.
Show resolved Hide resolved
let mut total_balance: u64 = 0;
loop {
match channel_rx.recv().await {
Some(taddr) => {
if check_taddress(taddr.as_str()).is_some() {
let balance =
zebrad_client.get_address_balance(vec![taddr]).await?;
total_balance += balance.balance;
} else {
return Err(tonic::Status::invalid_argument(
"Error: One or more invalid taddresses given.",
));
}
}
None => {
return Ok(total_balance);
}
}
}
})
.await;
match fetcher_timeout {
Ok(result) => result,
Err(_) => Err(tonic::Status::deadline_exceeded(
"Error: get_taddress_balance_stream request timed out.",
)),
}
});
let addr_recv_timeout = timeout(std::time::Duration::from_secs(30), async {
let mut address_stream = request.into_inner();
while let Some(address_result) = address_stream.next().await {
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
let address = address_result.map_err(|e| {
tonic::Status::unknown(format!("Failed to read from stream: {}", e))
})?;
if channel_tx.send(address.address).await.is_err() {
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
return Err(tonic::Status::unknown(
"Error: Failed to send address to balance task.",
));
}
}
drop(channel_tx);
Ok::<(), tonic::Status>(())
})
.await;
match addr_recv_timeout {
Ok(Ok(())) => {}
Ok(Err(e)) => {
fetcher_task_handle.abort();
Oscar-Pepper marked this conversation as resolved.
Show resolved Hide resolved
return Err(e);
}
Err(_) => {
fetcher_task_handle.abort();
return Err(tonic::Status::deadline_exceeded(
"Error: get_taddress_balance_stream request timed out in address loop.",
));
}
}
match fetcher_task_handle.await {
Ok(Ok(total_balance)) => {
let checked_balance: i64 = match i64::try_from(total_balance) {
Ok(balance) => balance,
Err(_) => {
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
return Err(tonic::Status::unknown(
"Error: Error converting balance from u64 to i64.",
));
}
};
Ok(tonic::Response::new(Balance {
value_zat: checked_balance,
}))
}
Ok(Err(e)) => Err(e),
// TODO: Hide server error from clients before release. Currently useful for dev purposes.
Err(e) => Err(tonic::Status::unknown(format!(
"Fetcher Task failed: {}",
e
))),
}
})
}

Expand Down
Loading