Skip to content

Commit

Permalink
fix(paxos): re-commit logs even when there are no checkpoints (#1639)
Browse files Browse the repository at this point in the history
Technically the old code was correct too because `accepted_logs` would
always have something, but this is a bit clearer I think.
  • Loading branch information
shadaj authored Jan 10, 2025
1 parent b968f5b commit 0bc253f
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 9 deletions.
5 changes: 3 additions & 2 deletions hydro_test/src/cluster/paxos.rs
Original file line number Diff line number Diff line change
Expand Up @@ -485,8 +485,9 @@ fn recommit_after_leader_election<'a, P: PaxosPayload>(
) {
let p_p1b_max_checkpoint = accepted_logs
.clone()
.map(q!(|(checkpoint, _log)| checkpoint))
.max();
.filter_map(q!(|(checkpoint, _log)| checkpoint))
.max()
.into_singleton();
let p_p1b_highest_entries_and_count = accepted_logs
.map(q!(|(_checkpoint, log)| log))
.flatten_unordered() // Convert HashMap log back to stream
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -788,15 +788,30 @@ expression: built.ir()
},
),
Tee {
inner: <tee 27>: Reduce {
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < core :: option :: Option < usize > , core :: option :: Option < usize > , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
input: Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (checkpoint , _log) | checkpoint }),
input: Tee {
inner: <tee 20>,
inner: <tee 27>: Chain(
Map {
f: stageleft :: runtime_support :: fn1_type_hint :: < usize , core :: option :: Option < usize > > ({ use hydro_lang :: __staged :: optional :: * ; | v | Some (v) }),
input: Reduce {
f: stageleft :: runtime_support :: fn2_borrow_mut_type_hint :: < usize , usize , () > ({ use hydro_lang :: __staged :: stream :: * ; | curr , new | { if new > * curr { * curr = new ; } } }),
input: FilterMap {
f: stageleft :: runtime_support :: fn1_type_hint :: < (core :: option :: Option < usize > , std :: collections :: hash_map :: HashMap < usize , hydro_test :: cluster :: paxos :: LogValue < hydro_test :: cluster :: paxos_kv :: KvPayload < u32 , (hydro_lang :: location :: cluster :: cluster_id :: ClusterId < hydro_test :: cluster :: paxos_bench :: Client > , u32) > > >) , core :: option :: Option < usize > > ({ use crate :: __staged :: cluster :: paxos :: * ; | (checkpoint , _log) | checkpoint }),
input: Tee {
inner: <tee 20>,
},
},
},
},
},
Persist(
Source {
source: Iter(
[:: std :: option :: Option :: None],
),
location_kind: Cluster(
0,
),
},
),
),
},
),
},
Expand Down

0 comments on commit 0bc253f

Please sign in to comment.