diff --git a/be/cmake/thirdparty.cmake b/be/cmake/thirdparty.cmake index 78f28fe72dc4e8..41203a9006a400 100644 --- a/be/cmake/thirdparty.cmake +++ b/be/cmake/thirdparty.cmake @@ -139,6 +139,11 @@ if (NOT OS_MACOSX) add_thirdparty(aws-s2n LIBNAME "lib/libs2n.a") endif() +add_thirdparty(azure-core) +add_thirdparty(azure-identity) +add_thirdparty(azure-storage-blobs) +add_thirdparty(azure-storage-common) + add_thirdparty(minizip LIB64) add_thirdparty(simdjson LIB64) add_thirdparty(idn LIB64) diff --git a/be/src/agent/task_worker_pool.cpp b/be/src/agent/task_worker_pool.cpp index 31cfe19d4f0359..a776be08f79022 100644 --- a/be/src/agent/task_worker_pool.cpp +++ b/be/src/agent/task_worker_pool.cpp @@ -53,6 +53,7 @@ #include "io/fs/file_system.h" #include "io/fs/hdfs_file_system.h" #include "io/fs/local_file_system.h" +#include "io/fs/obj_storage_client.h" #include "io/fs/path.h" #include "io/fs/remote_file_system.h" #include "io/fs/s3_file_system.h" @@ -1378,23 +1379,8 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr if (!existed_fs) { // No such FS instance on BE - S3Conf s3_conf { - .bucket = param.s3_storage_param.bucket, - .prefix = param.s3_storage_param.root_path, - .client_conf = { - .endpoint = param.s3_storage_param.endpoint, - .region = param.s3_storage_param.region, - .ak = param.s3_storage_param.ak, - .sk = param.s3_storage_param.sk, - .token = param.s3_storage_param.token, - .max_connections = param.s3_storage_param.max_conn, - .request_timeout_ms = param.s3_storage_param.request_timeout_ms, - .connect_timeout_ms = param.s3_storage_param.conn_timeout_ms, - // When using cold heat separation in minio, user might use ip address directly, - // which needs enable use_virtual_addressing to true - .use_virtual_addressing = !param.s3_storage_param.use_path_style, - }}; - auto res = io::S3FileSystem::create(std::move(s3_conf), std::to_string(param.id)); + auto res = io::S3FileSystem::create(S3Conf::get_s3_conf(param.s3_storage_param), + std::to_string(param.id)); if (!res.has_value()) { st = std::move(res).error(); } else { @@ -1403,10 +1389,12 @@ void update_s3_resource(const TStorageResource& param, io::RemoteFileSystemSPtr } else { DCHECK_EQ(existed_fs->type(), io::FileSystemType::S3) << param.id << ' ' << param.name; auto client = static_cast(existed_fs.get())->client_holder(); + auto new_s3_conf = S3Conf::get_s3_conf(param.s3_storage_param); S3ClientConf conf { - .ak = param.s3_storage_param.ak, - .sk = param.s3_storage_param.sk, - .token = param.s3_storage_param.token, + .ak = std::move(new_s3_conf.client_conf.ak), + .sk = std::move(new_s3_conf.client_conf.sk), + .token = std::move(new_s3_conf.client_conf.token), + .provider = new_s3_conf.client_conf.provider, }; st = client->reset(conf); fs = std::move(existed_fs); diff --git a/be/src/cloud/cloud_meta_mgr.cpp b/be/src/cloud/cloud_meta_mgr.cpp index a14ec2b0497f24..d55c884a6c2b7d 100644 --- a/be/src/cloud/cloud_meta_mgr.cpp +++ b/be/src/cloud/cloud_meta_mgr.cpp @@ -46,6 +46,7 @@ #include "gen_cpp/Types_types.h" #include "gen_cpp/cloud.pb.h" #include "gen_cpp/olap_file.pb.h" +#include "io/fs/obj_storage_client.h" #include "olap/olap_common.h" #include "olap/rowset/rowset.h" #include "olap/rowset/rowset_factory.h" @@ -825,17 +826,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) { } auto add_obj_store = [&vault_infos](const auto& obj_store) { - vault_infos->emplace_back(obj_store.id(), - S3Conf { - .bucket = obj_store.bucket(), - .prefix = obj_store.prefix(), - .client_conf {.endpoint = obj_store.endpoint(), - .region = obj_store.region(), - .ak = obj_store.ak(), - .sk = obj_store.sk()}, - .sse_enabled = obj_store.sse_enabled(), - .provider = obj_store.provider(), - }, + vault_infos->emplace_back(obj_store.id(), S3Conf::get_s3_conf(obj_store), StorageVaultPB_PathFormat {}); }; @@ -853,7 +844,7 @@ Status CloudMetaMgr::get_storage_vault_info(StorageVaultInfos* vault_infos) { resp.mutable_obj_info(i)->set_sk(resp.obj_info(i).sk().substr(0, 2) + "xxx"); } for (int i = 0; i < resp.storage_vault_size(); ++i) { - auto j = resp.mutable_storage_vault(i); + auto* j = resp.mutable_storage_vault(i); if (!j->has_obj_info()) continue; j->mutable_obj_info()->set_sk(j->obj_info().sk().substr(0, 2) + "xxx"); } diff --git a/be/src/io/fs/azure_obj_storage_client.cpp b/be/src/io/fs/azure_obj_storage_client.cpp new file mode 100644 index 00000000000000..9569bf9a8e82ff --- /dev/null +++ b/be/src/io/fs/azure_obj_storage_client.cpp @@ -0,0 +1,235 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "io/fs/azure_obj_storage_client.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/logging.h" +#include "common/status.h" +#include "io/fs/obj_storage_client.h" + +namespace { +std::string wrap_object_storage_path_msg(const doris::io::ObjectStoragePathOptions& opts) { + return fmt::format("bucket {}, key {}, prefix {}, path {}", opts.bucket, opts.key, opts.prefix, + opts.path.native()); +} +} // namespace + +namespace doris::io { + +// As Azure's doc said, the batch size is 256 +// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id +// > Each batch request supports a maximum of 256 subrequests. +constexpr size_t BlobBatchMaxOperations = 256; + +template +ObjectStorageResponse do_azure_client_call(Func f, const ObjectStoragePathOptions& opts) { + try { + f(); + } catch (Azure::Storage::StorageException& e) { + auto msg = fmt::format("Azure request failed because {}, path msg {}", e.Message, + wrap_object_storage_path_msg(opts)); + LOG_WARNING(msg); + return {.status = convert_to_obj_response(Status::InternalError(std::move(msg))), + .http_code = static_cast(e.StatusCode), + .request_id = std::move(e.RequestId)}; + } + return {}; +} + +// Azure would do nothing +ObjectStorageUploadResponse AzureObjStorageClient::create_multipart_upload( + const ObjectStoragePathOptions& opts) { + return {}; +} + +ObjectStorageResponse AzureObjStorageClient::put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) { + auto client = _client->GetBlockBlobClient(opts.key); + return do_azure_client_call( + [&]() { + client.UploadFrom(reinterpret_cast(stream.data()), stream.size()); + }, + opts); +} + +ObjectStorageUploadResponse AzureObjStorageClient::upload_part(const ObjectStoragePathOptions& opts, + std::string_view stream, + int part_num) { + auto client = _client->GetBlockBlobClient(opts.key); + try { + Azure::Core::IO::MemoryBodyStream memory_body( + reinterpret_cast(stream.data()), stream.size()); + client.StageBlock(std::to_string(part_num), memory_body); + } catch (Azure::Storage::StorageException& e) { + auto msg = fmt::format("Azure request failed because {}, path msg {}", e.Message, + wrap_object_storage_path_msg(opts)); + LOG_WARNING(msg); + // clang-format off + return { + .resp = { + .status = convert_to_obj_response( + Status::InternalError(std::move(msg))), + .http_code = static_cast(e.StatusCode), + .request_id = std::move(e.RequestId), + }, + }; + // clang-format on + } + return {}; +} + +ObjectStorageResponse AzureObjStorageClient::complete_multipart_upload( + const ObjectStoragePathOptions& opts, const ObjectCompleteMultiParts& completed_parts) { + auto client = _client->GetBlockBlobClient(opts.key); + const auto& block_ids = static_cast(completed_parts).block_ids; + std::vector string_block_ids; + std::ranges::transform(block_ids, std::back_inserter(string_block_ids), + [](int i) { return std::to_string(i); }); + return do_azure_client_call([&]() { client.CommitBlockList(string_block_ids); }, opts); +} + +ObjectStorageHeadResponse AzureObjStorageClient::head_object(const ObjectStoragePathOptions& opts) { + try { + Azure::Storage::Blobs::Models::BlobProperties properties = + _client->GetBlockBlobClient(opts.key).GetProperties().Value; + return {.file_size = properties.BlobSize}; + } catch (Azure::Storage::StorageException& e) { + if (e.StatusCode == Azure::Core::Http::HttpStatusCode::NotFound) { + return ObjectStorageHeadResponse { + .resp = {.status = convert_to_obj_response(Status::NotFound("")), + .http_code = static_cast(e.StatusCode), + .request_id = std::move(e.RequestId)}, + }; + } + auto msg = fmt::format("Failed to head azure blob due to {}, path msg {}", e.Message, + wrap_object_storage_path_msg(opts)); + return ObjectStorageHeadResponse { + .resp = {.status = convert_to_obj_response( + Status::InternalError(std::move(msg))), + .http_code = static_cast(e.StatusCode), + .request_id = std::move(e.RequestId)}, + }; + } +} + +ObjectStorageResponse AzureObjStorageClient::get_object(const ObjectStoragePathOptions& opts, + void* buffer, size_t offset, + size_t bytes_read, size_t* size_return) { + auto client = _client->GetBlockBlobClient(opts.key); + return do_azure_client_call( + [&]() { + Azure::Storage::Blobs::DownloadBlobToOptions download_opts; + download_opts.Range->Offset = offset; + download_opts.Range->Length = bytes_read; + client.DownloadTo(reinterpret_cast(buffer), bytes_read, download_opts); + }, + opts); +} + +ObjectStorageResponse AzureObjStorageClient::list_objects(const ObjectStoragePathOptions& opts, + std::vector* files) { + auto get_file_file = [&](Azure::Storage::Blobs::ListBlobsPagedResponse& resp) { + std::ranges::transform(resp.Blobs, std::back_inserter(*files), [](auto&& blob_item) { + return FileInfo { + .file_name = blob_item.Name, .file_size = blob_item.BlobSize, .is_file = true}; + }); + }; + return do_azure_client_call( + [&]() { + Azure::Storage::Blobs::ListBlobsOptions list_opts; + list_opts.Prefix = opts.prefix; + auto resp = _client->ListBlobs(list_opts); + get_file_file(resp); + while (!resp.NextPageToken->empty()) { + list_opts.ContinuationToken = resp.NextPageToken; + resp = _client->ListBlobs(list_opts); + get_file_file(resp); + } + }, + opts); +} + +// As Azure's doc said, the batch size is 256 +// You can find out the num in https://learn.microsoft.com/en-us/rest/api/storageservices/blob-batch?tabs=microsoft-entra-id +// > Each batch request supports a maximum of 256 subrequests. +ObjectStorageResponse AzureObjStorageClient::delete_objects(const ObjectStoragePathOptions& opts, + std::vector objs) { + // TODO(ByteYue) : use range to adate this code when compiler is ready + // auto chunkedView = objs | std::views::chunk(BlobBatchMaxOperations); + auto begin = std::begin(objs); + auto end = std::end(objs); + + while (begin != end) { + auto batch = _client->CreateBatch(); + auto chunkEnd = begin; + std::advance(chunkEnd, std::min(BlobBatchMaxOperations, + static_cast(std::distance(begin, end)))); + for (auto it = begin; it != chunkEnd; ++it) { + batch.DeleteBlob(*it); + } + begin = chunkEnd; + auto resp = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts); + if (resp.status.code != ErrorCode::OK) { + return resp; + } + } + return {}; +} + +ObjectStorageResponse AzureObjStorageClient::delete_object(const ObjectStoragePathOptions& opts) { + return do_azure_client_call([&]() { _client->DeleteBlob(opts.key); }, opts); +} + +ObjectStorageResponse AzureObjStorageClient::delete_objects_recursively( + const ObjectStoragePathOptions& opts) { + Azure::Storage::Blobs::ListBlobsOptions list_opts; + list_opts.Prefix = opts.prefix; + list_opts.PageSizeHint = BlobBatchMaxOperations; + auto resp = _client->ListBlobs(list_opts); + auto batch = _client->CreateBatch(); + for (auto&& blob_item : resp.Blobs) { + batch.DeleteBlob(blob_item.Name); + } + auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts); + if (response.status.code != ErrorCode::OK) { + return response; + } + while (!resp.NextPageToken->empty()) { + batch = _client->CreateBatch(); + list_opts.ContinuationToken = resp.NextPageToken; + resp = _client->ListBlobs(list_opts); + for (auto&& blob_item : resp.Blobs) { + batch.DeleteBlob(blob_item.Name); + } + auto response = do_azure_client_call([&]() { _client->SubmitBatch(batch); }, opts); + if (response.status.code != ErrorCode::OK) { + return response; + } + } + return {}; +} +} // namespace doris::io \ No newline at end of file diff --git a/be/src/io/fs/azure_obj_storage_client.h b/be/src/io/fs/azure_obj_storage_client.h new file mode 100644 index 00000000000000..a8c2db9d4dbbf6 --- /dev/null +++ b/be/src/io/fs/azure_obj_storage_client.h @@ -0,0 +1,63 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include "io/fs/obj_storage_client.h" + +namespace Azure::Storage::Blobs { +class BlobContainerClient; +} // namespace Azure::Storage::Blobs + +namespace doris::io { + +struct AzureCompleteMultiParts : public ObjectCompleteMultiParts { + std::vector block_ids; +}; + +class ObjClientHolder; + +class AzureObjStorageClient final : public ObjStorageClient { +public: + AzureObjStorageClient(std::shared_ptr client) + : _client(std::move(client)) {} + ~AzureObjStorageClient() override = default; + ObjectStorageUploadResponse create_multipart_upload( + const ObjectStoragePathOptions& opts) override; + ObjectStorageResponse put_object(const ObjectStoragePathOptions& opts, + std::string_view stream) override; + ObjectStorageUploadResponse upload_part(const ObjectStoragePathOptions& opts, std::string_view, + int partNum) override; + ObjectStorageResponse complete_multipart_upload( + const ObjectStoragePathOptions& opts, + const ObjectCompleteMultiParts& completed_parts) override; + ObjectStorageHeadResponse head_object(const ObjectStoragePathOptions& opts) override; + ObjectStorageResponse get_object(const ObjectStoragePathOptions& opts, void* buffer, + size_t offset, size_t bytes_read, + size_t* size_return) override; + ObjectStorageResponse list_objects(const ObjectStoragePathOptions& opts, + std::vector* files) override; + ObjectStorageResponse delete_objects(const ObjectStoragePathOptions& opts, + std::vector objs) override; + ObjectStorageResponse delete_object(const ObjectStoragePathOptions& opts) override; + ObjectStorageResponse delete_objects_recursively(const ObjectStoragePathOptions& opts) override; + +private: + std::shared_ptr _client; +}; + +} // namespace doris::io \ No newline at end of file diff --git a/be/src/io/fs/obj_storage_client.h b/be/src/io/fs/obj_storage_client.h index 3ab0a8e2dea898..2a99bde80f1bf2 100644 --- a/be/src/io/fs/obj_storage_client.h +++ b/be/src/io/fs/obj_storage_client.h @@ -27,7 +27,8 @@ namespace io { // Names are in lexico order. enum class ObjStorageType : uint8_t { - AWS = 0, + UNKNOWN = 0, + AWS = 1, AZURE, BOS, COS, diff --git a/be/src/io/fs/s3_file_system.cpp b/be/src/io/fs/s3_file_system.cpp index 27aff992f4cd6b..648a80cda8c02f 100644 --- a/be/src/io/fs/s3_file_system.cpp +++ b/be/src/io/fs/s3_file_system.cpp @@ -79,12 +79,11 @@ ObjClientHolder::ObjClientHolder(S3ClientConf conf) : _conf(std::move(conf)) {} ObjClientHolder::~ObjClientHolder() = default; Status ObjClientHolder::init() { - auto client = S3ClientFactory::instance().create(_conf); - if (!client) { + _client = S3ClientFactory::instance().create(_conf); + if (!_client) { return Status::InternalError("failed to init s3 client with conf {}", _conf.to_string()); } - _client = std::make_shared(std::move(client)); return Status::OK(); } @@ -100,6 +99,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) { reset_conf.ak = conf.ak; reset_conf.sk = conf.sk; reset_conf.token = conf.token; + reset_conf.bucket = conf.bucket; // Should check endpoint here? } @@ -112,7 +112,7 @@ Status ObjClientHolder::reset(const S3ClientConf& conf) { { std::lock_guard lock(_mtx); - _client = std::make_shared(std::move(client)); + _client = std::move(client); _conf = std::move(reset_conf); } diff --git a/be/src/util/s3_util.cpp b/be/src/util/s3_util.cpp index 6cf8e97962e515..e89366f3ab89b4 100644 --- a/be/src/util/s3_util.cpp +++ b/be/src/util/s3_util.cpp @@ -29,6 +29,7 @@ #include #include +#include #include #include #include @@ -40,6 +41,9 @@ #include "common/logging.h" #include "common/status.h" #include "common/sync_point.h" +#include "io/fs/azure_obj_storage_client.h" +#include "io/fs/obj_storage_client.h" +#include "io/fs/s3_obj_storage_client.h" #include "runtime/exec_env.h" #include "s3_uri.h" #include "vec/exec/scan/scanner_scheduler.h" @@ -138,7 +142,7 @@ S3ClientFactory::S3ClientFactory() { string S3ClientFactory::get_valid_ca_cert_path() { vector vec_ca_file_path = doris::split(config::ca_cert_file_paths, ";"); - vector::iterator it = vec_ca_file_path.begin(); + auto it = vec_ca_file_path.begin(); for (; it != vec_ca_file_path.end(); ++it) { if (std::filesystem::exists(*it)) { return *it; @@ -156,9 +160,7 @@ S3ClientFactory& S3ClientFactory::instance() { return ret; } -std::shared_ptr S3ClientFactory::create(const S3ClientConf& s3_conf) { - TEST_SYNC_POINT_RETURN_WITH_VALUE("s3_client_factory::create", - std::make_shared()); +std::shared_ptr S3ClientFactory::create(const S3ClientConf& s3_conf) { if (!is_s3_conf_valid(s3_conf)) { return nullptr; } @@ -172,6 +174,36 @@ std::shared_ptr S3ClientFactory::create(const S3ClientConf& s } } + auto obj_client = (s3_conf.provider == io::ObjStorageType::AZURE) + ? _create_azure_client(s3_conf) + : _create_s3_client(s3_conf); + + { + uint64_t hash = s3_conf.get_hash(); + std::lock_guard l(_lock); + _cache[hash] = obj_client; + } + return obj_client; +} + +std::shared_ptr S3ClientFactory::_create_azure_client( + const S3ClientConf& s3_conf) { + auto cred = + std::make_shared(s3_conf.ak, s3_conf.sk); + + const std::string container_name = s3_conf.bucket; + const std::string uri = fmt::format("{}://{}.blob.core.windows.net/{}", + config::s3_client_http_scheme, s3_conf.ak, container_name); + + auto containerClient = std::make_shared(uri, cred); + return std::make_shared(std::move(containerClient)); +} + +std::shared_ptr S3ClientFactory::_create_s3_client( + const S3ClientConf& s3_conf) { + TEST_SYNC_POINT_RETURN_WITH_VALUE( + "s3_client_factory::create", + std::make_shared(std::make_shared())); Aws::Client::ClientConfiguration aws_config = S3ClientFactory::getClientConfiguration(); aws_config.endpointOverride = s3_conf.endpoint; aws_config.region = s3_conf.region; @@ -231,11 +263,8 @@ std::shared_ptr S3ClientFactory::create(const S3ClientConf& s s3_conf.use_virtual_addressing); } - { - std::lock_guard l(_lock); - _cache[hash] = new_client; - } - return new_client; + auto obj_client = std::make_shared(std::move(new_client)); + return obj_client; } Status S3ClientFactory::convert_properties_to_s3_conf( @@ -293,4 +322,105 @@ Status S3ClientFactory::convert_properties_to_s3_conf( return Status::OK(); } +S3Conf S3Conf::get_s3_conf(const cloud::ObjectStoreInfoPB& info) { + S3Conf ret { + .bucket = info.bucket(), + .prefix = info.prefix(), + .client_conf { + .endpoint = info.endpoint(), + .region = info.region(), + .ak = info.ak(), + .sk = info.sk(), + .bucket = info.bucket(), + .provider = io::ObjStorageType::AWS, + }, + .sse_enabled = info.sse_enabled(), + }; + + io::ObjStorageType type = io::ObjStorageType::AWS; + switch (info.provider()) { + case cloud::ObjectStoreInfoPB_Provider_OSS: + type = io::ObjStorageType::OSS; + break; + case cloud::ObjectStoreInfoPB_Provider_S3: + type = io::ObjStorageType::AWS; + break; + case cloud::ObjectStoreInfoPB_Provider_COS: + type = io::ObjStorageType::COS; + break; + case cloud::ObjectStoreInfoPB_Provider_OBS: + type = io::ObjStorageType::OBS; + break; + case cloud::ObjectStoreInfoPB_Provider_BOS: + type = io::ObjStorageType::BOS; + break; + case cloud::ObjectStoreInfoPB_Provider_GCP: + type = io::ObjStorageType::GCP; + break; + case cloud::ObjectStoreInfoPB_Provider_AZURE: + type = io::ObjStorageType::AZURE; + break; + default: + LOG_FATAL("unknown provider type {}, info {}", info.provider(), ret.to_string()); + __builtin_unreachable(); + } + ret.client_conf.provider = type; + return ret; +} + +S3Conf S3Conf::get_s3_conf(const TS3StorageParam& param) { + S3Conf ret { + .bucket = param.bucket, + .prefix = param.root_path, + .client_conf = { + .endpoint = param.endpoint, + .region = param.region, + .ak = param.ak, + .sk = param.sk, + .token = param.token, + .bucket = param.bucket, + .provider = io::ObjStorageType::AWS, + .max_connections = param.max_conn, + .request_timeout_ms = param.request_timeout_ms, + .connect_timeout_ms = param.conn_timeout_ms, + // When using cold heat separation in minio, user might use ip address directly, + // which needs enable use_virtual_addressing to true + .use_virtual_addressing = !param.use_path_style, + }}; + io::ObjStorageType type = io::ObjStorageType::AWS; + switch (param.provider) { + case TObjStorageType::UNKNOWN: + LOG_INFO("Receive one legal storage resource, set provider type to aws, param detail {}", + ret.to_string()); + type = io::ObjStorageType::AWS; + break; + case TObjStorageType::AWS: + type = io::ObjStorageType::AWS; + break; + case TObjStorageType::AZURE: + type = io::ObjStorageType::AZURE; + break; + case TObjStorageType::BOS: + type = io::ObjStorageType::BOS; + break; + case TObjStorageType::COS: + type = io::ObjStorageType::COS; + break; + case TObjStorageType::OBS: + type = io::ObjStorageType::OBS; + break; + case TObjStorageType::OSS: + type = io::ObjStorageType::OSS; + break; + case TObjStorageType::GCP: + type = io::ObjStorageType::GCP; + break; + default: + LOG_FATAL("unknown provider type {}, info {}", param.provider, ret.to_string()); + __builtin_unreachable(); + } + ret.client_conf.provider = type; + return ret; +} + } // end namespace doris diff --git a/be/src/util/s3_util.h b/be/src/util/s3_util.h index 5dd68069759a42..1764b1b8b86a58 100644 --- a/be/src/util/s3_util.h +++ b/be/src/util/s3_util.h @@ -22,6 +22,7 @@ #include #include #include +#include #include #include @@ -32,14 +33,14 @@ #include #include "common/status.h" +#include "io/fs/obj_storage_client.h" #include "util/s3_rate_limiter.h" #include "vec/common/string_ref.h" -namespace Aws { -namespace S3 { +namespace Aws::S3 { class S3Client; -} // namespace S3 -} // namespace Aws +} // namespace Aws::S3 + namespace bvar { template class Adder; @@ -95,6 +96,9 @@ struct S3ClientConf { std::string ak; std::string sk; std::string token; + // For azure we'd better support the bucket at the first time init azure blob container client + std::string bucket; + io::ObjStorageType provider = io::ObjStorageType::AWS; int max_connections = -1; int request_timeout_ms = -1; int connect_timeout_ms = -1; @@ -107,6 +111,7 @@ struct S3ClientConf { hash_code ^= crc32_hash(token); hash_code ^= crc32_hash(endpoint); hash_code ^= crc32_hash(region); + hash_code ^= crc32_hash(bucket); hash_code ^= max_connections; hash_code ^= request_timeout_ms; hash_code ^= connect_timeout_ms; @@ -116,9 +121,9 @@ struct S3ClientConf { std::string to_string() const { return fmt::format( - "(ak={}, token={}, endpoint={}, region={}, max_connections={}, " + "(ak={}, token={}, endpoint={}, region={}, bucket={}, max_connections={}, " "request_timeout_ms={}, connect_timeout_ms={}, use_virtual_addressing={}", - ak, token, endpoint, region, max_connections, request_timeout_ms, + ak, token, endpoint, region, bucket, max_connections, request_timeout_ms, connect_timeout_ms, use_virtual_addressing); } }; @@ -129,7 +134,8 @@ struct S3Conf { S3ClientConf client_conf; bool sse_enabled = false; - cloud::ObjectStoreInfoPB::Provider provider; + static S3Conf get_s3_conf(const cloud::ObjectStoreInfoPB&); + static S3Conf get_s3_conf(const TS3StorageParam&); std::string to_string() const { return fmt::format("(bucket={}, prefix={}, client_conf={}, sse_enabled={})", bucket, prefix, @@ -143,7 +149,7 @@ class S3ClientFactory { static S3ClientFactory& instance(); - std::shared_ptr create(const S3ClientConf& s3_conf); + std::shared_ptr create(const S3ClientConf& s3_conf); static Status convert_properties_to_s3_conf(const std::map& prop, const S3URI& s3_uri, S3Conf* s3_conf); @@ -161,12 +167,14 @@ class S3ClientFactory { S3RateLimiterHolder* rate_limiter(S3RateLimitType type); private: + std::shared_ptr _create_s3_client(const S3ClientConf& s3_conf); + std::shared_ptr _create_azure_client(const S3ClientConf& s3_conf); S3ClientFactory(); static std::string get_valid_ca_cert_path(); Aws::SDKOptions _aws_options; std::mutex _lock; - std::unordered_map> _cache; + std::unordered_map> _cache; std::string _ca_cert_file_path; std::array, 2> _rate_limiters; }; diff --git a/be/test/io/fs/azure_test.cpp b/be/test/io/fs/azure_test.cpp new file mode 100644 index 00000000000000..f158cf0a7b4547 --- /dev/null +++ b/be/test/io/fs/azure_test.cpp @@ -0,0 +1,96 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include +#include + +#include +#include +#include +#include +#include +#include +#include +#include + +#include "common/config.h" + +namespace doris { + +std::string GetConnectionString() { + const static std::string ConnectionString = ""; + + if (!ConnectionString.empty()) { + return ConnectionString; + } + const static std::string envConnectionString = std::getenv("AZURE_STORAGE_CONNECTION_STRING"); + if (!envConnectionString.empty()) { + return envConnectionString; + } + throw std::runtime_error("Cannot find connection string."); +} + +TEST(AzureTest, Write) { + GTEST_SKIP() << "Skipping Azure test, because this test it to test the compile and linkage"; + using namespace Azure::Storage::Blobs; + + std::string accountName = config::test_s3_ak; + std::string accountKey = config::test_s3_sk; + + auto cred = + std::make_shared(accountName, accountKey); + + const std::string containerName = config::test_s3_bucket; + const std::string blobName = "sample-blob"; + const std::string blobContent = "Fuck Azure!"; + const std::string uri = + fmt::format("https://{}.blob.core.windows.net/{}", accountName, containerName); + + // auto containerClient = + // BlobContainerClient::CreateFromConnectionString(GetConnectionString(), containerName); + + auto containerClient = BlobContainerClient(uri, cred); + containerClient.CreateIfNotExists(); + + std::vector blockIds1; + + auto blockBlobContainer = containerClient.GetBlockBlobClient(blobName); + + // Azure::Storage::StorageException exception; + + BlockBlobClient blobClient = containerClient.GetBlockBlobClient(blobName); + std::vector blockIds; + + std::vector buffer(blobContent.begin(), blobContent.end()); + auto aresp = blobClient.UploadFrom(buffer.data(), buffer.size()); + + Azure::Storage::Metadata blobMetadata = {{"key1", "value1"}, {"key2", "value2"}}; + blobClient.SetMetadata(blobMetadata); + + auto properties = blobClient.GetProperties().Value; + for (auto metadata : properties.Metadata) { + std::cout << metadata.first << ":" << metadata.second << std::endl; + } + // We know blob size is small, so it's safe to cast here. + buffer.resize(static_cast(properties.BlobSize)); + + blobClient.DownloadTo(buffer.data(), buffer.size()); + + std::cout << std::string(buffer.begin(), buffer.end()) << std::endl; +} + +} // namespace doris diff --git a/be/test/io/fs/s3_file_writer_test.cpp b/be/test/io/fs/s3_file_writer_test.cpp index 75a49d813a4cde..4f3594dc5f0144 100644 --- a/be/test/io/fs/s3_file_writer_test.cpp +++ b/be/test/io/fs/s3_file_writer_test.cpp @@ -263,7 +263,8 @@ static auto test_mock_callbacks = std::array { pair->first = mock_client->head_object(req); }}, MockCallback {"s3_client_factory::create", [](auto&& outcome) { - auto pair = try_any_cast_ret>(outcome); + auto pair = try_any_cast_ret>( + outcome); pair->second = true; }}}; diff --git a/gensrc/proto/cloud.proto b/gensrc/proto/cloud.proto index 99c8c2647f229a..e1c3c9be5ab876 100644 --- a/gensrc/proto/cloud.proto +++ b/gensrc/proto/cloud.proto @@ -172,12 +172,14 @@ message ObjectStoreInfoPB { // presigned url use // oss,aws,cos,obs,bos enum Provider { - OSS = 0; - S3 = 1; - COS = 2; - OBS = 3; - BOS = 4; - GCP = 5; + UNKONWN = -1; + OSS = 0; + S3 = 1; + COS = 2; + OBS = 3; + BOS = 4; + GCP = 5; + AZURE = 6; } optional int64 ctime = 1; optional int64 mtime = 2; diff --git a/gensrc/thrift/AgentService.thrift b/gensrc/thrift/AgentService.thrift index 104adca70fa130..cc5dc367915fa8 100644 --- a/gensrc/thrift/AgentService.thrift +++ b/gensrc/thrift/AgentService.thrift @@ -61,6 +61,16 @@ enum TTabletType { TABLET_TYPE_MEMORY = 1 } +enum TObjStorageType { + UNKNOWN = 0, + AWS = 1, + AZURE = 2, + BOS = 3, + COS = 4, + OBS = 5, + OSS = 6, + GCP = 7 +} struct TS3StorageParam { 1: optional string endpoint @@ -74,6 +84,7 @@ struct TS3StorageParam { 9: optional string bucket 10: optional bool use_path_style = false 11: optional string token + 12: optional TObjStorageType provider } struct TStoragePolicy {