Skip to content

Commit

Permalink
1. catch the WriteLevel0TableForRecovery
Browse files Browse the repository at this point in the history
2. Ref the MemTable

3. WaitUnitFlushWouldNotStallWrites

4. Wait FlushPending finished
  • Loading branch information
yapple committed Jul 11, 2022
1 parent 0a0708c commit 5957e3b
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 12 deletions.
5 changes: 5 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ Status BuildTable(
std::vector<TableProperties>* table_properties_vec, int level,
double compaction_load, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {

if(reason == TableFileCreationReason::kRecovery){
TEST_SYNC_POINT("DBImpl::FakeFlush:2");
}

assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
Expand Down
58 changes: 50 additions & 8 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -117,31 +117,64 @@ Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
cfd->Ref();
cfds.push_back(cfd);
}
mutex_.Unlock();
version_edits.clear();

auto need_force_flush = [](ColumnFamilyData* cfd) {
return cfd->ioptions()->atomic_flush_group == nullptr &&
cfd->imm()->HasFlushRequested();
};
for (auto cfd : cfds) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits.insert({cfd->GetID(), edit});
bool flush_needed = true;
status = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
if(!status.ok()){
break;
}
while(cfd->imm()->IsFlushPending() || need_force_flush(cfd)){
bg_cv_.TimedWait(1000);
}
}
mutex_.Lock();
for (auto cfd : cfds) {
auto iter = version_edits.find(cfd->GetID());
int job_id = next_job_id_.fetch_add(1);
VersionEdit* edit = &iter->second;
autovector<MemTable*> mems;
cfd->imm()->PickMemtablesToFlush(nullptr,&mems);
if (!mems.empty()) {
cfd->imm()->RollbackMemtableFlush(mems, 0, status);
for (int i = 0; status.ok() && i < mems.size(); i++) {
auto& m = mems[i];
m->Ref();
try {
status = WriteLevel0TableForRecovery(job_id, cfd, m, edit);
} catch (std::exception e) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" memory usage:%" PRIu64 "",
cfd->GetName().c_str(), m->ApproximateMemoryUsage());
status = Status::Corruption(
"WriteLevel0TableForRecovery immutmemtable Corruption");
}
m->Unref();
}
if (status.ok()) {
for (auto& m : mems) {
auto m = cfd->mem();
m->Ref();
try {
TEST_SYNC_POINT("DBImpl::FakeFlush:1");
status = WriteLevel0TableForRecovery(job_id, cfd, m, edit);
if (!status.ok()) break;
} catch (std::exception e) {
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" memory usage:%" PRIu64 "",
cfd->GetName().c_str(), m->ApproximateMemoryUsage());
status = Status::Corruption(
"WriteLevel0TableForRecovery memtable Corruption");
}
m->Unref();
}
if(status.ok()){
status = WriteLevel0TableForRecovery(job_id, cfd, cfd->mem(), edit);
}
// TODO wangyi
edit->set_check_point(true);
if (status.ok()) {

Expand All @@ -151,6 +184,15 @@ Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
break;
}
}

if (!mems.empty()) {
cfd->imm()->RollbackMemtableFlush(mems, 0, status);
ROCKS_LOG_DEBUG(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery] immut table size:%d",
cfd->GetName().c_str(), mems.size());
// we should apply the immut table to Manifest
}

}
for (auto cfd : cfds) {
cfd->Unref();
Expand Down
55 changes: 51 additions & 4 deletions utilities/checkpoint/checkpoint_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,53 @@ class CheckpointTest : public testing::Test {
return result;
}
};
TEST_F(CheckpointTest, FakeFlush) {
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::FakeFlush:1", "CheckpointTest::Flush1"},
{"CheckpointTest::Flush2","DBImpl::FakeFlush:2" }});
TERARKDB_NAMESPACE::SyncPoint::GetInstance()->EnableProcessing();
Options options;
DB* snapshotDB;
ReadOptions roptions;
std::string result;
Checkpoint* checkpoint;

options = CurrentOptions();
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
// Create a database
Status s;
options.create_if_missing = true;
options.write_buffer_size = 256 << 20;
ASSERT_OK(DB::Open(options, dbname_, &db_));
std::string key = std::string("foo");

for(int i = 0;i < 256;i++){
std::string k = key;
k.push_back('a' + i);
ASSERT_OK(Put(key, "v1"));
}
ASSERT_OK(Checkpoint::Create(db_, &checkpoint));
TERARKDB_NAMESPACE::port::Thread t1([&]() {
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
std::cout << "snapshot finished" << std::endl;
});
TERARKDB_NAMESPACE::port::Thread t2([&]() {
TEST_SYNC_POINT("CheckpointTest::Flush1");
std::cout << "flush started" << std::endl;
ASSERT_OK(Flush());
std::cout << "flush finished" << std::endl;
TEST_SYNC_POINT("CheckpointTest::Flush2");
});
t1.join();
t2.join();
delete db_;
db_ = nullptr;
ASSERT_OK(DestroyDB(dbname_, options));
delete checkpoint;
}

TEST_F(CheckpointTest, GetSnapshotLinkAndFlush) {
for (uint64_t log_size_for_flush : {1000000}) {
Options options;
Expand Down Expand Up @@ -320,10 +367,10 @@ TEST_F(CheckpointTest, RepeatWriteSnapShot){
db_->Put(wo,k,value);
}
ASSERT_OK(checkpoint->CreateCheckpoint(snapshot_name_));
options.create_if_missing = false;
ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB));
delete snapshotDB;
snapshotDB = nullptr;
// options.create_if_missing = false;
// ASSERT_OK(DB::Open(options, snapshot_name_, &snapshotDB));
// delete snapshotDB;
// snapshotDB = nullptr;
ASSERT_OK(DestroyDB(snapshot_name_, options));
}

Expand Down

0 comments on commit 5957e3b

Please sign in to comment.