diff --git a/pipelinewise/fastsync/commons/utils.py b/pipelinewise/fastsync/commons/utils.py index 761101c97..592f487bd 100644 --- a/pipelinewise/fastsync/commons/utils.py +++ b/pipelinewise/fastsync/commons/utils.py @@ -5,7 +5,7 @@ import logging import datetime -from typing import Dict +from typing import Dict, Optional from pipelinewise.cli.utils import generate_random_string LOGGER = logging.getLogger(__name__) @@ -107,17 +107,13 @@ def get_tables_from_properties(properties: Dict) -> set: return tables -def get_bookmark_for_table(table, properties, db_engine, dbname=None): - """Get actual bookmark for a specific table used for LOG_BASED or INCREMENTAL - replications - """ - bookmark = {} - +def get_metadata_for_table( + table: str, properties: Dict, dbname: Optional[str] = None +) -> Dict: # Find table from properties and get bookmark based on replication method for stream in properties.get('streams', []): metadata = stream.get('metadata', []) table_name = stream.get('table_name', stream['stream']) - # Get table specific metadata i.e. replication method, replication key, etc. table_meta = next( ( @@ -129,8 +125,6 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None): ).get('metadata') db_name = table_meta.get('database-name') schema_name = table_meta.get('schema-name') - replication_method = table_meta.get('replication-method') - replication_key = table_meta.get('replication-key') fully_qualified_table_name = ( '{}.{}'.format(schema_name or db_name, table_name) @@ -141,17 +135,29 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None): if ( dbname is None or db_name == dbname ) and fully_qualified_table_name == table: - # Log based replication: get mysql binlog position - if replication_method == 'LOG_BASED': - bookmark = db_engine.fetch_current_log_pos() + return table_meta + return {} + - # Key based incremental replication: Get max replication key from source - elif replication_method == 'INCREMENTAL': - bookmark = db_engine.fetch_current_incremental_key_pos( - fully_qualified_table_name, replication_key - ) +def get_bookmark_for_table(table, properties, db_engine, dbname=None): + """Get actual bookmark for a specific table used for LOG_BASED or INCREMENTAL + replications + """ + bookmark = {} + + table_meta = get_metadata_for_table(table, properties, dbname=dbname) + replication_method = table_meta.get('replication-method') + replication_key = table_meta.get('replication-key') - break + # Log based replication: get mysql binlog position + if replication_method == 'LOG_BASED': + bookmark = db_engine.fetch_current_log_pos() + + # Key based incremental replication: Get max replication key from source + elif replication_method == 'INCREMENTAL': + bookmark = db_engine.fetch_current_incremental_key_pos( + table, replication_key + ) return bookmark