Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support python submissions #248

Merged
merged 102 commits into from
Feb 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
102 commits
Select commit Hold shift + click to select a range
751c84c
Submitted python model successfully
Avinash-1394 Mar 17, 2023
7737000
Rebase and resolved conflicts
Avinash-1394 Mar 25, 2023
c1d1ab2
Execution successful but table not saved
Avinash-1394 Mar 26, 2023
fd6a135
Add incremental model support
Avinash-1394 Mar 27, 2023
ed72a5b
Merge branch 'main' into add-python-model-support
Avinash-1394 Mar 27, 2023
514ad5c
Merge branch 'main' into add-python-model-support
Avinash-1394 Apr 2, 2023
7913646
Fixed location and incremental model rerun
Avinash-1394 Apr 3, 2023
233ab98
Added docs
Avinash-1394 Apr 5, 2023
d2ad09e
Updated README
Avinash-1394 Apr 5, 2023
af38061
Merge branch 'main' into add-python-model-support
Avinash-1394 Apr 5, 2023
fed0dfa
fix: return empty if table does not exist in get_columns (#199)
Jrmyy Apr 5, 2023
f5c8167
fix: check for empty workgroup in profiles (#194)
nicor88 Apr 6, 2023
785ba5f
chore: add credits section (#201)
mattiamatrix Apr 6, 2023
ee8bf95
fix: enable database in policy to support cross-account queries (#200)
Jrmyy Apr 6, 2023
414c739
fix: glue column types (#196)
svdimchenko Apr 7, 2023
6ad67e7
docs: add nicor88 as a contributor for code (#205)
allcontributors[bot] Apr 7, 2023
1c35b6e
docs: add jessedobbelaere as a contributor for bug (#210)
allcontributors[bot] Apr 7, 2023
3150170
docs: add lemiffe as a contributor for design (#209)
allcontributors[bot] Apr 7, 2023
4074bc7
docs: add jessedobbelaere as a contributor for maintenance (#212)
allcontributors[bot] Apr 7, 2023
439bc75
docs: add Jrmyy as a contributor for maintenance (#207)
allcontributors[bot] Apr 7, 2023
3d31ab9
docs: add dbt-athena-logo (#211)
mattiamatrix Apr 8, 2023
02c21a3
docs: update readme (#204)
mattiamatrix Apr 8, 2023
8c850a9
docs: add Tomme as a contributor for maintenance (#214)
allcontributors[bot] Apr 8, 2023
0f50a01
docs: add mattiamatrix as a contributor for maintenance (#216)
allcontributors[bot] Apr 8, 2023
e73ef90
chore: Update pytest requirement from ~=7.2 to ~=7.3 (#218)
dependabot[bot] Apr 10, 2023
8f17ea9
chore: Update pyathena requirement from ~=2.23 to ~=2.24 (#217)
dependabot[bot] Apr 14, 2023
e461f0a
fix: broken drop view query (#221)
jessedobbelaere Apr 14, 2023
386ec47
fix: allow to set table location when output location is configured b…
juliansteger-sc Apr 14, 2023
067d66f
fix: reading README.md (#225)
CommonCrisis Apr 14, 2023
8e21fcb
Merge branch 'main' into add-python-model-support
Avinash-1394 Apr 20, 2023
1beb024
Added tests
Avinash-1394 Apr 21, 2023
84c9d50
Added more tests
Avinash-1394 Apr 21, 2023
355948a
Merge branch 'main' into add-python-model-support
Avinash-1394 Apr 21, 2023
0693fa9
Fixed mypy errors
Avinash-1394 Apr 21, 2023
eac0504
Formatting
Avinash-1394 Apr 21, 2023
589715e
Simplified patches
Avinash-1394 Apr 21, 2023
4c43d49
Fixed imports
Avinash-1394 Apr 21, 2023
de12fd2
Added docs to tests and added kwargs
Avinash-1394 Apr 23, 2023
8e1bf6f
Fixed whitespace
Avinash-1394 Apr 23, 2023
352e1f6
Fixed whitespace
Avinash-1394 Apr 23, 2023
25531c9
Added overrides to source and ref
Avinash-1394 Apr 23, 2023
f86159d
Added support for configuring table output
Avinash-1394 Apr 24, 2023
883519e
Renamed functions and fixed minor bugs
Avinash-1394 Apr 24, 2023
890b601
Fixed CI
Avinash-1394 Apr 24, 2023
ee9bfd8
Fix names in test
Avinash-1394 Apr 24, 2023
24cb7ff
Handle botocore client error
Avinash-1394 Apr 26, 2023
e1b19e4
Merge branch 'main' into support-python-submissions
Avinash-1394 Apr 30, 2023
08ec29f
Fix conflict
Avinash-1394 Apr 30, 2023
2720dfc
Added threading support
Avinash-1394 May 3, 2023
71cd4ee
Added tests for python job helper
Avinash-1394 May 5, 2023
e61c4d9
Fix flake8 tests
Avinash-1394 May 5, 2023
82296ab
Merge branch 'main' into support-python-submissions
Avinash-1394 May 5, 2023
edfec3f
Merge branch 'main' into support-python-submissions
Avinash-1394 May 5, 2023
2dc8afd
Added tests for listing and updating sessions
Avinash-1394 May 7, 2023
f281a06
Added docs to tests
Avinash-1394 May 8, 2023
22ce37b
Merge branch 'main' into support-python-submissions
Avinash-1394 May 8, 2023
10461ed
Merge branch 'main' into support-python-submissions
Avinash-1394 May 26, 2023
0038628
Split the class objects into separate modules. Handled threading dead…
Avinash-1394 May 28, 2023
3a33445
Merge branch 'main' into support-python-submissions
Jrmyy Jun 7, 2023
76c5c88
Extracted session_count and spark work group methods
Avinash-1394 Jun 10, 2023
1fd436e
fix: BatchDeletePartitions only accepts up to 25 partitions (#328)
juliansteger-sc Jun 9, 2023
0bdaa7e
feat: enable mypy pre-commit check (#329)
svdimchenko Jun 9, 2023
ed56cfe
chore: Update dbt-tests-adapter requirement from ~=1.5.0 to ~=1.5.1 (…
dependabot[bot] Jun 9, 2023
c24d65a
Fixed readme. Moved some defaults to constants.
Avinash-1394 Jun 11, 2023
49ba924
Merge branch 'main' into support-python-submissions
Avinash-1394 Jun 11, 2023
209e02c
Add more docs and functional tests.
Avinash-1394 Jun 14, 2023
d044c35
Merge branch 'main' into support-python-submissions
Avinash-1394 Jun 14, 2023
cad13db
Merge branch 'main' into support-python-submissions
Sep 26, 2023
f56c2e8
Used default logger
Sep 28, 2023
81d499a
Used default logger
Sep 28, 2023
213ee65
Fix isort
Sep 28, 2023
9aead54
Merge branch 'main' into support-python-submissions
Avinash-1394 Sep 28, 2023
a3da02b
Merge branch 'main' into support-python-submissions
nicor88 Oct 5, 2023
6c72e7d
Merge branch 'main' into support-python-submissions
Avinash-1394 Oct 6, 2023
ea38eee
Support additional arguments for create_table_as with dispatch. Call …
Avinash-1394 Oct 10, 2023
8d1848d
Merge branch 'main' into support-python-submissions
Avinash-1394 Oct 10, 2023
5060ae5
Provide language as argument instead of kwarg. Reduce diff in connect…
Avinash-1394 Oct 10, 2023
88e8e44
Restore diff in connections and fix functional test for constraint
Avinash-1394 Oct 10, 2023
855bd6d
Restore test_constraint
Avinash-1394 Oct 10, 2023
8153d20
Disable python functional test until spark work group is added and IA…
Avinash-1394 Oct 10, 2023
1effdbc
Reduce diff in connections
Avinash-1394 Oct 10, 2023
a49102a
Merge branch 'main' into support-python-submissions
Avinash-1394 Nov 23, 2023
9edb519
Break engine config into three variables
Avinash-1394 Nov 23, 2023
eb2d92d
Add method to create session from credentials. Supply boto3 config to…
Avinash-1394 Nov 23, 2023
6e16e45
Merge branch 'main' into support-python-submissions
Avinash-1394 Nov 29, 2023
c2efdb2
Merge branch 'main' into support-python-submissions
Avinash-1394 Jan 2, 2024
4dd9f94
feat: enable iceberg table format for athena spark
Dec 30, 2023
200a8fe
fix: empty code submissions
Jan 3, 2024
a24b1fe
Merge branch 'support-python-submissions' into support-python-submiss…
sankeerthnagapuri Jan 3, 2024
89d2d92
fix: remove references to obsolete config - spark_threads
Jan 10, 2024
9de42be
Merge branch 'support-python-submissions' of https://github.com/sanke…
Jan 10, 2024
217d751
fix: precommit
Jan 11, 2024
5368981
fix: 3.9 compatability
Jan 13, 2024
e1e80dc
chore: readme update config for spark iceberg model example
Jan 13, 2024
e3eb149
fix: test config and remove obselete functions' tests
Jan 15, 2024
7c45844
fix: precommit
Jan 15, 2024
e571f6a
fix: unit tests
Jan 17, 2024
c6fcf80
Merge pull request #1 from sankeerthnagapuri/support-python-submissions
Avinash-1394 Jan 17, 2024
65625ab
Merge branch 'main' into support-python-submissions
Avinash-1394 Jan 20, 2024
cb4a999
Comment out python submissions functional test
Avinash-1394 Jan 20, 2024
016f5f5
Merge branch 'main' into support-python-submissions
Avinash-1394 Feb 5, 2024
c256e9e
Merge branch 'main' into support-python-submissions
Avinash-1394 Feb 5, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,4 @@ DBT_TEST_ATHENA_DATABASE=
DBT_TEST_ATHENA_SCHEMA=
DBT_TEST_ATHENA_WORK_GROUP=
DBT_TEST_ATHENA_AWS_PROFILE_NAME=
DBT_TEST_ATHENA_SPARK_WORK_GROUP=
150 changes: 149 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
- Supports two incremental update strategies: `insert_overwrite` and `append`
- Does **not** support the use of `unique_key`
- Supports [snapshots][snapshots]
- Does not support [Python models][python-models]
- Supports [Python models][python-models]

[seeds]: https://docs.getdbt.com/docs/building-a-dbt-project/seeds

Expand Down Expand Up @@ -132,6 +132,7 @@ A dbt profile can be configured to run against AWS Athena using the following co
| aws_profile_name | Profile to use from your AWS shared credentials file. | Optional | `my-profile` |
| work_group | Identifier of Athena workgroup | Optional | `my-custom-workgroup` |
| num_retries | Number of times to retry a failing query | Optional | `3` |
| spark_work_group | Identifier of Athena Spark workgroup | Optional | `my-spark-workgroup` |
| num_boto3_retries | Number of times to retry boto3 requests (e.g. deleting S3 files for materialized tables) | Optional | `5` |
| seed_s3_upload_args | Dictionary containing boto3 ExtraArgs when uploading to S3 | Optional | `{"ACL": "bucket-owner-full-control"}` |
| lf_tags_database | Default LF tags for new database if it's created by dbt | Optional | `tag_key: tag_value` |
Expand All @@ -151,8 +152,10 @@ athena:
region_name: eu-west-1
schema: dbt
database: awsdatacatalog
threads: 4
aws_profile_name: my-profile
work_group: my-workgroup
spark_work_group: my-spark-workgroup
seed_s3_upload_args:
ACL: bucket-owner-full-control
```
Expand Down Expand Up @@ -546,6 +549,151 @@ You may find the following links useful to manage that:
* [terraform aws_lakeformation_resource_lf_tags](https://registry.terraform.io/providers/hashicorp/aws/latest/docs/resources/lakeformation_resource_lf_tags)
<!-- markdownlint-restore -->

## Python Models

The adapter supports python models using [`spark`](https://docs.aws.amazon.com/athena/latest/ug/notebooks-spark.html).

### Setup

- A spark enabled work group created in athena
- Spark execution role granted access to Athena, Glue and S3
- The spark work group is added to the ~/.dbt/profiles.yml file and the profile is referenced in dbt_project.yml
that will be created. It is recommended to keep this same as threads.

### Spark specific table configuration

- `timeout` (`default=43200`)
- Time out in seconds for each python model execution. Defaults to 12 hours/43200 seconds.
- `spark_encryption` (`default=false`)
- If this flag is set to true, encrypts data in transit between Spark nodes and also encrypts data at rest stored
locally by Spark.
- `spark_cross_account_catalog` (`default=false`)
- In spark, you can query the external account catalog and for that the consumer account has to be configured to
access the producer catalog.
- If this flag is set to true, "/" can be used as the glue catalog separator.
Ex: 999999999999/mydatabase.cloudfront_logs (*where *999999999999* is the external catalog id*)
- `spark_requester_pays` (`default=false`)
- When an Amazon S3 bucket is configured as requester pays, the account of the user running the query is charged for
data access and data transfer fees associated with the query.
- If this flag is set to true, requester pays S3 buckets are enabled in Athena for Spark.

### Spark notes

- A session is created for each unique engine configuration defined in the models that are part of the invocation.
- A session's idle timeout is set to 10 minutes. Within the timeout period, if there is a new calculation
(spark python model) ready for execution and the engine configuration matches, the process will reuse the same session.
- Number of python models running at a time depends on the `threads`. Number of sessions created for the entire run
depends on number of unique engine configurations and availability of session to maintain threads concurrency.
- For iceberg table, it is recommended to use table_properties configuration to set the format_version to 2. This is to
maintain compatability between iceberg tables created by Trino with those created by Spark.

### Example models

#### Simple pandas model

```python
import pandas as pd


def model(dbt, session):
dbt.config(materialized="table")

model_df = pd.DataFrame({"A": [1, 2, 3, 4]})

return model_df
```

#### Simple spark

```python
def model(dbt, spark_session):
dbt.config(materialized="table")

data = [(1,), (2,), (3,), (4,)]

df = spark_session.createDataFrame(data, ["A"])

return df
```

#### Spark incremental

```python
def model(dbt, spark_session):
dbt.config(materialized="incremental")
df = dbt.ref("model")

if dbt.is_incremental:
max_from_this = (
f"select max(run_date) from {dbt.this.schema}.{dbt.this.identifier}"
)
df = df.filter(df.run_date >= spark_session.sql(max_from_this).collect()[0][0])

return df
```

#### Config spark model

```python
def model(dbt, spark_session):
dbt.config(
materialized="table",
engine_config={
"CoordinatorDpuSize": 1,
"MaxConcurrentDpus": 3,
"DefaultExecutorDpuSize": 1
},
spark_encryption=True,
spark_cross_account_catalog=True,
spark_requester_pays=True
polling_interval=15,
timeout=120,
)

data = [(1,), (2,), (3,), (4,)]

df = spark_session.createDataFrame(data, ["A"])

return df
```

#### Create pySpark udf using imported external python files

```python
def model(dbt, spark_session):
dbt.config(
materialized="incremental",
incremental_strategy="merge",
unique_key="num",
)
sc = spark_session.sparkContext
sc.addPyFile("s3://athena-dbt/test/file1.py")
sc.addPyFile("s3://athena-dbt/test/file2.py")

def func(iterator):
from file2 import transform

return [transform(i) for i in iterator]

from pyspark.sql.functions import udf
from pyspark.sql.functions import col

udf_with_import = udf(func)

data = [(1, "a"), (2, "b"), (3, "c")]
cols = ["num", "alpha"]
df = spark_session.createDataFrame(data, cols)

return df.withColumn("udf_test_col", udf_with_import(col("alpha")))
```

#### Known issues in python models

- Incremental models do not fully utilize spark capabilities. They depend partially on existing sql based logic which
runs on trino.
- Snapshots materializations are not supported.
- Spark can only reference tables within the same catalog.

### Working example

seed file - employent_indicators_november_2022_csv_tables.csv
Expand Down
147 changes: 147 additions & 0 deletions dbt/adapters/athena/config.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,159 @@
import importlib.metadata
from functools import lru_cache
from typing import Any, Dict

from botocore import config

from dbt.adapters.athena.constants import (
DEFAULT_CALCULATION_TIMEOUT,
DEFAULT_POLLING_INTERVAL,
DEFAULT_SPARK_COORDINATOR_DPU_SIZE,
DEFAULT_SPARK_EXECUTOR_DPU_SIZE,
DEFAULT_SPARK_MAX_CONCURRENT_DPUS,
DEFAULT_SPARK_PROPERTIES,
LOGGER,
)


@lru_cache()
def get_boto3_config(num_retries: int) -> config.Config:
return config.Config(
user_agent_extra="dbt-athena-community/" + importlib.metadata.version("dbt-athena-community"),
retries={"max_attempts": num_retries, "mode": "standard"},
)


class AthenaSparkSessionConfig:
"""
A helper class to manage Athena Spark Session Configuration.
"""

def __init__(self, config: Dict[str, Any], **session_kwargs: Any) -> None:
self.config = config
self.session_kwargs = session_kwargs

def set_timeout(self) -> int:
"""
Get the timeout value.

This function retrieves the timeout value from the parsed model's configuration. If the timeout value
is not defined, it falls back to the default timeout value. If the retrieved timeout value is less than or
equal to 0, a ValueError is raised as timeout must be a positive integer.

Returns:
int: The timeout value in seconds.

Raises:
ValueError: If the timeout value is not a positive integer.

"""
timeout = self.config.get("timeout", DEFAULT_CALCULATION_TIMEOUT)
if not isinstance(timeout, int):
raise TypeError("Timeout must be an integer")
if timeout <= 0:
raise ValueError("Timeout must be a positive integer")
LOGGER.debug(f"Setting timeout: {timeout}")
return timeout

def get_polling_interval(self) -> Any:
"""
Get the polling interval for the configuration.

Returns:
Any: The polling interval value.

Raises:
KeyError: If the polling interval is not found in either `self.config`
or `self.session_kwargs`.
"""
try:
return self.config["polling_interval"]
except KeyError:
try:
return self.session_kwargs["polling_interval"]
except KeyError:
return DEFAULT_POLLING_INTERVAL

def set_polling_interval(self) -> float:
"""
Set the polling interval for the configuration.

Returns:
float: The polling interval value.

Raises:
ValueError: If the polling interval is not a positive integer.
"""
polling_interval = self.get_polling_interval()
if not (isinstance(polling_interval, float) or isinstance(polling_interval, int)) or polling_interval <= 0:
raise ValueError(f"Polling_interval must be a positive number. Got: {polling_interval}")
LOGGER.debug(f"Setting polling_interval: {polling_interval}")
return float(polling_interval)

def set_engine_config(self) -> Dict[str, Any]:
nicor88 marked this conversation as resolved.
Show resolved Hide resolved
"""Set the engine configuration.

Returns:
Dict[str, Any]: The engine configuration.

Raises:
TypeError: If the engine configuration is not of type dict.
KeyError: If the keys of the engine configuration dictionary do not match the expected format.
"""
table_type = self.config.get("table_type", "hive")
spark_encryption = self.config.get("spark_encryption", False)
spark_cross_account_catalog = self.config.get("spark_cross_account_catalog", False)
spark_requester_pays = self.config.get("spark_requester_pays", False)

default_spark_properties: Dict[str, str] = dict(
**DEFAULT_SPARK_PROPERTIES.get(table_type)
if table_type.lower() in ["iceberg", "hudi", "delta_lake"]
else {},
**DEFAULT_SPARK_PROPERTIES.get("spark_encryption") if spark_encryption else {},
**DEFAULT_SPARK_PROPERTIES.get("spark_cross_account_catalog") if spark_cross_account_catalog else {},
**DEFAULT_SPARK_PROPERTIES.get("spark_requester_pays") if spark_requester_pays else {},
)

default_engine_config = {
"CoordinatorDpuSize": DEFAULT_SPARK_COORDINATOR_DPU_SIZE,
"MaxConcurrentDpus": DEFAULT_SPARK_MAX_CONCURRENT_DPUS,
"DefaultExecutorDpuSize": DEFAULT_SPARK_EXECUTOR_DPU_SIZE,
"SparkProperties": default_spark_properties,
}
engine_config = self.config.get("engine_config", None)

if engine_config:
provided_spark_properties = engine_config.get("SparkProperties", None)
if provided_spark_properties:
default_spark_properties.update(provided_spark_properties)
default_engine_config["SparkProperties"] = default_spark_properties
engine_config.pop("SparkProperties")
default_engine_config.update(engine_config)
engine_config = default_engine_config

if not isinstance(engine_config, dict):
raise TypeError("Engine configuration has to be of type dict")

expected_keys = {
"CoordinatorDpuSize",
"MaxConcurrentDpus",
"DefaultExecutorDpuSize",
"SparkProperties",
"AdditionalConfigs",
}

if set(engine_config.keys()) - {
"CoordinatorDpuSize",
"MaxConcurrentDpus",
"DefaultExecutorDpuSize",
"SparkProperties",
"AdditionalConfigs",
}:
raise KeyError(
f"The engine configuration keys provided do not match the expected athena engine keys: {expected_keys}"
)

if engine_config["MaxConcurrentDpus"] == 1:
nicor88 marked this conversation as resolved.
Show resolved Hide resolved
raise KeyError("The lowest value supported for MaxConcurrentDpus is 2")
LOGGER.debug(f"Setting engine configuration: {engine_config}")
return engine_config
Loading
Loading