使用亞馬遜 MWAA 與亞馬遜 EKS - Amazon Managed Workflows for Apache Airflow

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用亞馬遜 MWAA 與亞馬遜 EKS

以下範例示範如何使用亞馬遜受管工作流程搭配 Amazon EKS 進行 Apache 氣流。

版本

  • 此頁面上的範例程式碼可搭配使用阿帕奇氣流 V1蟒蛇 3.7

  • 您可以使用此頁面上的程式碼範例阿帕奇氣流 v2 及以上蟒蛇

先決條件

若要使用本主題中的範例,您需要下列項目:

注意

當您使用eksctl指令,您可以包括--profile以指定預設值以外的紀要。

為亞馬遜 EC2 創建一個公鑰

使用下面的命令從您的私鑰對創建一個公鑰。

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

若要深入瞭解,請參閱擷取金鑰組的公開金鑰

建立叢集

使用下列指令建立叢集。如果您想要叢集的自訂名稱,或在不同的區域中建立名稱,請取代名稱和 Region 值。您必須在建立 Amazon MWAA 環境的相同區域中建立叢集。取代子網路的值,以符合您用於 Amazon MWAA 的 Amazon 虛擬私人雲端網路中的子網路。取代的值ssh-public-key匹配您使用的密鑰。您可以使用位於相同區域的 Amazon EC2 現有金鑰,或在建立 Amazon MWAA 環境的相同區域中建立新金鑰。

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

完成叢集建立需要一些時間。完成後,您可以使用下列命令驗證叢集是否已成功建立,並設定 IAM OIDC 提供者:

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

創建一個mwaa命名空間

確認已成功建立叢集之後,請使用下列命令為網繭建立命名空間。

kubectl create namespace mwaa

建立角色mwaa命名空間

建立命名空間後,請為可在 MWAA 命名空間中執行網繭的 EKS 上的 Amazon MWAA 使用者建立角色和角色繫結。如果您為命名空間使用了不同的名稱,請取代 mwaa-n mwaa使用您使用的名稱。

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

執行下列命令,確認新角色可存取 Amazon EKS 叢集。如果沒有使用,請務必使用正確的名稱Mwaa:

kubectl get pods -n mwaa --as mwaa-service

您應該會看到傳回的訊息,上面寫著:

No resources found in mwaa namespace.

為亞馬遜 EKS 叢集建立和附加 IAM 角色

您必須建立 IAM 角色,然後將其繫結至 Amazon EKS (k8s) 叢集,以便透過 IAM 將其用於身分驗證。角色僅用於登入叢集,並且沒有任何主控台或 API 呼叫的權限。

使用以下步驟為 Amazon MWAA 環境建立新角色Amazon MWAA 執行角色。不過,請不要建立並附加該主題中描述的原則,而是附加下列原則:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

建立角色後,請編輯 Amazon MWAA 環境,以使用您建立的角色做為環境的執行角色。若要變更角色,請編輯要使用的環境。您選取下方的執行角色權限

已知問題:

  • 子路徑無法透過 Amazon EKS 進行驗證的角色 ARN 存在已知問題。解決方法是手動建立服務角色,而不是使用 Amazon MWAA 本身建立的服務角色。若要深入瞭解,請參閱當路徑包含在 aws-auth 配置映射中的 ARN 中時,具有路徑的角色不起作用

  • 如果 IAM 中無法使用 Amazon MWAA 服務清單,您需要選擇替代服務政策 (例如 Amazon EC2),然後更新角色的信任政策以符合下列項目:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    若要深入瞭解,請參閱如何搭配 IAM 角色使用信任政策

建立 requirements.txt 檔案

若要使用本節中的範例程式碼,請確定您已將下列其中一個資料庫選項新增至requirements.txt。如需進一步了解,請參閱 安裝 Python 賴項

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

為亞馬遜 EKS 創建身份映射

使用 ARN 作為您在以下命令中建立的角色,為 Amazon EKS 建立身分對應。變更地區您所在的地區到您建立環境的區域。替換角色的 ARN,最後更換mwaa-execution-role與您環境的執行角色。

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

建立 kubeconfig

使用下面的命令來創建kubeconfig:

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

如果您在跑步時使用了特定的配置文件update-kubeconfig你需要刪除env:區段已新增至 kube_config.yaml 檔案,以便與亞馬遜 MWAA 正常運作。若要這麼做,請從檔案中刪除下列項目,然後儲存:

env: - name: AWS_PROFILE value: profile_name

建立一個 DAG

使用下面的代碼示例來創建一個 Python 文件,如mwaa_pod_example.py對於 DAG。

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

新增 DAG 和kube_config.yaml到亞馬遜 S3 桶

放置您創建的 DAG 和kube_config.yaml歸檔到亞馬遜 S3 存儲桶的亞馬遜 MWAA 環境。您可以使用 Amazon S3 主控台或AWS Command Line Interface。

啟用並觸發範例

在 Apache 氣流中,啟用範例,然後觸發它。

順利執行並完成之後,請使用下列命令來驗證網繭:

kubectl get pods -n mwaa

您應該會看到類似下列的輸出:

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

然後,您可以使用以下命令驗證網繭的輸出。將 name 值替換為從上一個命令返回的值:

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888