Skip to content

Commit

Permalink
feat: automatically inject OL transport info into spark jobs
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda committed Jan 2, 2025
1 parent 1f0bd6d commit 78b57e4
Show file tree
Hide file tree
Showing 14 changed files with 1,669 additions and 219 deletions.
23 changes: 23 additions & 0 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,29 @@ You can enable this automation by setting ``spark_inject_parent_job_info`` optio
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true
Passing transport information to Spark jobs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

OpenLineage integration can automatically inject Airflow's transport information into Spark application properties,
for :ref:`supported Operators <supported_classes:openlineage>`.
It allows Spark integration to send events to the same backend as Airflow integration without manual configuration.
See `Scheduling from Airflow <https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.

.. note::

If any of the ``spark.openlineage.transport*`` properties are manually specified in the Spark job configuration, the integration will refrain from injecting transport properties to ensure that manually provided values are preserved.

You can enable this automation by setting ``spark_inject_transport_info`` option to ``true`` in Airflow configuration.

.. code-block:: ini
[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_transport_info = true
``AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO`` environment variable is an equivalent.

.. code-block:: ini
AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true
Troubleshooting
===============

Expand Down
7 changes: 5 additions & 2 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ See :ref:`automatic injection of parent job information <options:spark_inject_pa
apache-airflow-providers-google
"""""""""""""""""""""""""""""""

- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
Expand Down
4 changes: 2 additions & 2 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -631,7 +631,7 @@
"google": {
"deps": [
"PyOpenSSL>=23.0.0",
"apache-airflow-providers-common-compat>=1.3.0",
"apache-airflow-providers-common-compat>=1.4.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.9.0",
"asgiref>=3.5.2",
Expand Down Expand Up @@ -974,7 +974,7 @@
},
"openlineage": {
"deps": [
"apache-airflow-providers-common-compat>=1.3.0",
"apache-airflow-providers-common-compat>=1.4.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.9.0",
"attrs>=22.2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
log = logging.getLogger(__name__)

if TYPE_CHECKING:
from airflow.providers.openlineage.utils.spark import inject_parent_job_information_into_spark_properties
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
else:
try:
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
except ImportError:
try:
Expand Down Expand Up @@ -64,5 +68,64 @@ def inject_parent_job_information_into_spark_properties(properties: dict, contex
}
return {**properties, **ol_parent_job_properties}

try:
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
except ImportError:

def inject_transport_information_into_spark_properties(properties: dict, context) -> dict:
log.warning(
"Could not import `airflow.providers.openlineage.plugins.listener`."
"Skipping the injection of OpenLineage transport information into Spark properties."
)
return properties

else:

def inject_transport_information_into_spark_properties(properties: dict, context) -> dict:
if any(str(key).startswith("spark.openlineage.transport") for key in properties):
log.info(
"Some OpenLineage properties with transport information are already present "
"in Spark properties. Skipping the injection of OpenLineage "
"transport information into Spark properties."
)
return properties

transport = get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
if transport.kind != "http":
log.info(
"OpenLineage transport type `%s` does not support automatic "
"injection of OpenLineage transport information into Spark properties.",
transport.kind,
)
return {}

transport_properties = {
"spark.openlineage.transport.type": "http",
"spark.openlineage.transport.url": transport.url,
"spark.openlineage.transport.endpoint": transport.endpoint,
"spark.openlineage.transport.timeoutInMillis": int(
transport.timeout * 1000 # convert to milliseconds, as required by Spark integration
),
}
if transport.compression:
transport_properties["spark.openlineage.transport.compression"] = str(
transport.compression
)

if hasattr(transport.config.auth, "api_key") and transport.config.auth.get_bearer():
transport_properties["spark.openlineage.transport.auth.type"] = "api_key"
transport_properties["spark.openlineage.transport.auth.apiKey"] = (
transport.config.auth.get_bearer()
)

if hasattr(transport.config, "custom_headers") and transport.config.custom_headers:
for key, value in transport.config.custom_headers.items():
transport_properties[f"spark.openlineage.transport.headers.{key}"] = value

return {**properties, **transport_properties}


__all__ = ["inject_parent_job_information_into_spark_properties"]
__all__ = [
"inject_parent_job_information_into_spark_properties",
"inject_transport_information_into_spark_properties",
]
58 changes: 45 additions & 13 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
)
from airflow.providers.common.compat.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
from airflow.providers.google import __version__ as provider_version
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
Expand Down Expand Up @@ -440,7 +441,7 @@ def _replace_dataproc_job_properties(job: dict, job_type: str, new_properties: d


def inject_openlineage_properties_into_dataproc_job(
job: dict, context: Context, inject_parent_job_info: bool
job: dict, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark job definition.
Expand All @@ -453,18 +454,19 @@ def inject_openlineage_properties_into_dataproc_job(
- OpenLineage provider is not accessible.
- The job type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
- Any OpenLineage properties with respective information are already present
in the Spark job definition.
Args:
job: The original Dataproc job definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.
Returns:
The modified job definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return job

Expand All @@ -484,7 +486,17 @@ def inject_openlineage_properties_into_dataproc_job(

properties = job[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=job, job_type=job_type, new_properties=properties
Expand Down Expand Up @@ -574,7 +586,7 @@ def _replace_dataproc_batch_properties(batch: dict | Batch, new_properties: dict


def inject_openlineage_properties_into_dataproc_batch(
batch: dict | Batch, context: Context, inject_parent_job_info: bool
batch: dict | Batch, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict | Batch:
"""
Inject OpenLineage properties into Dataproc batch definition.
Expand All @@ -587,18 +599,19 @@ def inject_openlineage_properties_into_dataproc_batch(
- OpenLineage provider is not accessible.
- The batch type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
- Any OpenLineage properties with respective information are already present
in the Spark job configuration.
Args:
batch: The original Dataproc batch definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.
Returns:
The modified batch definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return batch

Expand All @@ -618,14 +631,24 @@ def inject_openlineage_properties_into_dataproc_batch(

properties = _extract_dataproc_batch_properties(batch)

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

batch_with_ol_config = _replace_dataproc_batch_properties(batch=batch, new_properties=properties)
return batch_with_ol_config


def inject_openlineage_properties_into_dataproc_workflow_template(
template: dict, context: Context, inject_parent_job_info: bool
template: dict, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark jobs in Workflow Template.
Expand All @@ -645,11 +668,12 @@ def inject_openlineage_properties_into_dataproc_workflow_template(
template: The original Dataproc Workflow Template definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.
Returns:
The modified Workflow Template definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return template

Expand All @@ -675,9 +699,17 @@ def inject_openlineage_properties_into_dataproc_workflow_template(

properties = single_job_definition[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=single_job_definition, job_type=job_type, new_properties=properties
Expand Down
25 changes: 21 additions & 4 deletions providers/src/airflow/providers/google/cloud/operators/dataproc.py
Original file line number Diff line number Diff line change
Expand Up @@ -1829,6 +1829,9 @@ def __init__(
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -1849,17 +1852,19 @@ def __init__(
self.cancel_on_kill = cancel_on_kill
self.operation_name: str | None = None
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info
self.openlineage_inject_transport_info = openlineage_inject_transport_info

def execute(self, context: Context):
self.log.info("Instantiating Inline Template")
hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
project_id = self.project_id or hook.project_id
if self.openlineage_inject_parent_job_info:
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self.template = inject_openlineage_properties_into_dataproc_workflow_template(
template=self.template,
context=context,
inject_parent_job_info=self.openlineage_inject_parent_job_info,
inject_transport_info=self.openlineage_inject_transport_info,
)

operation = hook.instantiate_inline_workflow_template(
Expand Down Expand Up @@ -1982,6 +1987,9 @@ def __init__(
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
**kwargs,
) -> None:
super().__init__(**kwargs)
Expand All @@ -2004,14 +2012,18 @@ def __init__(
self.job_id: str | None = None
self.wait_timeout = wait_timeout
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info
self.openlineage_inject_transport_info = openlineage_inject_transport_info

def execute(self, context: Context):
self.log.info("Submitting job")
self.hook = DataprocHook(gcp_conn_id=self.gcp_conn_id, impersonation_chain=self.impersonation_chain)
if self.openlineage_inject_parent_job_info:
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self.job = inject_openlineage_properties_into_dataproc_job(
job=self.job, context=context, inject_parent_job_info=self.openlineage_inject_parent_job_info
job=self.job,
context=context,
inject_parent_job_info=self.openlineage_inject_parent_job_info,
inject_transport_info=self.openlineage_inject_transport_info,
)
job_object = self.hook.submit_job(
project_id=self.project_id,
Expand Down Expand Up @@ -2442,6 +2454,9 @@ def __init__(
openlineage_inject_parent_job_info: bool = conf.getboolean(
"openlineage", "spark_inject_parent_job_info", fallback=False
),
openlineage_inject_transport_info: bool = conf.getboolean(
"openlineage", "spark_inject_transport_info", fallback=False
),
**kwargs,
):
super().__init__(**kwargs)
Expand All @@ -2464,6 +2479,7 @@ def __init__(
self.deferrable = deferrable
self.polling_interval_seconds = polling_interval_seconds
self.openlineage_inject_parent_job_info = openlineage_inject_parent_job_info
self.openlineage_inject_transport_info = openlineage_inject_transport_info

def execute(self, context: Context):
if self.asynchronous and self.deferrable:
Expand All @@ -2486,12 +2502,13 @@ def execute(self, context: Context):
else:
self.log.info("Starting batch. The batch ID will be generated since it was not provided.")

if self.openlineage_inject_parent_job_info:
if self.openlineage_inject_parent_job_info or self.openlineage_inject_transport_info:
self.log.info("Automatic injection of OpenLineage information into Spark properties is enabled.")
self.batch = inject_openlineage_properties_into_dataproc_batch(
batch=self.batch,
context=context,
inject_parent_job_info=self.openlineage_inject_parent_job_info,
inject_transport_info=self.openlineage_inject_transport_info,
)

try:
Expand Down
Loading

0 comments on commit 78b57e4

Please sign in to comment.