Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add remote table materialization #8

Open
wants to merge 5 commits into
base: dap-main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/dbclient.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ def __init__(self, credentials: ClickHouseCredentials):
"distributed_table": {},
"distributed_incremental": {},
"general": {},
"remote_table": {},
}
if (
not credentials.allow_automatic_deduplication
Expand Down
3 changes: 1 addition & 2 deletions dbt/adapters/clickhouse/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ def convert_time_type(cls, agate_table: "agate.Table", col_idx: int) -> str:
@available.parse(lambda *a, **k: {})
def get_clickhouse_cluster_name(self):
conn = self.connections.get_if_exists()
if conn.credentials.cluster:
return f'"{conn.credentials.cluster}"'
Copy link
Collaborator

@Magicbeanbuyer Magicbeanbuyer Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ALTER TABLE _admin_portal_aggregation.usage_aggregation_full_period_blue_base
        ON CLUSTER "fal"
        DROP PARTITION tuple(toYYYYMM(toDate('2025-01-12 03:24:00')))

also works :D
code
dag

return conn.credentials.cluster

@available.parse(lambda *a, **k: {})
def get_clickhouse_local_suffix(self):
Expand Down
1 change: 1 addition & 0 deletions dbt/adapters/clickhouse/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ class ClickHouseRelation(BaseRelation):
quote_character: str = '`'
can_exchange: bool = False
can_on_cluster: bool = False
remote_cluster: Optional[str] = None

def __post_init__(self):
if self.database != self.schema and self.database:
Expand Down
2 changes: 1 addition & 1 deletion dbt/include/clickhouse/macros/adapters.sql
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
db.engine as db_engine,
{%- if adapter.get_clickhouse_cluster_name() -%}
count(distinct _shard_num) > 1 as is_on_cluster
from clusterAllReplicas({{ adapter.get_clickhouse_cluster_name() }}, system.tables) as t
from clusterAllReplicas('{{ adapter.get_clickhouse_cluster_name() }}', system.tables) as t
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All three variants work

select * from cluster(default, system.one);
select * from cluster('default', system.one);
select * from cluster("default", system.one);

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was worried about passing reserved names to the first parameter

join system.databases as db on t.database = db.name
where schema = '{{ schema_relation.schema }}'
group by name, schema, type, db_engine
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,24 +86,30 @@

{% endmaterialization %}

{% macro create_distributed_table(relation, local_relation) %}
{%- set cluster = adapter.get_clickhouse_cluster_name() -%}
{% if cluster is none %}
{% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %}
{% endif %}
{% macro create_distributed_table(relation, local_relation, sql=none) %}
{% if adapter.get_clickhouse_cluster_name() is none %}
{% do exceptions.raise_compiler_error('Cluster name should be defined for using distributed materializations, current is None') %}
{% endif %}

{%- set cluster = cluster[1:-1] -%}
{%- set sharding = config.get('sharding_key') -%}

create or replace table {{ relation }} {{ on_cluster_clause(relation) }} as {{ local_relation }}
ENGINE = Distributed('{{ cluster}}', '{{ local_relation.schema }}', '{{ local_relation.name }}'
{%- if sharding is not none and sharding.strip() != '' -%}
, {{ sharding }}
{%- else %}
, rand()
{% endif -%}
)
{% endmacro %}
{%- set remote_cluster = local_relation.remote_cluster or adapter.get_clickhouse_cluster_name() -%}
{%- set sharding = config.get('sharding_key') -%}
{%- set reference = "as " ~ local_relation -%}
{%- if sql -%}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is the use case for having an SQL when creating a distributed table?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't rely on an existing local relation for the ddl query. the sql will allow to directly pass the column names and types

{%- set col_list = [] -%}
{%- for col in adapter.get_column_schema_from_query(sql) -%}
{%- do col_list.append(col.name + ' ' + col.data_type) -%}
{%- endfor -%}
{%- set reference = "(" ~ (col_list | join(', ')) ~ ")" -%}
{%- endif -%}
create or replace table {{ relation }} {{ on_cluster_clause(relation) }} {{ reference }}
engine = Distributed('{{ remote_cluster }}', '{{ local_relation.schema }}', '{{ local_relation.name }}'
{%- if sharding is not none and sharding.strip() != '' -%}
, {{ sharding }}
{%- else %}
, rand()
{% endif -%}
)
{% endmacro %}

{% macro create_empty_table_from_relation(relation, source_relation, sql=none) -%}
{%- set sql_header = config.get('sql_header', none) -%}
Expand Down
39 changes: 39 additions & 0 deletions dbt/include/clickhouse/macros/materializations/remote_table.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
{% materialization remote_table, adapter='clickhouse', supported_languages=['python', 'sql'] -%}
{%- set remote_config = config.require('remote_config') -%}

{%- set remote_cluster = remote_config.get('cluster') -%}
{%- set remote_schema = remote_config.get('local_db_prefix') + this.schema -%}
{%- set remote_identifier = this.identifier + remote_config.get('local_suffix') -%}

{%- set target_relation = this.incorporate(type='table') -%}
{%- set remote_relation = target_relation.incorporate(path={"identifier": remote_identifier, "schema": remote_schema}, remote_cluster=remote_cluster) -%}
{%- set existing_relation = load_cached_relation(this) -%}

{{ run_hooks(pre_hooks, inside_transaction=False) }}
{{ run_hooks(pre_hooks, inside_transaction=True) }}

{%- set column_changes = none -%}
{%- if existing_relation -%}
{%- set column_changes = adapter.check_incremental_schema_changes('ignore', existing_relation, sql) -%}
{%- endif -%}

{% call statement('main') %}
{%- if column_changes or existing_relation is none or should_full_refresh() -%}
{{ create_distributed_table(target_relation, remote_relation, sql) }}
{%- else -%}
{{ log("no-op run: distributed table exists with correct schema.", info=true) }}
select true;
{%- endif -%}
{% endcall %}

{% set should_revoke = should_revoke(target_relation, full_refresh_mode) %}
{% set grant_config = config.get('grants') %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
{% do persist_docs(target_relation, model) %}

{{ run_hooks(post_hooks, inside_transaction=True) }}
{{ adapter.commit() }}
{{ run_hooks(post_hooks, inside_transaction=False) }}
{{ return({'relations': [target_relation]}) }}

{% endmaterialization %}
91 changes: 91 additions & 0 deletions tests/integration/adapter/remote_table/test_remote_table.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
import pytest
from dbt.tests.util import run_dbt, run_dbt_and_capture, get_connection

model = """
{{
config(
materialized='remote_table',
remote_config={'cluster': 'test_shard', 'local_db_prefix': '_', 'local_suffix': '_local'},
sharding_key='key1',
)
}}
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
"""


class TestRemoteTableRemoteConfig:
@pytest.fixture(scope="class")
def models(self):
return {
"remote_table.sql": model,
}

@pytest.fixture(scope="class")
def init_local_table(self, project):
project.run_sql(f"create database if not exists _{project.test_schema} on cluster test_shard")
project.run_sql(f"""
create table _{project.test_schema}.remote_table_local on cluster test_shard
(key1 UInt64, key2 Int64)
engine=MergeTree order by key1
""")
return

def test_with_remote_configuration(self, project, init_local_table):
# the created distributed table should point to a local table as defined in the model's `remote_config`
run_dbt()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  create or replace table `dbt_clickhouse_3961_test_remote_table_1736978435032`.`remote_table` 
   (key1 UInt64, key2 Int64)
  engine = Distributed('test_shard', 'dbt_clickhouse_3961_test_remote_table_1736978435032', 'remote_table_local'
    , rand()
  )

I copied this DDL from test debugging, and I have two questions.

  1. What is the use case for creating a Distributed table on test_shard when the base table is also on test_shard? Wouldn't this simply overwrite the Distributed table created by distributed_incremental materialization?
  2. I didn't dig deeper, but on_cluster_clause returned nothing in this case, but I believe this query should have an on cluster test_shard clause, otherwise the Distributed table will only be available on one shard.


project.run_sql(f"""
insert into {project.test_schema}.remote_table
select toUInt64(number) as key1, toInt64(-number) as key2 from numbers(10)
""")

self._assert_is_distributed_table(project)
self._assert_correct_engine(project)
self._assert_correct_data(project)

# rerun (should be no-op)
_, log_output = run_dbt_and_capture()
assert "no-op run" in log_output

@staticmethod
def _assert_is_distributed_table(project):
# check correct table creation on current host
result = project.run_sql(
f"select engine from system.tables where name='remote_table'",
fetch="one"
)
assert result is not None
assert result[0] == "Distributed"

@staticmethod
def _assert_correct_engine(project):
# assert correct engine parameters
result = project.run_sql(f"select create_table_query from system.tables where name='remote_table'", fetch="one")
assert f"Distributed('test_shard', '_{project.test_schema}', 'remote_table_local', key1)" in result[0]

@staticmethod
def _assert_correct_data(project):
# query remote data from distributed table
result = project.run_sql("select count(*) as num_rows from remote_table", fetch="one")
assert result[0] == 10


class TestRemoteTableRemoteConfigReplicatedDB(TestRemoteTableRemoteConfig):
@pytest.fixture(scope="class")
def test_config(self, test_config):
test_config["db_engine"] = "Replicated('/clickhouse/databases/{uuid}', '{shard}', '{replica}')"
return test_config

@pytest.fixture(scope="class")
def init_local_table(self, project):
schema_name = f"_{project.test_schema}"
with get_connection(project.adapter):
relation = project.adapter.Relation.create(database=project.database, schema=schema_name)
project.adapter.create_schema(relation)
project.created_schemas.append(schema_name)

project.run_sql(f"""
create table _{project.test_schema}.remote_table_local
(key1 UInt64, key2 Int64)
engine=MergeTree order by key1
""")
2 changes: 2 additions & 0 deletions tests/integration/test_config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@
<replica from_env="REPLICA_NUM" />
<server_index from_env="SERVER_INDEX" />
</macros>
<default_replica_path>/clickhouse/tables/{uuid}/{shard}</default_replica_path>
<default_replica_name>{replica}</default_replica_name>
<remote_servers>
<test_shard>
<shard>
Expand Down