From 89efc0ed1d787daefccc24cfe6aeec665b6594ed Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 23 Feb 2022 23:06:53 +0000 Subject: [PATCH 1/4] Refactor `get_bookmark_for_table` --- pipelinewise/fastsync/commons/utils.py | 44 +++++++++++++++----------- 1 file changed, 25 insertions(+), 19 deletions(-) diff --git a/pipelinewise/fastsync/commons/utils.py b/pipelinewise/fastsync/commons/utils.py index 761101c97..592f487bd 100644 --- a/pipelinewise/fastsync/commons/utils.py +++ b/pipelinewise/fastsync/commons/utils.py @@ -5,7 +5,7 @@ import logging import datetime -from typing import Dict +from typing import Dict, Optional from pipelinewise.cli.utils import generate_random_string LOGGER = logging.getLogger(__name__) @@ -107,17 +107,13 @@ def get_tables_from_properties(properties: Dict) -> set: return tables -def get_bookmark_for_table(table, properties, db_engine, dbname=None): - """Get actual bookmark for a specific table used for LOG_BASED or INCREMENTAL - replications - """ - bookmark = {} - +def get_metadata_for_table( + table: str, properties: Dict, dbname: Optional[str] = None +) -> Dict: # Find table from properties and get bookmark based on replication method for stream in properties.get('streams', []): metadata = stream.get('metadata', []) table_name = stream.get('table_name', stream['stream']) - # Get table specific metadata i.e. replication method, replication key, etc. table_meta = next( ( @@ -129,8 +125,6 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None): ).get('metadata') db_name = table_meta.get('database-name') schema_name = table_meta.get('schema-name') - replication_method = table_meta.get('replication-method') - replication_key = table_meta.get('replication-key') fully_qualified_table_name = ( '{}.{}'.format(schema_name or db_name, table_name) @@ -141,17 +135,29 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None): if ( dbname is None or db_name == dbname ) and fully_qualified_table_name == table: - # Log based replication: get mysql binlog position - if replication_method == 'LOG_BASED': - bookmark = db_engine.fetch_current_log_pos() + return table_meta + return {} + - # Key based incremental replication: Get max replication key from source - elif replication_method == 'INCREMENTAL': - bookmark = db_engine.fetch_current_incremental_key_pos( - fully_qualified_table_name, replication_key - ) +def get_bookmark_for_table(table, properties, db_engine, dbname=None): + """Get actual bookmark for a specific table used for LOG_BASED or INCREMENTAL + replications + """ + bookmark = {} + + table_meta = get_metadata_for_table(table, properties, dbname=dbname) + replication_method = table_meta.get('replication-method') + replication_key = table_meta.get('replication-key') - break + # Log based replication: get mysql binlog position + if replication_method == 'LOG_BASED': + bookmark = db_engine.fetch_current_log_pos() + + # Key based incremental replication: Get max replication key from source + elif replication_method == 'INCREMENTAL': + bookmark = db_engine.fetch_current_incremental_key_pos( + table, replication_key + ) return bookmark From 924640e359645d756378e187d926bd9bed9935c8 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Wed, 23 Feb 2022 23:14:42 +0000 Subject: [PATCH 2/4] Implement ability to partition BigQuery tables --- pipelinewise/cli/config.py | 1 + pipelinewise/cli/pipelinewise.py | 6 ++++++ pipelinewise/cli/schemas/tap.json | 6 ++++++ .../fastsync/commons/target_bigquery.py | 4 ++++ pipelinewise/fastsync/mongodb_to_bigquery.py | 8 ++++++-- pipelinewise/fastsync/mysql_to_bigquery.py | 14 ++++++++++++-- pipelinewise/fastsync/postgres_to_bigquery.py | 17 ++++++++++++++--- pipelinewise/fastsync/s3_csv_to_bigquery.py | 7 ++++++- 8 files changed, 55 insertions(+), 8 deletions(-) diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index baae48b17..e53c5c355 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -325,6 +325,7 @@ def generate_selection(cls, tap: Dict) -> List[Dict]: # Add replication_key only if replication_method is INCREMENTAL 'replication_key': table.get('replication_key') if replication_method == 'INCREMENTAL' else None, + 'partition_key': table.get('partition_key'), } ) ) diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index 62db6173a..e5be39bdb 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -728,6 +728,12 @@ def make_default_selection(self, schema, selection_file): ]['metadata']['replication-key'] = tap_stream_sel[ 'replication_key' ] + if 'partition_key' in tap_stream_sel: + schema['streams'][stream_idx]['metadata'][ + stream_table_mdata_idx + ]['metadata']['partition-key'] = tap_stream_sel[ + 'partition_key' + ] else: self.logger.debug( 'Mark %s tap_stream_id as not selected', tap_stream_id diff --git a/pipelinewise/cli/schemas/tap.json b/pipelinewise/cli/schemas/tap.json index a437e4b8f..3635e2506 100644 --- a/pipelinewise/cli/schemas/tap.json +++ b/pipelinewise/cli/schemas/tap.json @@ -85,6 +85,9 @@ "LOG_BASED" ] }, + "partition_key": { + "type": "string" + }, "transformations": { "type": "array", "items": { @@ -113,6 +116,9 @@ "replication_key": { "type": "string" }, + "partition_key": { + "type": "string" + }, "transformations": { "type": "array", "items": { diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index 60bcdba61..6d3b3dd29 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -194,6 +194,7 @@ def create_table( primary_key: Optional[List[str]], is_temporary: bool = False, sort_columns=False, + partition_key: Optional[str] = None, ): table_dict = utils.tablename_to_dict(table_name) @@ -232,6 +233,9 @@ def create_table( f'CREATE OR REPLACE TABLE {target_schema}.{target_table} (' f'{",".join(columns)})' ) + if partition_key: + partition_key = partition_key.lower() + sql = sql + f' PARTITION BY DATE({partition_key})' if primary_key: primary_key = [c.lower() for c in primary_key] sql = sql + f' CLUSTER BY {",".join(primary_key)}' diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index a4f27e954..d3c4f6932 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -50,9 +50,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + dbname = args.tap.get('dbname') + partition_key = utils.get_metadata_for_table( + table, args.properties, dbname=dbname).get('partition_key') try: - dbname = args.tap.get('dbname') filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( dbname, table, time.strftime('%Y%m%d-%H%M%S') ) @@ -99,7 +101,9 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table, bigquery_columns, primary_key, partition_key=partition_key + ) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index 751eeb83e..1d03401b6 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -71,6 +71,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + partition_key = utils.get_metadata_for_table( + table, args.properties).get('partition_key') try: filename = 'pipelinewise_fastsync_{}_{}.csv'.format( @@ -111,7 +113,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Creating temp table in Bigquery bigquery.create_schema(target_schema) - bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True) + bigquery.create_table( + target_schema, + table, + bigquery_columns, + primary_key, + is_temporary=True, + partition_key=partition_key, + ) # Load into Bigquery table bigquery.copy_to_table( @@ -130,7 +139,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table, bigquery_columns, primary_key, partition_key=partition_key) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index ad839708b..f545f0d1e 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -70,9 +70,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + dbname = args.tap.get('dbname') + partition_key = utils.get_metadata_for_table( + table, args.properties, dbname=dbname).get('partition_key') try: - dbname = args.tap.get('dbname') filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( dbname, table, time.strftime('%Y%m%d-%H%M%S') ) @@ -113,7 +115,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Creating temp table in Bigquery bigquery.create_schema(target_schema) - bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True) + bigquery.create_table( + target_schema, + table, + bigquery_columns, + primary_key, + is_temporary=True, + partition_key=partition_key, + ) # Load into Bigquery table bigquery.copy_to_table( @@ -132,7 +141,9 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table, bigquery_columns, primary_key, partition_key=partition_key, + ) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 4dd6e0412..8f10caafb 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -46,6 +46,8 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) + partition_key = utils.get_metadata_for_table( + table_name, args.properties).get('partition_key') try: filename = utils.gen_export_filename( @@ -75,6 +77,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: primary_key, is_temporary=True, sort_columns=True, + partition_key=partition_key, ) # Load into Bigquery table @@ -93,7 +96,9 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery.obfuscate_columns(target_schema, table_name) # Create target table and swap with the temp table in Bigquery - bigquery.create_table(target_schema, table_name, bigquery_columns, primary_key) + bigquery.create_table( + target_schema, table_name, bigquery_columns, primary_key, partition_key=partition_key, + ) bigquery.swap_tables(target_schema, table_name) # Get bookmark From 67ce7e8c68f1746ec3928284a94dcf6627cde416 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 24 Feb 2022 09:43:04 +0000 Subject: [PATCH 3/4] Use the same `partition_by` format as `dbt` --- pipelinewise/cli/config.py | 2 +- pipelinewise/cli/pipelinewise.py | 6 +- pipelinewise/cli/schemas/tap.json | 67 +++++++++++++++++-- .../fastsync/commons/target_bigquery.py | 35 ++++++++-- pipelinewise/fastsync/mongodb_to_bigquery.py | 6 +- pipelinewise/fastsync/mysql_to_bigquery.py | 8 +-- pipelinewise/fastsync/postgres_to_bigquery.py | 8 +-- pipelinewise/fastsync/s3_csv_to_bigquery.py | 8 +-- 8 files changed, 112 insertions(+), 28 deletions(-) diff --git a/pipelinewise/cli/config.py b/pipelinewise/cli/config.py index e53c5c355..0071687a7 100644 --- a/pipelinewise/cli/config.py +++ b/pipelinewise/cli/config.py @@ -325,7 +325,7 @@ def generate_selection(cls, tap: Dict) -> List[Dict]: # Add replication_key only if replication_method is INCREMENTAL 'replication_key': table.get('replication_key') if replication_method == 'INCREMENTAL' else None, - 'partition_key': table.get('partition_key'), + 'partition_by': table.get('partition_by'), } ) ) diff --git a/pipelinewise/cli/pipelinewise.py b/pipelinewise/cli/pipelinewise.py index e5be39bdb..d6b8fe4ca 100644 --- a/pipelinewise/cli/pipelinewise.py +++ b/pipelinewise/cli/pipelinewise.py @@ -728,11 +728,11 @@ def make_default_selection(self, schema, selection_file): ]['metadata']['replication-key'] = tap_stream_sel[ 'replication_key' ] - if 'partition_key' in tap_stream_sel: + if 'partition_by' in tap_stream_sel: schema['streams'][stream_idx]['metadata'][ stream_table_mdata_idx - ]['metadata']['partition-key'] = tap_stream_sel[ - 'partition_key' + ]['metadata']['partition-by'] = tap_stream_sel[ + 'partition_by' ] else: self.logger.debug( diff --git a/pipelinewise/cli/schemas/tap.json b/pipelinewise/cli/schemas/tap.json index 3635e2506..2b5dccb33 100644 --- a/pipelinewise/cli/schemas/tap.json +++ b/pipelinewise/cli/schemas/tap.json @@ -85,8 +85,8 @@ "LOG_BASED" ] }, - "partition_key": { - "type": "string" + "partition_by": { + "$ref": "#/definitions/partition_by" }, "transformations": { "type": "array", @@ -116,8 +116,8 @@ "replication_key": { "type": "string" }, - "partition_key": { - "type": "string" + "partition_by": { + "$ref": "#/definitions/partition_by" }, "transformations": { "type": "array", @@ -329,6 +329,65 @@ } } ] + }, + "partition_by": { + "type": "object", + "properties": { + "field": { + "type": "string" + }, + "data_type": { + "enum": ["timestamp", "date", "datetime", "int64"] + }, + "granularity":{ + "anyOf": [ + { + "properties": { + "data_type": { "const": "date" } + }, + "granularity": ["day", "month", "year"] + }, + { + "properties": { + "data_type": { "const": "datetime" } + }, + "granularity": ["hour", "day", "month", "year"] + }, + { + "properties": { + "data_type": { "const": "timestamp" } + }, + "granularity": ["hour", "day", "month", "year"] + } + ] + }, + "range": { + "type": "object", + "required": ["start", "end", "interval"], + "properties": { + "start": { + "type": "integer" + }, + "end": { + "type": "integer" + }, + "interval": { + "type": "integer" + } + } + } + }, + "anyOf": [ + { + "properties": { + "data_type": { "const": "int64" } + }, + "required": ["field", "data_type", "range"] + }, + { + "required": ["field", "data_type"] + } + ] } }, "type": "object", diff --git a/pipelinewise/fastsync/commons/target_bigquery.py b/pipelinewise/fastsync/commons/target_bigquery.py index 6d3b3dd29..fddd66e80 100644 --- a/pipelinewise/fastsync/commons/target_bigquery.py +++ b/pipelinewise/fastsync/commons/target_bigquery.py @@ -31,6 +31,32 @@ def safe_name(name, quotes=True): return removed_bad_chars +def get_partition_by_clause(partition_by: Dict) -> str: + """ + Generate BigQuery specific PARTITION BY clause from dictionary. + """ + field = partition_by['field'].lower() + data_type = partition_by['data_type'] + granularity = partition_by.get('granularity', 'day') + + if data_type == 'int64': + sub_clause = ( + f'RANGE_BUCKET({field}, ' + f'GENERATE_ARRAY({partition_by["range"]["start"]}, ' + f'{partition_by["range"]["end"]}, {partition_by["range"]["interval"]}))' + ) + elif data_type == 'date': + sub_clause = f'DATE({field})' + if granularity != 'day': + sub_clause = f'DATE_TRUNC({field}, {granularity})' + elif data_type == 'datetime': + sub_clause = f'DATETIME_TRUNC({field}, {granularity})' + elif data_type == 'timestamp': + sub_clause = f'TIMESTAMP_TRUNC({field}, {granularity})' + + return f' PARTITION BY {sub_clause})' + + # pylint: disable=missing-function-docstring,no-self-use,too-many-arguments class FastSyncTargetBigquery: """ @@ -194,7 +220,7 @@ def create_table( primary_key: Optional[List[str]], is_temporary: bool = False, sort_columns=False, - partition_key: Optional[str] = None, + partition_by: Optional[Dict] = None, ): table_dict = utils.tablename_to_dict(table_name) @@ -233,12 +259,11 @@ def create_table( f'CREATE OR REPLACE TABLE {target_schema}.{target_table} (' f'{",".join(columns)})' ) - if partition_key: - partition_key = partition_key.lower() - sql = sql + f' PARTITION BY DATE({partition_key})' + if partition_by: + sql = f'{sql} {get_partition_by_clause(partition_by)}' if primary_key: primary_key = [c.lower() for c in primary_key] - sql = sql + f' CLUSTER BY {",".join(primary_key)}' + sql = f'{sql} CLUSTER BY {",".join(primary_key)}' self.query(sql) diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index d3c4f6932..81fc546cc 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -51,8 +51,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) dbname = args.tap.get('dbname') - partition_key = utils.get_metadata_for_table( - table, args.properties, dbname=dbname).get('partition_key') + partition_by = utils.get_metadata_for_table( + table, args.properties, dbname=dbname).get('partition-by') try: filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( @@ -102,7 +102,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table, bigquery_columns, primary_key, partition_key=partition_key + target_schema, table, bigquery_columns, primary_key, partition_by=partition_by ) bigquery.swap_tables(target_schema, table) diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index 1d03401b6..0c25f5278 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -71,8 +71,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) - partition_key = utils.get_metadata_for_table( - table, args.properties).get('partition_key') + partition_by = utils.get_metadata_for_table( + table, args.properties).get('partition-by') try: filename = 'pipelinewise_fastsync_{}_{}.csv'.format( @@ -119,7 +119,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns, primary_key, is_temporary=True, - partition_key=partition_key, + partition_by=partition_by, ) # Load into Bigquery table @@ -140,7 +140,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table, bigquery_columns, primary_key, partition_key=partition_key) + target_schema, table, bigquery_columns, primary_key, partition_by=partition_by) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index f545f0d1e..c6a4661fc 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -71,8 +71,8 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) dbname = args.tap.get('dbname') - partition_key = utils.get_metadata_for_table( - table, args.properties, dbname=dbname).get('partition_key') + partition_by = utils.get_metadata_for_table( + table, args.properties, dbname=dbname).get('partition-by') try: filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( @@ -121,7 +121,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns, primary_key, is_temporary=True, - partition_key=partition_key, + partition_by=partition_by, ) # Load into Bigquery table @@ -142,7 +142,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table, bigquery_columns, primary_key, partition_key=partition_key, + target_schema, table, bigquery_columns, primary_key, partition_by=partition_by, ) bigquery.swap_tables(target_schema, table) diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index 8f10caafb..b39481458 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -46,8 +46,8 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) - partition_key = utils.get_metadata_for_table( - table_name, args.properties).get('partition_key') + partition_by = utils.get_metadata_for_table( + table_name, args.properties).get('partition-by') try: filename = utils.gen_export_filename( @@ -77,7 +77,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: primary_key, is_temporary=True, sort_columns=True, - partition_key=partition_key, + partition_by=partition_by, ) # Load into Bigquery table @@ -97,7 +97,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table_name, bigquery_columns, primary_key, partition_key=partition_key, + target_schema, table_name, bigquery_columns, primary_key, partition_by=partition_by, ) bigquery.swap_tables(target_schema, table_name) From d7e1b2a9d1078ff3f5bf7a6b81d23e4fbbe984b6 Mon Sep 17 00:00:00 2001 From: Judah Rand <17158624+judahrand@users.noreply.github.com> Date: Thu, 24 Feb 2022 10:37:59 +0000 Subject: [PATCH 4/4] Pass `partition_by` information through state --- pipelinewise/fastsync/commons/utils.py | 3 ++- pipelinewise/fastsync/mongodb_to_bigquery.py | 17 +++++++++++++---- pipelinewise/fastsync/mysql_to_bigquery.py | 11 +++++++---- pipelinewise/fastsync/postgres_to_bigquery.py | 10 ++++++---- pipelinewise/fastsync/s3_csv_to_bigquery.py | 16 +++++++++------- 5 files changed, 37 insertions(+), 20 deletions(-) diff --git a/pipelinewise/fastsync/commons/utils.py b/pipelinewise/fastsync/commons/utils.py index 592f487bd..d9183c6f1 100644 --- a/pipelinewise/fastsync/commons/utils.py +++ b/pipelinewise/fastsync/commons/utils.py @@ -158,7 +158,8 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None): bookmark = db_engine.fetch_current_incremental_key_pos( table, replication_key ) - + if 'partition-by' in table_meta: + bookmark['partition_by'] = table_meta['partition-by'] return bookmark diff --git a/pipelinewise/fastsync/mongodb_to_bigquery.py b/pipelinewise/fastsync/mongodb_to_bigquery.py index 81fc546cc..e55b77b3c 100644 --- a/pipelinewise/fastsync/mongodb_to_bigquery.py +++ b/pipelinewise/fastsync/mongodb_to_bigquery.py @@ -51,8 +51,6 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) dbname = args.tap.get('dbname') - partition_by = utils.get_metadata_for_table( - table, args.properties, dbname=dbname).get('partition-by') try: filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( @@ -83,7 +81,14 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Creating temp table in Bigquery bigquery.create_schema(target_schema) - bigquery.create_table(target_schema, table, bigquery_columns, primary_key, is_temporary=True) + bigquery.create_table( + target_schema, + table, + bigquery_columns, + primary_key, + is_temporary=True, + partition_by=bookmark.get('partition_by'), + ) # Load into Bigquery table bigquery.copy_to_table( @@ -102,7 +107,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table, bigquery_columns, primary_key, partition_by=partition_by + target_schema, + table, + bigquery_columns, + primary_key, + partition_by=bookmark.get('partition_by'), ) bigquery.swap_tables(target_schema, table) diff --git a/pipelinewise/fastsync/mysql_to_bigquery.py b/pipelinewise/fastsync/mysql_to_bigquery.py index 0c25f5278..ebe8bea00 100644 --- a/pipelinewise/fastsync/mysql_to_bigquery.py +++ b/pipelinewise/fastsync/mysql_to_bigquery.py @@ -71,8 +71,6 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) - partition_by = utils.get_metadata_for_table( - table, args.properties).get('partition-by') try: filename = 'pipelinewise_fastsync_{}_{}.csv'.format( @@ -119,7 +117,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns, primary_key, is_temporary=True, - partition_by=partition_by, + partition_by=bookmark.get('partition_by'), ) # Load into Bigquery table @@ -140,7 +138,12 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table, bigquery_columns, primary_key, partition_by=partition_by) + target_schema, + table, + bigquery_columns, + primary_key, + partition_by=bookmark.get('partition_by'), + ) bigquery.swap_tables(target_schema, table) # Save bookmark to singer state file diff --git a/pipelinewise/fastsync/postgres_to_bigquery.py b/pipelinewise/fastsync/postgres_to_bigquery.py index c6a4661fc..033247d9d 100644 --- a/pipelinewise/fastsync/postgres_to_bigquery.py +++ b/pipelinewise/fastsync/postgres_to_bigquery.py @@ -71,8 +71,6 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) dbname = args.tap.get('dbname') - partition_by = utils.get_metadata_for_table( - table, args.properties, dbname=dbname).get('partition-by') try: filename = 'pipelinewise_fastsync_{}_{}_{}.csv'.format( @@ -121,7 +119,7 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: bigquery_columns, primary_key, is_temporary=True, - partition_by=partition_by, + partition_by=bookmark.get('partition_by'), ) # Load into Bigquery table @@ -142,7 +140,11 @@ def sync_table(table: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table, bigquery_columns, primary_key, partition_by=partition_by, + target_schema, + table, + bigquery_columns, + primary_key, + partition_by=bookmark.get('partition_by'), ) bigquery.swap_tables(target_schema, table) diff --git a/pipelinewise/fastsync/s3_csv_to_bigquery.py b/pipelinewise/fastsync/s3_csv_to_bigquery.py index b39481458..d922c20cd 100644 --- a/pipelinewise/fastsync/s3_csv_to_bigquery.py +++ b/pipelinewise/fastsync/s3_csv_to_bigquery.py @@ -46,8 +46,6 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery = FastSyncTargetBigquery(args.target, args.transform) tap_id = args.target.get('tap_id') archive_load_files = args.target.get('archive_load_files', False) - partition_by = utils.get_metadata_for_table( - table_name, args.properties).get('partition-by') try: filename = utils.gen_export_filename( @@ -68,6 +66,9 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: bigquery_columns = bigquery_types.get('columns', []) primary_key = bigquery_types.get('primary_key', []) + # Get bookmark + bookmark = utils.get_bookmark_for_table(table_name, args.properties, s3_csv) + # Creating temp table in Bigquery bigquery.create_schema(target_schema) bigquery.create_table( @@ -77,7 +78,7 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: primary_key, is_temporary=True, sort_columns=True, - partition_by=partition_by, + partition_by=bookmark.get('partition_by'), ) # Load into Bigquery table @@ -97,13 +98,14 @@ def sync_table(table_name: str, args: Namespace) -> Union[bool, str]: # Create target table and swap with the temp table in Bigquery bigquery.create_table( - target_schema, table_name, bigquery_columns, primary_key, partition_by=partition_by, + target_schema, + table_name, + bigquery_columns, + primary_key, + partition_by=bookmark.get('partition_by'), ) bigquery.swap_tables(target_schema, table_name) - # Get bookmark - bookmark = utils.get_bookmark_for_table(table_name, args.properties, s3_csv) - # Save bookmark to singer state file # Lock to ensure that only one process writes the same state file at a time LOCK.acquire()