diff --git a/CHANGELOG.md b/CHANGELOG.md index 48b5548..55dc5f0 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,12 @@ # Changelog +## v0.10.0 + +- core dependencies have been updated +- latest rust nightly clippy lints have been applied +- a very unlikely but possible channel panic in case of a conflict resolution has been fixed +- bump MSRV to 1.70.0 + ## 0.9.1 Typo corrections in documentation and removed now obsolete minimal-versions dependencies. diff --git a/Cargo.toml b/Cargo.toml index e5dbb56..82c7ec4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,8 @@ [package] name = "redhac" -version = "0.9.1" +version = "0.10.0" edition = "2021" -rust-version = "1.65.0" +rust-version = "1.70.0" license = "Apache-2.0 OR MIT" authors = ["Sebastian Dobe anyhow::Result<()> { .await?; info!("First cache node started"); - // Now just sleep until we ctrl + c so we can start the other members and observe the behavior + // Now just sleep until we ctrl + c, so we can start the other members and observe the behavior time::sleep(Duration::from_secs(6000)).await; // Let's simulate a graceful shutdown diff --git a/justfile b/justfile index 3ae0a2e..effdb8f 100644 --- a/justfile +++ b/justfile @@ -33,7 +33,8 @@ build: #!/usr/bin/env bash set -euxo pipefail # build as musl to make sure this works - cargo build --release --target x86_64-unknown-linux-musl + #cargo build --release --target x86_64-unknown-linux-musl + cargo build --release # verifies the MSRV @@ -43,7 +44,7 @@ msrv-verify: # find's the new MSRV, if it needs a bump msrv-find: - cargo msrv --min 1.65.0 + cargo msrv --min 1.70.0 # verify thats everything is good @@ -73,6 +74,13 @@ release: verfiy-is-clean git push origin "v$TAG" +# dry-run publishing the latest version +publish-dry: verfiy-is-clean + #!/usr/bin/env bash + set -euxo pipefail + cargo publish --dry-run + + # publishes the current version to cargo.io publish: verfiy-is-clean #!/usr/bin/env bash diff --git a/src/client.rs b/src/client.rs index 183b7de..d1f0297 100644 --- a/src/client.rs +++ b/src/client.rs @@ -356,12 +356,21 @@ async fn run_client( }); debug!("Starting the Sending Stream to the Server"); - tx_quorum + if let Err(err)= tx_quorum .send_async(QuorumReq::UpdateServer { server: server.clone(), }) - .await - .unwrap(); + .await { + // can fail in case of a conflict resolution, if the other side has just shut down + error!("tx_quorum send error: {:?}", err); + callback_handle.abort(); + time::sleep(Duration::from_millis(get_rand_between( + *RECONNECT_TIMEOUT_LOWER, + *RECONNECT_TIMEOUT_UPPER, + ))) + .await; + continue; + } // Sending Stream to the Server // the ReceiverStream only accepts an mpsc channel diff --git a/src/lib.rs b/src/lib.rs index 44df03d..21732ea 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -277,11 +277,12 @@ //! &mut cache_config_1, //! // optional notification channel: `Option>` //! None, -//! // We need to overwrite the hostname so we can start all nodes on the same host for this +//! // We need to overwrite the hostname, so we can start all nodes on the same host for this //! // example. Usually, this will be set to `None` //! Some("127.0.0.1:7001".to_string()), //! ) //! .await?; +//! time::sleep(Duration::from_millis(100)).await; //! println!("First cache node started"); //! //! // Mimic the other 2 cache members. This should usually not be done in the same code - only @@ -295,6 +296,7 @@ //! Some("127.0.0.1:7002".to_string()), //! ) //! .await?; +//! time::sleep(Duration::from_millis(100)).await; //! println!("2nd cache node started"); //! // Now after the 2nd cache member has been started, we would already have quorum and a //! // working cache layer. As long as there is no leader and / or quorum, the cache will not @@ -309,6 +311,7 @@ //! Some("127.0.0.1:7003".to_string()), //! ) //! .await?; +//! time::sleep(Duration::from_millis(100)).await; //! println!("3rd cache node started"); //! //! // For the sake of this example again, we need to wait until the cache is in a healthy @@ -1004,12 +1007,12 @@ pub(crate) async fn insert_from_leader( }; // double check, that we are really the leader - // this might get removed after enough testing, if it provides a performance benefit if health_state.state != QuorumState::Leader && health_state.state != QuorumState::LeaderSwitch { - let error = "Execution of 'insert_from_leader' is not allowed on a non-leader".to_string(); - warn!("is_leader state: {:?}", health_state.state); - // TODO remove this panic after testing + let error = format!("Execution of 'insert_from_leader' is not allowed on a non-leader: {:?}", health_state.state); + error!("{}", error); + // TODO we once ended up here during conflict resolution -> try to reproduce + // rather panic than have an inconsistent state panic!("{}", error); } @@ -1438,7 +1441,7 @@ pub async fn start_cluster( if !h.contains(&hostname) { ha_clients.push(h.trim().to_owned()) } else { - host_srv_addr = h.trim().to_owned(); + h.trim().clone_into(&mut host_srv_addr); } }); info!( diff --git a/src/quorum.rs b/src/quorum.rs index 914391b..7e9725d 100644 --- a/src/quorum.rs +++ b/src/quorum.rs @@ -270,7 +270,7 @@ pub(crate) async fn quorum_handler( state = QuorumState::Follower; health_state.state = QuorumState::Follower; - health_state.tx_leader = server.tx.clone(); + health_state.tx_leader.clone_from(&server.tx); } } @@ -294,7 +294,7 @@ pub(crate) async fn quorum_handler( state = QuorumState::Follower; health_state.state = QuorumState::Follower; - health_state.tx_leader = server.tx.clone(); + health_state.tx_leader.clone_from(&server.tx); tx_remote .send_async(RpcRequest::LeaderReqAck { diff --git a/src/rpc/cache.rs b/src/rpc/cache.rs index e8d20e8..1a88d8e 100644 --- a/src/rpc/cache.rs +++ b/src/rpc/cache.rs @@ -1,3 +1,4 @@ +// This file is @generated by prost-build. #[allow(clippy::derive_partial_eq_without_eq)] #[derive(Clone, PartialEq, ::prost::Message)] pub struct Ack { @@ -521,7 +522,7 @@ pub mod cache_server { #[async_trait] pub trait Cache: Send + Sync + 'static { /// Server streaming response type for the StreamValues method. - type StreamValuesStream: futures_core::Stream< + type StreamValuesStream: tonic::codegen::tokio_stream::Stream< Item = std::result::Result, > + Send @@ -634,7 +635,7 @@ pub mod cache_server { ) -> Self::Future { let inner = Arc::clone(&self.0); let fut = async move { - (*inner).stream_values(request).await + ::stream_values(&inner, request).await }; Box::pin(fut) } diff --git a/src/rpc/mod.rs b/src/rpc/mod.rs index a5c08fd..32f90b3 100644 --- a/src/rpc/mod.rs +++ b/src/rpc/mod.rs @@ -1 +1,2 @@ +#[allow(clippy::mixed_attributes_style)] pub mod cache;