diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index cda47f65a2..d5998332a1 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -81,6 +81,97 @@ int DBImpl::IsFileDeletionsEnabled() const { return !disable_delete_obsolete_files_; } +Status DBImpl::UndoFakeFlush() { + mutex_.Lock(); + autovector cfds; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + cfd->Ref(); + cfds.push_back(cfd); + } + mutex_.Unlock(); + Status status; + for (auto cfd : cfds) { + auto iter = version_edits_.find(cfd->GetID()); + if (iter == version_edits_.end()) continue; + VersionEdit* edit = &iter->second; + VersionEdit edit_del; + for (auto f : edit->GetNewFiles()) { + edit_del.DeleteFile(0, f.second.fd.GetNumber()); + } + edit_del.set_check_point(true); + mutex_.Lock(); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + &edit_del, &mutex_); + mutex_.Unlock(); + if (!status.ok()) { + break; + } + } + + mutex_.Lock(); + for (auto cfd : cfds) { + cfd->Unref(); + } + mutex_.Unlock(); + + return status; +} + +Status DBImpl::FakeFlush(std::vector& ret) { + Status status; + mutex_.Lock(); + autovector cfds; + for (auto cfd : *versions_->GetColumnFamilySet()) { + if (cfd->IsDropped()) { + continue; + } + cfd->Ref(); + cfds.push_back(cfd); + } + mutex_.Unlock(); + version_edits_.clear(); + + for (auto cfd : cfds) { + VersionEdit edit; + edit.SetColumnFamily(cfd->GetID()); + version_edits_.insert({cfd->GetID(), edit}); + } + for (auto cfd : cfds) { + auto iter = version_edits_.find(cfd->GetID()); + int job_id = next_job_id_.fetch_add(1); + VersionEdit* edit = &iter->second; + status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit); + edit->set_check_point(true); + if (status.ok()) { + mutex_.Lock(); + status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(), + edit, &mutex_); + mutex_.Unlock(); + if (!status.ok()) { + break; + } + } + } + mutex_.Lock(); + for (auto cfd : cfds) { + cfd->Unref(); + } + mutex_.Unlock(); + + if (status.ok()) { + for (auto iter : version_edits_) { + VersionEdit* edit = &iter.second; + int cf_id = iter.first; + for (auto f : edit->GetNewFiles()) { + ret.push_back(MakeTableFileName("", f.second.fd.GetNumber())); + } + } + } + return status; +} Status DBImpl::GetLiveFiles(std::vector& ret, uint64_t* manifest_file_size, bool flush_memtable) { *manifest_file_size = 0; diff --git a/db/db_impl.h b/db/db_impl.h index 66c51a6c3f..8791b033bd 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -276,6 +276,11 @@ class DBImpl : public DB { virtual Status DisableFileDeletions() override; virtual Status EnableFileDeletions(bool force) override; virtual int IsFileDeletionsEnabled() const; + + virtual Status FakeFlush(std::vector&) override; + + virtual Status UndoFakeFlush() override; + // All the returned filenames start with "/" virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size, @@ -1827,6 +1832,8 @@ class DBImpl : public DB { ErrorHandler error_handler_; + std::unordered_map version_edits_; + // Conditional variable to coordinate installation of atomic flush results. // With atomic flush, each bg thread installs the result of flushing multiple // column families, and different threads can flush different column diff --git a/db/version_builder.cc b/db/version_builder.cc index 3e260ab95b..f47d277602 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -877,7 +877,11 @@ bool VersionBuilder::CheckConsistencyForNumLevels() { return rep_->CheckConsistencyForNumLevels(); } -void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); } +void VersionBuilder::Apply(VersionEdit* edit) { + // not change the current state + if(edit->check_point()) return; + rep_->Apply(edit); +} void VersionBuilder::SaveTo(VersionStorageInfo* vstorage, double maintainer_job_ratio) { diff --git a/db/version_edit.h b/db/version_edit.h index 8e1a41a44d..f47c995d5b 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -411,6 +411,9 @@ class VersionEdit { std::string DebugString(bool hex_key = false) const; std::string DebugJSON(int edit_num, bool hex_key = false) const; + bool check_point() { return for_checkpoint_; } + void set_check_point(bool b) { for_checkpoint_ = b; } + private: friend class VersionSet; friend class Version; @@ -450,6 +453,7 @@ class VersionEdit { bool is_column_family_add_; std::string column_family_name_; + bool for_checkpoint_; bool is_open_db_; bool is_in_atomic_group_; uint32_t remaining_entries_; diff --git a/include/rocksdb/db.h b/include/rocksdb/db.h index c6272e2d09..6d261c4dda 100644 --- a/include/rocksdb/db.h +++ b/include/rocksdb/db.h @@ -1042,6 +1042,15 @@ class DB { // threads call EnableFileDeletions() virtual Status EnableFileDeletions(bool force = true) = 0; + + virtual Status FakeFlush(std::vector&){ + return Status::NotSupported("FakeFlush not implemented"); + } + + virtual Status UndoFakeFlush(){ + return Status::NotSupported("UndoFakeFlush not implemented"); + } + // GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup // Retrieve the list of all files in the database. The files are @@ -1058,6 +1067,8 @@ class DB { // you still need to call GetSortedWalFiles after GetLiveFiles to compensate // for new data that arrived to already-flushed column families while other // column families were flushing + + virtual Status GetLiveFiles(std::vector&, uint64_t* manifest_file_size, bool flush_memtable = true) = 0; diff --git a/include/rocksdb/options.h b/include/rocksdb/options.h index 110f5ec52a..07ab6d0e74 100644 --- a/include/rocksdb/options.h +++ b/include/rocksdb/options.h @@ -993,6 +993,8 @@ struct DBOptions { // transaction is encountered in the WAL bool allow_2pc = false; + bool check_point_fake_flush = true; + // A global cache for table-level rows. // Default: nullptr (disabled) // Not supported in ROCKSDB_LITE mode! diff --git a/utilities/checkpoint/checkpoint_impl.cc b/utilities/checkpoint/checkpoint_impl.cc index 5f7981622c..a097a0606d 100644 --- a/utilities/checkpoint/checkpoint_impl.cc +++ b/utilities/checkpoint/checkpoint_impl.cc @@ -204,7 +204,12 @@ Status CheckpointImpl::CreateCustomCheckpoint( } // this will return live_files prefixed with "/" - s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); + if(db_options.check_point_fake_flush){ + s = db_->FakeFlush(live_files); + s = db_->GetLiveFiles(live_files, &manifest_file_size, false); + }else{ + s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable); + } if (s.ok() && db_options.allow_2pc) { // If 2PC is enabled, we need to get minimum log number after the flush. @@ -324,6 +329,11 @@ Status CheckpointImpl::CreateCustomCheckpoint( } } + if(db_options.check_point_fake_flush){ + // Write Manifest + db_->UndoFakeFlush(); + } + return s; }