Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parquet File Metadata caching implementation #586

Open
wants to merge 7 commits into
base: project-antalya-24.12.2
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -914,8 +914,10 @@ The server successfully detected this situation and will download merged part fr
M(MemoryWorkerRun, "Number of runs done by MemoryWorker in background", ValueType::Number) \
M(MemoryWorkerRunElapsedMicroseconds, "Total time spent by MemoryWorker for background work", ValueType::Microseconds) \
\
M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \

M(ParquetFetchWaitTimeMicroseconds, "Time of waiting fetching parquet data", ValueType::Microseconds) \
\
M(ParquetMetaDataCacheHits, "Number of times the read from filesystem cache hit the cache.", ValueType::Number) \
M(ParquetMetaDataCacheMisses, "Number of times the read from filesystem cache miss the cache.", ValueType::Number) \

#ifdef APPLY_FOR_EXTERNAL_EVENTS
#define APPLY_FOR_EVENTS(M) APPLY_FOR_BUILTIN_EVENTS(M) APPLY_FOR_EXTERNAL_EVENTS(M)
Expand Down
2 changes: 1 addition & 1 deletion src/Core/FormatFactorySettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -1251,7 +1251,7 @@ Set the quoting rule for identifiers in SHOW CREATE query
DECLARE(IdentifierQuotingStyle, show_create_query_identifier_quoting_style, IdentifierQuotingStyle::Backticks, R"(
Set the quoting style for identifiers in SHOW CREATE query
)", 0) \

DECLARE(Bool, input_format_parquet_use_metadata_cache, false, R"(Enable parquet file metadata caching)", 0) \
// End of FORMAT_FACTORY_SETTINGS

#define OBSOLETE_FORMAT_SETTINGS(M, ALIAS) \
Expand Down
5 changes: 2 additions & 3 deletions src/Core/ServerSettings.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,9 +212,8 @@ namespace DB
DECLARE(UInt64, threadpool_writer_queue_size, 1000000, "Number of tasks which is possible to push into background pool for write requests to object storages", 0) \
DECLARE(UInt64, iceberg_catalog_threadpool_pool_size, 50, "Size of background pool for iceberg catalog", 0) \
DECLARE(UInt64, iceberg_catalog_threadpool_queue_size, 1000000, "Number of tasks which is possible to push into iceberg catalog pool", 0) \
DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \


DECLARE(UInt32, allow_feature_tier, 0, "0 - All feature tiers allowed (experimental, beta, production). 1 - Only beta and production feature tiers allowed. 2 - Only production feature tier allowed", 0) \
DECLARE(UInt64, input_format_parquet_metadata_cache_max_size, 500000000, "Maximum size of parquet file metadata cache", 0) \
// clang-format on

/// If you add a setting which can be updated at runtime, please update 'changeable_settings' map in dumpToSystemServerSettingsColumns below
Expand Down
3 changes: 3 additions & 0 deletions src/Processors/Formats/IInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ class IInputFormat : public SourceWithKeyCondition

void needOnlyCount() { need_only_count = true; }

/// Set additional info/key/id related to underlying storage of the ReadBuffer
virtual void setStorageRelatedUniqueKey(const ServerSettings & /* server_settings */, const Settings & /*settings*/, const String & /*key*/) {}

protected:
ReadBuffer & getReadBuffer() const { chassert(in); return *in; }

Expand Down
85 changes: 84 additions & 1 deletion src/Processors/Formats/Impl/ParquetBlockInputFormat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,9 @@

#if USE_PARQUET

#include <Core/Settings.h>
#include <Core/ServerSettings.h>
#include <Common/ProfileEvents.h>
#include <Common/logger_useful.h>
#include <Common/ThreadPool.h>
#include <Formats/FormatFactory.h>
Expand Down Expand Up @@ -33,6 +36,8 @@
namespace ProfileEvents
{
extern const Event ParquetFetchWaitTimeMicroseconds;
extern const Event ParquetMetaDataCacheHits;
extern const Event ParquetMetaDataCacheMisses;
}

namespace CurrentMetrics
Expand All @@ -49,6 +54,16 @@ namespace CurrentMetrics
namespace DB
{

namespace Setting
{
extern const SettingsBool input_format_parquet_use_metadata_cache;
}

namespace ServerSetting
{
extern const ServerSettingsUInt64 input_format_parquet_metadata_cache_max_size;
}

namespace ErrorCodes
{
extern const int BAD_ARGUMENTS;
Expand Down Expand Up @@ -484,6 +499,58 @@ static std::vector<Range> getHyperrectangleForRowGroup(const parquet::FileMetaDa
return hyperrectangle;
}

ParquetFileMetaDataCache::ParquetFileMetaDataCache(UInt64 max_size_bytes)
: CacheBase(max_size_bytes) {}

ParquetFileMetaDataCache * ParquetFileMetaDataCache::instance(UInt64 max_size_bytes)
{
static ParquetFileMetaDataCache instance(max_size_bytes);
return &instance;
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::readMetadataFromFile()
{
createArrowFileIfNotCreated();
return parquet::ReadMetaData(arrow_file);
}

std::shared_ptr<parquet::FileMetaData> ParquetBlockInputFormat::getFileMetaData()
{
// in-memory cache is not implemented for local file operations, only for remote files
// there is a chance the user sets `input_format_parquet_use_metadata_cache=1` for a local file operation
// and the cache_key won't be set. Therefore, we also need to check for metadata_cache.key
if (!metadata_cache.use_cache || metadata_cache.key.empty())
{
return readMetadataFromFile();
}

auto [parquet_file_metadata, loaded] = ParquetFileMetaDataCache::instance(metadata_cache.max_size_bytes)->getOrSet(
metadata_cache.key,
[&]()
{
return readMetadataFromFile();
}
);
if (loaded)
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheMisses);
else
ProfileEvents::increment(ProfileEvents::ParquetMetaDataCacheHits);
return parquet_file_metadata;
}

void ParquetBlockInputFormat::createArrowFileIfNotCreated()
{
if (arrow_file)
{
return;
}

// Create arrow file adapter.
// TODO: Make the adapter do prefetching on IO threads, based on the full set of ranges that
// we'll need to read (which we know in advance). Use max_download_threads for that.
arrow_file = asArrowFile(*in, format_settings, is_stopped, "Parquet", PARQUET_MAGIC_BYTES, /* avoid_buffering */ true);
}

ParquetBlockInputFormat::ParquetBlockInputFormat(
ReadBuffer & buf,
const Block & header_,
Expand Down Expand Up @@ -528,7 +595,7 @@ void ParquetBlockInputFormat::initializeIfNeeded()
if (is_stopped)
return;

metadata = parquet::ReadMetaData(arrow_file);
metadata = getFileMetaData();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what will happen if the object metadata was put in the cache, but later on that object was deleted from S3?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'll fail on S3::getObjectInfo call that is made prior to reaching this code.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

SETTINGS input_format_parquet_use_metadata_cache = 1, input_format_parquet_filter_push_down = 1, log_comment = 'abc', remote_filesystem_read_prefetch = 0

Query id: 2d36d362-894b-4ad5-a075-6960bca9de88


Elapsed: 148.109 sec. 

Received exception from server (version 24.12.2):
Code: 499. DB::Exception: Received from localhost:9000. DB::Exception: Failed to get object info: No response body.. HTTP response code: 404: while reading adir/test0.parquet: The table structure cannot be extracted from a Parquet format file. You can specify the structure manually. (S3_ERROR)

const bool prefetch_group = supportPrefetch();

std::shared_ptr<arrow::Schema> schema;
Expand Down Expand Up @@ -594,6 +661,8 @@ void ParquetBlockInputFormat::initializeIfNeeded()
filtering_columns = parquet_bloom_filter_condition->getFilteringColumnKeys();
}

bool has_row_groups_to_read = false;

for (int row_group = 0; row_group < num_row_groups; ++row_group)
{
if (skip_row_groups.contains(row_group))
Expand Down Expand Up @@ -628,9 +697,23 @@ void ParquetBlockInputFormat::initializeIfNeeded()
row_group_batches.back().total_bytes_compressed += row_group_size;
auto rows = adaptive_chunk_size(row_group);
row_group_batches.back().adaptive_chunk_size = rows ? rows : format_settings.parquet.max_block_size;

has_row_groups_to_read = true;
}

if (has_row_groups_to_read)
{
createArrowFileIfNotCreated();
}
}

void ParquetBlockInputFormat::setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_)
{
metadata_cache.key = key_;
metadata_cache.use_cache = settings[Setting::input_format_parquet_use_metadata_cache];
metadata_cache.max_size_bytes = server_settings[ServerSetting::input_format_parquet_metadata_cache_max_size];
}

void ParquetBlockInputFormat::initializeRowGroupBatchReader(size_t row_group_batch_idx)
{
const bool row_group_prefetch = supportPrefetch();
Expand Down
24 changes: 24 additions & 0 deletions src/Processors/Formats/Impl/ParquetBlockInputFormat.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "config.h"
#if USE_PARQUET

#include <Common/CacheBase.h>
#include <Processors/Formats/IInputFormat.h>
#include <Processors/Formats/ISchemaReader.h>
#include <Formats/FormatSettings.h>
Expand Down Expand Up @@ -72,6 +73,8 @@ class ParquetBlockInputFormat : public IInputFormat

size_t getApproxBytesReadForChunk() const override { return previous_approx_bytes_read_for_chunk; }

void setStorageRelatedUniqueKey(const ServerSettings & server_settings, const Settings & settings, const String & key_) override;

private:
Chunk read() override;

Expand All @@ -90,6 +93,11 @@ class ParquetBlockInputFormat : public IInputFormat

void threadFunction(size_t row_group_batch_idx);

void createArrowFileIfNotCreated();
std::shared_ptr<parquet::FileMetaData> readMetadataFromFile();

std::shared_ptr<parquet::FileMetaData> getFileMetaData();

inline bool supportPrefetch() const;

// Data layout in the file:
Expand Down Expand Up @@ -338,6 +346,12 @@ class ParquetBlockInputFormat : public IInputFormat
std::exception_ptr background_exception = nullptr;
std::atomic<int> is_stopped{0};
bool is_initialized = false;
struct Cache
{
String key;
bool use_cache = false;
UInt64 max_size_bytes{0};
} metadata_cache;
};

class ParquetSchemaReader : public ISchemaReader
Expand All @@ -356,6 +370,16 @@ class ParquetSchemaReader : public ISchemaReader
std::shared_ptr<parquet::FileMetaData> metadata;
};

class ParquetFileMetaDataCache : public CacheBase<String, parquet::FileMetaData>
{
public:
static ParquetFileMetaDataCache * instance(UInt64 max_size_bytes);
void clear() {}

private:
ParquetFileMetaDataCache(UInt64 max_size_bytes);
};

}

#endif
3 changes: 3 additions & 0 deletions src/Storages/ObjectStorage/StorageObjectStorageSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,9 @@ StorageObjectStorageSource::ReaderHolder StorageObjectStorageSource::createReade
if (need_only_count)
input_format->needOnlyCount();

if (!object_info->getPath().empty())
input_format->setStorageRelatedUniqueKey(context_->getServerSettings(), context_->getSettingsRef(), object_info->getPath() + ":" + object_info->metadata->etag);

builder.init(Pipe(input_format));

if (auto transformer = configuration->getSchemaTransformer(object_info->getPath()))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
10
10
10
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
-- Tags: no-parallel, no-fasttest

DROP TABLE IF EXISTS t_parquet_03262;

CREATE TABLE t_parquet_03262 (a UInt64)
ENGINE = S3(s3_conn, filename = 'test_03262_{_partition_id}', format = Parquet)
PARTITION BY a;

INSERT INTO t_parquet_03262 SELECT number FROM numbers(10) SETTINGS s3_truncate_on_insert=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1;

SELECT COUNT(*)
FROM s3(s3_conn, filename = 'test_03262_*', format = Parquet)
SETTINGS input_format_parquet_use_metadata_cache=1, log_comment='test_03262_parquet_metadata_cache';

SYSTEM FLUSH LOGS;

SELECT ProfileEvents['ParquetMetaDataCacheHits']
FROM system.query_log
where log_comment = 'test_03262_parquet_metadata_cache'
AND type = 'QueryFinish'
ORDER BY event_time desc
LIMIT 1;

DROP TABLE t_parquet_03262;
Loading