Skip to content

Commit

Permalink
Refactor get_bookmark_for_table
Browse files Browse the repository at this point in the history
  • Loading branch information
judahrand committed Feb 23, 2022
1 parent 6569a4e commit 89efc0e
Showing 1 changed file with 25 additions and 19 deletions.
44 changes: 25 additions & 19 deletions pipelinewise/fastsync/commons/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import logging
import datetime

from typing import Dict
from typing import Dict, Optional
from pipelinewise.cli.utils import generate_random_string

LOGGER = logging.getLogger(__name__)
Expand Down Expand Up @@ -107,17 +107,13 @@ def get_tables_from_properties(properties: Dict) -> set:
return tables


def get_bookmark_for_table(table, properties, db_engine, dbname=None):
"""Get actual bookmark for a specific table used for LOG_BASED or INCREMENTAL
replications
"""
bookmark = {}

def get_metadata_for_table(
table: str, properties: Dict, dbname: Optional[str] = None
) -> Dict:
# Find table from properties and get bookmark based on replication method
for stream in properties.get('streams', []):
metadata = stream.get('metadata', [])
table_name = stream.get('table_name', stream['stream'])

# Get table specific metadata i.e. replication method, replication key, etc.
table_meta = next(
(
Expand All @@ -129,8 +125,6 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None):
).get('metadata')
db_name = table_meta.get('database-name')
schema_name = table_meta.get('schema-name')
replication_method = table_meta.get('replication-method')
replication_key = table_meta.get('replication-key')

fully_qualified_table_name = (
'{}.{}'.format(schema_name or db_name, table_name)
Expand All @@ -141,17 +135,29 @@ def get_bookmark_for_table(table, properties, db_engine, dbname=None):
if (
dbname is None or db_name == dbname
) and fully_qualified_table_name == table:
# Log based replication: get mysql binlog position
if replication_method == 'LOG_BASED':
bookmark = db_engine.fetch_current_log_pos()
return table_meta
return {}


# Key based incremental replication: Get max replication key from source
elif replication_method == 'INCREMENTAL':
bookmark = db_engine.fetch_current_incremental_key_pos(
fully_qualified_table_name, replication_key
)
def get_bookmark_for_table(table, properties, db_engine, dbname=None):
"""Get actual bookmark for a specific table used for LOG_BASED or INCREMENTAL
replications
"""
bookmark = {}

table_meta = get_metadata_for_table(table, properties, dbname=dbname)
replication_method = table_meta.get('replication-method')
replication_key = table_meta.get('replication-key')

break
# Log based replication: get mysql binlog position
if replication_method == 'LOG_BASED':
bookmark = db_engine.fetch_current_log_pos()

# Key based incremental replication: Get max replication key from source
elif replication_method == 'INCREMENTAL':
bookmark = db_engine.fetch_current_incremental_key_pos(
table, replication_key
)

return bookmark

Expand Down

0 comments on commit 89efc0e

Please sign in to comment.