From 945f9ab11f0e3b7394da639ef6501f14935d289f Mon Sep 17 00:00:00 2001 From: gabriele-0201 Date: Mon, 3 Feb 2025 10:32:13 +0100 Subject: [PATCH] feat:propagates err in beatree offloaded sync work --- nomt/src/beatree/mod.rs | 13 +++++++------ nomt/src/beatree/ops/update/mod.rs | 18 +++++++++++++----- nomt/src/store/sync.rs | 2 +- 3 files changed, 21 insertions(+), 12 deletions(-) diff --git a/nomt/src/beatree/mod.rs b/nomt/src/beatree/mod.rs index 4ef8f175..ede607e1 100644 --- a/nomt/src/beatree/mod.rs +++ b/nomt/src/beatree/mod.rs @@ -230,7 +230,7 @@ impl Tree { sync: &Sync, shared: &Arc>, read_transaction_counter: &ReadTransactionCounter, - ) -> (SyncData, Index, Receiver<()>) { + ) -> (SyncData, Index, Receiver>) { // Take the shared lock. Briefly. let staged_changeset; let bbn_index; @@ -426,7 +426,7 @@ struct SharedSyncController { read_transaction_counter: ReadTransactionCounter, sync_data: Mutex>, bbn_index: Mutex>, - pre_swap_rx: Mutex>>, + pre_swap_rx: Mutex>>>, } impl SyncController { @@ -496,15 +496,16 @@ impl SyncController { /// /// Has to be called after the manifest is updated. Must be invoked by the sync /// thread. Blocking. - pub fn post_meta(&mut self) { + pub fn post_meta(&mut self) -> std::io::Result<()> { let pre_swap_rx = self.inner.pre_swap_rx.lock().take().unwrap(); - // UNWRAP: the offloaded non-critical sync work is infallible and may fail only if it - // panics. - let () = pre_swap_rx.recv().unwrap(); + // UNWRAP: the offloaded non-critical sync work is infallible + // and may fail only if it panics. + pre_swap_rx.recv().unwrap()?; let bbn_index = self.inner.bbn_index.lock().take().unwrap(); Tree::finish_sync(&self.inner.shared, bbn_index); + Ok(()) } } diff --git a/nomt/src/beatree/ops/update/mod.rs b/nomt/src/beatree/ops/update/mod.rs index 58393006..26e3a6fc 100644 --- a/nomt/src/beatree/ops/update/mod.rs +++ b/nomt/src/beatree/ops/update/mod.rs @@ -54,7 +54,7 @@ pub fn update( io_handle: IoHandle, thread_pool: ThreadPool, workers: usize, -) -> Result<(SyncData, Index, Receiver<()>)> { +) -> Result<(SyncData, Index, Receiver>)> { let leaf_reader = StoreReader::new(leaf_store.clone(), page_pool.clone()); let (leaf_writer, leaf_finisher) = leaf_store.start_sync(); let (bbn_writer, bbn_finisher) = bbn_store.start_sync(); @@ -99,10 +99,18 @@ pub fn update( let (tx, rx) = crossbeam_channel::bounded(1); thread_pool.execute(move || { - leaf_stage_outputs.post_io_work.run(&leaf_cache); - drop(branch_stage_outputs.post_io_drop); - leaf_cache.evict(); - let _ = tx.send(()); + let output_or_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| { + leaf_stage_outputs.post_io_work.run(&leaf_cache); + drop(branch_stage_outputs.post_io_drop); + leaf_cache.evict(); + })) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!("panic in beatree offloaded non-critical sync work: {:?}", e), + ) + }); + let _ = tx.send(output_or_panic); }); Ok(( diff --git a/nomt/src/store/sync.rs b/nomt/src/store/sync.rs index 471abbb0..00dcfbc8 100644 --- a/nomt/src/store/sync.rs +++ b/nomt/src/store/sync.rs @@ -86,7 +86,7 @@ impl Sync { } bitbox_sync.post_meta(shared.io_pool.make_handle())?; - beatree_sync.post_meta(); + beatree_sync.post_meta()?; if let Some(ref rollback) = rollback_sync { rollback.wait_post_meta()?;