Skip to content

Commit

Permalink
Merge branch 'main' into yuval/disable-tracing-with-use-txn
Browse files Browse the repository at this point in the history
  • Loading branch information
Yuval-Ariel authored Jan 29, 2024
2 parents c5fbf43 + 94e7a4f commit 6597cf0
Show file tree
Hide file tree
Showing 5 changed files with 75 additions and 35 deletions.
1 change: 1 addition & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ db_stress has been updated as well to take into account that some memtables do n
* stress test: Fix TestIterateAgainstExpected not supporting 0 iterations. TestIterateAgainstExpected was not designed to support value of 0 in FLAGS_num_iterations.
RocksDB has a value of 10 by default and we've added the option to randomize the values from 0 to 100 in https://github.com/speedb-io/speedb/commit/434692a63318036a3995a53001337f18bf467903
* Add more checks for using db_stress with --enable_speedb_features=true
* Proactive Flushes: Have the initiator return a correct answer when it was requested to initate a flush (#812).
* stress test: Adding a trace file by default in PR https://github.com/speedb-io/speedb/pull/797 has revealed some incompatibilities between the trace file and several configurations (more details in https://github.com/speedb-io/speedb/issues/813). Keep the trace file and remove the IsDone assertion.

### Miscellaneous
Expand Down
12 changes: 7 additions & 5 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -462,9 +462,9 @@ class DBImpl : public DB {

// flush initiated by the write buffer manager to free some space
bool InitiateMemoryManagerFlushRequest(size_t min_size_to_flush);
bool InitiateMemoryManagerFlushRequestAtomicFlush(
size_t InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);
bool InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options);

virtual SequenceNumber GetLatestSequenceNumber() const override;
Expand Down Expand Up @@ -1995,15 +1995,17 @@ class DBImpl : public DB {
// Force current memtable contents to be flushed.
Status FlushMemTable(ColumnFamilyData* cfd, const FlushOptions& options,
FlushReason flush_reason,
bool entered_write_thread = false);
bool entered_write_thread = false,
size_t* num_flushes_initiated = nullptr);

// Atomic-flush memtables from quanlified CFs among `provided_candidate_cfds`
// (if non-empty) or amomg all column families and atomically record the
// result to the MANIFEST.
Status AtomicFlushMemTables(
const FlushOptions& options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds = {},
bool entered_write_thread = false);
bool entered_write_thread = false,
size_t* num_flushes_initiated = nullptr);

// Wait until flushing this column family won't stall writes
Status WaitUntilFlushWouldNotStallWrites(ColumnFamilyData* cfd,
Expand Down Expand Up @@ -2156,7 +2158,7 @@ class DBImpl : public DB {
void GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
FlushReason flush_reason, FlushRequest* req);

void SchedulePendingFlush(const FlushRequest& req);
bool SchedulePendingFlush(const FlushRequest& req);

void SchedulePendingCompaction(ColumnFamilyData* cfd);
void SchedulePendingPurge(std::string fname, std::string dir_to_sync,
Expand Down
88 changes: 62 additions & 26 deletions db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2333,7 +2333,12 @@ void DBImpl::GenerateFlushRequest(const autovector<ColumnFamilyData*>& cfds,
Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
const FlushOptions& flush_options,
FlushReason flush_reason,
bool entered_write_thread) {
bool entered_write_thread,
size_t* num_flushes_initiated) {
if (num_flushes_initiated != nullptr) {
*num_flushes_initiated = 0U;
}

// This method should not be called if atomic_flush is true.
assert(!immutable_db_options_.atomic_flush);
if (!flush_options.wait && write_controller_->IsStopped()) {
Expand Down Expand Up @@ -2447,7 +2452,10 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
}
}
for (const auto& req : flush_reqs) {
SchedulePendingFlush(req);
bool pushed_req = SchedulePendingFlush(req);
if (pushed_req && (num_flushes_initiated != nullptr)) {
++(*num_flushes_initiated);
}
}
MaybeScheduleFlushOrCompaction();
}
Expand Down Expand Up @@ -2486,8 +2494,13 @@ Status DBImpl::FlushMemTable(ColumnFamilyData* cfd,
Status DBImpl::AtomicFlushMemTables(
const FlushOptions& flush_options, FlushReason flush_reason,
const autovector<ColumnFamilyData*>& provided_candidate_cfds,
bool entered_write_thread) {
bool entered_write_thread, size_t* num_flushes_initiated) {
assert(immutable_db_options_.atomic_flush);

if (num_flushes_initiated != nullptr) {
*num_flushes_initiated = 0U;
}

if (!flush_options.wait && write_controller_->IsStopped()) {
std::ostringstream oss;
oss << "Writes have been stopped, thus unable to perform manual flush. "
Expand Down Expand Up @@ -2598,7 +2611,10 @@ Status DBImpl::AtomicFlushMemTables(
}
}
GenerateFlushRequest(cfds, flush_reason, &flush_req);
SchedulePendingFlush(flush_req);
bool pushed_req = SchedulePendingFlush(flush_req);
if (pushed_req && (num_flushes_initiated != nullptr)) {
++(*num_flushes_initiated);
}
MaybeScheduleFlushOrCompaction();
}

Expand Down Expand Up @@ -3014,14 +3030,17 @@ ColumnFamilyData* DBImpl::PickCompactionFromQueue(
return cfd;
}

void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
bool DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
mutex_.AssertHeld();
if (reject_new_background_jobs_) {
return;
return false;
}
if (flush_req.cfd_to_max_mem_id_to_persist.empty()) {
return;
return false;
}

bool pushed_req = false;

if (!immutable_db_options_.atomic_flush) {
// For the non-atomic flush case, we never schedule multiple column
// families in the same flush request.
Expand All @@ -3035,6 +3054,7 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
cfd->set_queued_for_flush(true);
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
pushed_req = true;
}
} else {
for (auto& iter : flush_req.cfd_to_max_mem_id_to_persist) {
Expand All @@ -3043,7 +3063,10 @@ void DBImpl::SchedulePendingFlush(const FlushRequest& flush_req) {
}
++unscheduled_flushes_;
flush_queue_.push_back(flush_req);
pushed_req = true;
}

return pushed_req;
}

void DBImpl::SchedulePendingCompaction(ColumnFamilyData* cfd) {
Expand Down Expand Up @@ -3273,11 +3296,6 @@ Status DBImpl::BackgroundFlush(bool* made_progress, JobContext* job_context,
bg_job_limits.max_compactions, bg_flush_scheduled_,
bg_compaction_scheduled_);
}
*reason = bg_flush_args[0].flush_reason_;
if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
*reason == FlushReason::kWriteBufferManagerInitiated);
}

status = FlushMemTablesToOutputFiles(bg_flush_args, made_progress,
job_context, log_buffer, thread_pri);
Expand Down Expand Up @@ -3325,6 +3343,12 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
Status s =
BackgroundFlush(&made_progress, &job_context, &log_buffer, &reason,
&flush_rescheduled_to_retain_udt, thread_pri);

if (write_buffer_manager_) {
write_buffer_manager_->FlushStarted(
reason == FlushReason::kWriteBufferManagerInitiated);
}

if (s.IsTryAgain() && flush_rescheduled_to_retain_udt) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
Expand Down Expand Up @@ -4351,16 +4375,20 @@ bool DBImpl::InitiateMemoryManagerFlushRequest(size_t min_size_to_flush) {
flush_options.allow_write_stall = true;
flush_options.wait = false;

size_t num_flushes_initiated = 0U;
if (immutable_db_options_.atomic_flush) {
return InitiateMemoryManagerFlushRequestAtomicFlush(min_size_to_flush,
flush_options);
num_flushes_initiated = InitiateMemoryManagerFlushRequestAtomicFlush(
min_size_to_flush, flush_options);
} else {
return InitiateMemoryManagerFlushRequestNonAtomicFlush(min_size_to_flush,
flush_options);
num_flushes_initiated = InitiateMemoryManagerFlushRequestNonAtomicFlush(
min_size_to_flush, flush_options);
}

// TODO - Have Proactive Flushes handle num_flushes_initiated > 1
return (num_flushes_initiated > 0U);
}

bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush);

Expand All @@ -4370,7 +4398,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(

SelectColumnFamiliesForAtomicFlush(&cfds);
if (cfds.empty()) {
return false;
return 0U;
}

// min_size_to_flush may be 0.
Expand All @@ -4391,7 +4419,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(
}
}
if (total_size_to_flush < min_size_to_flush) {
return false;
return 0U;
}
}
}
Expand All @@ -4404,17 +4432,21 @@ bool DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush(

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestAtomicFlush::BeforeFlush");
size_t num_flushes_initiated = 0U;
auto s = AtomicFlushMemTables(
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds);
flush_options, FlushReason::kWriteBufferManagerInitiated, cfds,
false /* entered_write_thread */, &num_flushes_initiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"write buffer manager initiated Atomic flush finished, status: %s",
s.ToString().c_str());
return s.ok();

assert(s.ok() || (num_flushes_initiated == 0));
return num_flushes_initiated;
}

bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
size_t min_size_to_flush, const FlushOptions& flush_options) {
assert(immutable_db_options_.atomic_flush == false);

Expand Down Expand Up @@ -4456,7 +4488,7 @@ bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(
}

if (cfd_to_flush == nullptr) {
return false;
return 0U;
}

orig_cfd_to_flush = cfd_to_flush;
Expand Down Expand Up @@ -4503,15 +4535,19 @@ bool DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush(

TEST_SYNC_POINT(
"DBImpl::InitiateMemoryManagerFlushRequestNonAtomicFlush::BeforeFlush");
auto s = FlushMemTable(cfd_to_flush, flush_options,
FlushReason::kWriteBufferManagerInitiated);
size_t num_flushes_initiated = 0U;

auto s = FlushMemTable(
cfd_to_flush, flush_options, FlushReason::kWriteBufferManagerInitiated,
false /* entered_write_thread */, &num_flushes_initiated);

ROCKS_LOG_INFO(
immutable_db_options_.info_log,
"[%s] write buffer manager initialize flush finished, status: %s\n",
cfd_to_flush->GetName().c_str(), s.ToString().c_str());

return s.ok();
assert(s.ok() || (num_flushes_initiated == 0));
return num_flushes_initiated;
}

} // namespace ROCKSDB_NAMESPACE
6 changes: 3 additions & 3 deletions memtable/write_buffer_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -809,9 +809,9 @@ void WriteBufferManager::FlushEnded(bool /* wbm_initiated */) {
// the WBM will not be aware of the number of running flushes at the time
// it is enabled. The counter will become valid once all of the flushes
// that were running when it was enabled will have completed.
if (num_running_flushes_ > 0U) {
--num_running_flushes_;
}
assert(num_running_flushes_ > 0U);
--num_running_flushes_;

size_t curr_memory_used = memory_usage();
RecalcFlushInitiationSize();
ReevaluateNeedForMoreFlushesLockHeld(curr_memory_used);
Expand Down
3 changes: 2 additions & 1 deletion memtable/write_buffer_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -905,7 +905,8 @@ TEST_P(WriteBufferManagerFlushInitiationTest, DISABLED_FlushInitiationSteps) {
DeregisterInitiator(initiator_id);
}

TEST_P(WriteBufferManagerFlushInitiationTest, RegisteringLate) {
// TODO - The test is flaky. Investigate why and either fix it or remvoe it
TEST_P(WriteBufferManagerFlushInitiationTest, DISABLED_RegisteringLate) {
// Reach the 1st step, but no registered initiators
wbm_->ReserveMem(flush_step_size_);
IncNumFlushesToInitiate();
Expand Down

0 comments on commit 6597cf0

Please sign in to comment.