Skip to content

Commit

Permalink
Reapply "Export views" (ydb-platform#13509)
Browse files Browse the repository at this point in the history
This reverts commit e0f619d.
  • Loading branch information
jepett0 committed Jan 24, 2025
1 parent 066cc81 commit 8e8f603
Show file tree
Hide file tree
Showing 19 changed files with 675 additions and 103 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4329,6 +4329,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {

item.SourcePathId.OwnerId = rowset.GetValueOrDefault<Schema::ExportItems::SourceOwnerPathId>(selfId);
item.SourcePathId.LocalPathId = rowset.GetValue<Schema::ExportItems::SourcePathId>();
item.SourcePathType = rowset.GetValue<Schema::ExportItems::SourcePathType>();

item.State = static_cast<TExportInfo::EState>(rowset.GetValue<Schema::ExportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ExportItems::BackupTxId>(InvalidTxId);
Expand Down
7 changes: 6 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T
NIceDb::TUpdate<Schema::ExportItems::SourcePathName>(item.SourcePathName),
NIceDb::TUpdate<Schema::ExportItems::SourceOwnerPathId>(item.SourcePathId.OwnerId),
NIceDb::TUpdate<Schema::ExportItems::SourcePathId>(item.SourcePathId.LocalPathId),
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State))
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ExportItems::SourcePathType>(item.SourcePathType)
);
}
}
Expand Down Expand Up @@ -231,6 +232,10 @@ void TSchemeShard::Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TAct
Execute(CreateTxListExports(ev), ctx);
}

void TSchemeShard::Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxProgressExport(ev), ctx);
}

void TSchemeShard::ResumeExports(const TVector<ui64>& exportIds, const TActorContext& ctx) {
for (const ui64 id : exportIds) {
Execute(CreateTxProgressExport(id), ctx);
Expand Down
146 changes: 129 additions & 17 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include "schemeshard_xxport__tx_base.h"
#include "schemeshard_xxport__helpers.h"
#include "schemeshard_export.h"
#include "schemeshard_export_flow_proposals.h"
#include "schemeshard_export_helpers.h"
#include "schemeshard_export.h"
#include "schemeshard_export_scheme_uploader.h"
#include "schemeshard_audit_log.h"
#include "schemeshard_impl.h"

Expand Down Expand Up @@ -203,7 +204,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
.IsTable()
.IsSupportedInExports()
.FailOnRestrictedCreateInTempZone();

if (!checks) {
Expand All @@ -212,7 +213,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}
}

exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId);
exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType);
exportInfo->PendingItems.push_back(itemIdx);
}

Expand All @@ -230,6 +231,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
ui64 Id;
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
TEvPrivate::TEvExportSchemeUploadResult::TPtr SchemeUploadResult = nullptr;
TTxId CompletedTxId = InvalidTxId;

explicit TTxProgress(TSelf* self, ui64 id)
Expand All @@ -250,6 +252,12 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}

explicit TTxProgress(TSelf* self, TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev)
: TXxport::TTxBase(self)
, SchemeUploadResult(ev)
{
}

explicit TTxProgress(TSelf* self, TTxId completedTxId)
: TXxport::TTxBase(self)
, CompletedTxId(completedTxId)
Expand All @@ -267,6 +275,8 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
OnAllocateResult(txc, ctx);
} else if (ModifyResult) {
OnModifyResult(txc, ctx);
} else if (SchemeUploadResult) {
OnSchemeUploadResult(txc, ctx);
} else if (CompletedTxId) {
OnNotifyResult(txc, ctx);
} else {
Expand All @@ -290,16 +300,22 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo));
}

void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
bool CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
LOG_I("TExport::TTxProgress: CopyTables propose"
<< ": info# " << exportInfo->ToString()
<< ", txId# " << txId);

Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
if (AnyOf(exportInfo->Items, [](const TExportInfo::TItem& item) {
return item.SourcePathType == NKikimrSchemeOp::EPathTypeTable;
})) {
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
return true;
}
return false;
}

void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) {
void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId, const TActorContext& ctx) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
auto& item = exportInfo->Items.at(itemIdx);

Expand All @@ -311,7 +327,17 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
} else if (item.SourcePathType == NKikimrSchemeOp::EPathTypeView) {
Ydb::Export::ExportToS3Settings exportSettings;
Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
const auto databaseRoot = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements);

item.SchemeUploader = ctx.Register(CreateSchemeUploader(
Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId, exportSettings, databaseRoot
));
}
}

bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
Expand Down Expand Up @@ -620,7 +646,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
}

void OnAllocateResult(TTransactionContext&, const TActorContext&) {
void OnAllocateResult(TTransactionContext& txc, const TActorContext& ctx) {
Y_ABORT_UNLESS(AllocateResult);

const auto txId = TTxId(AllocateResult->Get()->TxIds.front());
Expand Down Expand Up @@ -651,13 +677,27 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CopyTables:
CopyTables(exportInfo, txId);
if (!CopyTables(exportInfo, txId)) {
// none of the items is a table
NIceDb::TNiceDb db(txc.DB);

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
exportInfo->Items[itemIdx].State = EState::Transferring;
Self->PersistExportItemState(db, exportInfo, itemIdx);

AllocateTxId(exportInfo, itemIdx);
}

exportInfo->State = EState::Transferring;
Self->PersistExportState(db, exportInfo);
return;
}
break;

case EState::Transferring:
if (exportInfo->PendingItems) {
itemIdx = popPendingItemIdx(exportInfo->PendingItems);
TransferData(exportInfo, itemIdx, txId);
TransferData(exportInfo, itemIdx, txId, ctx);
} else {
return;
}
Expand Down Expand Up @@ -870,6 +910,71 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
SubscribeTx(txId);
}

void OnSchemeUploadResult(TTransactionContext& txc, const TActorContext& ctx) {
Y_ABORT_UNLESS(SchemeUploadResult);

LOG_D("TExport::TTxProgress: OnSchemeUploadResult"
<< ": success# " << SchemeUploadResult->Get()->Success
<< ", error# " << SchemeUploadResult->Get()->Error
);

const auto exportId = SchemeUploadResult->Get()->ExportId;
auto exportInfo = Self->Exports.Value(exportId, nullptr);
if (!exportInfo) {
LOG_E("TExport::TTxProgress: OnSchemeUploadResult received unknown export id"
<< ": id# " << exportId
);
return;
}

ui32 itemIdx = SchemeUploadResult->Get()->ItemIdx;
if (itemIdx >= exportInfo->Items.size()) {
LOG_E("TExport::TTxProgress: OnSchemeUploadResult item index out of range"
<< ": id# " << exportId
<< ", item index# " << itemIdx
<< ", number of items# " << exportInfo->Items.size()
);
return;
}

NIceDb::TNiceDb db(txc.DB);

auto& item = exportInfo->Items[itemIdx];
item.SchemeUploader = TActorId();

if (!SchemeUploadResult->Get()->Success) {
item.State = EState::Cancelled;
item.Issue = SchemeUploadResult->Get()->Error;
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (!exportInfo->IsInProgress()) {
return;
}

Cancel(exportInfo, itemIdx, "unsuccessful scheme upload");

Self->PersistExportState(db, exportInfo);
return SendNotificationsIfFinished(exportInfo);
}

if (exportInfo->State == EState::Transferring) {
item.State = EState::Done;
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();

Self->PersistExportState(db, exportInfo);
SendNotificationsIfFinished(exportInfo);

if (exportInfo->IsFinished()) {
AuditLogExportEnd(*exportInfo.Get(), Self);
}
}
}
}

void OnNotifyResult(TTransactionContext& txc, const TActorContext&) {
Y_ABORT_UNLESS(CompletedTxId);
LOG_D("TExport::TTxProgress: OnNotifyResult"
Expand Down Expand Up @@ -948,15 +1053,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
item.State = EState::Done;
item.WaitTxId = InvalidTxId;

if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
item.Issue = *issue;
Cancel(exportInfo, itemIdx, "issues during backing up");
} else {
if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
bool itemHasIssues = false;
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
item.Issue = *issue;
Cancel(exportInfo, itemIdx, "issues during backing up");
itemHasIssues = true;
}
}
if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
}

Self->PersistExportItemState(db, exportInfo, itemIdx);
break;
Expand Down Expand Up @@ -1016,6 +1124,10 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche
return new TExport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) {
return new TExport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) {
return new TExport::TTxProgress(this, completedTxId);
}
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
const auto& item = exportInfo->Items.at(itemIdx);
if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) {
continue;
}

auto& desc = *copyTables.Add();
desc.SetSrcPath(item.SourcePathName);
Expand Down
Loading

0 comments on commit 8e8f603

Please sign in to comment.