Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add contrib/data_engineering example #57

Merged
merged 2 commits into from
Jan 17, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions contrib/data_engineering/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
.databricks/
build/
dist/
__pycache__/
*.egg-info
.venv/
**/explorations/**
**/!explorations/README.md
3 changes: 3 additions & 0 deletions contrib/data_engineering/.vscode/__builtins__.pyi
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
# Typings for Pylance in Visual Studio Code
# see https://github.com/microsoft/pyright/blob/main/docs/builtins.md
from databricks.sdk.runtime import *
7 changes: 7 additions & 0 deletions contrib/data_engineering/.vscode/extensions.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"recommendations": [
"databricks.databricks",
"ms-python.vscode-pylance",
"redhat.vscode-yaml"
]
}
21 changes: 21 additions & 0 deletions contrib/data_engineering/.vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"python.analysis.stubPath": ".vscode",
"databricks.python.envFile": "${workspaceFolder}/.env",
"jupyter.interactiveWindow.cellMarker.codeRegex": "^# COMMAND ----------|^# Databricks notebook source|^(#\\s*%%|#\\s*\\<codecell\\>|#\\s*In\\[\\d*?\\]|#\\s*In\\[ \\])",
"jupyter.interactiveWindow.cellMarker.default": "# COMMAND ----------",
"python.testing.pytestArgs": [
"."
],
"python.testing.unittestEnabled": false,
"python.testing.pytestEnabled": true,
"python.analysis.extraPaths": ["assets/etl_pipeline"],
"files.exclude": {
"**/*.egg-info": true,
"**/__pycache__": true,
".pytest_cache": true,
},
"[python]": {
"editor.defaultFormatter": "ms-python.black-formatter",
"editor.formatOnSave": true,
},
}
96 changes: 96 additions & 0 deletions contrib/data_engineering/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
# data_engineering

The 'data_engineering' project was generated by using the contrib/data-engineering template.

Learn more about this template here:

https://github.com/databricks/bundle-examples/tree/main/contrib/templates/data-engineering

You can re-create this bundle by running the following commands:

```
$ databricks bundle init https://github.com/databricks/bundle-examples --template-dir contrib/templates/data-engineering
# (answer prompts, call the project data_engineering)
$ cd data_engineering
$ uv run add-asset
# (select etl-pipeline)
```

## Setup

1. Install the Databricks CLI from https://docs.databricks.com/dev-tools/cli/databricks-cli.html

2. Authenticate to your Databricks workspace, if you have not done so already:
```
$ databricks auth login
```

3. We recommend the UV package manager to install project dependencies. It's a drop-in replacement for `pip`.
UV should be installed as a global package; https://docs.astral.sh/uv/getting-started/installation/ instructions.

4. Install all project dependencies:
```
$ uv sync
```

See the "Running unit tests" below for more on testing.

5. Optionally, install developer tools such as the Databricks extension for Visual Studio Code from
https://docs.databricks.com/dev-tools/vscode-ext.html. Or the PyCharm plugin from
https://www.databricks.com/blog/announcing-pycharm-integration-databricks.

## Adding assets such as pipelines and jobs

By default, the data-engineering template does not include any assets.

1. To add an asset, run the `add-asset` script:
```
$ uv run add-asset
```

or, if you don't use UV, use

```
$ export TYPE=etl-pipeline
$ databricks bundle init https://github.com/databricks/bundle-examples --template-dir contrib/templates/data-engineering/assets/$TYPE
```

2. Optionally, run all tests on serverless compute after adding an asset:
```
$ uv run test
```

## Deploying assets

1. To deploy a development copy of this project, type:
```
$ databricks bundle deploy --target dev
```
(Note that "dev" is the default target, so the `--target` parameter
is optional here.)

2. Similarly, to deploy a production copy, type:
```
$ databricks bundle deploy --target prod
```

3. Use the "summary" comand to review everything that was deployed:
```
$ databricks bundle summary
```

4. To run a job or pipeline, use the "run" command:
```
$ databricks bundle run
```

## Running unit tests

1. Run tests on a serverless environment using:
```
$ uv run test
```

2. Optionally, to run unit tests in a different environment, such as on a cluster,
please refer to the documentation of DB connect at
https://docs.databricks.com/en/dev-tools/databricks-connect/python/install.html
4 changes: 4 additions & 0 deletions contrib/data_engineering/assets/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
This folder is reserved for Databricks Asset Bundles definitions.

New jobs and pipelines should conventions from the 'data-engineering' template.
See https://github.com/databricks/bundle-examples/blob/main/contrib/templates/data-engineering/README.md.
5 changes: 5 additions & 0 deletions contrib/data_engineering/assets/etl_pipeline/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# This is the entry point for the {{.pipeline_name}} pipeline.
# It makes sure all transformations in the transformations directory are included.
import transformations

__all__ = ["transformations"]
19 changes: 19 additions & 0 deletions contrib/data_engineering/assets/etl_pipeline/etl_pipeline.job.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
# The job that triggers etl_pipeline.
resources:
jobs:
etl_pipeline_job:
name: etl_pipeline_job

trigger:
# Run this job every day, exactly one day from the last run; see https://docs.databricks.com/api/workspace/jobs/create#trigger
periodic:
interval: 1
unit: DAYS

email_notifications:
on_failure: ${var.notifications}

tasks:
- task_key: refresh_pipeline
pipeline_task:
pipeline_id: ${resources.pipelines.etl_pipeline.id}
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
resources:
pipelines:
etl_pipeline:
name: etl_pipeline
serverless: true
catalog: ${var.catalog}
target: ${var.schema}
libraries:
- file:
path: sources/${bundle.target}/*.py
- file:
path: __init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import dlt
from pyspark.sql import DataFrame
from databricks.sdk.runtime import spark


@dlt.view(comment="Small set of taxis for development (uses LIMIT 10)")
def taxis() -> DataFrame:
return spark.sql("SELECT * FROM samples.nyctaxi.trips LIMIT 10")
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
import dlt
from pyspark.sql import DataFrame
from databricks.sdk.runtime import spark


@dlt.view
def taxis() -> DataFrame:
return spark.sql("SELECT * FROM samples.nyctaxi.trips")
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
from ..sources.dev.taxis import taxis
from ..transformations import taxi_stats


def test_taxi_stats():
result = taxi_stats.filter_taxis(taxis())
assert len(result.collect()) > 5
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# __init__.py defines the 'transformations' Python package
import importlib
import pkgutil


# Import all modules in the package except those starting with '_', like '__init__.py'
for _, module_name, _ in pkgutil.iter_modules(__path__):
if not module_name.startswith("_"):
importlib.import_module(f"{__name__}.{module_name}")
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
import dlt
from pyspark.sql.functions import to_date, count
from pyspark.sql import DataFrame


@dlt.table(comment="Daily statistics of NYC Taxi trips")
def taxi_stats() -> DataFrame:
"""Read from the 'taxis' view from etl_pipeline/sources."""
taxis = dlt.read("taxis")

return filter_taxis(taxis)


def filter_taxis(taxis: DataFrame) -> DataFrame:
"""Group by date and calculate the number of trips."""
return (
taxis.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
.groupBy("pickup_date")
.agg(count("*").alias("number_of_trips"))
)
40 changes: 40 additions & 0 deletions contrib/data_engineering/conftest.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
# conftest.py is used to configure pytest.
# This file is in the root since it affects all tests through this bundle.
# It makes sure all 'assets/*' directories are added to `sys.path` so that
# tests can import them.
import os
import sys
import dlt
import pathlib
import pytest
import warnings
from pyspark.sql import SparkSession
from databricks.connect import DatabricksSession

# Dynamically find and add all `assets/*` directories to `sys.path`
for path in pathlib.Path(pathlib.Path(__file__).parent / "assets").glob("*"):
resolved_path = str(path.resolve())
if resolved_path not in sys.path:
sys.path.append(resolved_path)

# For older databricks-connect, work around issues importing SparkSession
# and errors when SPARK_REMOTE is set.
SparkSession.builder = DatabricksSession.builder
os.environ.pop("SPARK_REMOTE", None)

# Make dlt.views in 'sources/dev' available for tests
warnings.filterwarnings(
"ignore",
message="This is a stub that only contains the interfaces to Delta Live Tables.*",
category=UserWarning,
)
dlt.enable_local_execution()
dlt.view = lambda func=None, *args, **kwargs: func or (lambda f: f)


# Provide a 'spark' fixture for tests and make sure the session is eagerly initialized
@pytest.fixture(scope="session", autouse=True)
def spark() -> SparkSession:
if hasattr(DatabricksSession.builder, "validateSession"):
return DatabricksSession.builder.validateSession().getOrCreate()
return DatabricksSession.builder.getOrCreate()
46 changes: 46 additions & 0 deletions contrib/data_engineering/databricks.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
# This is a Databricks asset bundle definition for data_engineering.
# See https://docs.databricks.com/dev-tools/bundles/index.html for documentation.
bundle:
name: data_engineering

include:
- assets/*.yml
- assets/*/*.yml

variables:
catalog:
description: The catalog to use
schema:
description: The schema to use
notifications:
description: The email addresses to use for failure notifications

targets:
dev:
# The default target uses 'mode: development' to create a development copy.
# - Deployed resources get prefixed with '[dev my_user_name]'
# - Any job schedules and triggers are paused by default.
# See also https://docs.databricks.com/dev-tools/bundles/deployment-modes.html.
mode: development
default: true
workspace:
host: https://company.databricks.com
variables:
catalog: main
schema: ${workspace.current_user.short_name}
notifications: []
prod:
mode: production
workspace:
host: https://company.databricks.com
# We explicitly specify /Workspace/Users/[email protected] to make sure we only have a single copy.
root_path: /Workspace/Users/[email protected]/.bundle/${bundle.name}/${bundle.target}
permissions:
- user_name: [email protected]
level: CAN_MANAGE
run_as:
user_name: [email protected]
variables:
catalog: main
schema: default
notifications: [[email protected]]
20 changes: 20 additions & 0 deletions contrib/data_engineering/pyproject.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[project]
name = "my_data_project"
version = "0.1.0"
description = "Databricks ETL pipeline project"
requires-python = "==3.10.*"
dependencies = [
"databricks-dlt",
"pytest",
"databricks-connect==15.1.*",
]

[project.scripts]
add-asset = "scripts.add_asset:main"
test = "scripts.test:main"

[tool.uv]
package = true

[tool.setuptools.packages.find]
include = ["scripts"]
46 changes: 46 additions & 0 deletions contrib/data_engineering/scripts/add_asset.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
#!/usr/bin/env python3
#
# add_asset.py is used to initialize a new asset from the data-engineering template.
#
import sys
import subprocess
from typing import Literal

VALID_ASSETS = ["etl-pipeline", "job", "ingest-pipeline"]
AssetType = Literal["etl-pipeline", "job", "ingest-pipeline"]


def init_bundle(asset_type: AssetType) -> None:
cmd = f"databricks bundle init https://github.com/databricks/bundle-examples --template-dir contrib/templates/data-engineering/assets/{asset_type} " + " ".join(sys.argv[2:])
subprocess.run(cmd, shell=True)


def show_menu() -> AssetType:
print("\nSelect asset type to initialize:")
for i, asset in enumerate(VALID_ASSETS, 1):
print(f"{i}. {asset}")

while True:
try:
choice = int(input("\nEnter number (1-3): "))
if 1 <= choice <= len(VALID_ASSETS):
return VALID_ASSETS[choice - 1]
print("Invalid choice. Please try again.")
except ValueError:
print("Please enter a number.")


def main():
if len(sys.argv) > 1:
asset_type = sys.argv[1]
if asset_type not in VALID_ASSETS:
print(f"Error: Asset type must be one of {VALID_ASSETS}")
sys.exit(1)
else:
asset_type = show_menu()

init_bundle(asset_type)


if __name__ == "__main__":
main()
Loading