Menggunakan Amazon MWAA dengan Amazon EKS - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Amazon MWAA dengan Amazon EKS

Contoh berikut menunjukkan cara menggunakan Alur Kerja Terkelola Amazon untuk Apache Airflow dengan Amazon EKS.

Versi

  • Contoh kode pada halaman ini dapat digunakan denganApache aliran udara v1diPython 3.7.

  • Anda dapat menggunakan contoh kode di halaman ini denganApache Airflow v2 dan di atasnyadiPython 3.10.

Prasyarat

Untuk menggunakan contoh dalam topik ini, Anda memerlukan yang berikut ini:

catatan

Bila Anda menggunakaneksctlperintah, Anda dapat menyertakan--profileuntuk menentukan profil selain default.

Membuat kunci publik untuk Amazon EC2

Gunakan perintah berikut untuk membuat kunci publik dari pasangan kunci pribadi Anda.

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

Untuk mempelajari lebih lanjut, lihatMengambil kunci publik untuk pasangan kunci Anda.

Buat klaster

Gunakan perintah berikut untuk membuat cluster. Jika Anda ingin nama kustom untuk klaster atau membuatnya di Wilayah yang berbeda, ganti nama dan nilai Region. Anda harus membuat klaster di Wilayah yang sama tempat Anda membuat lingkungan Amazon MWAA. Ganti nilai subnet agar sesuai dengan subnet di jaringan Amazon VPC yang Anda gunakan untuk Amazon MWAA. Ganti nilai untukssh-public-keyuntuk mencocokkan kunci yang Anda gunakan. Anda dapat menggunakan kunci yang ada dari Amazon EC2 yang berada di Wilayah yang sama, atau membuat kunci baru di Wilayah yang sama tempat Anda membuat lingkungan Amazon MWAA Anda.

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

Butuh beberapa waktu untuk menyelesaikan pembuatan cluster. Setelah selesai, Anda dapat memverifikasi bahwa klaster telah berhasil dibuat dan memiliki IAM OIDC Provider dikonfigurasi dengan menggunakan perintah berikut:

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

Buatmwaanamespace

Setelah mengonfirmasi bahwa klaster berhasil dibuat, gunakan perintah berikut untuk membuat namespace untuk Pod.

kubectl create namespace mwaa

Buat peran untukmwaanamespace

Setelah Anda membuat namespace, buat peran dan role-binding untuk pengguna Amazon MWAA di EKS yang dapat menjalankan Pod dalam namespace MWAA. Jika Anda menggunakan nama yang berbeda untuk namespace, ganti mwaa di-n mwaadengan nama yang Anda gunakan.

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

Konfirmasikan bahwa peran baru dapat mengakses klaster Amazon EKS dengan menjalankan perintah berikut. Pastikan untuk menggunakan nama yang benar jika Anda tidak menggunakannyamwaa:

kubectl get pods -n mwaa --as mwaa-service

Anda akan melihat pesan yang ditampilkan yang bertuliskan:

No resources found in mwaa namespace.

Membuat dan melampirkan peran IAM untuk klaster Amazon EKS

Anda harus membuat peran IAM dan kemudian mengikatnya ke klaster Amazon EKS (k8s) sehingga dapat digunakan untuk otentikasi melalui IAM. Peran ini hanya digunakan untuk masuk ke klaster, dan tidak memiliki izin apa pun untuk panggilan konsol atau API.

Buat peran baru untuk lingkungan Amazon MWAA menggunakan langkah-langkahPeran eksekusi Amazon MWAA. Namun, alih-alih membuat dan melampirkan kebijakan yang dijelaskan dalam topik tersebut, lampirkan kebijakan berikut:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

Setelah Anda membuat peran, edit lingkungan Amazon MWAA Anda untuk menggunakan peran yang Anda buat sebagai peran eksekusi untuk lingkungan. Untuk mengubah peran, edit lingkungan yang akan digunakan. Anda memilih peran eksekusi di bawahIzin.

Masalah yang diketahui:

  • Ada masalah yang diketahui dengan ARN peran dengan subpath yang tidak dapat mengautentikasi dengan Amazon EKS. Solusi untuk ini adalah membuat peran layanan secara manual daripada menggunakan yang dibuat oleh Amazon MWAA itu sendiri. Untuk mempelajari lebih lanjut, lihatPeran dengan jalur tidak berfungsi saat jalur disertakan dalam ARN mereka di configmap aws-auth

  • Jika daftar layanan Amazon MWAA tidak tersedia di IAM, Anda harus memilih kebijakan layanan alternatif, seperti Amazon EC2, dan kemudian memperbarui kebijakan kepercayaan peran agar sesuai dengan yang berikut:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    Untuk mempelajari lebih lanjut, lihatCara menggunakan kebijakan kepercayaan dengan peran IAM.

Buat file requirements.txt

Untuk menggunakan kode contoh di bagian ini, pastikan Anda telah menambahkan salah satu opsi database berikut kerequirements.txt. Untuk mempelajari selengkapnya, lihat Menginstal dependensi Python.

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

Membuat pemetaan identitas untuk Amazon EKS

Gunakan ARN untuk peran yang Anda buat dalam perintah berikut untuk membuat pemetaan identitas untuk Amazon EKS. Ubah Wilayahwilayah-Andake Wilayah tempat Anda menciptakan lingkungan. Ganti ARN untuk peran, dan akhirnya, gantimwaa-execution-roledengan peran eksekusi lingkungan Anda.

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

Buatkubeconfig

Gunakan perintah berikut untuk membuatkubeconfig:

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

Jika Anda menggunakan profil tertentu saat Anda berlariupdate-kubeconfigAnda perlu menghapusenv:bagian ditambahkan ke file kube_config.yaml sehingga bekerja dengan benar dengan Amazon MWAA. Untuk melakukannya, hapus yang berikut dari file dan kemudian simpan:

env: - name: AWS_PROFILE value: profile_name

Buat DAG

Gunakan contoh kode berikut untuk membuat file Python, sepertimwaa_pod_example.pyuntuk DAG.

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

Tambahkan DAG dankube_config.yamlke bucket Amazon S3

Letakkan DAG yang Anda buat dankube_config.yamlfile ke bucket Amazon S3 untuk lingkungan Amazon MWAA. Anda dapat memasukkan file ke bucket Anda menggunakan konsol Amazon S3 atauAWS Command Line Interface.

Aktifkan dan picu contoh

Di Apache Airflow, aktifkan contoh dan kemudian picu.

Setelah berhasil berjalan dan selesai, gunakan perintah berikut untuk memverifikasi pod:

kubectl get pods -n mwaa

Anda akan melihat output yang serupa dengan yang berikut:

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

Anda kemudian dapat memverifikasi output dari Pod dengan perintah berikut. Ganti nilai nama dengan nilai yang dikembalikan dari perintah sebelumnya:

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888