From 8e8f603485cf2032f249f2065b0bf2207f809436 Mon Sep 17 00:00:00 2001 From: Daniil Demin Date: Tue, 21 Jan 2025 15:53:58 +0000 Subject: [PATCH] Reapply "Export views" (#13509) This reverts commit e0f619d4043dd00d032f2bbe9d5dbc1ad9389d60. --- ydb/core/tx/schemeshard/schemeshard__init.cpp | 1 + .../tx/schemeshard/schemeshard_export.cpp | 7 +- .../schemeshard_export__create.cpp | 146 +++++++- .../schemeshard_export_flow_proposals.cpp | 3 + .../schemeshard_export_scheme_uploader.cpp | 317 ++++++++++++++++++ .../schemeshard_export_scheme_uploader.h | 16 + ...eshard_export_scheme_uploader_fallback.cpp | 39 +++ ydb/core/tx/schemeshard/schemeshard_impl.cpp | 10 + ydb/core/tx/schemeshard/schemeshard_impl.h | 2 + .../tx/schemeshard/schemeshard_info_types.h | 5 +- ydb/core/tx/schemeshard/schemeshard_path.cpp | 14 + ydb/core/tx/schemeshard/schemeshard_path.h | 1 + ydb/core/tx/schemeshard/schemeshard_private.h | 20 ++ ydb/core/tx/schemeshard/schemeshard_schema.h | 4 +- ydb/core/tx/schemeshard/ya.make | 3 + ydb/library/backup/backup.cpp | 77 +---- .../lib/ydb_cli/dump/util/view_utils.cpp | 86 ++++- ydb/public/lib/ydb_cli/dump/util/view_utils.h | 19 +- .../flat_schemeshard.schema | 8 +- 19 files changed, 675 insertions(+), 103 deletions(-) create mode 100644 ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp create mode 100644 ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h create mode 100644 ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp diff --git a/ydb/core/tx/schemeshard/schemeshard__init.cpp b/ydb/core/tx/schemeshard/schemeshard__init.cpp index 2ec3ee1acd75..938215b4e957 100644 --- a/ydb/core/tx/schemeshard/schemeshard__init.cpp +++ b/ydb/core/tx/schemeshard/schemeshard__init.cpp @@ -4329,6 +4329,7 @@ struct TSchemeShard::TTxInit : public TTransactionBase { item.SourcePathId.OwnerId = rowset.GetValueOrDefault(selfId); item.SourcePathId.LocalPathId = rowset.GetValue(); + item.SourcePathType = rowset.GetValue(); item.State = static_cast(rowset.GetValue()); item.WaitTxId = rowset.GetValueOrDefault(InvalidTxId); diff --git a/ydb/core/tx/schemeshard/schemeshard_export.cpp b/ydb/core/tx/schemeshard/schemeshard_export.cpp index ea6f881ee39a..cba13774cc96 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export.cpp @@ -170,7 +170,8 @@ void TSchemeShard::PersistCreateExport(NIceDb::TNiceDb& db, const TExportInfo::T NIceDb::TUpdate(item.SourcePathName), NIceDb::TUpdate(item.SourcePathId.OwnerId), NIceDb::TUpdate(item.SourcePathId.LocalPathId), - NIceDb::TUpdate(static_cast(item.State)) + NIceDb::TUpdate(static_cast(item.State)), + NIceDb::TUpdate(item.SourcePathType) ); } } @@ -231,6 +232,10 @@ void TSchemeShard::Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TAct Execute(CreateTxListExports(ev), ctx); } +void TSchemeShard::Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx) { + Execute(CreateTxProgressExport(ev), ctx); +} + void TSchemeShard::ResumeExports(const TVector& exportIds, const TActorContext& ctx) { for (const ui64 id : exportIds) { Execute(CreateTxProgressExport(id), ctx); diff --git a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp index 7f63a60a7970..4705dfe6b638 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export__create.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export__create.cpp @@ -1,8 +1,9 @@ #include "schemeshard_xxport__tx_base.h" #include "schemeshard_xxport__helpers.h" +#include "schemeshard_export.h" #include "schemeshard_export_flow_proposals.h" #include "schemeshard_export_helpers.h" -#include "schemeshard_export.h" +#include "schemeshard_export_scheme_uploader.h" #include "schemeshard_audit_log.h" #include "schemeshard_impl.h" @@ -203,7 +204,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { .IsResolved() .NotDeleted() .NotUnderDeleting() - .IsTable() + .IsSupportedInExports() .FailOnRestrictedCreateInTempZone(); if (!checks) { @@ -212,7 +213,7 @@ struct TSchemeShard::TExport::TTxCreate: public TSchemeShard::TXxport::TTxBase { } } - exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId); + exportInfo->Items.emplace_back(item.source_path(), path.Base()->PathId, path->PathType); exportInfo->PendingItems.push_back(itemIdx); } @@ -230,6 +231,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase ui64 Id; TEvTxAllocatorClient::TEvAllocateResult::TPtr AllocateResult = nullptr; TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr ModifyResult = nullptr; + TEvPrivate::TEvExportSchemeUploadResult::TPtr SchemeUploadResult = nullptr; TTxId CompletedTxId = InvalidTxId; explicit TTxProgress(TSelf* self, ui64 id) @@ -250,6 +252,12 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase { } + explicit TTxProgress(TSelf* self, TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) + : TXxport::TTxBase(self) + , SchemeUploadResult(ev) + { + } + explicit TTxProgress(TSelf* self, TTxId completedTxId) : TXxport::TTxBase(self) , CompletedTxId(completedTxId) @@ -267,6 +275,8 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase OnAllocateResult(txc, ctx); } else if (ModifyResult) { OnModifyResult(txc, ctx); + } else if (SchemeUploadResult) { + OnSchemeUploadResult(txc, ctx); } else if (CompletedTxId) { OnNotifyResult(txc, ctx); } else { @@ -290,16 +300,22 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase Send(Self->SelfId(), MkDirPropose(Self, txId, exportInfo)); } - void CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) { + bool CopyTables(TExportInfo::TPtr exportInfo, TTxId txId) { LOG_I("TExport::TTxProgress: CopyTables propose" << ": info# " << exportInfo->ToString() << ", txId# " << txId); Y_ABORT_UNLESS(exportInfo->WaitTxId == InvalidTxId); - Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo)); + if (AnyOf(exportInfo->Items, [](const TExportInfo::TItem& item) { + return item.SourcePathType == NKikimrSchemeOp::EPathTypeTable; + })) { + Send(Self->SelfId(), CopyTablesPropose(Self, txId, exportInfo)); + return true; + } + return false; } - void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId) { + void TransferData(TExportInfo::TPtr exportInfo, ui32 itemIdx, TTxId txId, const TActorContext& ctx) { Y_ABORT_UNLESS(itemIdx < exportInfo->Items.size()); auto& item = exportInfo->Items.at(itemIdx); @@ -311,7 +327,17 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase << ", txId# " << txId); Y_ABORT_UNLESS(item.WaitTxId == InvalidTxId); - Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx)); + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + Send(Self->SelfId(), BackupPropose(Self, txId, exportInfo, itemIdx)); + } else if (item.SourcePathType == NKikimrSchemeOp::EPathTypeView) { + Ydb::Export::ExportToS3Settings exportSettings; + Y_ABORT_UNLESS(exportSettings.ParseFromString(exportInfo->Settings)); + const auto databaseRoot = TStringBuilder() << '/' << JoinSeq('/', Self->RootPathElements); + + item.SchemeUploader = ctx.Register(CreateSchemeUploader( + Self->SelfId(), exportInfo->Id, itemIdx, item.SourcePathId, exportSettings, databaseRoot + )); + } } bool CancelTransferring(TExportInfo::TPtr exportInfo, ui32 itemIdx) { @@ -620,7 +646,7 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase } } - void OnAllocateResult(TTransactionContext&, const TActorContext&) { + void OnAllocateResult(TTransactionContext& txc, const TActorContext& ctx) { Y_ABORT_UNLESS(AllocateResult); const auto txId = TTxId(AllocateResult->Get()->TxIds.front()); @@ -651,13 +677,27 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase break; case EState::CopyTables: - CopyTables(exportInfo, txId); + if (!CopyTables(exportInfo, txId)) { + // none of the items is a table + NIceDb::TNiceDb db(txc.DB); + + for (ui32 itemIdx : xrange(exportInfo->Items.size())) { + exportInfo->Items[itemIdx].State = EState::Transferring; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + AllocateTxId(exportInfo, itemIdx); + } + + exportInfo->State = EState::Transferring; + Self->PersistExportState(db, exportInfo); + return; + } break; case EState::Transferring: if (exportInfo->PendingItems) { itemIdx = popPendingItemIdx(exportInfo->PendingItems); - TransferData(exportInfo, itemIdx, txId); + TransferData(exportInfo, itemIdx, txId, ctx); } else { return; } @@ -870,6 +910,71 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase SubscribeTx(txId); } + void OnSchemeUploadResult(TTransactionContext& txc, const TActorContext& ctx) { + Y_ABORT_UNLESS(SchemeUploadResult); + + LOG_D("TExport::TTxProgress: OnSchemeUploadResult" + << ": success# " << SchemeUploadResult->Get()->Success + << ", error# " << SchemeUploadResult->Get()->Error + ); + + const auto exportId = SchemeUploadResult->Get()->ExportId; + auto exportInfo = Self->Exports.Value(exportId, nullptr); + if (!exportInfo) { + LOG_E("TExport::TTxProgress: OnSchemeUploadResult received unknown export id" + << ": id# " << exportId + ); + return; + } + + ui32 itemIdx = SchemeUploadResult->Get()->ItemIdx; + if (itemIdx >= exportInfo->Items.size()) { + LOG_E("TExport::TTxProgress: OnSchemeUploadResult item index out of range" + << ": id# " << exportId + << ", item index# " << itemIdx + << ", number of items# " << exportInfo->Items.size() + ); + return; + } + + NIceDb::TNiceDb db(txc.DB); + + auto& item = exportInfo->Items[itemIdx]; + item.SchemeUploader = TActorId(); + + if (!SchemeUploadResult->Get()->Success) { + item.State = EState::Cancelled; + item.Issue = SchemeUploadResult->Get()->Error; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (!exportInfo->IsInProgress()) { + return; + } + + Cancel(exportInfo, itemIdx, "unsuccessful scheme upload"); + + Self->PersistExportState(db, exportInfo); + return SendNotificationsIfFinished(exportInfo); + } + + if (exportInfo->State == EState::Transferring) { + item.State = EState::Done; + Self->PersistExportItemState(db, exportInfo, itemIdx); + + if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { + exportInfo->State = EState::Done; + exportInfo->EndTime = TAppData::TimeProvider->Now(); + + Self->PersistExportState(db, exportInfo); + SendNotificationsIfFinished(exportInfo); + + if (exportInfo->IsFinished()) { + AuditLogExportEnd(*exportInfo.Get(), Self); + } + } + } + } + void OnNotifyResult(TTransactionContext& txc, const TActorContext&) { Y_ABORT_UNLESS(CompletedTxId); LOG_D("TExport::TTxProgress: OnNotifyResult" @@ -948,15 +1053,18 @@ struct TSchemeShard::TExport::TTxProgress: public TSchemeShard::TXxport::TTxBase item.State = EState::Done; item.WaitTxId = InvalidTxId; - if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { - item.Issue = *issue; - Cancel(exportInfo, itemIdx, "issues during backing up"); - } else { - if (AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { - exportInfo->State = EState::Done; - exportInfo->EndTime = TAppData::TimeProvider->Now(); + bool itemHasIssues = false; + if (item.SourcePathType == NKikimrSchemeOp::EPathTypeTable) { + if (const auto issue = GetIssues(ItemPathId(Self, exportInfo, itemIdx), txId)) { + item.Issue = *issue; + Cancel(exportInfo, itemIdx, "issues during backing up"); + itemHasIssues = true; } } + if (!itemHasIssues && AllOf(exportInfo->Items, &TExportInfo::TItem::IsDone)) { + exportInfo->State = EState::Done; + exportInfo->EndTime = TAppData::TimeProvider->Now(); + } Self->PersistExportItemState(db, exportInfo, itemIdx); break; @@ -1016,6 +1124,10 @@ ITransaction* TSchemeShard::CreateTxProgressExport(TEvSchemeShard::TEvModifySche return new TExport::TTxProgress(this, ev); } +ITransaction* TSchemeShard::CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev) { + return new TExport::TTxProgress(this, ev); +} + ITransaction* TSchemeShard::CreateTxProgressExport(TTxId completedTxId) { return new TExport::TTxProgress(this, completedTxId); } diff --git a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp index c403bfc128ba..74eb0c0c4cde 100644 --- a/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_export_flow_proposals.cpp @@ -59,6 +59,9 @@ THolder CopyTablesPropose( for (ui32 itemIdx : xrange(exportInfo->Items.size())) { const auto& item = exportInfo->Items.at(itemIdx); + if (item.SourcePathType != NKikimrSchemeOp::EPathTypeTable) { + continue; + } auto& desc = *copyTables.Add(); desc.SetSrcPath(item.SourcePathName); diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp new file mode 100644 index 000000000000..90bd2e48e3f1 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.cpp @@ -0,0 +1,317 @@ +#include "schemeshard.h" +#include "schemeshard_export_scheme_uploader.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace NKikimr::NSchemeShard { + +class TSchemeUploader: public TActorBootstrapped { + + using TS3ExternalStorageConfig = NWrappers::NExternalStorage::TS3ExternalStorageConfig; + using TEvExternalStorage = NWrappers::TEvExternalStorage; + using TPutObjectResult = Aws::Utils::Outcome; + + static TString BuildViewScheme(const TString& path, const NKikimrSchemeOp::TViewDescription& viewDescription, const TString& backupRoot, TString& error) { + NYql::TIssues issues; + auto scheme = NYdb::NDump::BuildCreateViewQuery(viewDescription.GetName(), path, viewDescription.GetQueryText(), backupRoot, issues); + if (!scheme) { + error = issues.ToString(); + } + return scheme; + } + + bool BuildSchemeToUpload(const NKikimrScheme::TEvDescribeSchemeResult& describeResult, TString& error) { + const auto pathType = describeResult.GetPathDescription().GetSelf().GetPathType(); + switch (pathType) { + case NKikimrSchemeOp::EPathTypeView: { + Scheme = BuildViewScheme(describeResult.GetPath(), describeResult.GetPathDescription().GetViewDescription(), DatabaseRoot, error); + return !Scheme.empty(); + } + default: + error = TStringBuilder() << "unsupported path type: " << pathType; + return false; + } + } + + void HandleSchemeDescription(TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev) { + const auto& describeResult = ev->Get()->GetRecord(); + + LOG_D("HandleSchemeDescription TEvSchemeShard::TEvDescribeSchemeResult" + << ", self: " << this->SelfId() + << ", status: " << describeResult.GetStatus() + ); + + auto reportError = [&](const TString& error) { + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult( + ExportId, ItemIdx, false, error + )); + }; + + if (describeResult.GetStatus() != TEvSchemeShard::EStatus::StatusSuccess) { + reportError(describeResult.GetReason()); + return; + } + + TString error; + if (!BuildSchemeToUpload(describeResult, error)) { + reportError(error); + return; + } + + if (auto permissions = NDataShard::GenYdbPermissions(describeResult.GetPathDescription())) { + google::protobuf::TextFormat::PrintToString(permissions.GetRef(), &Permissions); + } else { + reportError("cannot infer permissions"); + return; + } + + Restart(); + } + + void Restart() { + if (Attempt) { + this->Send(std::exchange(StorageOperator, TActorId()), new TEvents::TEvPoisonPill()); + } + + StorageOperator = this->RegisterWithSameMailbox( + NWrappers::CreateS3Wrapper(ExternalStorageConfig->ConstructStorageOperator()) + ); + + if (!SchemeUploaded) { + UploadScheme(); + } else if (!PermissionsUploaded) { + UploadPermissions(); + } + } + + void UploadScheme() { + Y_ABORT_UNLESS(!SchemeUploaded); + + if (!Scheme) { + return Finish(false, "cannot infer scheme"); + } + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/create_view.sql", DestinationPrefix.c_str())); + + this->Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Scheme))); + this->Become(&TThis::StateUploadScheme); + } + + void HandleSchemePutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandleSchemePutResponse TEvExternalStorage::TEvPutObjectResponse" + << ", self: " << this->SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (scheme)"))) { + return; + } + SchemeUploaded = true; + UploadPermissions(); + } + + void UploadPermissions() { + Y_ABORT_UNLESS(!PermissionsUploaded); + + if (!Permissions) { + return Finish(false, "cannot infer permissions"); + } + auto request = Aws::S3::Model::PutObjectRequest() + .WithKey(Sprintf("%s/permissions.pb", DestinationPrefix.c_str())); + + this->Send(StorageOperator, new TEvExternalStorage::TEvPutObjectRequest(request, TString(Permissions))); + this->Become(&TThis::StateUploadPermissions); + } + + void HandlePermissionsPutResponse(TEvExternalStorage::TEvPutObjectResponse::TPtr& ev) { + const auto& result = ev->Get()->Result; + + LOG_D("HandlePermissionsPutResponse TEvExternalStorage::TEvPutObjectResponse" + << ", self: " << this->SelfId() + << ", result: " << result + ); + + if (!CheckResult(result, TStringBuf("PutObject (permissions)"))) { + return; + } + PermissionsUploaded = true; + Finish(); + } + + bool CheckResult(const TPutObjectResult& result, const TStringBuf marker) { + if (result.IsSuccess()) { + return true; + } + + LOG_E("Error at '" << marker << "'" + << ", self: " << this->SelfId() + << ", error: " << result + ); + + RetryOrFinish(result.GetError()); + return false; + } + + static bool ShouldRetry(const Aws::S3::S3Error& error) { + if (error.ShouldRetry()) { + return true; + } + return error.GetExceptionName() == "TooManyRequests"; + } + + void Retry() { + Delay = Min(Delay * ++Attempt, MaxDelay); + const TDuration random = TDuration::FromValue(TAppData::RandomProvider->GenRand64() % Delay.MicroSeconds()); + this->Schedule(Delay + random, new TEvents::TEvWakeup()); + } + + void RetryOrFinish(const Aws::S3::S3Error& error) { + if (Attempt < Retries && ShouldRetry(error)) { + Retry(); + } else { + Finish(false, TStringBuilder() << "S3 error: " << error.GetMessage()); + } + } + + void Finish(bool success = true, const TString& error = TString()) { + LOG_I("Finish" + << ", self: " << this->SelfId() + << ", success: " << success + << ", error: " << error + ); + + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult(ExportId, ItemIdx, success, error)); + PassAway(); + } + + void PassAway() override { + this->Send(StorageOperator, new TEvents::TEvPoisonPill()); + IActor::PassAway(); + } + +public: + + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { + return NKikimrServices::TActivity::EXPORT_S3_UPLOADER_ACTOR; + } + + TSchemeUploader( + TActorId schemeShard, + ui64 exportId, + ui32 itemIdx, + TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, + const TString& databaseRoot + ) + : SchemeShard(schemeShard) + , ExportId(exportId) + , ItemIdx(itemIdx) + , SourcePathId(sourcePathId) + , ExternalStorageConfig(new TS3ExternalStorageConfig(settings)) + , Retries(settings.number_of_retries()) + , DatabaseRoot(databaseRoot) + { + if (itemIdx < ui32(settings.items_size())) { + DestinationPrefix = settings.items(itemIdx).destination_prefix(); + } + } + + void Bootstrap() { + if (!DestinationPrefix) { + Finish(false, TStringBuilder() << "cannot determine destination prefix, item index: " << ItemIdx << " out of range"); + return; + } + if (!Scheme || !Permissions) { + Send(SchemeShard, new TEvSchemeShard::TEvDescribeScheme(SourcePathId)); + this->Become(&TThis::StateDescribe); + return; + } + if (!SchemeUploaded) { + UploadScheme(); + return; + } + if (!PermissionsUploaded) { + UploadPermissions(); + return; + } + Finish(); + } + + STATEFN(StateBase) { + switch (ev->GetTypeRewrite()) { + sFunc(TEvents::TEvWakeup, Bootstrap); + sFunc(TEvents::TEvPoisonPill, PassAway); + } + } + + STATEFN(StateDescribe) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSchemeShard::TEvDescribeSchemeResult, HandleSchemeDescription); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadScheme) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandleSchemePutResponse); + default: + return StateBase(ev); + } + } + + STATEFN(StateUploadPermissions) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvExternalStorage::TEvPutObjectResponse, HandlePermissionsPutResponse); + default: + return StateBase(ev); + } + } + +private: + + TActorId SchemeShard; + + ui64 ExportId; + ui32 ItemIdx; + TPathId SourcePathId; + + NWrappers::IExternalStorageConfig::TPtr ExternalStorageConfig; + TString DestinationPrefix; + + ui32 Attempt = 0; + const ui32 Retries; + + TString DatabaseRoot; + + TActorId StorageOperator; + + TDuration Delay = TDuration::Minutes(1); + static constexpr TDuration MaxDelay = TDuration::Minutes(10); + + TString Scheme; + bool SchemeUploaded = false; + + TString Permissions; + bool PermissionsUploaded = false; + +}; // TSchemeUploader + +IActor* CreateSchemeUploader(TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot +) { + return new TSchemeUploader(schemeShard, exportId, itemIdx, sourcePathId, settings, databaseRoot); +} + +} // NSchemeShard::NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h new file mode 100644 index 000000000000..2262ab4cf674 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader.h @@ -0,0 +1,16 @@ +#pragma once + +#include +#include + +namespace Ydb::Export { + class ExportToS3Settings; +} + +namespace NKikimr::NSchemeShard { + +NActors::IActor* CreateSchemeUploader(NActors::TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot +); + +} diff --git a/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp new file mode 100644 index 000000000000..3afbc69d61c5 --- /dev/null +++ b/ydb/core/tx/schemeshard/schemeshard_export_scheme_uploader_fallback.cpp @@ -0,0 +1,39 @@ +#include "schemeshard_export_scheme_uploader.h" + +#include +#include + +using namespace NActors; + +namespace NKikimr::NSchemeShard { + +class TSchemeUploaderFallback: public TActorBootstrapped { +public: + explicit TSchemeUploaderFallback(TActorId schemeShard, ui64 exportId, ui32 itemIdx) + : SchemeShard(schemeShard) + , ExportId(exportId) + , ItemIdx(itemIdx) + { + } + + void Bootstrap() { + Send(SchemeShard, new TEvPrivate::TEvExportSchemeUploadResult(ExportId, ItemIdx, false, + "Exports to S3 are disabled" + )); + PassAway(); + } + +private: + TActorId SchemeShard; + ui64 ExportId; + ui32 ItemIdx; +}; + + +IActor* CreateSchemeUploader(TActorId schemeShard, ui64 exportId, ui32 itemIdx, TPathId sourcePathId, + const Ydb::Export::ExportToS3Settings& settings, const TString& databaseRoot +) { + return new TSchemeUploaderFallback(schemeShard, exportId, itemIdx); +} + +} // NSchemeShard::NKikimr diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.cpp b/ydb/core/tx/schemeshard/schemeshard_impl.cpp index 5f3ec61c1559..0806a2f6b9c0 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_impl.cpp @@ -4539,6 +4539,15 @@ void TSchemeShard::Die(const TActorContext &ctx) { if (TabletMigrator) { ctx.Send(TabletMigrator, new TEvents::TEvPoisonPill()); } + for (const auto& [id, exportInfo] : Exports) { + if (!exportInfo->IsDone()) { + for (const auto& item : exportInfo->Items) { + if (item.SchemeUploader != TActorId()) { + ctx.Send(item.SchemeUploader, new TEvents::TEvPoisonPill()); + } + } + } + } IndexBuildPipes.Shutdown(ctx); CdcStreamScanPipes.Shutdown(ctx); @@ -4827,6 +4836,7 @@ void TSchemeShard::StateWork(STFUNC_SIG) { HFuncTraced(TEvExport::TEvListExportsRequest, Handle); // } // NExport HFuncTraced(NBackground::TEvListRequest, Handle); + HFuncTraced(TEvPrivate::TEvExportSchemeUploadResult, Handle); // namespace NImport { HFuncTraced(TEvImport::TEvCreateImportRequest, Handle); diff --git a/ydb/core/tx/schemeshard/schemeshard_impl.h b/ydb/core/tx/schemeshard/schemeshard_impl.h index 52a1d89ce367..64bae8c63f23 100644 --- a/ydb/core/tx/schemeshard/schemeshard_impl.h +++ b/ydb/core/tx/schemeshard/schemeshard_impl.h @@ -1236,6 +1236,7 @@ class TSchemeShard NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvTxAllocatorClient::TEvAllocateResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvSchemeShard::TEvModifySchemeTransactionResult::TPtr& ev); NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TTxId completedTxId); + NTabletFlatExecutor::ITransaction* CreateTxProgressExport(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev); void Handle(TEvExport::TEvCreateExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvGetExportRequest::TPtr& ev, const TActorContext& ctx); @@ -1243,6 +1244,7 @@ class TSchemeShard void Handle(TEvExport::TEvForgetExportRequest::TPtr& ev, const TActorContext& ctx); void Handle(TEvExport::TEvListExportsRequest::TPtr& ev, const TActorContext& ctx); void Handle(TAutoPtr>& ev, const TActorContext& ctx); + void Handle(TEvPrivate::TEvExportSchemeUploadResult::TPtr& ev, const TActorContext& ctx); void ResumeExports(const TVector& exportIds, const TActorContext& ctx); // } // NExport diff --git a/ydb/core/tx/schemeshard/schemeshard_info_types.h b/ydb/core/tx/schemeshard/schemeshard_info_types.h index 2bfc69f91dbb..be85a45af7be 100644 --- a/ydb/core/tx/schemeshard/schemeshard_info_types.h +++ b/ydb/core/tx/schemeshard/schemeshard_info_types.h @@ -2683,17 +2683,20 @@ struct TExportInfo: public TSimpleRefCount { TString SourcePathName; TPathId SourcePathId; + NKikimrSchemeOp::EPathType SourcePathType; EState State = EState::Waiting; ESubState SubState = ESubState::AllocateTxId; TTxId WaitTxId = InvalidTxId; + TActorId SchemeUploader; TString Issue; TItem() = default; - explicit TItem(const TString& sourcePathName, const TPathId sourcePathId) + explicit TItem(const TString& sourcePathName, const TPathId sourcePathId, NKikimrSchemeOp::EPathType sourcePathType) : SourcePathName(sourcePathName) , SourcePathId(sourcePathId) + , SourcePathType(sourcePathType) { } diff --git a/ydb/core/tx/schemeshard/schemeshard_path.cpp b/ydb/core/tx/schemeshard/schemeshard_path.cpp index cfed8c7e2215..0202ec3d6463 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.cpp +++ b/ydb/core/tx/schemeshard/schemeshard_path.cpp @@ -922,6 +922,20 @@ const TPath::TChecker& TPath::TChecker::IsBackupCollection(EStatus status) const << " (" << BasicPathInfo(Path.Base()) << ")"); } +const TPath::TChecker& TPath::TChecker::IsSupportedInExports(EStatus status) const { + if (Failed) { + return *this; + } + + if (Path.Base()->IsTable() || Path.Base()->IsView()) { + return *this; + } + + return Fail(status, TStringBuilder() << "path type is not supported in exports" + << " (" << BasicPathInfo(Path.Base()) << ")" + ); +} + const TPath::TChecker& TPath::TChecker::PathShardsLimit(ui64 delta, EStatus status) const { if (Failed) { return *this; diff --git a/ydb/core/tx/schemeshard/schemeshard_path.h b/ydb/core/tx/schemeshard/schemeshard_path.h index 81e3ee13d09b..4e017c51b05a 100644 --- a/ydb/core/tx/schemeshard/schemeshard_path.h +++ b/ydb/core/tx/schemeshard/schemeshard_path.h @@ -104,6 +104,7 @@ class TPath { const TChecker& FailOnRestrictedCreateInTempZone(bool allowCreateInTemporaryDir = false, EStatus status = EStatus::StatusPreconditionFailed) const; const TChecker& IsResourcePool(EStatus status = EStatus::StatusNameConflict) const; const TChecker& IsBackupCollection(EStatus status = EStatus::StatusNameConflict) const; + const TChecker& IsSupportedInExports(EStatus status = EStatus::StatusNameConflict) const; }; public: diff --git a/ydb/core/tx/schemeshard/schemeshard_private.h b/ydb/core/tx/schemeshard/schemeshard_private.h index 2ba95e78b0c8..a85a3c4f6748 100644 --- a/ydb/core/tx/schemeshard/schemeshard_private.h +++ b/ydb/core/tx/schemeshard/schemeshard_private.h @@ -15,6 +15,7 @@ namespace TEvPrivate { EvRunConditionalErase, EvIndexBuildBilling, EvImportSchemeReady, + EvExportSchemeUploadResult, EvServerlessStorageBilling, EvCleanDroppedPaths, EvCleanDroppedSubDomains, @@ -95,6 +96,25 @@ namespace TEvPrivate { {} }; + struct TEvExportSchemeUploadResult: public TEventLocal { + ui64 ExportId; + ui32 ItemIdx; + bool Success; + TString Error; + + TEvExportSchemeUploadResult(ui64 id, ui32 itemIdx, bool success, const TString& error) + : ExportId(id) + , ItemIdx(itemIdx) + , Success(success) + , Error(error) + {} + + void SetError(const TString& error) { + Success = false; + Error = error; + } + }; + struct TEvServerlessStorageBilling: public TEventLocal { TEvServerlessStorageBilling() {} diff --git a/ydb/core/tx/schemeshard/schemeshard_schema.h b/ydb/core/tx/schemeshard/schemeshard_schema.h index 5fb719e771e0..c08e2a6472e8 100644 --- a/ydb/core/tx/schemeshard/schemeshard_schema.h +++ b/ydb/core/tx/schemeshard/schemeshard_schema.h @@ -1209,6 +1209,7 @@ struct Schema : NIceDb::Schema { struct BackupTxId : Column<6, NScheme::NTypeIds::Uint64> { using Type = TTxId; }; struct Issue : Column<7, NScheme::NTypeIds::Utf8> {}; struct SourceOwnerPathId : Column<8, NScheme::NTypeIds::Uint64> { using Type = TOwnerId; }; + struct SourcePathType : Column<9, NScheme::NTypeIds::Uint32> { using Type = NKikimrSchemeOp::EPathType; static constexpr Type Default = NKikimrSchemeOp::EPathTypeTable; }; using TKey = TableKey; using TColumns = TableColumns< @@ -1219,7 +1220,8 @@ struct Schema : NIceDb::Schema { State, BackupTxId, Issue, - SourceOwnerPathId + SourceOwnerPathId, + SourcePathType >; }; diff --git a/ydb/core/tx/schemeshard/ya.make b/ydb/core/tx/schemeshard/ya.make index 5dd8b1c9c700..182a7f2f1a0f 100644 --- a/ydb/core/tx/schemeshard/ya.make +++ b/ydb/core/tx/schemeshard/ya.make @@ -308,6 +308,7 @@ PEERDIR( ydb/library/login ydb/library/login/protos ydb/library/protobuf_printer + ydb/public/lib/ydb_cli/dump/util yql/essentials/minikql yql/essentials/providers/common/proto ydb/services/bg_tasks @@ -319,10 +320,12 @@ YQL_LAST_ABI_VERSION() IF (OS_WINDOWS) SRCS( + schemeshard_export_scheme_uploader_fallback.cpp schemeshard_import_scheme_getter_fallback.cpp ) ELSE() SRCS( + schemeshard_export_scheme_uploader.cpp schemeshard_import_scheme_getter.cpp ) ENDIF() diff --git a/ydb/library/backup/backup.cpp b/ydb/library/backup/backup.cpp index 86489ff290b8..ef43516c7172 100644 --- a/ydb/library/backup/backup.cpp +++ b/ydb/library/backup/backup.cpp @@ -552,68 +552,6 @@ NView::TViewDescription DescribeView(NView::TViewClient& client, const TString& return status.GetViewDescription(); } -struct TViewQuerySplit { - TString ContextRecreation; - TString Select; -}; - -TViewQuerySplit SplitViewQuery(TStringInput query) { - // to do: make the implementation more versatile - TViewQuerySplit split; - - TString line; - while (query.ReadLine(line)) { - (line.StartsWith("--") || line.StartsWith("PRAGMA ") - ? split.ContextRecreation - : split.Select - ) += line; - } - - return split; -} - -void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& accumulatedIssues) { - NYql::TIssues subIssues; - if (!NDump::ValidateViewQuery(query, subIssues)) { - NYql::TIssue restorabilityIssue( - TStringBuilder() << "Restorability of the view: " << dbPath.Quote() - << " storing the following query:\n" - << query - << "\ncannot be guaranteed. For more information, please consult the 'ydb tools dump' documentation." - ); - restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; - for (const auto& subIssue : subIssues) { - restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); - } - accumulatedIssues.AddIssue(std::move(restorabilityIssue)); - } -} - -TString BuildCreateViewQuery(TStringBuf name, const NView::TViewDescription& description, TStringBuf backupRoot) { - auto queryText = TString{description.GetQueryText()}; - auto [contextRecreation, select] = SplitViewQuery(queryText); - - const TString query = std::format( - "-- backup root: \"{}\"\n" - "{}\n" - "CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = TRUE) AS\n" - " {};\n", - backupRoot.data(), - contextRecreation.data(), - name.data(), - select.data() - ); - - TString formattedQuery; - TString errors; - Y_ENSURE(NSQLFormat::SqlFormatSimple( - query, - formattedQuery, - errors - ), errors); - return formattedQuery; -} - } /*! @@ -637,17 +575,22 @@ void BackupView(TDriver driver, const TString& dbBackupRoot, const TString& dbPa NView::TViewClient client(driver); auto viewDescription = DescribeView(client, dbPath); - ValidateViewQuery(TString{viewDescription.GetQueryText()}, dbPath, issues); + ValidateViewQuery(viewDescription.GetQueryText(), dbPath, issues); const auto fsPath = fsBackupDir.Child(NDump::NFiles::CreateView().FileName); LOG_D("Write view creation query to " << fsPath.GetPath().Quote()); - TFileOutput output(fsPath); - output << BuildCreateViewQuery( + const auto creationQuery = NDump::BuildCreateViewQuery( TFsPath(dbPathRelativeToBackupRoot).GetName(), - viewDescription, - dbBackupRoot + dbPath, + viewDescription.GetQueryText(), + dbBackupRoot, + issues ); + Y_ENSURE(creationQuery, issues.ToString()); + + TFileOutput output(fsPath); + output << creationQuery; BackupPermissions(driver, dbBackupRoot, dbPathRelativeToBackupRoot, fsBackupDir); } diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp index ac5d51f62e76..4c61632a87b2 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.cpp @@ -11,6 +11,8 @@ #include #include +#include + using namespace NSQLv1Generated; namespace { @@ -103,15 +105,6 @@ void VisitAllFields(const NProtoBuf::Message& msg, TTokenCollector& callback) { } } -bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { - google::protobuf::Arena arena; - NSQLTranslation::TTranslationSettings settings; - settings.Arena = &arena; - - auto formatter = NSQLFormat::MakeSqlFormatter(settings); - return formatter->Format(query, formattedQuery, issues); -} - struct TTableRefValidator { // returns true if the message is not a table ref and we need to dive deeper to find it @@ -184,10 +177,6 @@ bool SqlToProtoAst(const TString& query, TRule_sql_query& queryProto, NYql::TIss return true; } -} - -namespace NYdb::NDump { - bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { TRule_sql_query queryProto; if (!SqlToProtoAst(query, queryProto, issues)) { @@ -196,6 +185,10 @@ bool ValidateViewQuery(const TString& query, NYql::TIssues& issues) { return ValidateTableRefs(queryProto, issues); } +} + +namespace NYdb::NDump { + TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot) { if (backupRoot == restoreRoot) { return TString(path); @@ -231,4 +224,71 @@ bool RewriteTableRefs(TString& query, TStringBuf backupRoot, TStringBuf restoreR return true; } +TViewQuerySplit SplitViewQuery(TStringInput query) { + // to do: make the implementation more versatile + TViewQuerySplit split; + + TString line; + while (query.ReadLine(line)) { + (line.StartsWith("--") || line.StartsWith("PRAGMA ") + ? split.ContextRecreation + : split.Select + ) += line; + } + + return split; +} + +void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues) { + NYql::TIssues subIssues; + if (!::ValidateViewQuery(query, subIssues)) { + NYql::TIssue restorabilityIssue( + TStringBuilder() << "Restorability of the view: " << dbPath.Quote() + << " storing the following query:\n" + << query + << "\ncannot be guaranteed. For more information, please refer to the 'ydb tools dump' documentation." + ); + restorabilityIssue.Severity = NYql::TSeverityIds::S_WARNING; + for (const auto& subIssue : subIssues) { + restorabilityIssue.AddSubIssue(MakeIntrusive(subIssue)); + } + issues.AddIssue(std::move(restorabilityIssue)); + } +} + +bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues) { + google::protobuf::Arena arena; + NSQLTranslation::TTranslationSettings settings; + settings.Arena = &arena; + + auto formatter = NSQLFormat::MakeSqlFormatter(settings); + return formatter->Format(query, formattedQuery, issues); +} + +TString BuildCreateViewQuery( + const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, + NYql::TIssues& issues +) { + auto [contextRecreation, select] = SplitViewQuery(viewQuery); + + const TString creationQuery = std::format( + "-- backup root: \"{}\"\n" + "{}\n" + "CREATE VIEW IF NOT EXISTS `{}` WITH (security_invoker = TRUE) AS\n" + " {};\n", + backupRoot.data(), + contextRecreation.data(), + name.data(), + select.data() + ); + + ValidateViewQuery(creationQuery, dbPath, issues); + + TString formattedQuery; + if (!Format(creationQuery, formattedQuery, issues)) { + return ""; + } + return formattedQuery; +} + } diff --git a/ydb/public/lib/ydb_cli/dump/util/view_utils.h b/ydb/public/lib/ydb_cli/dump/util/view_utils.h index 6e65ca7c78ab..2b87a5022e5b 100644 --- a/ydb/public/lib/ydb_cli/dump/util/view_utils.h +++ b/ydb/public/lib/ydb_cli/dump/util/view_utils.h @@ -2,10 +2,25 @@ namespace NYdb::NDump { -bool ValidateViewQuery(const TString& query, NYql::TIssues& issues); - TString RewriteAbsolutePath(TStringBuf path, TStringBuf backupRoot, TStringBuf restoreRoot); bool RewriteTableRefs(TString& scheme, TStringBuf backupRoot, TStringBuf restoreRoot, NYql::TIssues& issues); +struct TViewQuerySplit { + TString ContextRecreation; + TString Select; +}; + +TViewQuerySplit SplitViewQuery(TStringInput query); + +// returns void, because the validation is non-blocking +void ValidateViewQuery(const TString& query, const TString& dbPath, NYql::TIssues& issues); + +bool Format(const TString& query, TString& formattedQuery, NYql::TIssues& issues); + +TString BuildCreateViewQuery( + const TString& name, const TString& dbPath, const TString& viewQuery, const TString& backupRoot, + NYql::TIssues& issues +); + } diff --git a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema index 9ec4b87cd28b..caf96b227559 100644 --- a/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema +++ b/ydb/tests/functional/scheme_tests/canondata/tablet_scheme_tests.TestTabletSchemes.test_tablet_schemes_flat_schemeshard_/flat_schemeshard.schema @@ -3205,6 +3205,11 @@ "ColumnId": 8, "ColumnName": "SourceOwnerPathId", "ColumnType": "Uint64" + }, + { + "ColumnId": 9, + "ColumnName": "SourcePathType", + "ColumnType": "Uint32" } ], "ColumnsDropped": [], @@ -3218,7 +3223,8 @@ 5, 6, 7, - 8 + 8, + 9 ], "RoomID": 0, "Codec": 0,