Skip to content

Commit

Permalink
feat:propagates err in beatree offloaded sync work
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriele-0201 committed Feb 3, 2025
1 parent 26559cc commit 9e64145
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 12 deletions.
13 changes: 7 additions & 6 deletions nomt/src/beatree/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@ impl Tree {
sync: &Sync,
shared: &Arc<RwLock<Shared>>,
read_transaction_counter: &ReadTransactionCounter,
) -> (SyncData, Index, Receiver<()>) {
) -> (SyncData, Index, Receiver<std::io::Result<()>>) {
// Take the shared lock. Briefly.
let staged_changeset;
let bbn_index;
Expand Down Expand Up @@ -426,7 +426,7 @@ struct SharedSyncController {
read_transaction_counter: ReadTransactionCounter,
sync_data: Mutex<Option<SyncData>>,
bbn_index: Mutex<Option<Index>>,
pre_swap_rx: Mutex<Option<Receiver<()>>>,
pre_swap_rx: Mutex<Option<Receiver<std::io::Result<()>>>>,
}

impl SyncController {
Expand Down Expand Up @@ -496,15 +496,16 @@ impl SyncController {
///
/// Has to be called after the manifest is updated. Must be invoked by the sync
/// thread. Blocking.
pub fn post_meta(&mut self) {
pub fn post_meta(&mut self) -> std::io::Result<()> {
let pre_swap_rx = self.inner.pre_swap_rx.lock().take().unwrap();

// UNWRAP: the offloaded non-critical sync work is infallible and may fail only if it
// panics.
let () = pre_swap_rx.recv().unwrap();
// UNWRAP: the offloaded non-critical sync work is infallible
// and may fail only if it panics.
pre_swap_rx.recv().unwrap()?;

let bbn_index = self.inner.bbn_index.lock().take().unwrap();
Tree::finish_sync(&self.inner.shared, bbn_index);
Ok(())
}
}

Expand Down
18 changes: 13 additions & 5 deletions nomt/src/beatree/ops/update/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn update(
io_handle: IoHandle,
thread_pool: ThreadPool,
workers: usize,
) -> Result<(SyncData, Index, Receiver<()>)> {
) -> Result<(SyncData, Index, Receiver<std::io::Result<()>>)> {
let leaf_reader = StoreReader::new(leaf_store.clone(), page_pool.clone());
let (leaf_writer, leaf_finisher) = leaf_store.start_sync();
let (bbn_writer, bbn_finisher) = bbn_store.start_sync();
Expand Down Expand Up @@ -99,10 +99,18 @@ pub fn update(

let (tx, rx) = crossbeam_channel::bounded(1);
thread_pool.execute(move || {
leaf_stage_outputs.post_io_work.run(&leaf_cache);
drop(branch_stage_outputs.post_io_drop);
leaf_cache.evict();
let _ = tx.send(());
let output_or_panic = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
leaf_stage_outputs.post_io_work.run(&leaf_cache);
drop(branch_stage_outputs.post_io_drop);
leaf_cache.evict();
}))
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!("panic in beatree offloaded non-critical sync work: {:?}", e),
)
});
let _ = tx.send(output_or_panic);
});

Ok((
Expand Down
2 changes: 1 addition & 1 deletion nomt/src/store/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ impl Sync {
}

bitbox_sync.post_meta(shared.io_pool.make_handle())?;
beatree_sync.post_meta();
beatree_sync.post_meta()?;

if let Some(ref rollback) = rollback_sync {
rollback.wait_post_meta()?;
Expand Down

0 comments on commit 9e64145

Please sign in to comment.