Skip to content

Commit

Permalink
further refinements
Browse files Browse the repository at this point in the history
  • Loading branch information
hansieodendaal committed Nov 7, 2024
1 parent f4e4304 commit 2f404b6
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,7 @@ impl WalletEventMonitor {
result = transaction_service_events.recv() => {
match result {
Ok(msg) => {
trace!(
target: LOG_TARGET,
"Wallet Event Monitor received wallet transaction service event {:?}",
msg
);
trace!(target: LOG_TARGET, "Wallet transaction service event {:?}", msg);
self.app_state_inner.write().await.add_event(EventListItem{
event_type: "TransactionEvent".to_string(),
desc: (*msg).to_string()
Expand Down Expand Up @@ -184,27 +180,17 @@ impl WalletEventMonitor {
}
},
Ok(_) = connectivity_status.changed() => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet connectivity status changed");
trace!(
target: LOG_TARGET,
"Wallet connectivity status changed to {:?}",
connectivity_status.borrow().clone()
);
self.trigger_peer_state_refresh().await;
},
// Ok(_) = software_update_notif.changed() => {
// trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet auto update status changed");
// let update = software_update_notif.borrow().as_ref().cloned();
// if let Some(update) = update {
// self.add_notification(format!(
// "Version {} of the {} is available: {} (sha: {})",
// update.version(),
// update.app(),
// update.download_url(),
// update.to_hash_hex()
// )).await;
// }
// },
result = network_events.recv() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received wallet connectivity event {:?}", msg
);
result = network_events.recv() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet connectivity event {:?}", msg);
match msg {
NetworkEvent::PeerConnected{..} |
NetworkEvent::PeerDisconnected{..} => {
Expand Down Expand Up @@ -244,14 +230,16 @@ impl WalletEventMonitor {
_ = base_node_changed.changed() => {
let peer = base_node_changed.borrow().as_ref().cloned();
if let Some(peer) = peer {
self.trigger_base_node_peer_refresh(peer.get_current_peer().clone()).await;
let current_peer = peer.get_current_peer().clone();
trace!(target: LOG_TARGET, "Base node changed to '{}'", current_peer.peer_id());
self.trigger_base_node_peer_refresh(current_peer).await;
self.trigger_balance_refresh();
}
}
result = base_node_events.recv() => {
match result {
Ok(msg) => {
trace!(target: LOG_TARGET, "Wallet Event Monitor received base node event {:?}", msg);
trace!(target: LOG_TARGET, "Base node event {:?}", msg);
if let BaseNodeEvent::BaseNodeStateChanged(state) = (*msg).clone() {
self.trigger_base_node_state_refresh(state).await;
}
Expand Down
2 changes: 1 addition & 1 deletion base_layer/contacts/src/contacts_service/handle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ impl Display for ContactsLivenessData {
f,
"Liveness event '{}' for contact {} ({}) {}",
self.message_type,
self.address,
self.address.to_hex(),
self.peer_id,
if let Some(time) = self.last_seen {
let local_time = DateTime::<Local>::from_naive_utc_and_offset(time, Local::now().offset().to_owned())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,19 @@
use std::{
fmt::Display,
sync::{atomic, atomic::AtomicUsize, Arc},
time::{Duration, Instant},
};

use tari_network::{identity::PeerId, Peer};

use crate::connectivity_service::WalletConnectivityError;

/// The selected peer is a current base node and an optional list of backup peers.
#[derive(Clone)]
#[derive(Clone, Debug)]
pub struct BaseNodePeerManager {
// The current base node that the wallet is connected to
current_peer_index: Arc<AtomicUsize>,
// The other base nodes that the wallet can connect to if the selected peer is not available
peer_list: Arc<Vec<Peer>>,
local_last_connection_attempt: Option<Instant>,
}

impl BaseNodePeerManager {
Expand All @@ -57,7 +55,6 @@ impl BaseNodePeerManager {
Ok(Self {
current_peer_index: Arc::new(AtomicUsize::new(preferred_peer_index)),
peer_list: Arc::new(peer_list),
local_last_connection_attempt: None,
})
}

Expand All @@ -66,13 +63,6 @@ impl BaseNodePeerManager {
self.get_current_peer().peer_id()
}

pub fn select_next_peer_if_attempted(&mut self) -> &Peer {
if self.time_since_last_connection_attempt().is_some() {
self.select_next_peer();
}
self.get_current_peer()
}

/// Get the current peer.
pub fn get_current_peer(&self) -> &Peer {
self.peer_list
Expand All @@ -84,10 +74,6 @@ impl BaseNodePeerManager {
/// Changes to the next peer in the list, returning that peer
pub fn select_next_peer(&mut self) -> &Peer {
self.set_current_peer_index((self.current_peer_index() + 1) % self.peer_list.len());
if self.peer_list.len() > 1 {
// Reset the last attempt since we've moved onto another peer
self.local_last_connection_attempt = None;
}
&self.peer_list[self.current_peer_index()]
}

Expand All @@ -100,16 +86,6 @@ impl BaseNodePeerManager {
(self.current_peer_index(), &self.peer_list)
}

/// Set the last connection attempt stats
pub fn set_last_connection_attempt(&mut self) {
self.local_last_connection_attempt = Some(Instant::now());
}

/// Get the last connection attempt for the current peer
pub fn time_since_last_connection_attempt(&self) -> Option<Duration> {
self.local_last_connection_attempt.as_ref().map(|t| t.elapsed())
}

fn set_current_peer_index(&self, index: usize) {
self.current_peer_index.store(index, atomic::Ordering::SeqCst);
}
Expand All @@ -121,15 +97,10 @@ impl BaseNodePeerManager {

impl Display for BaseNodePeerManager {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
let last_connection_attempt = match self.time_since_last_connection_attempt() {
Some(stats) => format!("{:?}", stats.as_secs()),
None => "Never".to_string(),
};
write!(
f,
"BaseNodePeerManager {{ current index: {}, last attempt (s): {}, peer list: {} entries }}",
"BaseNodePeerManager {{ current index: {}, peer list: {} entries }}",
self.current_peer_index(),
last_connection_attempt,
self.peer_list.len()
)
}
Expand Down
36 changes: 9 additions & 27 deletions base_layer/wallet/src/connectivity_service/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ pub struct WalletConnectivityService {
current_pool: Option<ClientPoolContainer>,
online_status_watch: Watch<OnlineStatus>,
pending_requests: Vec<ReplyOneshot>,
last_attempted_peer: Option<PeerId>,
}

struct ClientPoolContainer {
Expand Down Expand Up @@ -100,6 +101,7 @@ impl WalletConnectivityService {
current_pool: None,
pending_requests: Vec::new(),
online_status_watch,
last_attempted_peer: None,
}
}

Expand Down Expand Up @@ -297,21 +299,14 @@ impl WalletConnectivityService {
}

async fn setup_base_node_connection(&mut self, mut peer_manager: BaseNodePeerManager) {
let mut peer = peer_manager.select_next_peer_if_attempted().clone();
let mut peer = if self.last_attempted_peer.is_some() {
peer_manager.select_next_peer().clone()
} else {
peer_manager.get_current_peer().clone()
};

loop {
self.set_online_status(OnlineStatus::Connecting);
let maybe_last_attempt = peer_manager.time_since_last_connection_attempt();

debug!(
target: LOG_TARGET,
"Attempting to connect to base node peer '{}'... (last attempt {:?})",
peer,
maybe_last_attempt
);

peer_manager.set_last_connection_attempt();

match self.try_setup_rpc_pool(&peer).await {
Ok(true) => {
self.base_node_watch.send(Some(peer_manager.clone()));
Expand All @@ -328,20 +323,9 @@ impl WalletConnectivityService {
target: LOG_TARGET,
"The peer has changed while connecting. Attempting to connect to new base node."
);

// NOTE: we do not strictly need to update our local copy of BaseNodePeerManager since state is
// atomically shared. However, since None is a possibility (although in practice
// it should never be) we handle that here.
peer_manager = match self.get_base_node_peer_manager() {
Some(pm) => pm,
None => {
warn!(target: LOG_TARGET, "⚠️ NEVER HAPPEN: Base node peer manager set to None while connecting");
return;
},
};
self.disconnect_base_node(peer.peer_id()).await;
self.set_online_status(OnlineStatus::Offline);
continue;
return;
},
Err(WalletConnectivityError::DialError(DialError::Aborted)) => {
debug!(target: LOG_TARGET, "Dial was cancelled.");
Expand All @@ -364,9 +348,6 @@ impl WalletConnectivityService {
CONNECTIVITY_WAIT.as_secs()
);
time::sleep(CONNECTIVITY_WAIT).await;
} else {
// Ensure that all services are aware of the next peer being attempted
self.base_node_watch.mark_changed();
}
peer = next_peer;
}
Expand All @@ -380,6 +361,7 @@ impl WalletConnectivityService {
}

async fn try_setup_rpc_pool(&mut self, peer: &Peer) -> Result<bool, WalletConnectivityError> {
self.last_attempted_peer = Some(peer.peer_id());
let peer_id = peer.peer_id();
let dial_wait = self
.network_handle
Expand Down

0 comments on commit 2f404b6

Please sign in to comment.