Skip to content

Commit

Permalink
Merge pull request #11 from sebadob/prepare-v0.10.0
Browse files Browse the repository at this point in the history
Prepare v0.10.0
  • Loading branch information
sebadob authored Apr 9, 2024
2 parents 395f7fd + c291478 commit 4b7e9db
Show file tree
Hide file tree
Showing 11 changed files with 57 additions and 21 deletions.
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# Changelog

## v0.10.0

- core dependencies have been updated
- latest rust nightly clippy lints have been applied
- a very unlikely but possible channel panic in case of a conflict resolution has been fixed
- bump MSRV to 1.70.0

## 0.9.1

Typo corrections in documentation and removed now obsolete minimal-versions dependencies.
Expand Down
10 changes: 5 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
[package]
name = "redhac"
version = "0.9.1"
version = "0.10.0"
edition = "2021"
rust-version = "1.65.0"
rust-version = "1.70.0"
license = "Apache-2.0 OR MIT"
authors = ["Sebastian Dobe <[email protected]"]
categories = ["caching"]
Expand All @@ -15,7 +15,7 @@ repository = "https://github.com/sebadob/redhac"
anyhow = "1.0"
async-stream = "0.3"
bincode = "1"
cached = { version = "0.46", features = ["async", "async_tokio_rt_multi_thread"] }
cached = { version = "0.49.3", features = ["async", "async_tokio_rt_multi_thread"] }
chrono = { version = "0.4.31", default-features = false, features = ["clock", "serde", "std"] }
ctrlc = { version = "3", features = ["termination"] }
dotenvy = "0.15"
Expand All @@ -30,7 +30,7 @@ rand = "0.8"
serde = { version = "1", features = ["derive"] }
tokio = { version = "1.21.2", features = ["full"] }
tokio-stream = "0.1.5"
tonic = { version = "0.10.2", features = ["gzip", "tls", "tls-webpki-roots"] }
tonic = { version = "0.11.0", features = ["gzip", "tls", "tls-webpki-roots"] }
tower = "0.4"
tracing = "0.1"

Expand All @@ -46,4 +46,4 @@ tokio-test = "0.4"
tracing-subscriber = { version = "0.3", features = ["env-filter", "tracing"] }

[build-dependencies]
tonic-build = "0.10"
tonic-build = "0.11.0"
1 change: 1 addition & 0 deletions build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ fn main() {
// .build_client(true)
// .build_server(true)
// .out_dir("src/rpc/")
// .protoc_arg("--experimental_allow_proto3_optional")
// .compile(&["proto/cache.proto"], &["proto"])
// .expect("Failed to compile proto/cache.proto");
}
6 changes: 6 additions & 0 deletions examples/conflict_resolution_tests/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,12 @@ edition = "2021"
license = "Apache-2.0 OR MIT"
authors = ["Sebastian Dobe <[email protected]"]

[profile.dev]
panic = "abort"

[profile.release]
panic = "abort"

[dependencies]
anyhow = "1"
flume = "0.11"
Expand Down
2 changes: 1 addition & 1 deletion examples/ha_setup/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ async fn main() -> anyhow::Result<()> {
.await?;
info!("First cache node started");

// Now just sleep until we ctrl + c so we can start the other members and observe the behavior
// Now just sleep until we ctrl + c, so we can start the other members and observe the behavior
time::sleep(Duration::from_secs(6000)).await;

// Let's simulate a graceful shutdown
Expand Down
12 changes: 10 additions & 2 deletions justfile
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ build:
#!/usr/bin/env bash
set -euxo pipefail
# build as musl to make sure this works
cargo build --release --target x86_64-unknown-linux-musl
#cargo build --release --target x86_64-unknown-linux-musl
cargo build --release

# verifies the MSRV
Expand All @@ -43,7 +44,7 @@ msrv-verify:

# find's the new MSRV, if it needs a bump
msrv-find:
cargo msrv --min 1.65.0
cargo msrv --min 1.70.0


# verify thats everything is good
Expand Down Expand Up @@ -73,6 +74,13 @@ release: verfiy-is-clean
git push origin "v$TAG"


# dry-run publishing the latest version
publish-dry: verfiy-is-clean
#!/usr/bin/env bash
set -euxo pipefail
cargo publish --dry-run

# publishes the current version to cargo.io
publish: verfiy-is-clean
#!/usr/bin/env bash
Expand Down
15 changes: 12 additions & 3 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -356,12 +356,21 @@ async fn run_client(
});

debug!("Starting the Sending Stream to the Server");
tx_quorum
if let Err(err)= tx_quorum
.send_async(QuorumReq::UpdateServer {
server: server.clone(),
})
.await
.unwrap();
.await {
// can fail in case of a conflict resolution, if the other side has just shut down
error!("tx_quorum send error: {:?}", err);
callback_handle.abort();
time::sleep(Duration::from_millis(get_rand_between(
*RECONNECT_TIMEOUT_LOWER,
*RECONNECT_TIMEOUT_UPPER,
)))
.await;
continue;
}

// Sending Stream to the Server
// the ReceiverStream only accepts an mpsc channel
Expand Down
15 changes: 9 additions & 6 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,11 +277,12 @@
//! &mut cache_config_1,
//! // optional notification channel: `Option<mpsc::Sender<CacheNotify>>`
//! None,
//! // We need to overwrite the hostname so we can start all nodes on the same host for this
//! // We need to overwrite the hostname, so we can start all nodes on the same host for this
//! // example. Usually, this will be set to `None`
//! Some("127.0.0.1:7001".to_string()),
//! )
//! .await?;
//! time::sleep(Duration::from_millis(100)).await;
//! println!("First cache node started");
//!
//! // Mimic the other 2 cache members. This should usually not be done in the same code - only
Expand All @@ -295,6 +296,7 @@
//! Some("127.0.0.1:7002".to_string()),
//! )
//! .await?;
//! time::sleep(Duration::from_millis(100)).await;
//! println!("2nd cache node started");
//! // Now after the 2nd cache member has been started, we would already have quorum and a
//! // working cache layer. As long as there is no leader and / or quorum, the cache will not
Expand All @@ -309,6 +311,7 @@
//! Some("127.0.0.1:7003".to_string()),
//! )
//! .await?;
//! time::sleep(Duration::from_millis(100)).await;
//! println!("3rd cache node started");
//!
//! // For the sake of this example again, we need to wait until the cache is in a healthy
Expand Down Expand Up @@ -1004,12 +1007,12 @@ pub(crate) async fn insert_from_leader(
};

// double check, that we are really the leader
// this might get removed after enough testing, if it provides a performance benefit
if health_state.state != QuorumState::Leader && health_state.state != QuorumState::LeaderSwitch
{
let error = "Execution of 'insert_from_leader' is not allowed on a non-leader".to_string();
warn!("is_leader state: {:?}", health_state.state);
// TODO remove this panic after testing
let error = format!("Execution of 'insert_from_leader' is not allowed on a non-leader: {:?}", health_state.state);
error!("{}", error);
// TODO we once ended up here during conflict resolution -> try to reproduce
// rather panic than have an inconsistent state
panic!("{}", error);
}

Expand Down Expand Up @@ -1438,7 +1441,7 @@ pub async fn start_cluster(
if !h.contains(&hostname) {
ha_clients.push(h.trim().to_owned())
} else {
host_srv_addr = h.trim().to_owned();
h.trim().clone_into(&mut host_srv_addr);
}
});
info!(
Expand Down
4 changes: 2 additions & 2 deletions src/quorum.rs
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ pub(crate) async fn quorum_handler(

state = QuorumState::Follower;
health_state.state = QuorumState::Follower;
health_state.tx_leader = server.tx.clone();
health_state.tx_leader.clone_from(&server.tx);
}
}

Expand All @@ -294,7 +294,7 @@ pub(crate) async fn quorum_handler(

state = QuorumState::Follower;
health_state.state = QuorumState::Follower;
health_state.tx_leader = server.tx.clone();
health_state.tx_leader.clone_from(&server.tx);

tx_remote
.send_async(RpcRequest::LeaderReqAck {
Expand Down
5 changes: 3 additions & 2 deletions src/rpc/cache.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
// This file is @generated by prost-build.
#[allow(clippy::derive_partial_eq_without_eq)]
#[derive(Clone, PartialEq, ::prost::Message)]
pub struct Ack {
Expand Down Expand Up @@ -521,7 +522,7 @@ pub mod cache_server {
#[async_trait]
pub trait Cache: Send + Sync + 'static {
/// Server streaming response type for the StreamValues method.
type StreamValuesStream: futures_core::Stream<
type StreamValuesStream: tonic::codegen::tokio_stream::Stream<
Item = std::result::Result<super::Ack, tonic::Status>,
>
+ Send
Expand Down Expand Up @@ -634,7 +635,7 @@ pub mod cache_server {
) -> Self::Future {
let inner = Arc::clone(&self.0);
let fut = async move {
(*inner).stream_values(request).await
<T as Cache>::stream_values(&inner, request).await
};
Box::pin(fut)
}
Expand Down
1 change: 1 addition & 0 deletions src/rpc/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
#[allow(clippy::mixed_attributes_style)]
pub mod cache;

0 comments on commit 4b7e9db

Please sign in to comment.