Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: bitbox propagates errors during begin_sync #775

Merged
merged 1 commit into from
Feb 7, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 45 additions & 46 deletions nomt/src/bitbox/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::{
io::{self, page_pool::FatPage, IoCommand, IoHandle, IoKind, PagePool, PAGE_SIZE},
page_cache::{Page, PageCache},
store::{BucketInfo, DirtyPage},
task::{join_task, spawn_task, TaskResult},
};

use self::{ht_file::HTOffsets, meta_map::MetaMap};
Expand All @@ -29,26 +30,18 @@ mod meta_map;
mod wal;
pub(crate) mod writeout;

/// An error that can happen during bitbox's sync.
#[derive(Debug)]
pub enum SyncError {
/// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate
/// is too high.
BucketExhaustion,
/// An error occurred while writing to the WAL file.
WalWrite(std::io::Error),
}
/// During assigning a bucket to a page, the allocator gave up, meaning that the occupancy rate
/// is too high.
#[derive(fmt::Debug)]
pub struct BucketExhaustion;

impl fmt::Display for SyncError {
impl fmt::Display for BucketExhaustion {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
SyncError::BucketExhaustion => write!(f, "bucket exhaustion"),
SyncError::WalWrite(e) => write!(f, "wal write error: {}", e),
}
write!(f, "bucket exhaustion")
}
}

impl std::error::Error for SyncError {}
impl std::error::Error for BucketExhaustion {}

/// The index of a bucket within the map.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
Expand Down Expand Up @@ -189,7 +182,7 @@ impl DB {
Vec<(u64, Arc<FatPage>)>,
Vec<(PageId, Option<(Page, BucketIndex)>)>,
),
SyncError,
BucketExhaustion,
> {
wal_blob_builder.reset(sync_seqn);

Expand Down Expand Up @@ -224,15 +217,15 @@ impl DB {
BucketInfo::Known(bucket) => (false, bucket),
BucketInfo::FreshWithNoDependents => {
let bucket = allocate_bucket(&page_id, &mut meta_map, &self.shared.seed)
.ok_or(SyncError::BucketExhaustion)?;
.ok_or(BucketExhaustion)?;
(true, bucket)
}
BucketInfo::FreshOrDependent(maybe_bucket) => match maybe_bucket.get() {
Some(bucket) => (false, bucket),
None => {
let bucket =
allocate_bucket(&page_id, &mut meta_map, &self.shared.seed)
.ok_or(SyncError::BucketExhaustion)?;
.ok_or(BucketExhaustion)?;
// Propagate changes to dependents.
maybe_bucket.set(bucket);
(true, bucket)
Expand Down Expand Up @@ -302,20 +295,27 @@ impl DB {
pub struct SyncController {
db: DB,
/// The channel to send the result of the pre-meta sync errors. Option is to allow `take`.
pre_meta_result_tx: Option<Sender<Result<(), SyncError>>>,
pre_meta_result_tx: Option<Sender<TaskResult<std::io::Result<()>>>>,
/// he channel to receive the result of the pre-meta sync errors.
pre_meta_result_rx: Receiver<Result<(), SyncError>>,
pre_meta_result_rx: Receiver<TaskResult<std::io::Result<()>>>,
/// The channel to send the result of the begin_sync task. Option is to allow `take`.
begin_sync_result_tx: Option<Sender<TaskResult<Result<(), BucketExhaustion>>>>,
/// The channel to receive the result of the the begin_sync task.
begin_sync_result_rx: Receiver<TaskResult<Result<(), BucketExhaustion>>>,
/// The pages along with their page numbers to write out to the HT file.
ht_to_write: Arc<Mutex<Option<Vec<(u64, Arc<FatPage>)>>>>,
}

impl SyncController {
fn new(db: DB) -> Self {
let (pre_meta_result_tx, pre_meta_result_rx) = crossbeam_channel::bounded(1);
let (begin_sync_result_tx, begin_sync_result_rx) = crossbeam_channel::bounded(1);
Self {
db,
pre_meta_result_tx: Some(pre_meta_result_tx),
pre_meta_result_rx,
begin_sync_result_tx: Some(begin_sync_result_tx),
begin_sync_result_rx,
ht_to_write: Arc::new(Mutex::new(None)),
}
}
Expand All @@ -335,25 +335,17 @@ impl SyncController {
let wal_blob_builder = self.db.shared.wal_blob_builder.clone();
// UNWRAP: safe because begin_sync is called only once.
let pre_meta_result_tx = self.pre_meta_result_tx.take().unwrap();
self.db.shared.sync_tp.execute(move || {
let begin_sync_task = move || {
let mut wal_blob_builder = wal_blob_builder.lock();
let (ht_pages, cache_updates) = match bitbox.prepare_sync(

// if fails The sync coordinator will poison the database and all further commits will
// be rejected. Therefore, there is no need to perform cleanup.
let (ht_pages, cache_updates) = bitbox.prepare_sync(
sync_seqn,
&page_pool,
updated_pages,
&mut *wal_blob_builder,
) {
Ok(v) => v,
Err(SyncError::BucketExhaustion) => {
// Bail the commit.
//
// The sync coordinator will poison the database and all further commits will
// be rejected. Therefore, there is no need to perform cleanup.
let _ = pre_meta_result_tx.send(Err(SyncError::BucketExhaustion));
return;
}
Err(SyncError::WalWrite(_)) => unreachable!(),
};
)?;
drop(wal_blob_builder);

// Set the hash-table pages before spawning WAL writeout so they don't race with it.
Expand All @@ -364,31 +356,38 @@ impl SyncController {
// evict and drop old pages outside of the critical path.
page_cache.batch_update(cache_updates);
page_cache.evict();
});
Ok(())
};
// UNWRAP: safe because begin_sync is called only once.
let begin_sync_result_tx = self.begin_sync_result_tx.take().unwrap();
spawn_task(
&self.db.shared.sync_tp,
begin_sync_task,
begin_sync_result_tx,
);
}

fn spawn_wal_writeout(pre_meta_result_tx: Sender<Result<(), SyncError>>, bitbox: DB) {
fn spawn_wal_writeout(pre_meta_result_tx: Sender<TaskResult<std::io::Result<()>>>, bitbox: DB) {
let bitbox = bitbox.clone();
let tp = bitbox.shared.sync_tp.clone();
tp.execute(move || {
let wal_writeout_task = move || {
let wal_blob_builder = bitbox.shared.wal_blob_builder.lock();
let wal_slice = wal_blob_builder.as_slice();
let wal_result =
writeout::write_wal(&bitbox.shared.wal_fd, wal_slice).map_err(SyncError::WalWrite);
let _ = pre_meta_result_tx.send(wal_result);
});
writeout::write_wal(&bitbox.shared.wal_fd, wal_slice)
};

spawn_task(&tp, wal_writeout_task, pre_meta_result_tx);
}

/// Wait for the pre-meta operations to complete.
///
/// This includes WAL file to be written out.
///
/// Must be invoked by the sync thread. Blocking.
pub fn wait_pre_meta(&self) -> Result<(), SyncError> {
match self.pre_meta_result_rx.recv() {
Ok(wal_result) => wal_result,
Err(_) => panic!("unexpected hungup"),
}
pub fn wait_pre_meta(&self) -> anyhow::Result<()> {
join_task(&self.begin_sync_result_rx)?;
join_task(&self.pre_meta_result_rx)?;
Ok(())
}

/// Write out the HT pages and truncate the WAL file.
Expand Down