Skip to content

Commit

Permalink
YT-23616: Make implementation of CanonizeYPath method being common
Browse files Browse the repository at this point in the history
commit_hash:7f3ecc44b4299acc4fc7b0f463eceac61d0b0156
  • Loading branch information
hdnpth committed Jan 15, 2025
1 parent 0395c50 commit 8dff5c1
Show file tree
Hide file tree
Showing 10 changed files with 36 additions and 47 deletions.
2 changes: 1 addition & 1 deletion yt/cpp/mapreduce/client/client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,7 @@ void TClientBase::Concatenate(

TRichYPath TClientBase::CanonizeYPath(const TRichYPath& path)
{
return NRawClient::CanonizeYPath(ClientRetryPolicy_->CreatePolicyForGenericRequest(), Context_, path);
return NRawClient::CanonizeYPath(RawClient_, path);
}

TVector<TTableColumnarStatistics> TClientBase::GetTableColumnarStatistics(
Expand Down
34 changes: 17 additions & 17 deletions yt/cpp/mapreduce/client/operation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,8 @@ TSimpleOperationIo CreateSimpleOperationIo(
structuredJob,
preparer,
options,
CanonizeStructuredTableList(preparer.GetContext(), GetStructuredInputs(spec)),
CanonizeStructuredTableList(preparer.GetContext(), GetStructuredOutputs(spec)),
CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredInputs(spec)),
CanonizeStructuredTableList(preparer.GetClient()->GetRawClient(), GetStructuredOutputs(spec)),
hints,
nodeReaderFormat,
GetColumnsUsedInOperation(spec));
Expand All @@ -303,8 +303,8 @@ TSimpleOperationIo CreateSimpleOperationIo(
}
};

auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetInputs());
auto outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), spec.GetOutputs());
auto inputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetInputs());
auto outputs = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), spec.GetOutputs());

VerifyHasElements(inputs, "input");
VerifyHasElements(outputs, "output");
Expand Down Expand Up @@ -1632,9 +1632,9 @@ void ExecuteMapReduce(
TMapReduceOperationSpec spec = spec_;

TMapReduceOperationIo operationIo;
auto structuredInputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredInputs());
auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredMapOutputs());
auto structuredOutputs = CanonizeStructuredTableList(preparer->GetContext(), spec.GetStructuredOutputs());
auto structuredInputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredInputs());
auto structuredMapOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredMapOutputs());
auto structuredOutputs = CanonizeStructuredTableList(preparer->GetClient()->GetRawClient(), spec.GetStructuredOutputs());

const bool inferOutputSchema = options.InferOutputSchema_.GetOrElse(preparer->GetContext().Config->InferTableSchema);

Expand Down Expand Up @@ -1898,9 +1898,9 @@ void ExecuteRawMapReduce(
YT_LOG_DEBUG("Starting raw map-reduce operation (PreparationId: %v)",
preparer->GetPreparationId());
TMapReduceOperationIo operationIo;
operationIo.Inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetInputs());
operationIo.MapOutputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetMapOutputs());
operationIo.Outputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.GetOutputs());
operationIo.Inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetInputs());
operationIo.MapOutputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetMapOutputs());
operationIo.Outputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.GetOutputs());

VerifyHasElements(operationIo.Inputs, "inputs");
VerifyHasElements(operationIo.Outputs, "outputs");
Expand Down Expand Up @@ -1947,8 +1947,8 @@ void ExecuteSort(
{
YT_LOG_DEBUG("Starting sort operation (PreparationId: %v)",
preparer->GetPreparationId());
auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);

if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
Expand Down Expand Up @@ -1996,8 +1996,8 @@ void ExecuteMerge(
{
YT_LOG_DEBUG("Starting merge operation (PreparationId: %v)",
preparer->GetPreparationId());
auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);

if (options.CreateOutputTables_) {
CheckInputTablesExist(*preparer, inputs);
Expand Down Expand Up @@ -2046,7 +2046,7 @@ void ExecuteErase(
{
YT_LOG_DEBUG("Starting erase operation (PreparationId: %v)",
preparer->GetPreparationId());
auto tablePath = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.TablePath_);
auto tablePath = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.TablePath_);

TNode specNode = BuildYsonNodeFluently()
.BeginMap()
Expand Down Expand Up @@ -2082,8 +2082,8 @@ void ExecuteRemoteCopy(
{
YT_LOG_DEBUG("Starting remote copy operation (PreparationId: %v)",
preparer->GetPreparationId());
auto inputs = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer->GetContext(), spec.Inputs_);
auto output = NRawClient::CanonizeYPath(nullptr, preparer->GetContext(), spec.Output_);
auto inputs = NRawClient::CanonizeYPaths(preparer->GetClient()->GetRawClient(), spec.Inputs_);
auto output = NRawClient::CanonizeYPath(preparer->GetClient()->GetRawClient(), spec.Output_);

if (options.CreateOutputTables_) {
CreateOutputTable(*preparer, output);
Expand Down
2 changes: 1 addition & 1 deletion yt/cpp/mapreduce/client/operation_preparer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,7 @@ TJobPreparer::TJobPreparer(
{
CreateStorage();
auto cypressFileList = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, OperationPreparer_.GetContext(), spec.Files_);
auto cypressFileList = NRawClient::CanonizeYPaths(RawClient_, spec.Files_);
for (const auto& file : cypressFileList) {
UseFileInCypress(file);
Expand Down
2 changes: 1 addition & 1 deletion yt/cpp/mapreduce/client/py_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ TStructuredJobTableList NodeToStructuredTablePaths(const TNode& node, const TOpe
paths.emplace_back(inputNode.AsString());
}
}
paths = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, preparer.GetContext(), paths);
paths = NRawClient::CanonizeYPaths(preparer.GetClient()->GetRawClient(), paths);
TStructuredJobTableList result(intermediateTableCount, TStructuredJobTable::Intermediate(TUnspecifiedTableStructure()));
for (const auto& path : paths) {
result.emplace_back(TStructuredJobTable{TUnspecifiedTableStructure(), path});
Expand Down
4 changes: 1 addition & 3 deletions yt/cpp/mapreduce/client/skiff.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -280,8 +280,6 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema) {

NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
ENodeReaderFormat nodeReaderFormat,
const TVector<TRichYPath>& tablePaths,
Expand All @@ -306,7 +304,7 @@ NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(

auto nodes = NRawClient::BatchTransform(
rawClient,
NRawClient::CanonizeYPaths(clientRetryPolicy->CreatePolicyForGenericRequest(), context, tablePaths),
NRawClient::CanonizeYPaths(rawClient, tablePaths),
[&] (IRawBatchRequestPtr batch, const TRichYPath& path) {
auto getOptions = TGetOptions()
.AttributeFilter(
Expand Down
2 changes: 0 additions & 2 deletions yt/cpp/mapreduce/client/skiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,8 +60,6 @@ TFormat CreateSkiffFormat(const NSkiff::TSkiffSchemaPtr& schema);

NSkiff::TSkiffSchemaPtr CreateSkiffSchemaIfNecessary(
const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
ENodeReaderFormat nodeReaderFormat,
const TVector<TRichYPath>& tablePaths,
Expand Down
10 changes: 2 additions & 8 deletions yt/cpp/mapreduce/client/structured_table_formats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,6 @@ namespace NDetail {

NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
const IRawClientPtr& rawClient,
const TClientContext& context,
const IClientRetryPolicyPtr& clientRetryPolicy,
const TTransactionId& transactionId,
const TVector<TRichYPath>& tables,
const TOperationOptions& options,
Expand All @@ -137,8 +135,6 @@ NSkiff::TSkiffSchemaPtr TryCreateSkiffSchema(
}
return CreateSkiffSchemaIfNecessary(
rawClient,
context,
clientRetryPolicy,
transactionId,
nodeReaderFormat,
tables,
Expand Down Expand Up @@ -214,14 +210,14 @@ TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTableP
return result;
}

TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList)
TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList)
{
TVector<TRichYPath> toCanonize;
toCanonize.reserve(tableList.size());
for (const auto& table : tableList) {
toCanonize.emplace_back(table.RichYPath);
}
const auto canonized = NRawClient::CanonizeYPaths(/* retryPolicy */ nullptr, context, toCanonize);
const auto canonized = NRawClient::CanonizeYPaths(rawClient, toCanonize);
Y_ABORT_UNLESS(canonized.size() == tableList.size());

TStructuredJobTableList result;
Expand Down Expand Up @@ -437,8 +433,6 @@ std::pair<TFormat, TMaybe<TSmallJobFile>> TFormatBuilder::CreateNodeFormat(
}
skiffSchema = TryCreateSkiffSchema(
RawClient_,
Context_,
ClientRetryPolicy_,
TransactionId_,
tableList,
OperationOptions_,
Expand Down
2 changes: 1 addition & 1 deletion yt/cpp/mapreduce/client/structured_table_formats.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ using TStructuredJobTableList = TVector<TStructuredJobTable>;
TString JobTablePathString(const TStructuredJobTable& jobTable);
TStructuredJobTableList ToStructuredJobTableList(const TVector<TStructuredTablePath>& tableList);

TStructuredJobTableList CanonizeStructuredTableList(const TClientContext& context, const TVector<TStructuredTablePath>& tableList);
TStructuredJobTableList CanonizeStructuredTableList(const IRawClientPtr& rawClient, const TVector<TStructuredTablePath>& tableList);
TVector<TRichYPath> GetPathList(
const TStructuredJobTableList& tableList,
const TMaybe<TVector<TTableSchema>>& schemaInferenceResult,
Expand Down
19 changes: 10 additions & 9 deletions yt/cpp/mapreduce/raw_client/raw_requests.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -253,25 +253,26 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node)
}

TRichYPath CanonizeYPath(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const IRawClientPtr& rawClient,
const TRichYPath& path)
{
return CanonizeYPaths(retryPolicy, context, {path}).front();
return CanonizeYPaths(rawClient, {path}).front();
}

TVector<TRichYPath> CanonizeYPaths(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const IRawClientPtr& rawClient,
const TVector<TRichYPath>& paths)
{
THttpRawBatchRequest batch(context, retryPolicy);
auto batch = rawClient->CreateRawBatchRequest();

TVector<NThreading::TFuture<TRichYPath>> futures;
futures.reserve(paths.size());
for (int i = 0; i < static_cast<int>(paths.size()); ++i) {
futures.push_back(batch.CanonizeYPath(paths[i]));
for (const auto& path : paths) {
futures.push_back(batch->CanonizeYPath(path));
}
batch.ExecuteBatch();

batch->ExecuteBatch();

TVector<TRichYPath> result;
result.reserve(futures.size());
for (auto& future : futures) {
Expand Down
6 changes: 2 additions & 4 deletions yt/cpp/mapreduce/raw_client/raw_requests.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,11 @@ TCheckPermissionResponse ParseCheckPermissionResponse(const TNode& node);
////////////////////////////////////////////////////////////////////////////////

TRichYPath CanonizeYPath(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const IRawClientPtr& rawClient,
const TRichYPath& path);

TVector<TRichYPath> CanonizeYPaths(
const IRequestRetryPolicyPtr& retryPolicy,
const TClientContext& context,
const IRawClientPtr& rawClient,
const TVector<TRichYPath>& paths);

NHttpClient::IHttpResponsePtr SkyShareTable(
Expand Down

0 comments on commit 8dff5c1

Please sign in to comment.