From 725326dbfdd0392d9e23e95643a05da46f8daef5 Mon Sep 17 00:00:00 2001 From: "wangyi.ywq" Date: Fri, 1 Jul 2022 01:18:48 +0800 Subject: [PATCH] WIP --- db/db_filesnapshot.cc | 182 ++++++++++++------------ db/db_impl.h | 6 +- include/rocksdb/db.h | 14 +- utilities/checkpoint/checkpoint_impl.cc | 38 ++--- 4 files changed, 121 insertions(+), 119 deletions(-) diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index a57f34d732..772db5a5d1 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -81,97 +81,97 @@ int DBImpl::IsFileDeletionsEnabled() const { return !disable_delete_obsolete_files_; } -Status DBImpl::UndoFakeFlush() { - mutex_.Lock(); - autovector cfds; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - cfd->Ref(); - cfds.push_back(cfd); - } - mutex_.Unlock(); - Status status; - for (auto cfd : cfds) { - auto iter = version_edits_.find(cfd->GetID()); - if (iter == version_edits_.end()) continue; - VersionEdit* edit = &iter->second; - VersionEdit edit_del; - for (auto f : edit->GetNewFiles()) { - edit_del.DeleteFile(0, f.second.fd.GetNumber()); - } - edit_del.set_check_point(true); - mutex_.Lock(); - status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), - &edit_del, &mutex_); - mutex_.Unlock(); - if (!status.ok()) { - break; - } - } - - mutex_.Lock(); - for (auto cfd : cfds) { - cfd->Unref(); - } - mutex_.Unlock(); - - return status; -} - -Status DBImpl::FakeFlush(std::vector& ret) { - Status status; - mutex_.Lock(); - autovector cfds; - for (auto cfd : *versions_->GetColumnFamilySet()) { - if (cfd->IsDropped()) { - continue; - } - cfd->Ref(); - cfds.push_back(cfd); - } - version_edits_.clear(); - - for (auto cfd : cfds) { - VersionEdit edit; - edit.SetColumnFamily(cfd->GetID()); - version_edits_.insert({cfd->GetID(), edit}); - } - for (auto cfd : cfds) { - auto iter = version_edits_.find(cfd->GetID()); - int job_id = next_job_id_.fetch_add(1); - VersionEdit* edit = &iter->second; - status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); - edit->set_check_point(true); - if (status.ok()) { - - status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), - edit, &mutex_); - if (!status.ok()) { - break; - } - } - } - for (auto cfd : cfds) { - cfd->Unref(); - } - mutex_.Unlock(); - TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); - TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); - - if (status.ok()) { - for (auto iter : version_edits_) { - VersionEdit* edit = &iter.second; - int cf_id = iter.first; - for (auto f : edit->GetNewFiles()) { - ret.push_back(MakeTableFileName("", - f.second.fd.GetNumber())); - } - } - } - return status; -} +//Status DBImpl::UndoFakeFlush() { +// mutex_.Lock(); +// autovector cfds; +// for (auto cfd : *versions_->GetColumnFamilySet()) { +// if (cfd->IsDropped()) { +// continue; +// } +// cfd->Ref(); +// cfds.push_back(cfd); +// } +// mutex_.Unlock(); +// Status status; +// for (auto cfd : cfds) { +// auto iter = version_edits_.find(cfd->GetID()); +// if (iter == version_edits_.end()) continue; +// VersionEdit* edit = &iter->second; +// VersionEdit edit_del; +// for (auto f : edit->GetNewFiles()) { +// edit_del.DeleteFile(0, f.second.fd.GetNumber()); +// } +// edit_del.set_check_point(true); +// mutex_.Lock(); +// status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), +// &edit_del, &mutex_); +// mutex_.Unlock(); +// if (!status.ok()) { +// break; +// } +// } +// +// mutex_.Lock(); +// for (auto cfd : cfds) { +// cfd->Unref(); +// } +// mutex_.Unlock(); +// +// return status; +//} +// +//Status DBImpl::FakeFlush(std::vector& ret) { +// Status status; +// mutex_.Lock(); +// autovector cfds; +// for (auto cfd : *versions_->GetColumnFamilySet()) { +// if (cfd->IsDropped()) { +// continue; +// } +// cfd->Ref(); +// cfds.push_back(cfd); +// } +// version_edits_.clear(); +// +// for (auto cfd : cfds) { +// VersionEdit edit; +// edit.SetColumnFamily(cfd->GetID()); +// version_edits_.insert({cfd->GetID(), edit}); +// } +// for (auto cfd : cfds) { +// auto iter = version_edits_.find(cfd->GetID()); +// int job_id = next_job_id_.fetch_add(1); +// VersionEdit* edit = &iter->second; +// status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); +// edit->set_check_point(true); +// if (status.ok()) { +// +// status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), +// edit, &mutex_); +// if (!status.ok()) { +// break; +// } +// } +// } +// for (auto cfd : cfds) { +// cfd->Unref(); +// } +// mutex_.Unlock(); +// TEST_SYNC_POINT("DBImpl::GetLiveFiles:1"); +// TEST_SYNC_POINT("DBImpl::GetLiveFiles:2"); +// +// if (status.ok()) { +// for (auto iter : version_edits_) { +// VersionEdit* edit = &iter.second; +// int cf_id = iter.first; +// for (auto f : edit->GetNewFiles()) { +// ret.push_back(MakeTableFileName("", +// f.second.fd.GetNumber())); +// } +// } +// } +// return status; +//} Status DBImpl::GetLiveFiles(std::vector& ret, uint64_t* manifest_file_size, bool flush_memtable) { *manifest_file_size = 0; diff --git a/db/db_impl.h b/db/db_impl.h index 8791b033bd..603383bb2b 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -277,9 +277,9 @@ class DBImpl : public DB { virtual Status EnableFileDeletions(bool force) override; virtual int IsFileDeletionsEnabled() const; - virtual Status FakeFlush(std::vector&) override; - - virtual Status UndoFakeFlush() override; +// virtual Status FakeFlush(std::vector&) override; +// +// virtual Status UndoFakeFlush() override; // All the returned filenames start with "/" virtual Status GetLiveFiles(std::vector&, diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index 6d261c4dda..d38c853b63 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1043,13 +1043,13 @@ class DB { virtual Status EnableFileDeletions(bool force = true) = 0; - virtual Status FakeFlush(std::vector&){ - return Status::NotSupported("FakeFlush not implemented"); - } - - virtual Status UndoFakeFlush(){ - return Status::NotSupported("UndoFakeFlush not implemented"); - } +// virtual Status FakeFlush(std::vector&){ +// return Status::NotSupported("FakeFlush not implemented"); +// } +// +// virtual Status UndoFakeFlush(){ +// return Status::NotSupported("UndoFakeFlush not implemented"); +// } // GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index bf65668eb6..2fdced3f49 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -205,18 +205,20 @@ Status CheckpointImpl::CreateCustomCheckpoint( } // this will return live_files prefixed with "/" - if (db_options.check_point_fake_flush) { - s = db_->FakeFlush(fake_flush_files); - if (s.ok()) { - s = db_->GetLiveFiles(live_files, &manifest_file_size, false); - } - if (s.ok()) { - live_files.insert(live_files.end(), fake_flush_files.begin(), - fake_flush_files.end()); - } - } else { - s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); - } +// if (db_options.check_point_fake_flush) { +// s = db_->FakeFlush(fake_flush_files); +// if (s.ok()) { +// s = db_->GetLiveFiles(live_files, &manifest_file_size, false); +// } +// if (s.ok()) { +// live_files.insert(live_files.end(), fake_flush_files.begin(), +// fake_flush_files.end()); +// } +// } else { +// s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); +// } + + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); if (s.ok() && db_options.allow_2pc) { // If 2PC is enabled, we need to get minimum log number after the flush. @@ -250,7 +252,8 @@ Status CheckpointImpl::CreateCustomCheckpoint( db_->FlushWAL(false /* sync */); } // if we have more than one column family, we need to also get WAL files - if (s.ok() && !db_options.check_point_fake_flush ) { +// if (s.ok() && !db_options.check_point_fake_flush ) { + if (s.ok()) { s = db_->GetSortedWalFiles(live_wal_files); } if (!s.ok()) { @@ -335,11 +338,10 @@ Status CheckpointImpl::CreateCustomCheckpoint( } } } - - if(db_options.check_point_fake_flush){ - // Write Manifest - db_->UndoFakeFlush(); - } +// if(db_options.check_point_fake_flush){ +// // Write Manifest +// db_->UndoFakeFlush(); +// } return s; }