diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index baae48b17..e53c5c355 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -325,6 +325,7 @@ def generate_selection(cls, tap: Dict) -> List[Dict]: # Add replication_key only if replication_method is INCREMENTAL 'replication_key': table.get('replication_key') if replication_method == 'INCREMENTAL' else None, + 'partition_key': table.get('partition_key'), } ) ) diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index 62db6173a..e5be39bdb 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -728,6 +728,12 @@ def make_default_selection(self, schema, selection_file): ]['metadata']['replication-key'] = tap_stream_sel[ 'replication_key' ] + if 'partition_key' in tap_stream_sel: + schema['streams'][stream_idx]['metadata'][ + stream_table_mdata_idx + ]['metadata']['partition-key'] = tap_stream_sel[ + 'partition_key' + ] else: self.logger.debug( 'Mark %s tap_stream_id as not selected', tap_stream_id diff --git a/pipelinewise/cli/schemas/tap.json b/pipelinewise/cli/schemas/tap.json index a437e4b8f..3635e2506 100644 --- a/pipelinewise/cli/schemas/tap.json +++ b/pipelinewise/cli/schemas/tap.json @@ -85,6 +85,9 @@ "LOG_BASED" ] }, + "partition_key": { + "type": "string" + }, "transformations": { "type": "array", "items": { @@ -113,6 +116,9 @@ "replication_key": { "type": "string" }, + "partition_key": { + "type": "string" + }, "transformations": { "type": "array", "items": { diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index 60bcdba61..6d3b3dd29 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -194,6 +194,7 @@ def create_table( primary_key: Optional[List[str]], is_temporary: bool = False, sort_columns=False, + partition_key: Optional[str] = None, ): table_dict = utils.tablename_to_dict(table_name) @@ -232,6 +233,9 @@ def create_table( f'CREATE OR REPLACE TABLE {target_schema}.{target_table} (' f'{",".join(columns)})' ) + if partition_key: + partition_key = partition_key.lower() + sql = sql + f' PARTITION BY DATE({partition_key})' if primary_key: primary_key = [c.lower() for c in primary_key] sql = sql + f' CLUSTER BY {",".join(primary_key)}' diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index a4f27e954..d3c4f6932 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -50,9 +50,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + dbname = args.tap.get('dbname') + partition_key = utils.get_metadata_for_table( + table, args.properties, dbname=dbname).get('partition_key') try: - dbname = args.tap.get('dbname') filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( dbname, table, time.strftime('%Y%m%d-%H%M%S') ) @@ -99,7 +101,9 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table, bigquery_columns, primary_key, partition_key=partition_key + ) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index 751eeb83e..1d03401b6 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -71,6 +71,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + partition_key = utils.get_metadata_for_table( + table, args.properties).get('partition_key') try: filename = 'pipelinewise_fastsync_{}_{}.csv'.format( @@ -111,7 +113,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Creating temp table in Bigquery bigquery.create_schema(target_schema) - bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True) + bigquery.create_table( + target_schema, + table, + bigquery_columns, + primary_key, + is_temporary=True, + partition_key=partition_key, + ) # Load into Bigquery table bigquery.copy_to_table( @@ -130,7 +139,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table, bigquery_columns, primary_key, partition_key=partition_key) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index ad839708b..f545f0d1e 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -70,9 +70,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + dbname = args.tap.get('dbname') + partition_key = utils.get_metadata_for_table( + table, args.properties, dbname=dbname).get('partition_key') try: - dbname = args.tap.get('dbname') filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( dbname, table, time.strftime('%Y%m%d-%H%M%S') ) @@ -113,7 +115,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Creating temp table in Bigquery bigquery.create_schema(target_schema) - bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True) + bigquery.create_table( + target_schema, + table, + bigquery_columns, + primary_key, + is_temporary=True, + partition_key=partition_key, + ) # Load into Bigquery table bigquery.copy_to_table( @@ -132,7 +141,9 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table, bigquery_columns, primary_key, partition_key=partition_key, + ) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 4dd6e0412..8f10caafb 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -46,6 +46,8 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + partition_key = utils.get_metadata_for_table( + table_name, args.properties).get('partition_key') try: filename = utils.gen_export_filename( @@ -75,6 +77,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: primary_key, is_temporary=True, sort_columns=True, + partition_key=partition_key, ) # Load into Bigquery table @@ -93,7 +96,9 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table_name) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table_name, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table_name, bigquery_columns, primary_key, partition_key=partition_key, + ) bigquery.swap_tables(target_schema, table_name) # Get bookmark