Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

There is a bug in connecting to EKS using the airflow.providers.amazon.aws.operators.eks library in China. #45368

Closed
1 of 2 tasks
QiaoLiar opened this issue Jan 3, 2025 · 4 comments · Fixed by #45469 or #45725
Closed
1 of 2 tasks
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues

Comments

@QiaoLiar
Copy link

QiaoLiar commented Jan 3, 2025

Apache Airflow Provider(s)

amazon

Versions of Apache Airflow Providers

9.2.0

Even though I'm using the latest version, I think this bug exists in all historical versions.

Apache Airflow version

2.10.1

Operating System

Amazon Linux 2023

Deployment

Amazon (AWS) MWAA

Deployment details

You can reproduce this bug stably without any customization.

What happened

Their dag script to connect to EKS cluster using [from airflow.providers.amazon.aws.operators.eks import EksPodOperator] returned 401 .

script:

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.providers.amazon.aws.operators.eks import EksPodOperator
from kubernetes.client import models as k8s

from datetime import datetime

DEFAULT_ARGS = {
    'owner': 'XXX',
}


with DAG(
        'test2_eks_pod_operator_poc',
        default_args=DEFAULT_ARGS,
        schedule_interval=None,  # trigger manually for now
        start_date=datetime(2024, 4, 28),
        catchup=False,
        tags=['examples']
) as dag:

    start = DummyOperator(task_id='start', retries=2)
    end = DummyOperator(task_id='end', retries=2)

    test2_eks_pod_operator = EksPodOperator(
        task_id='test2_eks_pod_operator',
        region='cn-north-1',
        cluster_name='eks-cluster',
        namespace='mwaa',
        service_account_name='default',
        pod_name='eks_pod_operator_poc',
        image='amazon/aws-cli:latest',
        image_pull_policy='IfNotPresent',
        node_selector={
            'type': 'app'
        },
        tolerations=[
            k8s.V1Toleration(
                effect='NoSchedule',
                key='type',
                operator='Equal',
                value='app'
            )
        ],
        cmds=['/bin/bash', '-c'],
        arguments=['echo "hello world"'],
        is_delete_operator_pod=True,
    )

    start >> test2_eks_pod_operator >> end

error log:

*** Reading remote log from Cloudwatch log_group: airflow-mwaa-Task log_stream: dag_id=test2_eks_pod_operator_poc/run_id=manual__2024-12-31T08_47_12.225701+00_00/task_id=test2_eks_pod_operator/attempt=1.log.
[2024-12-31, 08:47:15 UTC] {local_task_job_runner.py:123} ▶ Pre task execution logs
[2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'aws_default'
[2024-12-31, 08:47:16 UTC] {baseoperator.py:405} WARNING - EksPodOperator.execute cannot be called outside TaskInstance!
[2024-12-31, 08:47:16 UTC] {pod.py:1133} INFO - Building pod eks-pod-operator-poc-ylyc6uh3 with labels: {'dag_id': 'test2_eks_pod_operator_poc', 'task_id': 'test2_eks_pod_operator', 'run_id': 'manual__2024-12-31T084712.2257010000-a8af56277', 'kubernetes_pod_operator': 'True', 'try_number': '1'}
[2024-12-31, 08:47:16 UTC] {base.py:84} INFO - Retrieving connection 'kubernetes_default'
[2024-12-31, 08:47:19 UTC] {taskinstance.py:3310} ERROR - Task failed with exception
Traceback (most recent call last):
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 767, in _execute_task
    result = _execute_callable(context=context, **execute_callable_kwargs)
             ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/taskinstance.py", line 733, in _execute_callable
    return ExecutionCallableRunner(
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/utils/operator_helpers.py", line 252, in run
    return self.func(*args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/amazon/aws/operators/eks.py", line 1103, in execute
    return super().execute(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/models/baseoperator.py", line 406, in wrapper
    return func(self, *args, **kwargs)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 593, in execute
    return self.execute_sync(context)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 603, in execute_sync
    self.pod = self.get_or_create_pod(  # must set `self.pod` for `on_kill`
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 561, in get_or_create_pod
    pod = self.find_pod(self.namespace or pod_request_obj.metadata.namespace, context=context)
          ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/airflow/providers/cncf/kubernetes/operators/pod.py", line 534, in find_pod
    pod_list = self.client.list_namespaced_pod(
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15823, in list_namespaced_pod
    return self.list_namespaced_pod_with_http_info(namespace, **kwargs)  # noqa: E501
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api/core_v1_api.py", line 15942, in list_namespaced_pod_with_http_info
    return self.api_client.call_api(
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 348, in call_api
    return self.__call_api(resource_path, method,
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 180, in __call_api
    response_data = self.request(
                    ^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/api_client.py", line 373, in request
    return self.rest_client.GET(url,
           ^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 244, in GET
    return self.request("GET", url,
           ^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/airflow/.local/lib/python3.11/site-packages/kubernetes/client/rest.py", line 238, in request
    raise ApiException(http_resp=r)
kubernetes.client.exceptions.ApiException: (401)
Reason: Unauthorized
HTTP response headers: HTTPHeaderDict({'Audit-Id': '6bab071a-ed5b-41b9-9df7-d76d7247ebcd', 'Cache-Control': 'no-cache, private', 'Content-Type': 'application/json', 'Date': 'Tue, 31 Dec 2024 08:47:19 GMT', 'Content-Length': '129'})
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Unauthorized","reason":"Unauthorized","code":401}


[2024-12-31, 08:47:19 UTC] {taskinstance.py:1225} INFO - Marking task as FAILED. dag_id=test2_eks_pod_operator_poc, task_id=test2_eks_pod_operator, run_id=manual__2024-12-31T08:47:12.225701+00:00, execution_date=20241231T084712, start_date=20241231T084715, end_date=20241231T084719
[2024-12-31, 08:47:19 UTC] {taskinstance.py:340} ▶ Post task execution logs

What you think should happen instead

Here's the investigation I've done:

Examining the source code for EksPodOperator shows that this class automatically generates a kube_config file if no external kube_config file is specified during initialization:

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/operators/eks.html
        # There is no need to manage the kube_config file, as it will be generated automatically.
        # All Kubernetes parameters (except config_file) are also valid for the EksPodOperator.

There seems to be a problem with this auto-generated kube_config file, so I printed the contents of the file in debug and examined the source code associated with it generating the contents of the file:

[docs]    def execute(self, context: Context):
        eks_hook = EksHook(
            aws_conn_id=self.aws_conn_id,
            region_name=self.region,
        )
        with eks_hook.generate_config_file(
            eks_cluster_name=self.cluster_name, pod_namespace=self.namespace
        ) as self.config_file:
            return super().execute(context)

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html

        cluster_config = {
            "apiVersion": "v1",
            "kind": "Config",
            "clusters": [
                {
                    "cluster": {"server": cluster_ep, "certificate-authority-data": cluster_cert},
                    "name": eks_cluster_name,
                }
            ],
            "contexts": [
                {
                    "context": {
                        "cluster": eks_cluster_name,
                        "namespace": pod_namespace,
                        "user": _POD_USERNAME,
                    },
                    "name": _CONTEXT_NAME,
                }
            ],
            "current-context": _CONTEXT_NAME,
            "preferences": {},
            "users": [
                {
                    "name": _POD_USERNAME,
                    "user": {
                        "exec": {
                            "apiVersion": AUTHENTICATION_API_VERSION,
                            "command": "sh",
                            "args": [
                                "-c",
                                COMMAND.format(
                                    python_executable=python_executable,
                                    eks_cluster_name=eks_cluster_name,
                                    args=args,
                                ),
                            ],
                            "interactiveMode": "Never",
                        }
                    },
                }
            ],
        }

Here it is executing a bash command, searching the COMMAND variable you can see the exact command executed as follows, you can see it is getting the eks token.

COMMAND = """
            output=$({python_executable} -m airflow.providers.amazon.aws.utils.eks_get_token \
                --cluster-name {eks_cluster_name} {args} 2>&1)

            if [ $? -ne 0 ]; then
                echo "Error running the script"
                exit 1
            fi

            expiration_timestamp=$(echo "$output" | grep -oP 'expirationTimestamp: \\K[^,]+')
            token=$(echo "$output" | grep -oP 'token: \\K[^,]+')

            json_string=$(printf '{{"kind": "ExecCredential","apiVersion": \
                "client.authentication.k8s.io/v1alpha1","spec": {{}},"status": \
                {{"expirationTimestamp": "%s","token": "%s"}}}}' "$expiration_timestamp" "$token")
            echo $json_string
            """

https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/utils/eks_get_token.html

[docs]def main():
    parser = get_parser()
    args = parser.parse_args()
    eks_hook = EksHook(aws_conn_id=args.aws_conn_id, region_name=args.region_name)
    access_token = eks_hook.fetch_access_token_for_cluster(args.cluster_name)
    access_token_expiration = get_expiration_time()
    print(f"expirationTimestamp: {access_token_expiration}, token: {access_token}")

access_token from eks_hook.fetch_access_token_for_cluster(args.cluster_name) Check out the implementation of the eks_hook.fetch_access_token_for_cluster method: https://airflow.apache.org/docs/apache-airflow-providers-amazon/stable/_modules/airflow/providers/amazon/aws/hooks/eks.html

def fetch_access_token_for_cluster(self, eks_cluster_name: str) -> str:
        session = self.get_session()
        service_id = self.conn.meta.service_model.service_id
        sts_url = (
            f"https://sts.{session.region_name}.amazonaws.com/?Action=GetCallerIdentity&Version=2011-06-15"
        )

The address to access STS here points to the global address, not the China STS service address. So the eks token obtained cannot be used in China. The sts_url that should be used in China is f “https://sts.{session.region_name}.amazonaws.com.cn/?Action=GetCallerIdentity&Version=2011-06-15”

How to reproduce

You can easily reproduce this using the dag script above, provided you use the identity credentials of your China AWS account.

Anything else

no

Are you willing to submit PR?

  • Yes I am willing to submit a PR!

Code of Conduct

@QiaoLiar QiaoLiar added area:providers kind:bug This is a clearly a bug needs-triage label for new issues that we didn't triage yet labels Jan 3, 2025
Copy link

boring-cyborg bot commented Jan 3, 2025

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

@dosubot dosubot bot added the provider:amazon-aws AWS/Amazon - related issues label Jan 3, 2025
@potiuk potiuk added good first issue and removed needs-triage label for new issues that we didn't triage yet labels Jan 3, 2025
@willdanckwerts
Copy link

Hi there, is there any easy way to replicate this issue without holding an AWS China account?

@QiaoLiar
Copy link
Author

QiaoLiar commented Jan 7, 2025

Hi there, is there any easy way to replicate this issue without holding an AWS China account?

No, this bug is only triggered when using AWS credentials based in China.

@vincbeck
Copy link
Contributor

vincbeck commented Jan 9, 2025

Re-opening it because we are reverting the fix (because it introduced another bug, more harmful). Revert: #45526

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area:providers good first issue kind:bug This is a clearly a bug provider:amazon-aws AWS/Amazon - related issues
Projects
None yet
4 participants