Skip to content

Commit

Permalink
Resolve comments
Browse files Browse the repository at this point in the history
  • Loading branch information
mapleFU committed Jan 13, 2025
1 parent 7aee7dd commit c5b1fb1
Show file tree
Hide file tree
Showing 8 changed files with 20 additions and 22 deletions.
2 changes: 1 addition & 1 deletion cpp/src/parquet/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -159,8 +159,8 @@ set(PARQUET_SRCS
arrow/schema_internal.cc
arrow/writer.cc
bloom_filter.cc
bloom_filter_reader.cc
bloom_filter_builder.cc
bloom_filter_reader.cc
column_reader.cc
column_scanner.cc
column_writer.cc
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/arrow/arrow_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5543,7 +5543,7 @@ auto encode_double = [](double value) {

} // namespace

class ParquetIndexRoundTripTest {
class TestingWithPageIndex {
public:
void WriteFile(const std::shared_ptr<WriterProperties>& writer_properties,
const std::shared_ptr<::arrow::Table>& table) {
Expand Down Expand Up @@ -5572,7 +5572,7 @@ class ParquetIndexRoundTripTest {
};

class ParquetPageIndexRoundTripTest : public ::testing::Test,
public ParquetIndexRoundTripTest {
public TestingWithPageIndex {
public:
void ReadPageIndexes(int expect_num_row_groups, int expect_num_pages,
const std::set<int>& expect_columns_without_index = {}) {
Expand Down Expand Up @@ -5878,7 +5878,7 @@ TEST_F(ParquetPageIndexRoundTripTest, EnablePerColumn) {
}

class ParquetBloomFilterRoundTripTest : public ::testing::Test,
public ParquetIndexRoundTripTest {
public TestingWithPageIndex {
public:
void ReadBloomFilters(int expect_num_row_groups,
const std::set<int>& expect_columns_without_filter = {}) {
Expand Down
2 changes: 2 additions & 0 deletions cpp/src/parquet/bloom_filter.h
Original file line number Diff line number Diff line change
Expand Up @@ -120,10 +120,12 @@ class PARQUET_EXPORT BloomFilter {
uint64_t Hash(const FLBA& value, uint32_t type_len) const {
return Hash(&value, type_len);
}

/// Compute hash for Int96 value by using its plain encoding result.
///
/// @param value the value to hash.
uint64_t Hash(const Int96& value) const { return Hash(&value); }

/// Compute hash for std::string_view value by using its plain encoding result.
///
/// @param value the value to hash.
Expand Down
18 changes: 9 additions & 9 deletions cpp/src/parquet/bloom_filter_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {
/// Make sure column ordinal is not out of bound and the builder is in good state.
void CheckState(int32_t column_ordinal) const {
if (finished_) {
throw ParquetException("Cannot call WriteTo() twice on BloomFilterBuilder.");
throw ParquetException("BloomFilterBuilder is already finished.");
}
if (column_ordinal < 0 || column_ordinal >= schema_->num_columns()) {
throw ParquetException("Invalid column ordinal: ", column_ordinal);
Expand All @@ -84,8 +84,7 @@ class BloomFilterBuilderImpl : public BloomFilterBuilder {

void BloomFilterBuilderImpl::AppendRowGroup() {
if (finished_) {
throw ParquetException(
"Cannot call AppendRowGroup() to BloomFilterBuilder::WriteTo is called");
throw ParquetException("Cannot append to a finished BloomFilterBuilder");
}
file_bloom_filters_.emplace_back(std::make_unique<RowGroupBloomFilters>());
}
Expand All @@ -112,11 +111,10 @@ BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordin
bloom_filter_options.ndv, bloom_filter_options.fpp));
auto insert_result = row_group_bloom_filter.emplace(
column_ordinal, std::move(block_split_bloom_filter));
DCHECK(insert_result.second);
iter = insert_result.first;
}
if (iter->second == nullptr) {
throw ParquetException("Bloom filter state is invalid for column ",
throw ParquetException("Bloom filter should not be null for column ",
column_descr->path());
}
return iter->second.get();
Expand All @@ -125,7 +123,7 @@ BloomFilter* BloomFilterBuilderImpl::GetOrCreateBloomFilter(int32_t column_ordin
void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
BloomFilterLocation* location) {
if (finished_) {
throw ParquetException("Cannot call WriteTo() multiple times.");
throw ParquetException("Cannot write a finished BloomFilterBuilder");
}
finished_ = true;

Expand All @@ -143,17 +141,19 @@ void BloomFilterBuilderImpl::WriteTo(::arrow::io::OutputStream* sink,
// serialize bloom filter in ascending order of column id
for (auto& [column_id, filter] : row_group_bloom_filters) {
if (ARROW_PREDICT_FALSE(filter == nullptr)) {
throw ParquetException("Bloom filter state is invalid for column ", column_id);
throw ParquetException("Bloom filter is null for column ", column_id);
}
if (ARROW_PREDICT_FALSE(column_id < 0 || column_id >= num_columns)) {
throw ParquetException("Invalid column ordinal when serailizing: ", column_id);
throw ParquetException("Invalid column ordinal when serializing bloom filter: ",
column_id);
}
PARQUET_ASSIGN_OR_THROW(int64_t offset, sink->Tell());
// TODO(GH-43138): Estimate the quality of the bloom filter before writing it.
filter->WriteTo(sink);
PARQUET_ASSIGN_OR_THROW(int64_t pos, sink->Tell());
if (pos - offset > std::numeric_limits<int32_t>::max()) {
throw ParquetException("Bloom filter is too large to be serialized, size: ",
pos - offset);
pos - offset, " for column ", column_id);
}
locations[column_id] = IndexLocation{offset, static_cast<int32_t>(pos - offset)};
}
Expand Down
6 changes: 3 additions & 3 deletions cpp/src/parquet/bloom_filter_reader_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ TEST(BloomFilterReader, FileNotHaveBloomFilter) {
}

// <c1:BYTE_ARRAY, c2:BYTE_ARRAY>, c1 has bloom filter.
TEST(BloomFilterBuilderTest, BasicRoundTrip) {
TEST(BloomFilterBuilder, BasicRoundTrip) {
SchemaDescriptor schema;
schema::NodePtr root = schema::GroupNode::Make(
"schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::ByteArray("c2")});
Expand Down Expand Up @@ -152,7 +152,7 @@ TEST(BloomFilterBuilderTest, BasicRoundTrip) {
}
}

TEST(BloomFilterBuilderTest, InvalidOperations) {
TEST(BloomFilterBuilder, InvalidOperations) {
SchemaDescriptor schema;
schema::NodePtr root = schema::GroupNode::Make(
"schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::Boolean("c2")});
Expand Down Expand Up @@ -196,7 +196,7 @@ TEST(BloomFilterBuilderTest, InvalidOperations) {
::testing::HasSubstr("Cannot call WriteTo() multiple times")));
}

TEST(BloomFilterBuilderTest, GetOrCreate) {
TEST(BloomFilterBuilder, GetOrCreate) {
SchemaDescriptor schema;
schema::NodePtr root = schema::GroupNode::Make(
"schema", Repetition::REPEATED, {schema::ByteArray("c1"), schema::Boolean("c2")});
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/parquet/column_writer_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1765,7 +1765,7 @@ TestBloomFilterWriter<TestType>::BuildWriterWithBloomFilter(
this->sink_, column_properties.compression(), this->metadata_.get());
builder_ =
internal::BloomFilterBuilder::Make(&this->schema_, this->writer_properties_.get());
// Initial RowGroup
// Initialize RowGroup
builder_->AppendRowGroup();
bloom_filter_ = builder_->GetOrCreateBloomFilter(0);
std::shared_ptr<ColumnWriter> writer =
Expand All @@ -1791,7 +1791,7 @@ TYPED_TEST(TestBloomFilterWriter, Basic) {
writer->WriteBatch(this->values_.size(), nullptr, nullptr, this->values_ptr_);
writer->Close();

// Read all rows so we are sure that also the non-dictionary pages are read correctly
// Make sure that column values are read correctly
this->SetupValuesOut(SMALL_SIZE);
this->ReadColumnFully();
ASSERT_EQ(SMALL_SIZE, this->values_read_);
Expand Down
1 change: 0 additions & 1 deletion cpp/src/parquet/metadata.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1976,7 +1976,6 @@ class FileMetaDataBuilder::FileMetaDataBuilderImpl {
auto& column = row_group_metadata.columns[column_id];
auto& column_metadata = column.meta_data;
column_metadata.__set_bloom_filter_offset(bloom_filter_location.offset);
// bloom_filter_length is added by Parquet format 2.10.0
column_metadata.__set_bloom_filter_length(bloom_filter_location.length);
}
}
Expand Down
3 changes: 0 additions & 3 deletions cpp/src/parquet/metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -529,9 +529,6 @@ struct PageIndexLocation {
/// \brief Public struct for location to all bloom filters in a parquet file.
struct BloomFilterLocation {
/// Row group bloom filter index locations which uses row group ordinal as the key.
///
/// Note: Before Parquet 2.10, the bloom filter index only have "offset". But here
/// we use "IndexLocation" with length to support the future extension.
FileBloomFilterLocation bloom_filter_location;
};

Expand Down

0 comments on commit c5b1fb1

Please sign in to comment.