Skip to content

Commit

Permalink
feat: bitbox propagates errors during begin_sync
Browse files Browse the repository at this point in the history
  • Loading branch information
gabriele-0201 committed Feb 3, 2025
1 parent 6a72c78 commit 75c8ce1
Showing 1 changed file with 36 additions and 5 deletions.
41 changes: 36 additions & 5 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -276,17 +276,24 @@ pub struct SyncController {
wal_result_tx: Option<Sender<std::io::Result<()>>>,
/// The channel to receive the result of the WAL writeout.
wal_result_rx: Receiver<std::io::Result<()>>,
/// The channel to send the result of the begin_sync task. Option is to allow `take`.
begin_sync_result_tx: Option<Sender<std::io::Result<()>>>,
/// The channel to receive the result of the the begin_sync task.
begin_sync_result_rx: Receiver<std::io::Result<()>>,
/// The pages along with their page numbers to write out to the HT file.
ht_to_write: Arc<Mutex<Option<Vec<(u64, Arc<FatPage>)>>>>,
}

impl SyncController {
fn new(db: DB) -> Self {
let (wal_result_tx, wal_result_rx) = crossbeam_channel::bounded(1);
let (begin_sync_result_tx, begin_sync_result_rx) = crossbeam_channel::bounded(1);
Self {
db,
wal_result_tx: Some(wal_result_tx),
wal_result_rx,
begin_sync_result_tx: Some(begin_sync_result_tx),
begin_sync_result_rx,
ht_to_write: Arc::new(Mutex::new(None)),
}
}
Expand All @@ -306,7 +313,8 @@ impl SyncController {
let wal_blob_builder = self.db.shared.wal_blob_builder.clone();
// UNWRAP: safe because begin_sync is called only once.
let wal_result_tx = self.wal_result_tx.take().unwrap();
self.db.shared.sync_tp.execute(move || {

let begin_sync_task = move || {
let mut wal_blob_builder = wal_blob_builder.lock();
let (ht_pages, cache_updates) =
bitbox.prepare_sync(sync_seqn, &page_pool, updated_pages, &mut *wal_blob_builder);
Expand All @@ -320,6 +328,20 @@ impl SyncController {
// evict and drop old pages outside of the critical path.
page_cache.batch_update(cache_updates);
page_cache.evict();
};

// UNWRAP: safe because begin_sync is called only once.
let begin_sync_result_tx = self.begin_sync_result_tx.take().unwrap();
self.db.shared.sync_tp.execute(move || {
let output_or_panic =
std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| begin_sync_task()))
.map_err(|e| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!("panic in bitbox begin sync: {:?}", e),
)
});
let _ = begin_sync_result_tx.send(output_or_panic);
});
}

Expand All @@ -338,10 +360,19 @@ impl SyncController {
///
/// Must be invoked by the sync thread. Blocking.
pub fn wait_pre_meta(&self) -> std::io::Result<()> {
match self.wal_result_rx.recv() {
Ok(wal_result) => wal_result,
Err(_) => panic!("unexpected hungup"),
}
self.begin_sync_result_rx.recv().map_err(|recv_err| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!("unexpected hungup of bitbox begin sync: {:?}", recv_err),
)
})??;
self.wal_result_rx.recv().map_err(|recv_err| {
std::io::Error::new(
std::io::ErrorKind::Other,
anyhow::anyhow!("unexpected hungup of wal writeout: {:?}", recv_err),
)
})??;
Ok(())
}

/// Write out the HT pages and truncate the WAL file.
Expand Down

0 comments on commit 75c8ce1

Please sign in to comment.