Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Flink fake flush #7

Open
wants to merge 26 commits into
base: flink-terark-1.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ trace_analyzer_test
java/out
java/target
java/*.log
java/include/org_rocksdb_*.h
java/include/org_terarkdb_*.h

.idea/
*.iml
Expand Down
15 changes: 8 additions & 7 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ endif
endif

ifeq (,$(shell $(CXX) -fsyntax-only -faligned-new -xc++ /dev/null 2>&1))
CXXFLAGS += -ljemalloc -faligned-new -DHAVE_ALIGNED_NEW
CXXFLAGS += -faligned-new -DHAVE_ALIGNED_NEW
endif

ifeq (,$(shell $(CXX) -fsyntax-only -maltivec -xc /dev/null 2>&1))
Expand Down Expand Up @@ -1809,7 +1809,7 @@ rocksdbjavastatic: $(java_static_all_libobjects)
cd java;jar -cf target/$(ROCKSDB_JAR) HISTORY*.md
jar -uf java/target/$(ROCKSDB_JAR) HISTORY*.md
cd java/target;jar -uf $(ROCKSDB_JAR) $(ROCKSDBJNILIB)
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/terarkdb/*.class org/terarkdb/util/*.class
cd java/target/apidocs;jar -cf ../$(ROCKSDB_JAVADOCS_JAR) *
cd java/src/main/java;jar -cf ../../../target/$(ROCKSDB_SOURCES_JAR) org
openssl sha1 java/target/$(ROCKSDB_JAR) | sed 's/.*= \([0-9a-f]*\)/\1/' > java/target/$(ROCKSDB_JAR).sha1
Expand All @@ -1825,7 +1825,7 @@ rocksdbjavastaticrelease: rocksdbjavastatic
cd java;jar -cf target/$(ROCKSDB_JAR_ALL) HISTORY*.md
jar -uf java/target/$(ROCKSDB_JAR_ALL) HISTORY*.md
cd java/target;jar -uf $(ROCKSDB_JAR_ALL) libterarkdbjni-*.so libterarkdbjni-*.jnilib libterarkdbjni-win64.dll
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/rocksdb/*.class org/rocksdb/util/*.class
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/terarkdb/*.class org/terarkdb/util/*.class

frocksdbjavastaticrelease: rocksdbjavastaticrelease
# update license
Expand Down Expand Up @@ -1879,12 +1879,13 @@ frocksdbjavastaticrelease: rocksdbjavastaticrelease
rocksdbjavastaticreleasedocker: rocksdbjavastatic rocksdbjavastaticdockerx86_64
cd java;jar -cf target/$(ROCKSDB_JAR_ALL) HISTORY*.md
cd java/target;jar -uf $(ROCKSDB_JAR_ALL) libterarkdbjni-*.so libterarkdbjni-*.jnilib
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/rocksdb/*.class org/rocksdb/util/*.class
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/terarkdb/*.class org/terarkdb/util/*.class

fterark:rocksdbjavastatic
cd java;jar -cf target/$(ROCKSDB_JAR_ALL) HISTORY*.md
cd java/target;jar -uf $(ROCKSDB_JAR_ALL) libterarkdbjni-*.so libterarkdbjni-*
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/rocksdb/*.class org/rocksdb/util/*.class
cd java/target;jar -uf $(ROCKSDB_JAR_ALL) libterarkdbjni-*
#cd java/target;jar -uf $(ROCKSDB_JAR_ALL) libterarkdbjni-*.so libterarkdbjni-*
cd java/target/classes;jar -uf ../$(ROCKSDB_JAR_ALL) org/terarkdb/*.class org/terarkdb/util/*.class

# update apache license
mkdir -p java/target/META-INF
Expand Down Expand Up @@ -2013,7 +2014,7 @@ rocksdbjava: $(java_all_libobjects)
$(AM_V_at)$(CXX) $(CXXFLAGS) -I./java/. $(JAVA_INCLUDE) -shared -fPIC -o ./java/target/$(ROCKSDBJNILIB) $(JNI_NATIVE_SOURCES) $(java_all_libobjects) $(JAVA_LDFLAGS) $(COVERAGEFLAGS)
$(AM_V_at)cd java;jar -cf target/$(ROCKSDB_JAR) HISTORY*.md
$(AM_V_at)cd java/target;jar -uf $(ROCKSDB_JAR) $(ROCKSDBJNILIB)
$(AM_V_at)cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/rocksdb/*.class org/rocksdb/util/*.class
$(AM_V_at)cd java/target/classes;jar -uf ../$(ROCKSDB_JAR) org/terarkdb/*.class org/terarkdb/util/*.class

frocksdbjava: rocksdbjava
# update license
Expand Down
5 changes: 5 additions & 0 deletions db/builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,11 @@ Status BuildTable(
std::vector<TableProperties>* table_properties_vec, int level,
double compaction_load, const uint64_t creation_time,
const uint64_t oldest_key_time, Env::WriteLifeTimeHint write_hint) {

if(reason == TableFileCreationReason::kRecovery){
TEST_SYNC_POINT("DBImpl::FakeFlush:2");
}

assert((column_family_id ==
TablePropertiesCollectorFactory::Context::kUnknownColumnFamily) ==
column_family_name.empty());
Expand Down
8 changes: 8 additions & 0 deletions db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -419,6 +419,14 @@ class ColumnFamilyData {

Directory* GetDataDir(size_t path_id) const;

void disableAutoCompaction() {
mutable_cf_options_.disable_auto_compactions = true;
}

void enableAutoCompaction() {
mutable_cf_options_.disable_auto_compactions = false;
}

private:
friend class ColumnFamilySet;
ColumnFamilyData(uint32_t id, const std::string& name,
Expand Down
1 change: 1 addition & 0 deletions db/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,7 @@ struct CompactionJob::SubcompactionState {
output->blobs.reserve(blob_map.size());
for (auto& pair : blob_map) {
auto meta = pair.second.first;
if (meta->prop.num_entries == 0) continue;
uint64_t total_bytes = meta->fd.GetFileSize();
uint64_t ref_bytes =
total_bytes * pair.second.second / meta->prop.num_entries;
Expand Down
155 changes: 155 additions & 0 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,161 @@ int DBImpl::IsFileDeletionsEnabled() const {
return !disable_delete_obsolete_files_;
}

Status DBImpl::UndoFakeFlush() {
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
cfds.push_back(cfd);
}
Status status;
if (cfds.size() > 0) {
auto& cfd = cfds[0];
VersionEdit edit_del;
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
&edit_del, &mutex_, nullptr, true);
}
for (auto cfd : cfds) {
cfd->Unref();
}
mutex_.Unlock();
return status;
}

Status DBImpl::FakeFlush(std::vector<std::string>& ret) {
std::unordered_map<int, VersionEdit> version_edits;
Status status;
mutex_.Lock();
autovector<ColumnFamilyData*> cfds;
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[fake-flush] start compaction_queue size: %d garbage_queue size: %d",
compaction_queue_.size(), garbage_collection_queue_.size());
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
}
cfd->Ref();
cfd->disableAutoCompaction();
cfds.push_back(cfd);
}
mutex_.Unlock();
version_edits.clear();
for (auto cfd : cfds) {
VersionEdit edit;
edit.SetColumnFamily(cfd->GetID());
version_edits.insert({cfd->GetID(), edit});
// bool flush_needed = true;
// we don't need to waitFlush because we don't really do flush
// status = WaitUntilFlushWouldNotStallWrites(cfd, &flush_needed);
int cnt = 0;
while (cfd->imm()->IsFlushPending()) {
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] isFlushPending, NumNotFlushed: %d, HasFlushRequested: %d",
cfd->GetName().c_str(), cfd->imm()->NumNotFlushed(),
cfd->imm()->HasFlushRequested());
env_->SleepForMicroseconds(1000000);
cnt++;
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] CheckPoint Waitting flush pending cnt: %d",
cfd->GetName().c_str(), cnt);
if (cnt > 10) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] CheckPoint break MaybeScheduleFlushOrCompaction",
cfd->GetName().c_str());
break;
}
} // while imm isFlushPending

if (cfd->mem()->ApproximateMemoryUsage() >
cfd->GetLatestCFOptions().write_buffer_size * 0.2) {
Status status =
FlushMemTable({cfd}, FlushOptions(), FlushReason::kFakeFlush);
}
}
mutex_.Lock();
for (auto cfd : cfds) {
auto iter = version_edits.find(cfd->GetID());
int job_id = next_job_id_.fetch_add(1);
VersionEdit* edit = &iter->second;
autovector<MemTable*> mems;
cfd->imm()->PickMemtablesToFlush(nullptr, &mems);
for (int i = 0; status.ok() && i < mems.size(); i++) {
auto& m = mems[i];
m->Ref();
try {
status = WriteLevel0TableForRecovery(job_id, cfd, m, edit);
} catch (std::exception e) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" memory usage:%" PRIu64 "",
cfd->GetName().c_str(), m->ApproximateMemoryUsage());
status = Status::Corruption(
"WriteLevel0TableForRecovery immutmemtable Corruption");
}
m->Unref();
}
if (status.ok() && cfd->mem()->num_entries() > 0) {
auto m = cfd->mem();
m->Ref();
try {
TEST_SYNC_POINT("DBImpl::FakeFlush:1");
status = WriteLevel0TableForRecovery(job_id, cfd, m, edit);
} catch (std::exception e) {
ROCKS_LOG_WARN(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery]"
" memory usage:%" PRIu64 "",
cfd->GetName().c_str(), m->ApproximateMemoryUsage());
status = Status::Corruption(
"WriteLevel0TableForRecovery memtable Corruption");
}
m->Unref();
}
// TODO wangyi
edit->set_check_point(true);
if (status.ok()) {
status = versions_->LogAndApply(cfd, *cfd->GetLatestMutableCFOptions(),
edit, &mutex_);
}

if (!mems.empty()) {
cfd->imm()->RollbackMemtableFlush(mems, 0, status);
ROCKS_LOG_INFO(immutable_db_options_.info_log,
"[%s] [WriteLevel0TableForRecovery] immut table size:%d",
cfd->GetName().c_str(), mems.size());
// we should apply the immut table to Manifest
}
if (!status.ok()) {
break;
}
}
ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[fake-flush] end compaction_queue size: %d garbage_queue size: %d",
compaction_queue_.size(), garbage_collection_queue_.size());
for (auto cfd : cfds) {
cfd->enableAutoCompaction();
cfd->Unref();
}
mutex_.Unlock();
TEST_SYNC_POINT("DBImpl::GetLiveFiles:1");
TEST_SYNC_POINT("DBImpl::GetLiveFiles:2");

if (status.ok()) {
for (auto iter : version_edits) {
VersionEdit* edit = &iter.second;
int cf_id = iter.first;
for (auto f : edit->GetNewFiles()) {
ret.push_back(MakeTableFileName("", f.second.fd.GetNumber()));
}
}
}
return status;
}
Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
uint64_t* manifest_file_size, bool flush_memtable) {
*manifest_file_size = 0;
Expand Down
29 changes: 17 additions & 12 deletions db/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1057,24 +1057,29 @@ void DBImpl::ScheduleTtlGC() {
return v == port::kMaxUint64 ? 0 : v;
};
ROCKS_LOG_BUFFER(
&log_buffer_info,
&log_buffer_debug,
"SST #%" PRIu64 " ttl schedule debug info @L%d , property: (%" PRIu64
" , %" PRIu64 ") now: %" PRIu64 " deleted key: %" PRIu64,
meta->fd.GetNumber(), l, max_to_zero(meta->prop.earliest_time_begin_compact),
max_to_zero(meta->prop.latest_time_end_compact), now, meta->prop.num_deletions);
if( meta->prop.num_deletions > meta->prop.num_entries * 0.2 ){
meta->marked_for_compaction = FileMetaData::kMarkedFromRangeDeletion;
ROCKS_LOG_BUFFER(
&log_buffer_info,
"SST #%" PRIu64 " ttl schedule debug info @L%d , property: (%" PRIu64
" , %" PRIu64 ") now: %" PRIu64 " deleted key: %" PRIu64 " marked as range_deletion",
meta->fd.GetNumber(), l, max_to_zero(meta->prop.earliest_time_begin_compact),
max_to_zero(meta->prop.latest_time_end_compact), now, meta->prop.num_deletions);
}
++total_count;
bool marked =
!!(meta->marked_for_compaction & FileMetaData::kMarkedFromTTL);
!!(meta->marked_for_compaction & (FileMetaData::kMarkedFromTTL | FileMetaData::kMarkedFromRangeDeletion));
++total_count;
old_mark_count += marked;
if (!marked &&
meta->prop.num_deletions > meta->prop.num_entries * 0.2) {
meta->marked_for_compaction |= FileMetaData::kMarkedFromRangeDeletion;
ROCKS_LOG_BUFFER(&log_buffer_info,
"SST #%" PRIu64
" ttl schedule debug info @L%d , property: (%" PRIu64
" , %" PRIu64 ") now: %" PRIu64
" deleted key: %" PRIu64 " marked as range_deletion",
meta->fd.GetNumber(), l,
max_to_zero(meta->prop.earliest_time_begin_compact),
max_to_zero(meta->prop.latest_time_end_compact), now,
meta->prop.num_deletions);
marked = true;
}
TEST_SYNC_POINT("DBImpl:Exist-SST");
if (!marked && has_ttl &&
should_marked_for_compacted(
Expand Down
5 changes: 5 additions & 0 deletions db/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,11 @@ class DBImpl : public DB {
virtual Status DisableFileDeletions() override;
virtual Status EnableFileDeletions(bool force) override;
virtual int IsFileDeletionsEnabled() const;

virtual Status FakeFlush(std::vector<std::string>&) override;

virtual Status UndoFakeFlush() override;

// All the returned filenames start with "/"
virtual Status GetLiveFiles(std::vector<std::string>&,
uint64_t* manifest_file_size,
Expand Down
1 change: 1 addition & 0 deletions db/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1407,6 +1407,7 @@ Status DBImpl::ScheduleFlushes(WriteContext* context) {
SchedulePendingFlush(flush_req_vec, FlushReason::kWriteBufferFull);
MaybeScheduleFlushOrCompaction();
}
TEST_SYNC_POINT("DBImpl::BufferFull");
return status;
}

Expand Down
2 changes: 2 additions & 0 deletions db/flush_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ const char* GetFlushReasonString(FlushReason flush_reason) {
return "Error Recovery";
case FlushReason::kInstallTimeout:
return "Install Timeout";
case FlushReason::kFakeFlush:
return "Fake Flush";
default:
return "Invalid";
}
Expand Down
6 changes: 5 additions & 1 deletion db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -877,7 +877,11 @@ bool VersionBuilder::CheckConsistencyForNumLevels() {
return rep_->CheckConsistencyForNumLevels();
}

void VersionBuilder::Apply(VersionEdit* edit) { rep_->Apply(edit); }
void VersionBuilder::Apply(VersionEdit* edit) {
// not change the current state
if(edit->check_point()) return;
rep_->Apply(edit);
}

void VersionBuilder::SaveTo(VersionStorageInfo* vstorage,
double maintainer_job_ratio) {
Expand Down
1 change: 1 addition & 0 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ void VersionEdit::Clear() {
is_open_db_ = false;
is_in_atomic_group_ = false;
remaining_entries_ = 0;
for_checkpoint_ = false;
}

bool VersionEdit::EncodeTo(std::string* dst) const {
Expand Down
4 changes: 4 additions & 0 deletions db/version_edit.h
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,9 @@ class VersionEdit {
std::string DebugString(bool hex_key = false) const;
std::string DebugJSON(int edit_num, bool hex_key = false) const;

bool check_point() const { return for_checkpoint_; }
void set_check_point(bool b) { for_checkpoint_ = b; }

private:
friend class VersionSet;
friend class Version;
Expand Down Expand Up @@ -450,6 +453,7 @@ class VersionEdit {
bool is_column_family_add_;
std::string column_family_name_;

bool for_checkpoint_;
bool is_open_db_;
bool is_in_atomic_group_;
uint32_t remaining_entries_;
Expand Down
4 changes: 1 addition & 3 deletions db/version_set.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3192,9 +3192,7 @@ Status VersionSet::ProcessManifestWrites(std::deque<ManifestWriter>& writers,
Status s;

assert(pending_manifest_file_number_ == 0);
if (!descriptor_log_ ||
manifest_file_size_ > db_options_->max_manifest_file_size ||
manifest_edit_count_ > db_options_->max_manifest_edit_count) {
if (!descriptor_log_) {
pending_manifest_file_number_ = NewFileNumber();
batch_edits.back()->SetNextFile(next_file_number_.load());
new_descriptor_log = true;
Expand Down
Loading