Skip to content

Commit

Permalink
support fake flush for check_point
Browse files Browse the repository at this point in the history
  • Loading branch information
yapple committed Jun 27, 2022
1 parent 9121d1d commit 8bb1226
Show file tree
Hide file tree
Showing 8 changed files with 130 additions and 2 deletions.
91 changes: 91 additions & 0 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,97 @@ int DBImpl::IsFileDeletionsEnabled() const {
return !disable_delete_obsolete_files_;
}

Status DBImpl::UndoFakeFlush() {
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
cfds.push_back(cfd);
}
mutex_.Unlock();
Status status;
for (auto cfd : cfds) {
auto iter = version_edits_.find(cfd->GetID());
if (iter == version_edits_.end()) continue;
VersionEdit* edit = &iter->second;
VersionEdit edit_del;
for (auto f : edit->GetNewFiles()) {
edit_del.DeleteFile(0, f.second.fd.GetNumber());
}
edit_del.set_check_point(true);
mutex_.Lock();
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit_del, &mutex_);
mutex_.Unlock();
if (!status.ok()) {
break;
}
}

mutex_.Lock();
for (auto cfd : cfds) {
cfd->Unref();
}
mutex_.Unlock();

return status;
}

Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
Status status;
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
cfds.push_back(cfd);
}
mutex_.Unlock();
version_edits_.clear();

for (auto cfd : cfds) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits_.insert({cfd->GetID(), edit});
}
for (auto cfd : cfds) {
auto iter = version_edits_.find(cfd->GetID());
int job_id = next_job_id_.fetch_add(1);
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
edit->set_check_point(true);
if (status.ok()) {
mutex_.Lock();
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
edit, &mutex_);
mutex_.Unlock();
if (!status.ok()) {
break;
}
}
}
mutex_.Lock();
for (auto cfd : cfds) {
cfd->Unref();
}
mutex_.Unlock();

if (status.ok()) {
for (auto iter : version_edits_) {
VersionEdit* edit = &iter.second;
int cf_id = iter.first;
for (auto f : edit->GetNewFiles()) {
ret.push_back(MakeTableFileName("", f.second.fd.GetNumber()));
}
}
}
return status;
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size, bool flush_memtable) {
*manifest_file_size = 0;
Expand Down
7 changes: 7 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ class DBImpl : public DB {
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual int IsFileDeletionsEnabled() const;

virtual Status FakeFlush(std::vector<std::string>&) override;

virtual Status UndoFakeFlush() override;

// All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
Expand Down Expand Up @@ -1827,6 +1832,8 @@ class DBImpl : public DB {

ErrorHandler error_handler_;

std::unordered_map<int, VersionEdit> version_edits_;

// Conditional variable to coordinate installation of atomic flush results.
// With atomic flush, each bg thread installs the result of flushing multiple
// column families, and different threads can flush different column
Expand Down
6 changes: 5 additions & 1 deletion db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,11 @@ bool VersionBuilder::CheckConsistencyForNumLevels() {
return rep_->CheckConsistencyForNumLevels();
}

void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
void VersionBuilder::Apply(VersionEdit* edit) {
// not change the current state
if(edit->check_point()) return;
rep_->Apply(edit);
}

void VersionBuilder::SaveTo(VersionStorageInfo* vstorage,
double maintainer_job_ratio) {
Expand Down
4 changes: 4 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ class VersionEdit {
std::string DebugString(bool hex_key = false) const;
std::string DebugJSON(int edit_num, bool hex_key = false) const;

bool check_point() { return for_checkpoint_; }
void set_check_point(bool b) { for_checkpoint_ = b; }

private:
friend class VersionSet;
friend class Version;
Expand Down Expand Up @@ -450,6 +453,7 @@ class VersionEdit {
bool is_column_family_add_;
std::string column_family_name_;

bool for_checkpoint_;
bool is_open_db_;
bool is_in_atomic_group_;
uint32_t remaining_entries_;
Expand Down
7 changes: 7 additions & 0 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1042,6 +1042,11 @@ class DB {
// threads call EnableFileDeletions()
virtual Status EnableFileDeletions(bool force = true) = 0;


virtual Status FakeFlush(std::vector<std::string>&) = 0;

virtual Status UndoFakeFlush() = 0;

// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup

// Retrieve the list of all files in the database. The files are
Expand All @@ -1058,6 +1063,8 @@ class DB {
// you still need to call GetSortedWalFiles after GetLiveFiles to compensate
// for new data that arrived to already-flushed column families while other
// column families were flushing


virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
bool flush_memtable = true) = 0;
Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/options.h
Original file line number Diff line number Diff line change
Expand Up @@ -993,6 +993,8 @@ struct DBOptions {
// transaction is encountered in the WAL
bool allow_2pc = false;

bool check_point_fake_flush = true;

// A global cache for table-level rows.
// Default: nullptr (disabled)
// Not supported in ROCKSDB_LITE mode!
Expand Down
12 changes: 11 additions & 1 deletion utilities/checkpoint/checkpoint_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,12 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}

// this will return live_files prefixed with "/"
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
if(db_options.check_point_fake_flush){
s = db_->FakeFlush(live_files);
s = db_->GetLiveFiles(live_files, &manifest_file_size, false);
}else{
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
}

if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
Expand Down Expand Up @@ -324,6 +329,11 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}
}

if(db_options.check_point_fake_flush){
// Write Manifest
db_->UndoFakeFlush();
}

return s;
}

Expand Down
3 changes: 3 additions & 0 deletions utilities/document/document_db.cc
Original file line number Diff line number Diff line change
Expand Up @@ -732,6 +732,9 @@ class DocumentDBImpl : public DocumentDB {
}
delete primary_key_column_family_;
}
virtual Status FakeFlush(std::vector<std::string>&) override;

virtual Status UndoFakeFlush() override;

virtual Status CreateIndex(const WriteOptions& write_options,
const IndexDescriptor& index) override {
Expand Down

0 comments on commit 8bb1226

Please sign in to comment.