Skip to content

Commit

Permalink
fix flush stall
Browse files Browse the repository at this point in the history
  • Loading branch information
yapple committed Jul 5, 2022
1 parent 19ad1b8 commit 72f512f
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 38 deletions.
29 changes: 7 additions & 22 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -91,37 +91,22 @@ Status DBImpl::UndoFakeFlush() {
cfd->Ref();
cfds.push_back(cfd);
}
mutex_.Unlock();
Status status;
for (auto cfd : cfds) {
auto iter = version_edits_.find(cfd->GetID());
if (iter == version_edits_.end()) continue;
VersionEdit* edit = &iter->second;
if (cfds.size() > 0) {
auto& cfd = cfds[0];
VersionEdit edit_del;
// for (auto f : edit->GetNewFiles()) {
// edit_del.DeleteFile(0, f.second.fd.GetNumber());
// }
// edit_del.set_check_point(true);
mutex_.Lock();
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit_del, &mutex_, nullptr, true);
mutex_.Unlock();
if (!status.ok()) {
break;
}
}

mutex_.Lock();
for (auto cfd : cfds) {
cfd->Unref();
}
mutex_.Unlock();
ReleaseFileNumberFromPendingOutputs(pending_output_elem_);
return status;
}

Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
pending_output_elem_ = CaptureCurrentFileNumberInPendingOutputs();
std::unordered_map<int, VersionEdit> version_edits;
Status status;
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
Expand All @@ -132,15 +117,15 @@ Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
cfd->Ref();
cfds.push_back(cfd);
}
version_edits_.clear();
version_edits.clear();

for (auto cfd : cfds) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits_.insert({cfd->GetID(), edit});
version_edits.insert({cfd->GetID(), edit});
}
for (auto cfd : cfds) {
auto iter = version_edits_.find(cfd->GetID());
auto iter = version_edits.find(cfd->GetID());
int job_id = next_job_id_.fetch_add(1);
VersionEdit* edit = &iter->second;
autovector<MemTable*> mems;
Expand Down Expand Up @@ -175,7 +160,7 @@ Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");

if (status.ok()) {
for (auto iter : version_edits_) {
for (auto iter : version_edits) {
VersionEdit* edit = &iter.second;
int cf_id = iter.first;
for (auto f : edit->GetNewFiles()) {
Expand Down
3 changes: 0 additions & 3 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1832,9 +1832,6 @@ class DBImpl : public DB {

ErrorHandler error_handler_;

std::unordered_map<int, VersionEdit> version_edits_;
std::list<uint64_t>::iterator pending_output_elem_;

// Conditional variable to coordinate installation of atomic flush results.
// With atomic flush, each bg thread installs the result of flushing multiple
// column families, and different threads can flush different column
Expand Down
5 changes: 5 additions & 0 deletions utilities/checkpoint/checkpoint_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,11 @@ Status CheckpointImpl::CreateCustomCheckpoint(
db_->UndoFakeFlush();
}

if (!s.ok()) {
ROCKS_LOG_INFO(db_options.info_log, "CheckPoint Failed %",
s.ToString().c_str());
}

return s;
}

Expand Down
27 changes: 14 additions & 13 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ class CheckpointTest : public testing::Test {
};

TEST_F(CheckpointTest, RepeatGetSnapshotLink) {
for (uint64_t log_size_for_flush : {0, 1000000}) {
for (uint64_t log_size_for_flush : {1000000}) {
Options options;
DB* snapshotDB;
ReadOptions roptions;
Expand All @@ -252,39 +252,40 @@ TEST_F(CheckpointTest, RepeatGetSnapshotLink) {
std::string key = std::string("foo");
ASSERT_OK(Put(key, "v1"));
// Take a snapshot
for (int i = 0; i < 10; i++) {
for (int i = 0; i < 100; i++) {
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
std::string snap_shot = snapshot_name_;
snap_shot.push_back(('a' + i));
for(int k = 0;k<i;k++){
snap_shot.push_back('a');
}
std::cout << snap_shot << " " << i << std::endl;
ASSERT_OK(checkpoint->CreateCheckpoint(snap_shot, log_size_for_flush));
ASSERT_OK(Put(key, "v1"));
ASSERT_EQ("v1", Get(key));
ASSERT_OK(Put(key, "v2"));
ASSERT_EQ("v2", Get(key));
ASSERT_OK(Flush());
ASSERT_EQ("v1", Get(key));
ASSERT_EQ("v2", Get(key));
ASSERT_OK(Put(key, "v1"));
// Open snapshot and verify contents while DB is running
options.create_if_missing = false;
ASSERT_OK(DB::Open(options, snap_shot, &snapshotDB));
ASSERT_OK(snapshotDB->Get(roptions, key, &result));
ASSERT_EQ("v1", result);
delete snapshotDB;
snapshotDB = nullptr;
// ASSERT_OK(DestroyDB(snap_shot, options));
}

delete db_;
db_ = nullptr;

// Destroy original DB
ASSERT_OK(DestroyDB(dbname_, options));

// Open snapshot and verify contents
options.create_if_missing = false;
dbname_ = snapshot_name_;
ASSERT_OK(DB::Open(options, dbname_, &db_));
ASSERT_EQ("v1", Get(key));
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
delete checkpoint;
// ASSERT_OK(DestroyDB(dbname_, options));

delete checkpoint;
// Restore DB name
dbname_ = test::PerThreadDBPath(env_, "db_test");
}
Expand Down

0 comments on commit 72f512f

Please sign in to comment.