Skip to content

Commit

Permalink
CDC-to-Kafka 3.3.0 (#25)
Browse files Browse the repository at this point in the history
* CDC-to-Kafka 3.3.0

* Get row counts without need of VIEW DATABASE STATE permissions

* Fixes from testing, before upgrading schema registry impl

* Add comments and fix things flagged by grammar inspection
  • Loading branch information
woodlee authored Jun 2, 2023
1 parent 7b3f91e commit f42f18e
Show file tree
Hide file tree
Showing 15 changed files with 864 additions and 317 deletions.
12 changes: 7 additions & 5 deletions cdc_kafka/avro_from_sql.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def get_cdc_metadata_fields_avro_schemas(table_fq_name: str, source_field_names:
# need not be updated. We align with that by making the Avro value schema for all captured fields nullable (which also
# helps with maintaining future Avro schema compatibility).
def avro_schema_from_sql_type(source_field_name: str, sql_type_name: str, decimal_precision: int,
decimal_scale: int, make_nullable: bool) -> Dict[str, Any]:
decimal_scale: int, make_nullable: bool, force_avro_long: bool) -> Dict[str, Any]:
if sql_type_name in ('decimal', 'numeric', 'money', 'smallmoney'):
if (not decimal_precision) or decimal_scale is None:
raise Exception(f"Field '{source_field_name}': For SQL decimal, money, or numeric types, the scale and "
Expand All @@ -70,14 +70,16 @@ def avro_schema_from_sql_type(source_field_name: str, sql_type_name: str, decima
avro_type = "double"
elif sql_type_name == 'real':
avro_type = "float"
elif sql_type_name in ('int', 'smallint', 'tinyint'):
avro_type = "long" if force_avro_long else "int"
# For date and time we don't respect force_avro_long since the underlying type being `int` for these logical
# types is spelled out in the Avro spec:
elif sql_type_name == 'date':
avro_type = {"type": "int", "logicalType": "date"}
elif sql_type_name in ('int', 'smallint', 'tinyint'):
avro_type = "int"
elif sql_type_name in ('datetime', 'datetime2', 'datetimeoffset', 'smalldatetime', 'xml') + SQL_STRING_TYPES:
avro_type = "string"
elif sql_type_name == 'time':
avro_type = {"type": "int", "logicalType": "time-millis"}
elif sql_type_name in ('datetime', 'datetime2', 'datetimeoffset', 'smalldatetime', 'xml') + SQL_STRING_TYPES:
avro_type = "string"
elif sql_type_name == 'uniqueidentifier':
avro_type = {"type": "string", "logicalType": "uuid"}
elif sql_type_name in ('binary', 'image', 'varbinary', 'rowversion'):
Expand Down
391 changes: 391 additions & 0 deletions cdc_kafka/build_startup_state.py

Large diffs are not rendered by default.

14 changes: 4 additions & 10 deletions cdc_kafka/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,17 @@
SLOW_TABLE_PROGRESS_HEARTBEAT_INTERVAL = datetime.timedelta(minutes=3)
DB_CLOCK_SYNC_INTERVAL = datetime.timedelta(minutes=5)

SMALL_TABLE_THRESHOLD = 5_000_000
MAX_AGE_TO_PRESUME_ADDED_COL_IS_NULL_SECONDS = 3600

SQL_QUERY_TIMEOUT_SECONDS = 30
SQL_QUERY_INTER_RETRY_INTERVAL_SECONDS = 1
SQL_QUERY_RETRIES = 2

WATERMARK_STABILITY_CHECK_DELAY_SECS = 10
KAFKA_REQUEST_TIMEOUT_SECS = 15
KAFKA_FULL_FLUSH_TIMEOUT_SECS = 30
KAFKA_CONFIG_RELOAD_DELAY_SECS = 5
KAFKA_CONFIG_RELOAD_DELAY_SECS = 3

# General

Expand Down Expand Up @@ -69,19 +72,10 @@

# Metadata column names and positions

OPERATION_POS = 0
OPERATION_NAME = '__operation'

EVENT_TIME_POS = 1
EVENT_TIME_NAME = '__event_time'

LSN_POS = 2
LSN_NAME = '__log_lsn'

SEQVAL_POS = 3
SEQVAL_NAME = '__log_seqval'

UPDATED_FIELDS_POS = 4
UPDATED_FIELDS_NAME = '__updated_fields'

DB_LSN_COL_NAME = '__$start_lsn'
Expand Down
17 changes: 17 additions & 0 deletions cdc_kafka/helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,25 @@

import confluent_kafka

from . import constants


# Helper function for loggers working with Kafka messages
def format_coordinates(msg: confluent_kafka.Message) -> str:
return f'{msg.topic()}:{msg.partition()}@{msg.offset()}, ' \
f'time {datetime.datetime.fromtimestamp(msg.timestamp()[1] / 1000)}'


def get_fq_change_table_name(capture_instance_name: str) -> str:
assert '.' not in capture_instance_name
capture_instance_name = capture_instance_name.strip(' []')
return f'{constants.CDC_DB_SCHEMA_NAME}.{capture_instance_name}_CT'


def get_capture_instance_name(change_table_name: str) -> str:
change_table_name = change_table_name.replace('[', '')
change_table_name = change_table_name.replace(']', '')
if change_table_name.startswith(constants.CDC_DB_SCHEMA_NAME + '.'):
change_table_name = change_table_name.replace(constants.CDC_DB_SCHEMA_NAME + '.', '')
assert change_table_name.endswith('_CT')
return change_table_name[:-3]
15 changes: 13 additions & 2 deletions cdc_kafka/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ def __init__(self, metrics_accumulator: 'accumulator.AccumulatorAbstract', boots
self._avro_decoders: Dict[int, Callable] = dict()
self._schema_ids_to_names: Dict[int, str] = dict()
self._delivery_callbacks: Dict[str, List[Callable]] = collections.defaultdict(list)
self._delivery_callbacks_finalized: bool = False
self._global_produce_sequence_nbr: int = 0
self._cluster_metadata: Optional[confluent_kafka.admin.ClusterMetadata] = None
self._last_full_flush_time: datetime.datetime = datetime.datetime.utcnow()
Expand All @@ -92,7 +91,7 @@ def __init__(self, metrics_accumulator: 'accumulator.AccumulatorAbstract', boots
def __enter__(self) -> 'KafkaClient':
return self

def __exit__(self, exc_type, value, traceback) -> None:
def __exit__(self, *args) -> None:
logger.info("Cleaning up Kafka resources...")
self._consumer.close()
self.flush(final=True)
Expand Down Expand Up @@ -349,6 +348,16 @@ def register_schemas(self, topic_name: str, key_schema: Dict[str, Any], value_sc
key_schema_compatibility_level: str = constants.DEFAULT_KEY_SCHEMA_COMPATIBILITY_LEVEL,
value_schema_compatibility_level: str = constants.DEFAULT_VALUE_SCHEMA_COMPATIBILITY_LEVEL) \
-> Tuple[int, int]:
# TODO: it turns out that if you try to re-register a schema that was previously registered but later superseded
# (e.g. in the case of adding and then later deleting a column), the schema registry will accept that and return
# you the previously-registered schema ID without updating the `latest` version associated with the registry
# subject, or verifying that the change is Avro-compatible. It seems like the way to handle this, per
# https://github.com/confluentinc/schema-registry/issues/1685, would be to detect the condition and delete the
# subject-version-number of that schema before re-registering it. Since subject-version deletion is not
# available in the `CachedSchemaRegistryClient` we use here--and since this is a rare case--I'm explicitly
# choosing to punt on it for the moment. The Confluent lib does now have a newer `SchemaRegistryClient` class
# which supports subject-version deletion, but changing this code to use it appears to be a non-trivial task.

key_schema = confluent_kafka.avro.loads(json.dumps(key_schema))
value_schema = confluent_kafka.avro.loads(json.dumps(value_schema))

Expand All @@ -359,6 +368,7 @@ def register_schemas(self, topic_name: str, key_schema: Dict[str, Any], value_sc
if (current_key_schema is None or current_key_schema != key_schema) and not self._disable_writing:
logger.info('Key schema for subject %s does not exist or is outdated; registering now.', key_subject)
key_schema_id = self._schema_registry.register(key_subject, key_schema)
logger.debug('Schema registered for subject %s: %s', key_subject, key_schema)
if current_key_schema is None:
time.sleep(constants.KAFKA_CONFIG_RELOAD_DELAY_SECS)
self._schema_registry.update_compatibility(key_schema_compatibility_level, key_subject)
Expand All @@ -368,6 +378,7 @@ def register_schemas(self, topic_name: str, key_schema: Dict[str, Any], value_sc
if (current_value_schema is None or current_value_schema != value_schema) and not self._disable_writing:
logger.info('Value schema for subject %s does not exist or is outdated; registering now.', value_subject)
value_schema_id = self._schema_registry.register(value_subject, value_schema)
logger.debug('Schema registered for subject %s: %s', value_subject, value_schema)
if current_value_schema is None:
time.sleep(constants.KAFKA_CONFIG_RELOAD_DELAY_SECS)
self._schema_registry.update_compatibility(value_schema_compatibility_level, value_subject)
Expand Down
Loading

0 comments on commit f42f18e

Please sign in to comment.