Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
yapple committed Jun 30, 2022
1 parent 5245313 commit 725326d
Show file tree
Hide file tree
Showing 4 changed files with 121 additions and 119 deletions.
182 changes: 91 additions & 91 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,97 +81,97 @@ int DBImpl::IsFileDeletionsEnabled() const {
return !disable_delete_obsolete_files_;
}

Status DBImpl::UndoFakeFlush() {
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
cfds.push_back(cfd);
}
mutex_.Unlock();
Status status;
for (auto cfd : cfds) {
auto iter = version_edits_.find(cfd->GetID());
if (iter == version_edits_.end()) continue;
VersionEdit* edit = &iter->second;
VersionEdit edit_del;
for (auto f : edit->GetNewFiles()) {
edit_del.DeleteFile(0, f.second.fd.GetNumber());
}
edit_del.set_check_point(true);
mutex_.Lock();
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit_del, &mutex_);
mutex_.Unlock();
if (!status.ok()) {
break;
}
}

mutex_.Lock();
for (auto cfd : cfds) {
cfd->Unref();
}
mutex_.Unlock();

return status;
}

Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
Status status;
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
cfds.push_back(cfd);
}
version_edits_.clear();

for (auto cfd : cfds) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits_.insert({cfd->GetID(), edit});
}
for (auto cfd : cfds) {
auto iter = version_edits_.find(cfd->GetID());
int job_id = next_job_id_.fetch_add(1);
VersionEdit* edit = &iter->second;
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
edit->set_check_point(true);
if (status.ok()) {

status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
edit, &mutex_);
if (!status.ok()) {
break;
}
}
}
for (auto cfd : cfds) {
cfd->Unref();
}
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");

if (status.ok()) {
for (auto iter : version_edits_) {
VersionEdit* edit = &iter.second;
int cf_id = iter.first;
for (auto f : edit->GetNewFiles()) {
ret.push_back(MakeTableFileName("",
f.second.fd.GetNumber()));
}
}
}
return status;
}
//Status DBImpl::UndoFakeFlush() {
// mutex_.Lock();
// autovector<ColumnFamilyData*> cfds;
// for (auto cfd : *versions_->GetColumnFamilySet()) {
// if (cfd->IsDropped()) {
// continue;
// }
// cfd->Ref();
// cfds.push_back(cfd);
// }
// mutex_.Unlock();
// Status status;
// for (auto cfd : cfds) {
// auto iter = version_edits_.find(cfd->GetID());
// if (iter == version_edits_.end()) continue;
// VersionEdit* edit = &iter->second;
// VersionEdit edit_del;
// for (auto f : edit->GetNewFiles()) {
// edit_del.DeleteFile(0, f.second.fd.GetNumber());
// }
// edit_del.set_check_point(true);
// mutex_.Lock();
// status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
// &edit_del, &mutex_);
// mutex_.Unlock();
// if (!status.ok()) {
// break;
// }
// }
//
// mutex_.Lock();
// for (auto cfd : cfds) {
// cfd->Unref();
// }
// mutex_.Unlock();
//
// return status;
//}
//
//Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
// Status status;
// mutex_.Lock();
// autovector<ColumnFamilyData*> cfds;
// for (auto cfd : *versions_->GetColumnFamilySet()) {
// if (cfd->IsDropped()) {
// continue;
// }
// cfd->Ref();
// cfds.push_back(cfd);
// }
// version_edits_.clear();
//
// for (auto cfd : cfds) {
// VersionEdit edit;
// edit.SetColumnFamily(cfd->GetID());
// version_edits_.insert({cfd->GetID(), edit});
// }
// for (auto cfd : cfds) {
// auto iter = version_edits_.find(cfd->GetID());
// int job_id = next_job_id_.fetch_add(1);
// VersionEdit* edit = &iter->second;
// status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
// edit->set_check_point(true);
// if (status.ok()) {
//
// status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
// edit, &mutex_);
// if (!status.ok()) {
// break;
// }
// }
// }
// for (auto cfd : cfds) {
// cfd->Unref();
// }
// mutex_.Unlock();
// TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
// TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");
//
// if (status.ok()) {
// for (auto iter : version_edits_) {
// VersionEdit* edit = &iter.second;
// int cf_id = iter.first;
// for (auto f : edit->GetNewFiles()) {
// ret.push_back(MakeTableFileName("",
// f.second.fd.GetNumber()));
// }
// }
// }
// return status;
//}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size, bool flush_memtable) {
*manifest_file_size = 0;
Expand Down
6 changes: 3 additions & 3 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,9 +277,9 @@ class DBImpl : public DB {
virtual Status EnableFileDeletions(bool force) override;
virtual int IsFileDeletionsEnabled() const;

virtual Status FakeFlush(std::vector<std::string>&) override;

virtual Status UndoFakeFlush() override;
// virtual Status FakeFlush(std::vector<std::string>&) override;
//
// virtual Status UndoFakeFlush() override;

// All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&,
Expand Down
14 changes: 7 additions & 7 deletions include/rocksdb/db.h
Original file line number Diff line number Diff line change
Expand Up @@ -1043,13 +1043,13 @@ class DB {
virtual Status EnableFileDeletions(bool force = true) = 0;


virtual Status FakeFlush(std::vector<std::string>&){
return Status::NotSupported("FakeFlush not implemented");
}

virtual Status UndoFakeFlush(){
return Status::NotSupported("UndoFakeFlush not implemented");
}
// virtual Status FakeFlush(std::vector<std::string>&){
// return Status::NotSupported("FakeFlush not implemented");
// }
//
// virtual Status UndoFakeFlush(){
// return Status::NotSupported("UndoFakeFlush not implemented");
// }

// GetLiveFiles followed by GetSortedWalFiles can generate a lossless backup

Expand Down
38 changes: 20 additions & 18 deletions utilities/checkpoint/checkpoint_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -205,18 +205,20 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}

// this will return live_files prefixed with "/"
if (db_options.check_point_fake_flush) {
s = db_->FakeFlush(fake_flush_files);
if (s.ok()) {
s = db_->GetLiveFiles(live_files, &manifest_file_size, false);
}
if (s.ok()) {
live_files.insert(live_files.end(), fake_flush_files.begin(),
fake_flush_files.end());
}
} else {
s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
}
// if (db_options.check_point_fake_flush) {
// s = db_->FakeFlush(fake_flush_files);
// if (s.ok()) {
// s = db_->GetLiveFiles(live_files, &manifest_file_size, false);
// }
// if (s.ok()) {
// live_files.insert(live_files.end(), fake_flush_files.begin(),
// fake_flush_files.end());
// }
// } else {
// s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);
// }

s = db_->GetLiveFiles(live_files, &manifest_file_size, flush_memtable);

if (s.ok() && db_options.allow_2pc) {
// If 2PC is enabled, we need to get minimum log number after the flush.
Expand Down Expand Up @@ -250,7 +252,8 @@ Status CheckpointImpl::CreateCustomCheckpoint(
db_->FlushWAL(false /* sync */);
}
// if we have more than one column family, we need to also get WAL files
if (s.ok() && !db_options.check_point_fake_flush ) {
// if (s.ok() && !db_options.check_point_fake_flush ) {
if (s.ok()) {
s = db_->GetSortedWalFiles(live_wal_files);
}
if (!s.ok()) {
Expand Down Expand Up @@ -335,11 +338,10 @@ Status CheckpointImpl::CreateCustomCheckpoint(
}
}
}

if(db_options.check_point_fake_flush){
// Write Manifest
db_->UndoFakeFlush();
}
// if(db_options.check_point_fake_flush){
// // Write Manifest
// db_->UndoFakeFlush();
// }

return s;
}
Expand Down

0 comments on commit 725326d

Please sign in to comment.