Skip to content

Commit

Permalink
Snapshot Support (#14)
Browse files Browse the repository at this point in the history
* Snapshot strategies implemented and passing tests
  • Loading branch information
arosychuk authored Feb 25, 2022
1 parent 331ac29 commit c38bfc9
Show file tree
Hide file tree
Showing 6 changed files with 413 additions and 20 deletions.
43 changes: 43 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,51 @@ Your [dbt](https://www.getdbt.com/) adapter for [Vertica](https://www.vertica.co

Uses [vertica-python](https://github.com/vertica/vertica-python) to connect to Vertica database.

## Supported Features

### dbt Core Features

Below is a table for what features the current Vertica adapter supports for dbt. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves.

| dbt Core Features | Supported |
| ------------------------------------------------- | ----------- |
| Table Materializations | Yes |
| Ephemeral Materializations | Yes |
| View Materializations | Yes |
| Incremental Materializations - Append | Untested |
| Incremental Materailizations - Insert + Overwrite | Yes |
| Incremental Materializations - Merge | Yes |
| Snapshots - Timestamp | Passes Test |
| Snapshots - Check Cols | Passes Test |
| Seeds | Yes |
| Tests | Yes |
| Documentation | Yes |
| External Tables | Untested |

* **Yes** - Supported, and tests pass.
* **No** - Not supported or implemented.
* **Untested** - May support out of the box, though hasn't been tested.
* **Passes Test** -The testes have passed, though haven't tested in a production like environment

### Vertica Features

Below is a table for what features the current Vertica adapter supports for Vertica. This is constantly improving and changing as both dbt adds new functionality, as well as the dbt-vertica driver improves.

| Vertica Features | Supported |
| --------------------- | --------- |
| Created/Drop Schema | Yes |
| Analyze Statistics | No |
| Purge Delete Vectors | No |
| Projection Management | No |
| Primary/Unique Keys | No |
| Other DDLs | No |

## Changes

### 1.0.2
- Added support for snapshot timestamp with passing tests
- Added support for snapshot check cols with passing tests

### 1.0.1

- Fixed the Incremental method implementation (was buggy/incomplete)
Expand Down
8 changes: 3 additions & 5 deletions dbt/include/vertica/macros/materializations/incremental.sql
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@

{% macro vertica___validate_get_incremental_strategy(config) %}
{% macro vertica__validate_get_incremental_strategy(config) %}
{#-- Find and validate the incremental strategy #}
{%- set strategy = config.get("incremental_strategy", default="merge") -%}

Expand Down Expand Up @@ -77,7 +76,7 @@
{% set tmp_relation = make_temp_relation(this) %}

{#-- Validate early so we don't run SQL if the strategy is invalid --#}
{% set strategy = vertica___validate_get_incremental_strategy(config) %}
{% set strategy = vertica__validate_get_incremental_strategy(config) %}

-- setup
{{ run_hooks(pre_hooks, inside_transaction=False) }}
Expand Down Expand Up @@ -124,5 +123,4 @@

{{ return({'relations': [target_relation]}) }}

{%- endmaterialization %}

{%- endmaterialization %}
98 changes: 98 additions & 0 deletions dbt/include/vertica/macros/materializations/snapshots/snapshot.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
{% materialization snapshot, adapter='vertica' %}
{%- set config = model['config'] -%}

{%- set target_table = model.get('alias', model.get('name')) -%}

{%- set strategy_name = config.get('strategy') -%}
{%- set unique_key = config.get('unique_key') %}

{% if not adapter.check_schema_exists(model.database, model.schema) %}
{% do create_schema(model.database, model.schema) %}
{% endif %}

{% set target_relation_exists, target_relation = get_or_create_relation(
database=model.database,
schema=model.schema,
identifier=target_table,
type='table') -%}

{%- if not target_relation.is_table -%}
{% do exceptions.relation_wrong_type(target_relation, 'table') %}
{%- endif -%}


{{ run_hooks(pre_hooks, inside_transaction=False) }}

{{ run_hooks(pre_hooks, inside_transaction=True) }}

{% set strategy_macro = strategy_dispatch(strategy_name) %}
{% set strategy = strategy_macro(model, "snapshotted_data", "source_data", config, target_relation_exists) %}

{% if not target_relation_exists %}

{% set build_sql = build_snapshot_table(strategy, model['compiled_sql']) %}
{% set final_sql = create_table_as(False, target_relation, build_sql) %}

{% else %}

{{ adapter.valid_snapshot_target(target_relation) }}

{% set staging_table = build_snapshot_staging_table(strategy, sql, target_relation) %}

-- this may no-op if the database does not require column expansion
{% do adapter.expand_target_column_types(from_relation=staging_table,
to_relation=target_relation) %}

{% set missing_columns = adapter.get_missing_columns(staging_table, target_relation)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% do create_columns(target_relation, missing_columns) %}

{% set source_columns = adapter.get_columns_in_relation(staging_table)
| rejectattr('name', 'equalto', 'dbt_change_type')
| rejectattr('name', 'equalto', 'DBT_CHANGE_TYPE')
| rejectattr('name', 'equalto', 'dbt_unique_key')
| rejectattr('name', 'equalto', 'DBT_UNIQUE_KEY')
| list %}

{% set quoted_source_columns = [] %}
{% for column in source_columns %}
{% do quoted_source_columns.append(adapter.quote(column.name)) %}
{% endfor %}

{% set final_sql = snapshot_merge_sql(
target = target_relation,
source = staging_table,
insert_cols = quoted_source_columns
)
%}

{% endif %}

{% call statement('main') %}
{{ final_sql }}
{% endcall %}

{% do persist_docs(target_relation, model) %}

{% if not target_relation_exists %}
{% do create_indexes(target_relation) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=True) }}

{{ adapter.commit() }}

{% if staging_table is defined %}
{% do post_snapshot(staging_table) %}
{% endif %}

{{ run_hooks(post_hooks, inside_transaction=False) }}

{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{% macro snapshot_merge_sql(target, source, insert_cols) -%}

{% if insert_cols %}
{%- set insert_cols_csv = insert_cols | join(', ') -%}
{% else %}
{%- set insert_cols_csv = get_quoted_csv(vertica__get_columns_in_relation(target) | map(attribute="name")) %}
{% endif %}

{{ adapter.dispatch('snapshot_merge_sql')(target, source, insert_cols_csv) }}
{%- endmacro %}

{% macro vertica__snapshot_merge_sql(target, source, insert_cols_csv) -%}

merge into {{ target }} as DBT_INTERNAL_DEST
using {{ source }} as DBT_INTERNAL_SOURCE
on DBT_INTERNAL_SOURCE.dbt_scd_id = DBT_INTERNAL_DEST.dbt_scd_id

when matched
and DBT_INTERNAL_DEST.dbt_valid_to is null
and DBT_INTERNAL_SOURCE.dbt_change_type in ('update', 'delete')
then update
set dbt_valid_to = DBT_INTERNAL_SOURCE.dbt_valid_to

when not matched
and DBT_INTERNAL_SOURCE.dbt_change_type = 'insert'
then insert ({{ insert_cols_csv }})
values (
{% for column in vertica__get_columns_in_relation(target) | map(attribute="name") -%}
DBT_INTERNAL_SOURCE.{{ column }}
{%- if not loop.last %}, {% endif %}
{%- endfor %}
)

{% endmacro %}
184 changes: 184 additions & 0 deletions dbt/include/vertica/macros/materializations/snapshots/strategies.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
{#
Dispatch strategies by name, optionally qualified to a package
#}
{% macro strategy_dispatch(name) -%}
{% set original_name = name %}
{% if '.' in name %}
{% set package_name, name = name.split(".", 1) %}
{% else %}
{% set package_name = none %}
{% endif %}

{% if package_name is none %}
{% set package_context = context %}
{% elif package_name in context %}
{% set package_context = context[package_name] %}
{% else %}
{% set error_msg %}
Could not find package '{{package_name}}', called with '{{original_name}}'
{% endset %}
{{ exceptions.raise_compiler_error(error_msg | trim) }}
{% endif %}

{%- set search_name = 'snapshot_' ~ name ~ '_strategy' -%}

{% if search_name not in package_context %}
{% set error_msg %}
The specified strategy macro '{{name}}' was not found in package '{{ package_name }}'
{% endset %}
{{ exceptions.raise_compiler_error(error_msg | trim) }}
{% endif %}
{{ return(package_context[search_name]) }}
{%- endmacro %}


{#
Create SCD Hash SQL fields cross-db
#}
{% macro snapshot_hash_arguments(args) -%}
{{ adapter.dispatch('snapshot_hash_arguments', 'dbt')(args) }}
{%- endmacro %}

{% macro default__snapshot_hash_arguments(args) -%}
md5({%- for arg in args -%}
coalesce(cast({{ arg }} as varchar ), '')
{% if not loop.last %} || '|' || {% endif %}
{%- endfor -%})
{%- endmacro %}


{#
Get the current time cross-db
#}
{% macro snapshot_get_time() -%}
{{ adapter.dispatch('snapshot_get_time', 'dbt')() }}
{%- endmacro %}

{% macro default__snapshot_get_time() -%}
{{ current_timestamp() }}
{%- endmacro %}


{#
Core strategy definitions
#}
{% macro snapshot_timestamp_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set primary_key = config['unique_key'] %}
{% set updated_at = config['updated_at'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{#/*
The snapshot relation might not have an {{ updated_at }} value if the
snapshot strategy is changed from `check` to `timestamp`. We
should use a dbt-created column for the comparison in the snapshot
table instead of assuming that the user-supplied {{ updated_at }}
will be present in the historical data.

See https://github.com/dbt-labs/dbt-core/issues/2350
*/ #}
{% set row_changed_expr -%}
({{ snapshotted_rel }}.dbt_valid_from < {{ current_rel }}.{{ updated_at }})
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}


{% macro snapshot_string_as_time(timestamp) -%}
{{ adapter.dispatch('snapshot_string_as_time')(timestamp) }}
{%- endmacro %}

{% macro vertica__snapshot_string_as_time(timestamp) %}
{%- set result = "('" ~ timestamp ~ "'::timestamptz)" -%}
{{ return(result) }}
{% endmacro %}


{% macro snapshot_check_all_get_existing_columns(node, target_exists) -%}
{%- set query_columns = get_columns_in_query(node['compiled_sql']) -%}
{%- if not target_exists -%}
{# no table yet -> return whatever the query does #}
{{ return([false, query_columns]) }}
{%- endif -%}
{# handle any schema changes #}
{%- set target_table = node.get('alias', node.get('name')) -%}
{%- set target_relation = adapter.get_relation(database=node.database, schema=node.schema, identifier=target_table) -%}
{%- set existing_cols = get_columns_in_query('select * from ' ~ target_relation) -%}
{%- set ns = namespace() -%} {# handle for-loop scoping with a namespace #}
{%- set ns.column_added = false -%}

{%- set intersection = [] -%}
{%- for col in query_columns -%}
{%- if col in existing_cols -%}
{%- do intersection.append(col) -%}
{%- else -%}
{% set ns.column_added = true %}
{%- endif -%}
{%- endfor -%}
{{ return([ns.column_added, intersection]) }}
{%- endmacro %}


{% macro snapshot_check_strategy(node, snapshotted_rel, current_rel, config, target_exists) %}
{% set check_cols_config = config['check_cols'] %}
{% set primary_key = config['unique_key'] %}
{% set invalidate_hard_deletes = config.get('invalidate_hard_deletes', false) %}

{% set select_current_time -%}
select {{ snapshot_get_time() }} as snapshot_start
{%- endset %}

{#-- don't access the column by name, to avoid dealing with casing issues on snowflake #}
{%- set now = run_query(select_current_time)[0][0] -%}
{% if now is none or now is undefined -%}
{%- do exceptions.raise_compiler_error('Could not get a snapshot start time from the database') -%}
{%- endif %}
{% set updated_at = config.get('updated_at', snapshot_string_as_time(now)) %}

{% set column_added = false %}

{% if check_cols_config == 'all' %}
{% set column_added, check_cols = snapshot_check_all_get_existing_columns(node, target_exists) %}
{% elif check_cols_config is iterable and (check_cols_config | length) > 0 %}
{% set check_cols = check_cols_config %}
{% else %}
{% do exceptions.raise_compiler_error("Invalid value for 'check_cols': " ~ check_cols_config) %}
{% endif %}

{%- set row_changed_expr -%}
(
{%- if column_added -%}
TRUE
{%- else -%}
{%- for col in check_cols -%}
{{ snapshotted_rel }}.{{ col }} != {{ current_rel }}.{{ col }}
or
(
(({{ snapshotted_rel }}.{{ col }} is null) and not ({{ current_rel }}.{{ col }} is null))
or
((not {{ snapshotted_rel }}.{{ col }} is null) and ({{ current_rel }}.{{ col }} is null))
)
{%- if not loop.last %} or {% endif -%}
{%- endfor -%}
{%- endif -%}
)
{%- endset %}

{% set scd_id_expr = snapshot_hash_arguments([primary_key, updated_at]) %}

{% do return({
"unique_key": primary_key,
"updated_at": updated_at,
"row_changed": row_changed_expr,
"scd_id": scd_id_expr,
"invalidate_hard_deletes": invalidate_hard_deletes
}) %}
{% endmacro %}
Loading

0 comments on commit c38bfc9

Please sign in to comment.