diff --git a/nomt/src/bitbox/mod.rs b/nomt/src/bitbox/mod.rs index 016ba3e3..72ea5c87 100644 --- a/nomt/src/bitbox/mod.rs +++ b/nomt/src/bitbox/mod.rs @@ -276,6 +276,10 @@ pub struct SyncController { wal_result_tx: Option>>, /// The channel to receive the result of the WAL writeout. wal_result_rx: Receiver>, + /// The channel to send the result of the begin_sync task. Option is to allow `take`. + begin_sync_result_tx: Option>>, + /// The channel to receive the result of the the begin_sync task. + begin_sync_result_rx: Receiver>, /// The pages along with their page numbers to write out to the HT file. ht_to_write: Arc)>>>>, } @@ -283,10 +287,13 @@ pub struct SyncController { impl SyncController { fn new(db: DB) -> Self { let (wal_result_tx, wal_result_rx) = crossbeam_channel::bounded(1); + let (begin_sync_result_tx, begin_sync_result_rx) = crossbeam_channel::bounded(1); Self { db, wal_result_tx: Some(wal_result_tx), wal_result_rx, + begin_sync_result_tx: Some(begin_sync_result_tx), + begin_sync_result_rx, ht_to_write: Arc::new(Mutex::new(None)), } } @@ -306,7 +313,8 @@ impl SyncController { let wal_blob_builder = self.db.shared.wal_blob_builder.clone(); // UNWRAP: safe because begin_sync is called only once. let wal_result_tx = self.wal_result_tx.take().unwrap(); - self.db.shared.sync_tp.execute(move || { + + let begin_sync_task = move || { let mut wal_blob_builder = wal_blob_builder.lock(); let (ht_pages, cache_updates) = bitbox.prepare_sync(sync_seqn, &page_pool, updated_pages, &mut *wal_blob_builder); @@ -320,6 +328,20 @@ impl SyncController { // evict and drop old pages outside of the critical path. page_cache.batch_update(cache_updates); page_cache.evict(); + }; + + // UNWRAP: safe because begin_sync is called only once. + let begin_sync_result_tx = self.begin_sync_result_tx.take().unwrap(); + self.db.shared.sync_tp.execute(move || { + let output_or_panic = + std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| begin_sync_task())) + .map_err(|e| { + std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!("panic in bitbox begin sync: {:?}", e), + ) + }); + let _ = begin_sync_result_tx.send(output_or_panic); }); } @@ -338,10 +360,19 @@ impl SyncController { /// /// Must be invoked by the sync thread. Blocking. pub fn wait_pre_meta(&self) -> std::io::Result<()> { - match self.wal_result_rx.recv() { - Ok(wal_result) => wal_result, - Err(_) => panic!("unexpected hungup"), - } + self.begin_sync_result_rx.recv().map_err(|recv_err| { + std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!("unexpected hungup of bitbox begin sync: {:?}", recv_err), + ) + })??; + self.wal_result_rx.recv().map_err(|recv_err| { + std::io::Error::new( + std::io::ErrorKind::Other, + anyhow::anyhow!("unexpected hungup of wal writeout: {:?}", recv_err), + ) + })??; + Ok(()) } /// Write out the HT pages and truncate the WAL file.