Skip to content

Commit

Permalink
Add data cleanup logic in localDB
Browse files Browse the repository at this point in the history
  • Loading branch information
lex007in committed Jan 17, 2025
1 parent c347a32 commit 4861d06
Show file tree
Hide file tree
Showing 17 changed files with 654 additions and 3 deletions.
24 changes: 24 additions & 0 deletions ydb/core/base/tablet.h
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ struct TEvTablet {
// utilitary
EvCheckBlobstorageStatusResult = EvBoot + 3072,
EvResetTabletResult,
EvGcForStepAckRequest, // from executer to sys tablet
EvGcForStepAckResponse, // from sys tablet to executer

EvEnd
};
Expand Down Expand Up @@ -790,6 +792,28 @@ struct TEvTablet {
{}
};

// will send TEvGcForStepAckResponse when the requested Generation and Step are less
// than the actual garbage collected Generation and Step
struct TEvGcForStepAckRequest : public TEventLocal<TEvGcForStepAckRequest, EvGcForStepAckRequest> {
const ui32 Generation;
const ui32 Step;

TEvGcForStepAckRequest(ui32 generation, ui32 step)
: Generation(generation)
, Step(step)
{}
};

struct TEvGcForStepAckResponse : public TEventLocal<TEvGcForStepAckResponse, EvGcForStepAckResponse> {
const ui32 Generation;
const ui32 Step;

TEvGcForStepAckResponse(ui32 generation, ui32 step)
: Generation(generation)
, Step(step)
{}
};

struct TEvGetCounters : TEventPB<TEvGetCounters, NKikimrTabletBase::TEvGetCounters, EvGetCounters> {};
struct TEvGetCountersResponse : TEventPB<TEvGetCountersResponse, NKikimrTabletBase::TEvGetCountersResponse, EvGetCountersResponse> {};

Expand Down
19 changes: 19 additions & 0 deletions ydb/core/tablet/tablet_sys.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1285,6 +1285,15 @@ void TTablet::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) {
}
break;
case NKikimrProto::OK:
if (GcForStepAckRequest) {
const auto& req = *GcForStepAckRequest->Get();
const ui32 gen = StateStorageInfo.KnownGeneration;
if (std::tie(req.Generation, req.Step) <= std::tie(gen, GcInFlyStep)) {
Send(GcForStepAckRequest->Sender, new TEvTablet::TEvGcForStepAckResponse(gen, GcInFlyStep));
GcForStepAckRequest = nullptr;
}
}
[[fallthrough]];
default: // silently ignore unrecognized errors (assume temporary)
if (GcInFly == 0 && GcNextStep != 0) {
GcLogChannel(std::exchange(GcNextStep, 0));
Expand All @@ -1295,6 +1304,16 @@ void TTablet::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) {
CheckBlobStorageError();
}

void TTablet::Handle(TEvTablet::TEvGcForStepAckRequest::TPtr& ev) {
const auto& req = *ev->Get();
const ui32 gen = StateStorageInfo.KnownGeneration;
if (std::tie(req.Generation, req.Step) <= std::tie(gen, GcInFlyStep)) {
Send(ev->Sender, new TEvTablet::TEvGcForStepAckResponse(gen, GcInFlyStep));
} else {
GcForStepAckRequest = ev;
}
}

void TTablet::GcLogChannel(ui32 step) {
const ui64 tabletid = TabletID();
const ui32 gen = StateStorageInfo.KnownGeneration;
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tablet/tablet_sys.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,7 @@ class TTablet : public TActor<TTablet> {
ui32 GcInFly;
ui32 GcInFlyStep;
ui32 GcNextStep;
TEvTablet::TEvGcForStepAckRequest::TPtr GcForStepAckRequest;
TResourceProfilesPtr ResourceProfiles;
TSharedQuotaPtr TxCacheQuota;
THolder<NTracing::ITrace> IntrospectionTrace;
Expand Down Expand Up @@ -315,6 +316,8 @@ class TTablet : public TActor<TTablet> {
void Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev);
void Handle(TEvTablet::TEvPreCommit::TPtr &ev);

void Handle(TEvTablet::TEvGcForStepAckRequest::TPtr& ev);

void Handle(TEvTablet::TEvCommit::TPtr &ev);
bool HandleNext(TEvTablet::TEvCommit::TPtr &ev);
void Handle(TEvTablet::TEvAux::TPtr &ev);
Expand Down Expand Up @@ -533,6 +536,7 @@ class TTablet : public TActor<TTablet> {
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleByLeader);
hFunc(TEvents::TEvUndelivered, HandleByLeader);
hFunc(TEvBlobStorage::TEvCollectGarbageResult, Handle);
hFunc(TEvTablet::TEvGcForStepAckRequest, Handle);
}
}

Expand Down
42 changes: 40 additions & 2 deletions ydb/core/tablet_flat/flat_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -446,6 +446,7 @@ void TExecutor::Active(const TActorContext &ctx) {

CompactionLogic = THolder<TCompactionLogic>(new TCompactionLogic(MemTableMemoryConsumersCollection.Get(), Logger.Get(), Broker.Get(), this, loadedState->Comp,
Sprintf("tablet-%" PRIu64, Owner->TabletID())));
DataCleanupLogic = MakeHolder<TDataCleanupLogic>(static_cast<NActors::IActorOps*>(this), this, Owner, Logger.Get());
LogicRedo->InstallCounters(Counters.Get(), nullptr);

ResourceMetrics = MakeHolder<NMetrics::TResourceMetrics>(Owner->TabletID(), 0, Launcher);
Expand Down Expand Up @@ -729,6 +730,7 @@ TExecutorCaches TExecutor::CleanupState() {
Y_ABORT_UNLESS(!LogicAlter);
Y_ABORT_UNLESS(!CompactionLogic);
BorrowLogic.Destroy();
DataCleanupLogic.Destroy();

return caches;
}
Expand Down Expand Up @@ -912,6 +914,9 @@ void TExecutor::CheckCollectionBarrier(TIntrusivePtr<TBarrier> &barrier) {
Owner->CompletedLoansChanged(OwnerCtx());
}
}
if (DataCleanupLogic->NeedGC(TGCTime(Generation(), barrier->Step), TGCTime(Generation(), GcLogic->GetActiveGcBarrier()))) {
GcLogic->SendCollectGarbage(ActorContext());
}
}

barrier.Drop();
Expand Down Expand Up @@ -1327,6 +1332,14 @@ void TExecutor::Handle(TEvBlobStorage::TEvGetResult::TPtr& ev, const TActorConte
}
}

void TExecutor::Handle(TEvTablet::TEvGcForStepAckResponse::TPtr &ev) {
if (ev->Get()->Generation != Generation()) {
return;
}

DataCleanupLogic->OnGcForStepAckResponse(ev->Get()->Step, OwnerCtx());
}

void TExecutor::AdvancePendingPartSwitches() {
while (PendingPartSwitches && ApplyReadyPartSwitches()) {
if (Stats->IsFollower()) {
Expand All @@ -1338,6 +1351,11 @@ void TExecutor::AdvancePendingPartSwitches() {
if (PendingPartSwitches.empty()) {
PlanTransactionActivation();
MaybeRelaxRejectProbability();

// followers haven't DataCleanupLogic
if (DataCleanupLogic && DataCleanupLogic->NeedLogSnaphot()) {
MakeLogSnapshot();
}
}
}

Expand Down Expand Up @@ -2643,6 +2661,7 @@ void TExecutor::MakeLogSnapshot() {
BorrowLogic->SnapToLog(snap, *commit);
GcLogic->SnapToLog(snap, commit->Step);
LogicSnap->MakeSnap(snap, *commit, Logger.Get());
DataCleanupLogic->OnMakeLogSnapshot(Generation(), commit->Step, commit->GcDelta);

AttachLeaseCommit(commit.Get(), /* force */ true);
CommitManager->Commit(commit);
Expand Down Expand Up @@ -2936,7 +2955,8 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext
case ECommit::Snap:
LogicSnap->Confirm(msg->Step);

if (NeedFollowerSnapshot)
DataCleanupLogic->OnSnapshotCommited(step, ctx);
if (NeedFollowerSnapshot || DataCleanupLogic->NeedLogSnaphot())
MakeLogSnapshot();

break;
Expand Down Expand Up @@ -2973,6 +2993,8 @@ void TExecutor::Handle(TEvTablet::TEvCommitResult::TPtr &ev, const TActorContext

void TExecutor::Handle(TEvBlobStorage::TEvCollectGarbageResult::TPtr &ev) {
GcLogic->OnCollectGarbageResult(ev);
auto channel = ev->Get()->Channel;
DataCleanupLogic->OnCollectedGarbage(channel, GcLogic->GetCommitedGcBarrier(channel), OwnerCtx());
}

void TExecutor::Handle(TEvResourceBroker::TEvResourceAllocated::TPtr &ev) {
Expand Down Expand Up @@ -3488,6 +3510,8 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)

Y_ABORT_UNLESS(InFlyCompactionGcBarriers.emplace(commit->Step, ops->Barrier).second);

DataCleanupLogic->OnCompleteCompaction(Generation(), commit->Step, tableId, CompactionLogic->GetFinishedCompactionInfo(tableId), commit->GcDelta);

AttachLeaseCommit(commit.Get());
CommitManager->Commit(commit);

Expand Down Expand Up @@ -3519,7 +3543,7 @@ void TExecutor::Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled)

activeTransaction.Done();

if (LogicSnap->MayFlush(false)) {
if (LogicSnap->MayFlush(false) || DataCleanupLogic->NeedLogSnaphot()) {
MakeLogSnapshot();
}
}
Expand Down Expand Up @@ -3881,6 +3905,19 @@ bool TExecutor::CompactTables() {
}
}

void TExecutor::CleanupData() {
if (DataCleanupLogic->TryStartCleanup(GcLogic->GetCommitedGcBarriers())) {
if (Scheme().Tables.empty()) {
DataCleanupLogic->OnNoTables(OwnerCtx());
return;
}
for (const auto& [tableId, _] : Scheme().Tables) {
auto compactionId = CompactionLogic->PrepareForceCompaction(tableId);
DataCleanupLogic->OnCompactionPrepared(tableId, compactionId);
}
}
}

void TExecutor::Handle(NMemory::TEvMemTableRegistered::TPtr &ev) {
const auto *msg = ev->Get();

Expand Down Expand Up @@ -3945,6 +3982,7 @@ STFUNC(TExecutor::StateWork) {
HFunc(NBlockIO::TEvStat, Handle);
hFunc(NMemory::TEvMemTableRegistered, Handle);
hFunc(NMemory::TEvMemTableCompact, Handle);
hFunc(TEvTablet::TEvGcForStepAckResponse, Handle);
default:
break;
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tablet_flat/flat_executor.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#include "flat_exec_commit.h"
#include "flat_executor_misc.h"
#include "flat_executor_compaction_logic.h"
#include "flat_executor_data_cleanup_logic.h"
#include "flat_executor_gclogic.h"
#include "flat_bio_events.h"
#include "flat_bio_stats.h"
Expand Down Expand Up @@ -434,6 +435,7 @@ class TExecutor
THolder<TExecutorGCLogic> GcLogic;
THolder<TCompactionLogic> CompactionLogic;
THolder<TExecutorBorrowLogic> BorrowLogic;
THolder<TDataCleanupLogic> DataCleanupLogic;

TLoadBlobQueue PendingBlobQueue;

Expand Down Expand Up @@ -563,6 +565,7 @@ class TExecutor
void Handle(NBlockIO::TEvStat::TPtr &ev, const TActorContext &ctx);
void Handle(NOps::TEvResult *ops, TProdCompact *msg, bool cancelled);
void Handle(TEvBlobStorage::TEvGetResult::TPtr&, const TActorContext&);
void Handle(TEvTablet::TEvGcForStepAckResponse::TPtr &ev);

void UpdateUsedTabletMemory();
void UpdateCounters(const TActorContext &ctx);
Expand Down Expand Up @@ -637,6 +640,8 @@ class TExecutor
ui64 CompactTable(ui32 tableId) override;
bool CompactTables() override;

void CleanupData() override;

void Handle(NMemory::TEvMemTableRegistered::TPtr &ev);
void Handle(NMemory::TEvMemTableCompact::TPtr &ev);

Expand Down
Loading

0 comments on commit 4861d06

Please sign in to comment.