Skip to content

Commit

Permalink
[feature](Cloud) Add azure's support to BE (apache#35670)
Browse files Browse the repository at this point in the history
As one subsequent pr of apache#35307, this pr will link azure into BE, and
implements the corresponding interface of ObjStorageClient for Azure.
  • Loading branch information
ByteYue authored Jun 11, 2024
1 parent 26e117d commit d443938
Show file tree
Hide file tree
Showing 13 changed files with 593 additions and 62 deletions.
5 changes: 5 additions & 0 deletions be/cmake/thirdparty.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,11 @@ if (NOT OS_MACOSX)
add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a")
endif()

add_thirdparty(azure-core)
add_thirdparty(azure-identity)
add_thirdparty(azure-storage-blobs)
add_thirdparty(azure-storage-common)

add_thirdparty(minizip LIB64)
add_thirdparty(simdjson LIB64)
add_thirdparty(idn LIB64)
Expand Down
28 changes: 8 additions & 20 deletions be/src/agent/task_worker_pool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
#include "io/fs/file_system.h"
#include "io/fs/hdfs_file_system.h"
#include "io/fs/local_file_system.h"
#include "io/fs/obj_storage_client.h"
#include "io/fs/path.h"
#include "io/fs/remote_file_system.h"
#include "io/fs/s3_file_system.h"
Expand Down Expand Up @@ -1378,23 +1379,8 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr

if (!existed_fs) {
// No such FS instance on BE
S3Conf s3_conf {
.bucket = param.s3_storage_param.bucket,
.prefix = param.s3_storage_param.root_path,
.client_conf = {
.endpoint = param.s3_storage_param.endpoint,
.region = param.s3_storage_param.region,
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.max_connections = param.s3_storage_param.max_conn,
.request_timeout_ms = param.s3_storage_param.request_timeout_ms,
.connect_timeout_ms = param.s3_storage_param.conn_timeout_ms,
// When using cold heat separation in minio, user might use ip address directly,
// which needs enable use_virtual_addressing to true
.use_virtual_addressing = !param.s3_storage_param.use_path_style,
}};
auto res = io::S3FileSystem::create(std::move(s3_conf), std::to_string(param.id));
auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param),
std::to_string(param.id));
if (!res.has_value()) {
st = std::move(res).error();
} else {
Expand All @@ -1403,10 +1389,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr
} else {
DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name;
auto client = static_cast<io::S3FileSystem*>(existed_fs.get())->client_holder();
auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param);
S3ClientConf conf {
.ak = param.s3_storage_param.ak,
.sk = param.s3_storage_param.sk,
.token = param.s3_storage_param.token,
.ak = std::move(new_s3_conf.client_conf.ak),
.sk = std::move(new_s3_conf.client_conf.sk),
.token = std::move(new_s3_conf.client_conf.token),
.provider = new_s3_conf.client_conf.provider,
};
st = client->reset(conf);
fs = std::move(existed_fs);
Expand Down
15 changes: 3 additions & 12 deletions be/src/cloud/cloud_meta_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "gen_cpp/Types_types.h"
#include "gen_cpp/cloud.pb.h"
#include "gen_cpp/olap_file.pb.h"
#include "io/fs/obj_storage_client.h"
#include "olap/olap_common.h"
#include "olap/rowset/rowset.h"
#include "olap/rowset/rowset_factory.h"
Expand Down Expand Up @@ -825,17 +826,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
}

auto add_obj_store = [&vault_infos](const auto& obj_store) {
vault_infos->emplace_back(obj_store.id(),
S3Conf {
.bucket = obj_store.bucket(),
.prefix = obj_store.prefix(),
.client_conf {.endpoint = obj_store.endpoint(),
.region = obj_store.region(),
.ak = obj_store.ak(),
.sk = obj_store.sk()},
.sse_enabled = obj_store.sse_enabled(),
.provider = obj_store.provider(),
},
vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store),
StorageVaultPB_PathFormat {});
};

Expand All @@ -853,7 +844,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) {
resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx");
}
for (int i = 0; i < resp.storage_vault_size(); ++i) {
auto j = resp.mutable_storage_vault(i);
auto* j = resp.mutable_storage_vault(i);
if (!j->has_obj_info()) continue;
j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx");
}
Expand Down
235 changes: 235 additions & 0 deletions be/src/io/fs/azure_obj_storage_client.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,235 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "io/fs/azure_obj_storage_client.h"

#include <algorithm>
#include <azure/core/io/body_stream.hpp>
#include <azure/storage/blobs.hpp>
#include <azure/storage/blobs/blob_client.hpp>
#include <azure/storage/blobs/blob_container_client.hpp>
#include <azure/storage/blobs/rest_client.hpp>
#include <azure/storage/common/storage_credential.hpp>
#include <azure/storage/common/storage_exception.hpp>
#include <iterator>

#include "common/logging.h"
#include "common/status.h"
#include "io/fs/obj_storage_client.h"

namespace {
std::string wrap_object_storage_path_msg(const doris::io::ObjectStoragePathOptions& opts) {
return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket, opts.key, opts.prefix,
opts.path.native());
}
} // namespace

namespace doris::io {

// As Azure's doc said, the batch size is 256
// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
// > Each batch request supports a maximum of 256 subrequests.
constexpr size_t BlobBatchMaxOperations = 256;

template <typename Func>
ObjectStorageResponse do_azure_client_call(Func f, const ObjectStoragePathOptions& opts) {
try {
f();
} catch (Azure::Storage::StorageException& e) {
auto msg = fmt::format("Azure request failed because {}, path msg {}", e.Message,
wrap_object_storage_path_msg(opts));
LOG_WARNING(msg);
return {.status = convert_to_obj_response(Status::InternalError<false>(std::move(msg))),
.http_code = static_cast<int>(e.StatusCode),
.request_id = std::move(e.RequestId)};
}
return {};
}

// Azure would do nothing
ObjectStorageUploadResponse AzureObjStorageClient::create_multipart_upload(
const ObjectStoragePathOptions& opts) {
return {};
}

ObjectStorageResponse AzureObjStorageClient::put_object(const ObjectStoragePathOptions& opts,
std::string_view stream) {
auto client = _client->GetBlockBlobClient(opts.key);
return do_azure_client_call(
[&]() {
client.UploadFrom(reinterpret_cast<const uint8_t*>(stream.data()), stream.size());
},
opts);
}

ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStoragePathOptions& opts,
std::string_view stream,
int part_num) {
auto client = _client->GetBlockBlobClient(opts.key);
try {
Azure::Core::IO::MemoryBodyStream memory_body(
reinterpret_cast<const uint8_t*>(stream.data()), stream.size());
client.StageBlock(std::to_string(part_num), memory_body);
} catch (Azure::Storage::StorageException& e) {
auto msg = fmt::format("Azure request failed because {}, path msg {}", e.Message,
wrap_object_storage_path_msg(opts));
LOG_WARNING(msg);
// clang-format off
return {
.resp = {
.status = convert_to_obj_response(
Status::InternalError<false>(std::move(msg))),
.http_code = static_cast<int>(e.StatusCode),
.request_id = std::move(e.RequestId),
},
};
// clang-format on
}
return {};
}

ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload(
const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& completed_parts) {
auto client = _client->GetBlockBlobClient(opts.key);
const auto& block_ids = static_cast<const AzureCompleteMultiParts&>(completed_parts).block_ids;
std::vector<std::string> string_block_ids;
std::ranges::transform(block_ids, std::back_inserter(string_block_ids),
[](int i) { return std::to_string(i); });
return do_azure_client_call([&]() { client.CommitBlockList(string_block_ids); }, opts);
}

ObjectStorageHeadResponse AzureObjStorageClient::head_object(const ObjectStoragePathOptions& opts) {
try {
Azure::Storage::Blobs::Models::BlobProperties properties =
_client->GetBlockBlobClient(opts.key).GetProperties().Value;
return {.file_size = properties.BlobSize};
} catch (Azure::Storage::StorageException& e) {
if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) {
return ObjectStorageHeadResponse {
.resp = {.status = convert_to_obj_response(Status::NotFound<false>("")),
.http_code = static_cast<int>(e.StatusCode),
.request_id = std::move(e.RequestId)},
};
}
auto msg = fmt::format("Failed to head azure blob due to {}, path msg {}", e.Message,
wrap_object_storage_path_msg(opts));
return ObjectStorageHeadResponse {
.resp = {.status = convert_to_obj_response(
Status::InternalError<false>(std::move(msg))),
.http_code = static_cast<int>(e.StatusCode),
.request_id = std::move(e.RequestId)},
};
}
}

ObjectStorageResponse AzureObjStorageClient::get_object(const ObjectStoragePathOptions& opts,
void* buffer, size_t offset,
size_t bytes_read, size_t* size_return) {
auto client = _client->GetBlockBlobClient(opts.key);
return do_azure_client_call(
[&]() {
Azure::Storage::Blobs::DownloadBlobToOptions download_opts;
download_opts.Range->Offset = offset;
download_opts.Range->Length = bytes_read;
client.DownloadTo(reinterpret_cast<uint8_t*>(buffer), bytes_read, download_opts);
},
opts);
}

ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePathOptions& opts,
std::vector<FileInfo>* files) {
auto get_file_file = [&](Azure::Storage::Blobs::ListBlobsPagedResponse& resp) {
std::ranges::transform(resp.Blobs, std::back_inserter(*files), [](auto&& blob_item) {
return FileInfo {
.file_name = blob_item.Name, .file_size = blob_item.BlobSize, .is_file = true};
});
};
return do_azure_client_call(
[&]() {
Azure::Storage::Blobs::ListBlobsOptions list_opts;
list_opts.Prefix = opts.prefix;
auto resp = _client->ListBlobs(list_opts);
get_file_file(resp);
while (!resp.NextPageToken->empty()) {
list_opts.ContinuationToken = resp.NextPageToken;
resp = _client->ListBlobs(list_opts);
get_file_file(resp);
}
},
opts);
}

// As Azure's doc said, the batch size is 256
// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id
// > Each batch request supports a maximum of 256 subrequests.
ObjectStorageResponse AzureObjStorageClient::delete_objects(const ObjectStoragePathOptions& opts,
std::vector<std::string> objs) {
// TODO(ByteYue) : use range to adate this code when compiler is ready
// auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations);
auto begin = std::begin(objs);
auto end = std::end(objs);

while (begin != end) {
auto batch = _client->CreateBatch();
auto chunkEnd = begin;
std::advance(chunkEnd, std::min(BlobBatchMaxOperations,
static_cast<size_t>(std::distance(begin, end))));
for (auto it = begin; it != chunkEnd; ++it) {
batch.DeleteBlob(*it);
}
begin = chunkEnd;
auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts);
if (resp.status.code != ErrorCode::OK) {
return resp;
}
}
return {};
}

ObjectStorageResponse AzureObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) {
return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); }, opts);
}

ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively(
const ObjectStoragePathOptions& opts) {
Azure::Storage::Blobs::ListBlobsOptions list_opts;
list_opts.Prefix = opts.prefix;
list_opts.PageSizeHint = BlobBatchMaxOperations;
auto resp = _client->ListBlobs(list_opts);
auto batch = _client->CreateBatch();
for (auto&& blob_item : resp.Blobs) {
batch.DeleteBlob(blob_item.Name);
}
auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts);
if (response.status.code != ErrorCode::OK) {
return response;
}
while (!resp.NextPageToken->empty()) {
batch = _client->CreateBatch();
list_opts.ContinuationToken = resp.NextPageToken;
resp = _client->ListBlobs(list_opts);
for (auto&& blob_item : resp.Blobs) {
batch.DeleteBlob(blob_item.Name);
}
auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts);
if (response.status.code != ErrorCode::OK) {
return response;
}
}
return {};
}
} // namespace doris::io
63 changes: 63 additions & 0 deletions be/src/io/fs/azure_obj_storage_client.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#pragma once

#include "io/fs/obj_storage_client.h"

namespace Azure::Storage::Blobs {
class BlobContainerClient;
} // namespace Azure::Storage::Blobs

namespace doris::io {

struct AzureCompleteMultiParts : public ObjectCompleteMultiParts {
std::vector<int> block_ids;
};

class ObjClientHolder;

class AzureObjStorageClient final : public ObjStorageClient {
public:
AzureObjStorageClient(std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> client)
: _client(std::move(client)) {}
~AzureObjStorageClient() override = default;
ObjectStorageUploadResponse create_multipart_upload(
const ObjectStoragePathOptions& opts) override;
ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts,
std::string_view stream) override;
ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& opts, std::string_view,
int partNum) override;
ObjectStorageResponse complete_multipart_upload(
const ObjectStoragePathOptions& opts,
const ObjectCompleteMultiParts& completed_parts) override;
ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override;
ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer,
size_t offset, size_t bytes_read,
size_t* size_return) override;
ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts,
std::vector<FileInfo>* files) override;
ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts,
std::vector<std::string> objs) override;
ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) override;
ObjectStorageResponse delete_objects_recursively(const ObjectStoragePathOptions& opts) override;

private:
std::shared_ptr<Azure::Storage::Blobs::BlobContainerClient> _client;
};

} // namespace doris::io
Loading

0 comments on commit d443938

Please sign in to comment.