Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Revert "Export views" #13509

Merged
merged 1 commit into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion ydb/core/tx/schemeshard/schemeshard__init.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4326,7 +4326,6 @@ struct TSchemeShard::TTxInit : public TTransactionBase<TSchemeShard> {

item.SourcePathId.OwnerId = rowset.GetValueOrDefault<Schema::ExportItems::SourceOwnerPathId>(selfId);
item.SourcePathId.LocalPathId = rowset.GetValue<Schema::ExportItems::SourcePathId>();
item.SourcePathType = rowset.GetValue<Schema::ExportItems::SourcePathType>();

item.State = static_cast<TExportInfo::EState>(rowset.GetValue<Schema::ExportItems::State>());
item.WaitTxId = rowset.GetValueOrDefault<Schema::ExportItems::BackupTxId>(InvalidTxId);
Expand Down
7 changes: 1 addition & 6 deletions ydb/core/tx/schemeshard/schemeshard_export.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,7 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T
NIceDb::TUpdate<Schema::ExportItems::SourcePathName>(item.SourcePathName),
NIceDb::TUpdate<Schema::ExportItems::SourceOwnerPathId>(item.SourcePathId.OwnerId),
NIceDb::TUpdate<Schema::ExportItems::SourcePathId>(item.SourcePathId.LocalPathId),
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State)),
NIceDb::TUpdate<Schema::ExportItems::SourcePathType>(item.SourcePathType)
NIceDb::TUpdate<Schema::ExportItems::State>(static_cast<ui8>(item.State))
);
}
}
Expand Down Expand Up @@ -232,10 +231,6 @@ void TSchemeShard::Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TAct
Execute(CreateTxListExports(ev), ctx);
}

void TSchemeShard::Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx) {
Execute(CreateTxProgressExport(ev), ctx);
}

void TSchemeShard::ResumeExports(const TVector<ui64>& exportIds, const TActorContext& ctx) {
for (const ui64 id : exportIds) {
Execute(CreateTxProgressExport(id), ctx);
Expand Down
146 changes: 17 additions & 129 deletions ydb/core/tx/schemeshard/schemeshard_export__create.cpp
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
#include "schemeshard_xxport__tx_base.h"
#include "schemeshard_xxport__helpers.h"
#include "schemeshard_export.h"
#include "schemeshard_export_flow_proposals.h"
#include "schemeshard_export_helpers.h"
#include "schemeshard_export_scheme_uploader.h"
#include "schemeshard_export.h"
#include "schemeshard_audit_log.h"
#include "schemeshard_impl.h"

Expand Down Expand Up @@ -204,7 +203,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
.IsResolved()
.NotDeleted()
.NotUnderDeleting()
.IsSupportedInExports()
.IsTable()
.FailOnRestrictedCreateInTempZone();

if (!checks) {
Expand All @@ -213,7 +212,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase {
}
}

exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType);
exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId);
exportInfo->PendingItems.push_back(itemIdx);
}

Expand All @@ -231,7 +230,6 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
ui64 Id;
TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr;
TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr;
TEvPrivate::TEvExportSchemeUploadResult::TPtr SchemeUploadResult = nullptr;
TTxId CompletedTxId = InvalidTxId;

explicit TTxProgress(TSelf* self, ui64 id)
Expand All @@ -252,12 +250,6 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
{
}

explicit TTxProgress(TSelf* self, TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev)
: TXxport::TTxBase(self)
, SchemeUploadResult(ev)
{
}

explicit TTxProgress(TSelf* self, TTxId completedTxId)
: TXxport::TTxBase(self)
, CompletedTxId(completedTxId)
Expand All @@ -275,8 +267,6 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
OnAllocateResult(txc, ctx);
} else if (ModifyResult) {
OnModifyResult(txc, ctx);
} else if (SchemeUploadResult) {
OnSchemeUploadResult(txc, ctx);
} else if (CompletedTxId) {
OnNotifyResult(txc, ctx);
} else {
Expand All @@ -300,22 +290,16 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo));
}

bool CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) {
LOG_I("TExport::TTxProgress: CopyTables propose"
<< ": info# " << exportInfo->ToString()
<< ", txId# " << txId);

Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId);
if (AnyOf(exportInfo->Items, [](const TExportInfo::TItem& item) {
return item.SourcePathType == NKikimrSchemeOp::EPathTypeTable;
})) {
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
return true;
}
return false;
Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo));
}

void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId, const TActorContext& ctx) {
void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) {
Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size());
auto& item = exportInfo->Items.at(itemIdx);

Expand All @@ -327,17 +311,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
<< ", txId# " << txId);

Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId);
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
} else if (item.SourcePathType == NKikimrSchemeOp::EPathTypeView) {
Ydb::Export::ExportToS3Settings exportSettings;
Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings));
const auto databaseRoot = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements);

item.SchemeUploader = ctx.Register(CreateSchemeUploader(
Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId, exportSettings, databaseRoot
));
}
Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx));
}

bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) {
Expand Down Expand Up @@ -646,7 +620,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
}
}

void OnAllocateResult(TTransactionContext& txc, const TActorContext& ctx) {
void OnAllocateResult(TTransactionContext&, const TActorContext&) {
Y_ABORT_UNLESS(AllocateResult);

const auto txId = TTxId(AllocateResult->Get()->TxIds.front());
Expand Down Expand Up @@ -677,27 +651,13 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
break;

case EState::CopyTables:
if (!CopyTables(exportInfo, txId)) {
// none of the items is a table
NIceDb::TNiceDb db(txc.DB);

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
exportInfo->Items[itemIdx].State = EState::Transferring;
Self->PersistExportItemState(db, exportInfo, itemIdx);

AllocateTxId(exportInfo, itemIdx);
}

exportInfo->State = EState::Transferring;
Self->PersistExportState(db, exportInfo);
return;
}
CopyTables(exportInfo, txId);
break;

case EState::Transferring:
if (exportInfo->PendingItems) {
itemIdx = popPendingItemIdx(exportInfo->PendingItems);
TransferData(exportInfo, itemIdx, txId, ctx);
TransferData(exportInfo, itemIdx, txId);
} else {
return;
}
Expand Down Expand Up @@ -910,71 +870,6 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
SubscribeTx(txId);
}

void OnSchemeUploadResult(TTransactionContext& txc, const TActorContext& ctx) {
Y_ABORT_UNLESS(SchemeUploadResult);

LOG_D("TExport::TTxProgress: OnSchemeUploadResult"
<< ": success# " << SchemeUploadResult->Get()->Success
<< ", error# " << SchemeUploadResult->Get()->Error
);

const auto exportId = SchemeUploadResult->Get()->ExportId;
auto exportInfo = Self->Exports.Value(exportId, nullptr);
if (!exportInfo) {
LOG_E("TExport::TTxProgress: OnSchemeUploadResult received unknown export id"
<< ": id# " << exportId
);
return;
}

ui32 itemIdx = SchemeUploadResult->Get()->ItemIdx;
if (itemIdx >= exportInfo->Items.size()) {
LOG_E("TExport::TTxProgress: OnSchemeUploadResult item index out of range"
<< ": id# " << exportId
<< ", item index# " << itemIdx
<< ", number of items# " << exportInfo->Items.size()
);
return;
}

NIceDb::TNiceDb db(txc.DB);

auto& item = exportInfo->Items[itemIdx];
item.SchemeUploader = TActorId();

if (!SchemeUploadResult->Get()->Success) {
item.State = EState::Cancelled;
item.Issue = SchemeUploadResult->Get()->Error;
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (!exportInfo->IsInProgress()) {
return;
}

Cancel(exportInfo, itemIdx, "unsuccessful scheme upload");

Self->PersistExportState(db, exportInfo);
return SendNotificationsIfFinished(exportInfo);
}

if (exportInfo->State == EState::Transferring) {
item.State = EState::Done;
Self->PersistExportItemState(db, exportInfo, itemIdx);

if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();

Self->PersistExportState(db, exportInfo);
SendNotificationsIfFinished(exportInfo);

if (exportInfo->IsFinished()) {
AuditLogExportEnd(*exportInfo.Get(), Self);
}
}
}
}

void OnNotifyResult(TTransactionContext& txc, const TActorContext&) {
Y_ABORT_UNLESS(CompletedTxId);
LOG_D("TExport::TTxProgress: OnNotifyResult"
Expand Down Expand Up @@ -1053,18 +948,15 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase
item.State = EState::Done;
item.WaitTxId = InvalidTxId;

bool itemHasIssues = false;
if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) {
if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
item.Issue = *issue;
Cancel(exportInfo, itemIdx, "issues during backing up");
itemHasIssues = true;
if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) {
item.Issue = *issue;
Cancel(exportInfo, itemIdx, "issues during backing up");
} else {
if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
}
}
if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) {
exportInfo->State = EState::Done;
exportInfo->EndTime = TAppData::TimeProvider->Now();
}

Self->PersistExportItemState(db, exportInfo, itemIdx);
break;
Expand Down Expand Up @@ -1124,10 +1016,6 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche
return new TExport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) {
return new TExport::TTxProgress(this, ev);
}

ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) {
return new TExport::TTxProgress(this, completedTxId);
}
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,9 +59,6 @@ THolder<TEvSchemeShard::TEvModifySchemeTransaction> CopyTablesPropose(

for (ui32 itemIdx : xrange(exportInfo->Items.size())) {
const auto& item = exportInfo->Items.at(itemIdx);
if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) {
continue;
}

auto& desc = *copyTables.Add();
desc.SetSrcPath(item.SourcePathName);
Expand Down
Loading
Loading