Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: automatically inject OL transport info into spark jobs #45326

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions docs/apache-airflow-providers-openlineage/guides/user.rst
Original file line number Diff line number Diff line change
Expand Up @@ -451,6 +451,33 @@ You can enable this automation by setting ``spark_inject_parent_job_info`` optio
AIRFLOW__OPENLINEAGE__SPARK_INJECT_PARENT_JOB_INFO=true


Passing transport information to Spark jobs
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^

OpenLineage integration can automatically inject Airflow's transport information into Spark application properties,
for :ref:`supported Operators <supported_classes:openlineage>`.
It allows Spark integration to send events to the same backend as Airflow integration without manual configuration.
See `Scheduling from Airflow <https://openlineage.io/docs/integrations/spark/configuration/airflow>`_.

.. note::

If any of the ``spark.openlineage.transport*`` properties are manually specified in the Spark job configuration, the integration will refrain from injecting transport properties to ensure that manually provided values are preserved.

You can enable this automation by setting ``spark_inject_transport_info`` option to ``true`` in Airflow configuration.

.. code-block:: ini

[openlineage]
transport = {"type": "http", "url": "http://example.com:5000", "endpoint": "api/v1/lineage"}
spark_inject_transport_info = true

``AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO`` environment variable is an equivalent.

.. code-block:: ini

AIRFLOW__OPENLINEAGE__SPARK_INJECT_TRANSPORT_INFO=true


Troubleshooting
===============

Expand Down
7 changes: 5 additions & 2 deletions docs/exts/templates/openlineage.rst.jinja2
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,15 @@ See :ref:`automatic injection of parent job information <options:spark_inject_pa
apache-airflow-providers-google
"""""""""""""""""""""""""""""""

- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocCreateBatchOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocInstantiateInlineWorkflowTemplateOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))
- :class:`~airflow.providers.google.cloud.operators.dataproc.DataprocSubmitJobOperator`
- Parent Job Information
- Transport Information (only HTTP transport is supported for now (with api_key auth, if any))


:class:`~airflow.providers.common.sql.operators.sql.SQLExecuteQueryOperator`
Expand Down
4 changes: 2 additions & 2 deletions generated/provider_dependencies.json
Original file line number Diff line number Diff line change
Expand Up @@ -628,7 +628,7 @@
"google": {
"deps": [
"PyOpenSSL>=23.0.0",
"apache-airflow-providers-common-compat>=1.3.0",
"apache-airflow-providers-common-compat>=1.4.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.9.0",
"asgiref>=3.5.2",
Expand Down Expand Up @@ -970,7 +970,7 @@
},
"openlineage": {
"deps": [
"apache-airflow-providers-common-compat>=1.3.0",
"apache-airflow-providers-common-compat>=1.4.0",
"apache-airflow-providers-common-sql>=1.20.0",
"apache-airflow>=2.9.0",
"attrs>=22.2",
Expand Down
2 changes: 1 addition & 1 deletion providers/src/airflow/providers/common/compat/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

__all__ = ["__version__"]

__version__ = "1.3.0"
__version__ = "1.4.0"

if packaging.version.parse(packaging.version.parse(airflow_version).base_version) < packaging.version.parse(
"2.9.0"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,15 @@
log = logging.getLogger(__name__)

if TYPE_CHECKING:
from airflow.providers.openlineage.utils.spark import inject_parent_job_information_into_spark_properties
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
else:
try:
from airflow.providers.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
except ImportError:
try:
Expand Down Expand Up @@ -64,5 +68,63 @@ def inject_parent_job_information_into_spark_properties(properties: dict, contex
}
return {**properties, **ol_parent_job_properties}

try:
from airflow.providers.openlineage.plugins.listener import get_openlineage_listener
except ImportError:

def inject_transport_information_into_spark_properties(properties: dict, context) -> dict:
log.warning(
"Could not import `airflow.providers.openlineage.plugins.listener`."
"Skipping the injection of OpenLineage transport information into Spark properties."
)
return properties

else:

def inject_transport_information_into_spark_properties(properties: dict, context) -> dict:
if any(str(key).startswith("spark.openlineage.transport") for key in properties):
log.info(
"Some OpenLineage properties with transport information are already present "
"in Spark properties. Skipping the injection of OpenLineage "
"transport information into Spark properties."
)
return properties

transport = get_openlineage_listener().adapter.get_or_create_openlineage_client().transport
if transport.kind != "http":
log.info(
"OpenLineage transport type `%s` does not support automatic "
"injection of OpenLineage transport information into Spark properties.",
transport.kind,
)
return {}

transport_properties = {
"spark.openlineage.transport.type": "http",
"spark.openlineage.transport.url": transport.url,
"spark.openlineage.transport.endpoint": transport.endpoint,
# Timeout is converted to milliseconds, as required by Spark integration,
"spark.openlineage.transport.timeoutInMillis": str(int(transport.timeout * 1000)),
}
if transport.compression:
transport_properties["spark.openlineage.transport.compression"] = str(
transport.compression
)

if hasattr(transport.config.auth, "api_key") and transport.config.auth.get_bearer():
transport_properties["spark.openlineage.transport.auth.type"] = "api_key"
transport_properties["spark.openlineage.transport.auth.apiKey"] = (
transport.config.auth.get_bearer()
)

if hasattr(transport.config, "custom_headers") and transport.config.custom_headers:
for key, value in transport.config.custom_headers.items():
transport_properties[f"spark.openlineage.transport.headers.{key}"] = value

return {**properties, **transport_properties}


__all__ = ["inject_parent_job_information_into_spark_properties"]
__all__ = [
"inject_parent_job_information_into_spark_properties",
"inject_transport_information_into_spark_properties",
]
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ state: ready
source-date-epoch: 1731569875
# note that those versions are maintained by release manager - do not update them manually
versions:
- 1.4.0
- 1.3.0
- 1.2.2
- 1.2.1
Expand Down
58 changes: 45 additions & 13 deletions providers/src/airflow/providers/google/cloud/openlineage/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
)
from airflow.providers.common.compat.openlineage.utils.spark import (
inject_parent_job_information_into_spark_properties,
inject_transport_information_into_spark_properties,
)
from airflow.providers.google import __version__ as provider_version
from airflow.providers.google.cloud.hooks.gcs import _parse_gcs_url
Expand Down Expand Up @@ -453,7 +454,7 @@ def _replace_dataproc_job_properties(job: dict, job_type: str, new_properties: d


def inject_openlineage_properties_into_dataproc_job(
job: dict, context: Context, inject_parent_job_info: bool
job: dict, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark job definition.
Expand All @@ -466,18 +467,19 @@ def inject_openlineage_properties_into_dataproc_job(
- OpenLineage provider is not accessible.
- The job type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
- Any OpenLineage properties with respective information are already present
in the Spark job definition.

Args:
job: The original Dataproc job definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.

Returns:
The modified job definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return job

Expand All @@ -497,7 +499,17 @@ def inject_openlineage_properties_into_dataproc_job(

properties = job[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=job, job_type=job_type, new_properties=properties
Expand Down Expand Up @@ -587,7 +599,7 @@ def _replace_dataproc_batch_properties(batch: dict | Batch, new_properties: dict


def inject_openlineage_properties_into_dataproc_batch(
batch: dict | Batch, context: Context, inject_parent_job_info: bool
batch: dict | Batch, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict | Batch:
"""
Inject OpenLineage properties into Dataproc batch definition.
Expand All @@ -600,18 +612,19 @@ def inject_openlineage_properties_into_dataproc_batch(
- OpenLineage provider is not accessible.
- The batch type is not supported.
- Automatic parent job information injection is disabled.
- Any OpenLineage properties with parent job information are already present
- Any OpenLineage properties with respective information are already present
in the Spark job configuration.

Args:
batch: The original Dataproc batch definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.

Returns:
The modified batch definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return batch

Expand All @@ -631,14 +644,24 @@ def inject_openlineage_properties_into_dataproc_batch(

properties = _extract_dataproc_batch_properties(batch)

properties = inject_parent_job_information_into_spark_properties(properties=properties, context=context)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

batch_with_ol_config = _replace_dataproc_batch_properties(batch=batch, new_properties=properties)
return batch_with_ol_config


def inject_openlineage_properties_into_dataproc_workflow_template(
template: dict, context: Context, inject_parent_job_info: bool
template: dict, context: Context, inject_parent_job_info: bool, inject_transport_info: bool
) -> dict:
"""
Inject OpenLineage properties into Spark jobs in Workflow Template.
Expand All @@ -658,11 +681,12 @@ def inject_openlineage_properties_into_dataproc_workflow_template(
template: The original Dataproc Workflow Template definition.
context: The Airflow context in which the job is running.
inject_parent_job_info: Flag indicating whether to inject parent job information.
inject_transport_info: Flag indicating whether to inject transport information.

Returns:
The modified Workflow Template definition with OpenLineage properties injected, if applicable.
"""
if not inject_parent_job_info:
if not inject_parent_job_info and not inject_transport_info:
log.debug("Automatic injection of OpenLineage information is disabled.")
return template

Expand All @@ -688,9 +712,17 @@ def inject_openlineage_properties_into_dataproc_workflow_template(

properties = single_job_definition[job_type].get("properties", {})

properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)
if inject_parent_job_info:
log.debug("Injecting OpenLineage parent job information into Spark properties.")
properties = inject_parent_job_information_into_spark_properties(
properties=properties, context=context
)

if inject_transport_info:
log.debug("Injecting OpenLineage transport information into Spark properties.")
properties = inject_transport_information_into_spark_properties(
properties=properties, context=context
)

job_with_ol_config = _replace_dataproc_job_properties(
job=single_job_definition, job_type=job_type, new_properties=properties
Expand Down
Loading