-
Notifications
You must be signed in to change notification settings - Fork 901
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Vectorized hash grouping by a single text column #7586
base: main
Are you sure you want to change the base?
Changes from 26 commits
9390673
a68572f
4aeecac
979e59b
3311861
ef106b7
89fd4ad
192ec2c
37b4cca
16d4908
e4b79ef
5b08c99
c4fc939
8cf8c69
8d45d71
2bac2c4
dc188e6
b63c057
17d4124
cfbf9d4
7cd0424
dddd0ce
1919e38
1529ed9
c7efeb0
d0e2431
621500d
04e2776
18f51f5
09d1036
679c56d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
Implements: #7586 Vectorized aggregation with grouping by a single text column. |
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,137 @@ | ||||||||||||||||
/* | ||||||||||||||||
* This file and its contents are licensed under the Timescale License. | ||||||||||||||||
* Please see the included NOTICE for copyright information and | ||||||||||||||||
* LICENSE-TIMESCALE for a copy of the license. | ||||||||||||||||
*/ | ||||||||||||||||
|
||||||||||||||||
/* | ||||||||||||||||
* Implementation of column hashing for a single text column. | ||||||||||||||||
*/ | ||||||||||||||||
|
||||||||||||||||
#include <postgres.h> | ||||||||||||||||
|
||||||||||||||||
#include <common/hashfn.h> | ||||||||||||||||
|
||||||||||||||||
#include "compression/arrow_c_data_interface.h" | ||||||||||||||||
#include "nodes/decompress_chunk/compressed_batch.h" | ||||||||||||||||
#include "nodes/vector_agg/exec.h" | ||||||||||||||||
#include "nodes/vector_agg/grouping_policy_hash.h" | ||||||||||||||||
#include "template_helper.h" | ||||||||||||||||
|
||||||||||||||||
#include "batch_hashing_params.h" | ||||||||||||||||
|
||||||||||||||||
#include "umash_fingerprint_key.h" | ||||||||||||||||
|
||||||||||||||||
#define EXPLAIN_NAME "single text" | ||||||||||||||||
#define KEY_VARIANT single_text | ||||||||||||||||
#define OUTPUT_KEY_TYPE BytesView | ||||||||||||||||
|
||||||||||||||||
static void | ||||||||||||||||
single_text_key_hashing_init(HashingStrategy *hashing) | ||||||||||||||||
{ | ||||||||||||||||
hashing->umash_params = umash_key_hashing_init(); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
typedef struct BytesView | ||||||||||||||||
{ | ||||||||||||||||
const uint8 *data; | ||||||||||||||||
uint32 len; | ||||||||||||||||
} BytesView; | ||||||||||||||||
Comment on lines
+35
to
+39
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Instead of defining a new type, can we use StringInfo and There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Well, it's not a complex type, and StringInfo has some unrelated things, so I'd keep it as is. |
||||||||||||||||
|
||||||||||||||||
static BytesView | ||||||||||||||||
get_bytes_view(CompressedColumnValues *column_values, int arrow_row) | ||||||||||||||||
{ | ||||||||||||||||
const uint32 start = ((uint32 *) column_values->buffers[1])[arrow_row]; | ||||||||||||||||
const int32 value_bytes = ((uint32 *) column_values->buffers[1])[arrow_row + 1] - start; | ||||||||||||||||
Assert(value_bytes >= 0); | ||||||||||||||||
|
||||||||||||||||
return (BytesView){ .len = value_bytes, .data = &((uint8 *) column_values->buffers[2])[start] }; | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
static pg_attribute_always_inline void | ||||||||||||||||
single_text_key_hashing_get_key(BatchHashingParams params, int row, void *restrict output_key_ptr, | ||||||||||||||||
void *restrict hash_table_key_ptr, bool *restrict valid) | ||||||||||||||||
{ | ||||||||||||||||
Assert(params.policy->num_grouping_columns == 1); | ||||||||||||||||
|
||||||||||||||||
BytesView *restrict output_key = (BytesView *) output_key_ptr; | ||||||||||||||||
HASH_TABLE_KEY_TYPE *restrict hash_table_key = (HASH_TABLE_KEY_TYPE *) hash_table_key_ptr; | ||||||||||||||||
|
||||||||||||||||
if (unlikely(params.single_grouping_column.decompression_type == DT_Scalar)) | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Question for my own understanding (not asking for any changes for this PR): This is deep into vector aggregation, so I would expect that this function only gets passed an arrow array. But now we need to check for different non-array formats, including impossible cases (e.g, DT_Iterator). If we only passed in arrays, these checks would not be necessary. The arrow array format already supports everything we need. Even scalar/segmentby values can be represented by arrow arrays (e.g., run-end encoded). Now we need this extra code to check for different formats/cases everywhere we reach into the data. Some of them shouldn't even be possible here. IMO, the API to retrieve a value should be something:
This function can easily check the encoding of the array (dict, runend, etc.) to retrieve the value requested. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, I have already regretted supporting the scalar values throughout aggregation. Technically it should perform better, because it avoids creating e.g. arrow array with the same constant value for every row, and sometimes we perform the computations in a different way for scalar values. But the implementation complexity might be a little too high. Maybe I should look into removing this at least for the key column, and always materializing them into arrow arrays. I'm going to consider this after we merge the multi-column aggregation. The external interface might still turn out to be more complex than what you suggest, and closer to the current |
||||||||||||||||
{ | ||||||||||||||||
*valid = !*params.single_grouping_column.output_isnull; | ||||||||||||||||
if (*valid) | ||||||||||||||||
{ | ||||||||||||||||
output_key->len = VARSIZE_ANY_EXHDR(*params.single_grouping_column.output_value); | ||||||||||||||||
output_key->data = | ||||||||||||||||
(const uint8 *) VARDATA_ANY(*params.single_grouping_column.output_value); | ||||||||||||||||
} | ||||||||||||||||
else | ||||||||||||||||
{ | ||||||||||||||||
output_key->len = 0; | ||||||||||||||||
output_key->data = NULL; | ||||||||||||||||
} | ||||||||||||||||
} | ||||||||||||||||
else if (params.single_grouping_column.decompression_type == DT_ArrowText) | ||||||||||||||||
{ | ||||||||||||||||
*output_key = get_bytes_view(¶ms.single_grouping_column, row); | ||||||||||||||||
*valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row); | ||||||||||||||||
} | ||||||||||||||||
else if (params.single_grouping_column.decompression_type == DT_ArrowTextDict) | ||||||||||||||||
{ | ||||||||||||||||
const int16 index = ((int16 *) params.single_grouping_column.buffers[3])[row]; | ||||||||||||||||
*output_key = get_bytes_view(¶ms.single_grouping_column, index); | ||||||||||||||||
*valid = arrow_row_is_valid(params.single_grouping_column.buffers[0], row); | ||||||||||||||||
} | ||||||||||||||||
else | ||||||||||||||||
{ | ||||||||||||||||
pg_unreachable(); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
DEBUG_PRINT("%p consider key row %d key index %d is %d bytes: ", | ||||||||||||||||
params.policy, | ||||||||||||||||
row, | ||||||||||||||||
params.policy->last_used_key_index + 1, | ||||||||||||||||
output_key->len); | ||||||||||||||||
for (size_t i = 0; i < output_key->len; i++) | ||||||||||||||||
{ | ||||||||||||||||
DEBUG_PRINT("%.2x.", output_key->data[i]); | ||||||||||||||||
} | ||||||||||||||||
DEBUG_PRINT("\n"); | ||||||||||||||||
|
||||||||||||||||
const struct umash_fp fp = umash_fprint(params.policy->hashing.umash_params, | ||||||||||||||||
/* seed = */ ~0ULL, | ||||||||||||||||
output_key->data, | ||||||||||||||||
output_key->len); | ||||||||||||||||
*hash_table_key = umash_fingerprint_get_key(fp); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
static pg_attribute_always_inline void | ||||||||||||||||
single_text_key_hashing_store_new(GroupingPolicyHash *restrict policy, uint32 new_key_index, | ||||||||||||||||
BytesView output_key) | ||||||||||||||||
{ | ||||||||||||||||
const int total_bytes = output_key.len + VARHDRSZ; | ||||||||||||||||
text *restrict stored = (text *) MemoryContextAlloc(policy->hashing.key_body_mctx, total_bytes); | ||||||||||||||||
SET_VARSIZE(stored, total_bytes); | ||||||||||||||||
memcpy(VARDATA(stored), output_key.data, output_key.len); | ||||||||||||||||
Comment on lines
+113
to
+116
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Suggest making use of PostgreSQL builtin function:
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think it's better to have something that can be inlined here, because it's a part of a hot loop that builds the hash table. |
||||||||||||||||
output_key.data = (uint8 *) VARDATA(stored); | ||||||||||||||||
akuzm marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||
policy->hashing.output_keys[new_key_index] = PointerGetDatum(stored); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
/* | ||||||||||||||||
* We use the standard single-key key output functions. | ||||||||||||||||
*/ | ||||||||||||||||
static void | ||||||||||||||||
single_text_emit_key(GroupingPolicyHash *policy, uint32 current_key, | ||||||||||||||||
TupleTableSlot *aggregated_slot) | ||||||||||||||||
{ | ||||||||||||||||
return hash_strategy_output_key_single_emit(policy, current_key, aggregated_slot); | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
static void | ||||||||||||||||
single_text_key_hashing_prepare_for_batch(GroupingPolicyHash *policy, | ||||||||||||||||
DecompressBatchState *batch_state) | ||||||||||||||||
{ | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
#include "hash_strategy_impl.c" |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,45 @@ | ||
/* | ||
* This file and its contents are licensed under the Timescale License. | ||
* Please see the included NOTICE for copyright information and | ||
* LICENSE-TIMESCALE for a copy of the license. | ||
*/ | ||
#pragma once | ||
|
||
/* | ||
* Helpers to use the umash fingerprint as a hash table key in our hashing | ||
* strategies for vectorized grouping. | ||
*/ | ||
|
||
#include "import/umash.h" | ||
|
||
/* | ||
* The struct is packed so that the hash table entry fits into 16 | ||
* bytes with the uint32 key index that goes before. | ||
*/ | ||
struct umash_fingerprint_key | ||
{ | ||
uint32 hash; | ||
uint64 rest; | ||
} pg_attribute_packed(); | ||
|
||
#define HASH_TABLE_KEY_TYPE struct umash_fingerprint_key | ||
#define KEY_HASH(X) (X.hash) | ||
#define KEY_EQUAL(a, b) (a.hash == b.hash && a.rest == b.rest) | ||
|
||
static inline struct umash_fingerprint_key | ||
umash_fingerprint_get_key(struct umash_fp fp) | ||
{ | ||
const struct umash_fingerprint_key key = { | ||
.hash = fp.hash[0] & (~(uint32) 0), | ||
.rest = fp.hash[1], | ||
}; | ||
return key; | ||
} | ||
|
||
static inline struct umash_params * | ||
umash_key_hashing_init() | ||
{ | ||
struct umash_params *params = palloc0(sizeof(struct umash_params)); | ||
umash_params_derive(params, 0xabcdef1234567890ull, NULL); | ||
return params; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need to skip these tests on windows? Is it also because of UMASH?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Right, I didn't get it to compile there, so decided to disable for now.