diff --git a/deltacat/compute/compactor_v2/steps/merge.py b/deltacat/compute/compactor_v2/steps/merge.py index 0e44e443..2d480e32 100644 --- a/deltacat/compute/compactor_v2/steps/merge.py +++ b/deltacat/compute/compactor_v2/steps/merge.py @@ -41,7 +41,10 @@ interface as unimplemented_deltacat_storage, ) from deltacat.compute.compactor_v2.utils.dedupe import drop_duplicates -from deltacat.constants import BYTES_PER_GIBIBYTE +from deltacat.constants import ( + BYTES_PER_GIBIBYTE, + DW_LAST_UPDATED_COLUMN_NAME, +) from deltacat.compute.compactor_v2.constants import ( MERGE_TIME_IN_SECONDS, MERGE_SUCCESS_COUNT, @@ -125,43 +128,46 @@ def _merge_tables( all_tables.append(table) - if not primary_keys or not can_drop_duplicates: - logger.info( - f"Not dropping duplicates for primary keys={primary_keys} " - f"and can_drop_duplicates={can_drop_duplicates}" - ) - all_tables[incremental_idx] = _drop_delta_type_rows( - all_tables[incremental_idx], DeltaType.DELETE - ) - # we need not drop duplicates - return pa.concat_tables(all_tables) + should_drop_duplicates = bool(primary_keys) and can_drop_duplicates + if should_drop_duplicates: + all_tables = generate_pk_hash_column(all_tables, primary_keys=primary_keys) - all_tables = generate_pk_hash_column(all_tables, primary_keys=primary_keys) + result_table_list = [] - result_table_list = [] + incremental_table = drop_duplicates( + all_tables[incremental_idx], on=sc._PK_HASH_STRING_COLUMN_NAME + ) - incremental_table = drop_duplicates( - all_tables[incremental_idx], on=sc._PK_HASH_STRING_COLUMN_NAME - ) + if compacted_table: + compacted_table = all_tables[0] - if compacted_table: - compacted_table = all_tables[0] - - records_to_keep = pc.invert( - pc.is_in( - compacted_table[sc._PK_HASH_STRING_COLUMN_NAME], - incremental_table[sc._PK_HASH_STRING_COLUMN_NAME], + records_to_keep = pc.invert( + pc.is_in( + compacted_table[sc._PK_HASH_STRING_COLUMN_NAME], + incremental_table[sc._PK_HASH_STRING_COLUMN_NAME], + ) ) - ) - result_table_list.append(compacted_table.filter(records_to_keep)) + result_table_list.append(compacted_table.filter(records_to_keep)) - incremental_table = _drop_delta_type_rows(incremental_table, DeltaType.DELETE) - result_table_list.append(incremental_table) + incremental_table = _drop_delta_type_rows(incremental_table, DeltaType.DELETE) + result_table_list.append(incremental_table) - final_table = pa.concat_tables(result_table_list) - final_table = final_table.drop([sc._PK_HASH_STRING_COLUMN_NAME]) + final_table = pa.concat_tables(result_table_list) + final_table = final_table.drop([sc._PK_HASH_STRING_COLUMN_NAME]) + else: + logger.info( + f"Not dropping duplicates for primary keys={primary_keys} " + f"and can_drop_duplicates={can_drop_duplicates}" + ) + all_tables[incremental_idx] = _drop_delta_type_rows( + all_tables[incremental_idx], DeltaType.DELETE + ) + final_table = pa.concat_tables(all_tables) + # TODO: Retrieve sort order policy from the table version metadata instead of hard-coding. + if DW_LAST_UPDATED_COLUMN_NAME in final_table.column_names: + final_table = final_table.sort_by([(DW_LAST_UPDATED_COLUMN_NAME, "descending")]) return final_table diff --git a/deltacat/constants.py b/deltacat/constants.py index 99934c73..957d6252 100644 --- a/deltacat/constants.py +++ b/deltacat/constants.py @@ -53,3 +53,5 @@ PYARROW_INFLATION_MULTIPLIER_ALL_COLUMNS = 6 MEMORY_TO_HASH_BUCKET_COUNT_RATIO = 0.0512 * BYTES_PER_TEBIBYTE + +DW_LAST_UPDATED_COLUMN_NAME = "dw_last_updated" diff --git a/deltacat/tests/compute/compact_partition_rebase_test_cases.py b/deltacat/tests/compute/compact_partition_rebase_test_cases.py index 006fd61b..4306c0ab 100644 --- a/deltacat/tests/compute/compact_partition_rebase_test_cases.py +++ b/deltacat/tests/compute/compact_partition_rebase_test_cases.py @@ -1,4 +1,6 @@ +import datetime as dt import pyarrow as pa +from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME from deltacat.tests.compute.test_util_common import ( PartitionKey, PartitionKeyType, @@ -65,20 +67,68 @@ class RebaseCompactionTestCaseParams(BaseCompactorTestCase): ], names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"], ), - expected_terminal_compact_partition_result=pa.Table.from_arrays( + expected_terminal_compact_partition_result=pa.Table.from_arrays([]), + expected_terminal_exception=None, + expected_terminal_exception_message=None, + do_create_placement_group=False, + records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE, + hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT, + read_kwargs_provider=None, + drop_duplicates=True, + skip_enabled_compact_partition_drivers=[CompactorVersion.V1], + ), + "2-rebase-sort": RebaseCompactionTestCaseParams( + primary_keys={"pk_col_1"}, + sort_keys=[ + SortKey.of(key_name="sk_col_1"), + SortKey.of(key_name="sk_col_2"), + ], + partition_keys=[PartitionKey.of("region_id", PartitionKeyType.INT)], + partition_values=["1"], + input_deltas=pa.Table.from_arrays( [ pa.array([str(i) for i in range(10)]), - pa.array([i for i in range(20, 30)]), + pa.array([i for i in range(0, 10)]), pa.array(["foo"] * 10), - pa.array([i / 10 for i in range(40, 50)]), + pa.array([i / 10 for i in range(10, 20)]), + pa.array(dt.datetime(year, 1, 1) for year in range(2000, 2010)), + ], + names=[ + "pk_col_1", + "sk_col_1", + "sk_col_2", + "col_1", + DW_LAST_UPDATED_COLUMN_NAME, + ], + ), + input_deltas_delta_type=DeltaType.UPSERT, + # dw_last_update is in ascending order in the input table. + # Expect descending sort on dw_last_updated for each hash bucket. + # Since there is only one hash bucket, the order of input rows should be reversed. + rebase_expected_compact_partition_result=pa.Table.from_arrays( + [ + pa.array([str(i) for i in reversed(range(10))]), + pa.array([i for i in reversed(range(0, 10))]), + pa.array(["foo"] * 10), + pa.array([i / 10 for i in reversed(range(10, 20))]), + pa.array( + dt.datetime(year, 1, 1) for year in reversed(range(2000, 2010)) + ), + ], + names=[ + "pk_col_1", + "sk_col_1", + "sk_col_2", + "col_1", + DW_LAST_UPDATED_COLUMN_NAME, ], - names=["pk_col_1", "sk_col_1", "sk_col_2", "col_1"], ), + expected_terminal_compact_partition_result=pa.Table.from_arrays([]), expected_terminal_exception=None, expected_terminal_exception_message=None, do_create_placement_group=False, records_per_compacted_file=DEFAULT_MAX_RECORDS_PER_FILE, - hash_bucket_count=DEFAULT_HASH_BUCKET_COUNT, + hash_bucket_count=1, read_kwargs_provider=None, drop_duplicates=True, skip_enabled_compact_partition_drivers=[CompactorVersion.V1], diff --git a/deltacat/tests/compute/test_compact_partition_rebase.py b/deltacat/tests/compute/test_compact_partition_rebase.py index aac5319f..fc577fea 100644 --- a/deltacat/tests/compute/test_compact_partition_rebase.py +++ b/deltacat/tests/compute/test_compact_partition_rebase.py @@ -5,6 +5,7 @@ import boto3 from boto3.resources.base import ServiceResource import pyarrow as pa +from deltacat.constants import DW_LAST_UPDATED_COLUMN_NAME from deltacat.io.ray_plasma_object_store import RayPlasmaObjectStore from pytest_benchmark.fixture import BenchmarkFixture @@ -274,16 +275,24 @@ def test_compact_partition_rebase_same_source_and_destination( compacted_delta_locator, storage_type=StorageType.LOCAL, **ds_mock_kwargs ) actual_rebase_compacted_table = pa.concat_tables(tables) - # if no primary key is specified then sort by sort_key for consistent assertion - sorting_cols: List[Any] = ( - [(val, "ascending") for val in primary_keys] if primary_keys else sort_keys - ) - rebase_expected_compact_partition_result = ( - rebase_expected_compact_partition_result.combine_chunks().sort_by(sorting_cols) - ) - actual_rebase_compacted_table = ( - actual_rebase_compacted_table.combine_chunks().sort_by(sorting_cols) - ) + if DW_LAST_UPDATED_COLUMN_NAME in actual_rebase_compacted_table.column_names: + # If DW_LAST_UPDATED_COLUMN_NAME is present, don't sort expected and actual tables; + # we want to assert on the order of the rows in the table, to validate sorting on timestamp. + pass + else: + # If DW_LAST_UPDATED_COLUMN_NAME is absent, sort by primary key for consistent assertion. + # Sort by sort_key if no primary key is specified. + sorting_cols: List[Any] = ( + [(val, "ascending") for val in primary_keys] if primary_keys else sort_keys + ) + rebase_expected_compact_partition_result = ( + rebase_expected_compact_partition_result.combine_chunks().sort_by( + sorting_cols + ) + ) + actual_rebase_compacted_table = ( + actual_rebase_compacted_table.combine_chunks().sort_by(sorting_cols) + ) assert actual_rebase_compacted_table.equals( rebase_expected_compact_partition_result ), f"{actual_rebase_compacted_table} does not match {rebase_expected_compact_partition_result}"