Amazon EKS での Amazon MWAA の使用 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon EKS での Amazon MWAA の使用

以下のサンプルは、Amazon EKS で Apache Airflow 用の Amazon マネージドワークフローを使用する方法を示しています。

バージョン

  • このページのサンプルコードは「Python 3.7」の Apache Airflow v1 で使用できます。

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

このトピックの例を使用するには、以下が必要です。

注記

eksctl コマンドを使用するときに、--profile を含めたデフォルト以外のプロファイルを指定できます。

Amazon EC2 用のパブリックキーを作成します

以下のコマンドを使用して、プライベートキーからパブリックキーを作成します。

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

詳しくは、「キーペアのパブリックキーを取得する」 を参照してください。

クラスターを作成します

次のコマンドを使用してクラスターを作成します。クラスターにカスタム名を付けたい場合や、別のリージョンで作成したい場合は、名前とリージョンの値を置き換えてください。  クラスターは、Amazon MWAA 環境を作成したのと同じリージョン内で作成する必要があります。  サブネットの値を、Amazon MWAA に使用している Amazon VPC ネットワークのサブネットと一致する ように置き換えます。ssh-public-key の値は、使用するキーと一致するように置き換えてください。同じリージョン内にある Amazon EC2 の既存のキーを使用するか、Amazon MWAA 環境を作成したのと同じリージョンで新しいキーを作成できます。

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

クラスターの作成が完了するまで。少し時間がかかります。完了後、以下のコマンドを使用して、クラスターが正常に作成され、IAM OIDC プロバイダーが設定されていることを確認できます。

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

mwaa 名前空間を作成します。

クラスターが正常に作成されたことを確認できたあと、以下のコマンドを使用してポッドの名前空間を作成します。

kubectl create namespace mwaa

mwaa 名前空間のロールを作成します。

名前空間を作成した後、MWAA 名前空間でポッドを実行できる EKS 上の Amazon MWAA ユーザー用のロールとロールバインディングを作成します。名前空間に別の名前を使用した場合は、-n mwaa の mwaa を使用した名前に置き換えます。

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

次のコマンドを実行して、新しいロールが Amazon EKS クラスターにアクセスできることを確認します。  mwaa を使用していない場合は、必ず正しい名前を使用してください。

kubectl get pods -n mwaa --as mwaa-service

以下のメッセージが返ってくるはずだ:

No resources found in mwaa namespace.

Amazon EKS クラスタの IAM ロールを作成してアタッチする

IAM による認証に使用できるように、IAM ロールを作成して Amazon EKS (k8s) クラスターにバインドする必要があります。  ロールはクラスターへのログインにのみ使用され、コンソールや API コールの権限はありません。 

Amazon MWAA 実行ロール」 のステップを使用して Amazon MWAA 環境用の新しいロールを作成します。ただし、このトピックで説明されているポリシーを作成して追加せずに、次のポリシーを追加します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

ロールを作成し、環境の実行ロールとして使用するように Amazon MWAA 環境を編集します。ロールを変更するには、使用する環境を編集します。[権限] で実行ロールを選択します。

既知の問題:

  • Amazon EKS での認証に失敗するサブパスを持つロール ARN には既知の問題があります。この問題の回避策は、Amazon MWAA 自体が作成したサービスロールを使用するのではなく、手動でサービスロールを作成することです。  詳しくは、「aws-auth configmapのARNにパスが含まれている場合、パスを持つロールは機能しない」 を参照してください。

  • Amazon MWAA サービスの一覧が IAM で利用できない場合は、Amazon EC2 などの代替サービスポリシーを選択し、ロールの信頼ポリシーを次のように更新する必要があります。 

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    詳細については、「IAM ロールで信頼ポリシーを使用する方法」 を参照してください。

要件.txt ファイルを作成します

このセクションのサンプルコードを使用するには、以下のデータベースオプションのいずれかを requirements.txt に追加していることを確認してください。詳細については、「Python 依存関係のインストール」を参照してください。

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

Amazon EKS 用のアイデンティティマッピングを作成します。

次のコマンドで作成したロールの ARN を使用して、Amazon EKS の ID マッピングを作成します。リージョン your-region を、環境を作成したリージョンに変更します。ロールの ARN を置き換え、最後に mwaa-execution-role ご使用の環境の実行ロールに置き換えます。

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

kubeconfig の作成

kubeconfig を作成するには以下のコマンドを使用する:

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

実行時に特定のプロファイルを使用した場合は、update-kubeconfig kube_config.yaml env: ファイルに追加されたセクションを削除して Amazon MWAA で正しく動作させる必要があります。そのためには、以下をファイルから削除して保存します。

env: - name: AWS_PROFILE value: profile_name

DAG を作成する

次のコード例を使用して、DAG の mwaa_pod_example.py などの Python ファイルを作成します。

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

DAG と kube_config.yaml を Amazon S3 バケットに追加します

作成した DAG と kube_config.yaml ファイルを Amazon MWAA 環境の Amazon S3 バケットに配置します。Amazon S3 コンソールまたは AWS Command Line Interface を使用して、ファイルをバケットに入れることができます。

サンプルを有効にして、トリガーしてください。

Apache Airflow で、サンプルを有効にしてからトリガーします。 

実行して正常に完了したら、次のコマンドを使用して、ポッドを確認します。

kubectl get pods -n mwaa

次のような出力が表示されます。

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

次に、以下のコマンドを使用して、ポッドの出力を確認できます。名前の値を、前のコマンドで返された値に置き換えます。

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888