Skip to content

Commit

Permalink
refactor: Add anyhow error handling (#339)
Browse files Browse the repository at this point in the history
* update rust dependecies

* add anyhow, refactor some functions

* wip
  • Loading branch information
Mr-Leshiy authored Mar 25, 2024
1 parent 7f08eb1 commit 1708cbd
Show file tree
Hide file tree
Showing 7 changed files with 41 additions and 41 deletions.
1 change: 1 addition & 0 deletions catalyst-gateway/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions catalyst-gateway/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,11 @@ tokio = "1"
dotenvy = "0.15"
local-ip-address = "0.5.7"
gethostname = "0.4.3"
hex = "0.4.3"
async-recursion = "1.0.5"
pallas = "0.23.0"
anyhow = "1.0.71"
cardano-chain-follower= { git = "https://github.com/input-output-hk/hermes.git", version="0.0.1" }

[workspace.lints.rust]
warnings = "deny"
Expand Down
26 changes: 7 additions & 19 deletions catalyst-gateway/bin/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,25 @@ repository.workspace = true
workspace = true

[dependencies]

bb8 = { workspace = true }
bb8-postgres = { workspace = true }
tokio-postgres = { workspace = true, features = [
"with-chrono-0_4",
"with-serde_json-1",
"with-time-0_3",
] }

clap = { workspace = true, features = ["derive", "env"] }
tracing = { workspace = true, features = ["log"] }
tracing-subscriber = { workspace = true, features = ["fmt", "json", "time"] }

serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }

tokio = { workspace = true, features = ["rt", "macros", "rt-multi-thread"] }
thiserror = { workspace = true }

rust_decimal = { workspace = true, features = [
"serde-with-float",
"db-tokio-postgres",
] }

chrono = { workspace = true }

poem = { workspace = true, features = [
"embed",
"prometheus",
Expand All @@ -56,8 +49,6 @@ poem-openapi = { workspace = true, features = [
"chrono",
] }
poem-extensions = { workspace = true }

# Metrics - Poem
prometheus = { workspace = true }
cryptoxide = { workspace = true }
uuid = { workspace = true, features = ["v4", "serde"] }
Expand All @@ -68,13 +59,10 @@ panic-message = { workspace = true }
cpu-time = { workspace = true }
ulid = { workspace = true, features = ["serde", "uuid"] }
rust-embed = { workspace = true }
local-ip-address.workspace = true
gethostname.workspace = true

hex = "0.4.3"
async-recursion = "1.0.5"


cardano-chain-follower= { git = "https://github.com/input-output-hk/hermes.git", version="0.0.1"}

pallas = { version = "0.23.0" }
local-ip-address = { workspace = true }
gethostname = { workspace = true }
hex = { workspace = true }
async-recursion = { workspace = true }
pallas = { workspace = true }
cardano-chain-follower= { workspace = true }
anyhow = { workspace = true }
2 changes: 1 addition & 1 deletion catalyst-gateway/bin/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl Cli {
/// - Failed to initialize the logger with the specified log level.
/// - Failed to create a new `State` with the provided database URL.
/// - Failed to run the service on the specified address.
pub(crate) async fn exec(self) -> Result<(), Box<dyn std::error::Error>> {
pub(crate) async fn exec(self) -> anyhow::Result<()> {
match self {
Self::Run(settings) => {
logger::init(settings.log_level)?;
Expand Down
29 changes: 18 additions & 11 deletions catalyst-gateway/bin/src/follower.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
//! Logic for orchestrating followers
use std::{error::Error, path::PathBuf, str::FromStr, sync::Arc};
use std::{path::PathBuf, str::FromStr, sync::Arc};

/// Handler for follower tasks, allows for control over spawned follower threads
pub type ManageTasks = JoinHandle<()>;
Expand Down Expand Up @@ -30,7 +30,7 @@ const DATA_NOT_STALE: i64 = 1;
pub(crate) async fn start_followers(
configs: (Vec<NetworkMeta>, FollowerMeta), db: Arc<EventDB>, data_refresh_tick: u64,
check_config_tick: u64, machine_id: String,
) -> Result<(), Box<dyn Error>> {
) -> anyhow::Result<()> {
// spawn followers and obtain thread handlers for control and future cancellation
let follower_tasks = spawn_followers(
configs.clone(),
Expand Down Expand Up @@ -76,7 +76,7 @@ pub(crate) async fn start_followers(
)
.await?;
},
None => return Err("Config has been deleted...".into()),
None => return Err(anyhow::anyhow!("Config has been deleted...")),
}

Ok(())
Expand All @@ -86,7 +86,7 @@ pub(crate) async fn start_followers(
async fn spawn_followers(
configs: (Vec<NetworkMeta>, FollowerMeta), db: Arc<EventDB>, data_refresh_tick: u64,
machine_id: String,
) -> Result<Vec<ManageTasks>, Box<dyn Error>> {
) -> anyhow::Result<Vec<ManageTasks>> {
let snapshot_path = configs.1.mithril_snapshot_path;

let mut follower_tasks = Vec::new();
Expand Down Expand Up @@ -151,7 +151,7 @@ async fn spawn_followers(
/// it left off. If there was no previous follower, start indexing from genesis point.
async fn find_last_update_point(
db: Arc<EventDB>, network: &String,
) -> Result<(Option<SlotNumber>, Option<BlockHash>, Option<BlockTime>), Box<dyn Error>> {
) -> anyhow::Result<(Option<SlotNumber>, Option<BlockHash>, Option<BlockTime>)> {
let (slot_no, block_hash, last_updated) =
match db.last_updated_metadata(network.to_string()).await {
Ok((slot_no, block_hash, last_updated)) => {
Expand All @@ -176,11 +176,11 @@ async fn find_last_update_point(
async fn init_follower(
network: Network, relay: &str, start_from: (Option<SlotNumber>, Option<BlockHash>),
db: Arc<EventDB>, machine_id: MachineId, snapshot: &str,
) -> Result<ManageTasks, Box<dyn Error>> {
) -> anyhow::Result<ManageTasks> {
let mut follower = follower_connection(start_from, snapshot, network, relay).await?;

let genesis_values =
network_genesis_values(&network).ok_or("Obtaining genesis values failed")?;
let genesis_values = network_genesis_values(&network)
.ok_or(anyhow::anyhow!("Obtaining genesis values failed"))?;

let task = tokio::spawn(async move {
loop {
Expand Down Expand Up @@ -310,7 +310,7 @@ async fn init_follower(
async fn follower_connection(
start_from: (Option<SlotNumber>, Option<BlockHash>), snapshot: &str, network: Network,
relay: &str,
) -> Result<Follower, Box<dyn Error>> {
) -> anyhow::Result<Follower> {
let mut follower_cfg = if start_from.0.is_none() || start_from.1.is_none() {
// start from genesis, no previous followers, hence no starting points.
FollowerConfigBuilder::default()
Expand All @@ -320,8 +320,15 @@ async fn follower_connection(
// start from given point
FollowerConfigBuilder::default()
.follow_from(Point::new(
start_from.0.ok_or("Slot number not present")?.try_into()?,
hex::decode(start_from.1.ok_or("Block Hash not present")?)?,
start_from
.0
.ok_or(anyhow::anyhow!("Slot number not present"))?
.try_into()?,
hex::decode(
start_from
.1
.ok_or(anyhow::anyhow!("Block Hash not present"))?,
)?,
))
.mithril_snapshot_path(PathBuf::from(snapshot))
.build()
Expand Down
7 changes: 4 additions & 3 deletions catalyst-gateway/bin/src/logger.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//! Setup for logging for the service.
use clap::ValueEnum;
use tracing::{level_filters::LevelFilter, subscriber::SetGlobalDefaultError};
use tracing::level_filters::LevelFilter;
use tracing_subscriber::{
fmt::{format::FmtSpan, time},
FmtSubscriber,
Expand Down Expand Up @@ -45,7 +45,7 @@ impl From<LogLevel> for tracing::log::LevelFilter {
}

/// Initialize the tracing subscriber
pub(crate) fn init(log_level: LogLevel) -> Result<(), SetGlobalDefaultError> {
pub(crate) fn init(log_level: LogLevel) -> anyhow::Result<()> {
let subscriber = FmtSubscriber::builder()
.json()
.with_max_level(LevelFilter::from_level(log_level.into()))
Expand All @@ -64,5 +64,6 @@ pub(crate) fn init(log_level: LogLevel) -> Result<(), SetGlobalDefaultError> {
// Logging is globally disabled by default, so globally enable it to the required level.
tracing::log::set_max_level(log_level.into());

tracing::subscriber::set_global_default(subscriber)
tracing::subscriber::set_global_default(subscriber)?;
Ok(())
}
12 changes: 5 additions & 7 deletions catalyst-gateway/bin/src/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
//! Block stream parsing and filtering utils
use std::error::Error;

use cryptoxide::{blake2b::Blake2b, digest::Digest};
use pallas::ledger::{
primitives::conway::{StakeCredential, VKeyWitness},
Expand Down Expand Up @@ -120,7 +118,7 @@ pub fn extract_stake_credentials_from_certs(
/// except for credentials (i.e. keys or scripts) which are 28-byte long (or 224 bits)
pub fn extract_hashed_witnesses(
witnesses: &[VKeyWitness],
) -> Result<Vec<(WitnessPubKey, WitnessHash)>, Box<dyn Error>> {
) -> anyhow::Result<Vec<(WitnessPubKey, WitnessHash)>> {
let mut hashed_witnesses = Vec::new();
for witness in witnesses {
let pub_key_bytes: [u8; 32] = witness.vkey.as_slice().try_into()?;
Expand All @@ -141,7 +139,7 @@ pub fn extract_hashed_witnesses(
/// to identify the correct stake credential key.
pub fn find_matching_stake_credential(
witnesses: &[(WitnessPubKey, WitnessHash)], stake_credentials: &[String],
) -> Result<(StakeCredentialKey, StakeCredentialHash), Box<dyn Error>> {
) -> anyhow::Result<(StakeCredentialKey, StakeCredentialHash)> {
stake_credentials
.iter()
.zip(witnesses.iter())
Expand All @@ -152,7 +150,7 @@ pub fn find_matching_stake_credential(
None
}
})
.ok_or(
"No stake credential from the certificates matches any of the witness pub keys".into(),
)
.ok_or(anyhow::anyhow!(
"No stake credential from the certificates matches any of the witness pub keys"
))
}

0 comments on commit 1708cbd

Please sign in to comment.