Menggunakan Amazon MWAA dengan Amazon EKS - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menggunakan Amazon MWAA dengan Amazon EKS

Contoh berikut menunjukkan cara menggunakan Alur Kerja Terkelola Amazon untuk Apache Airflow dengan Amazon. EKS

Versi

Prasyarat

Untuk menggunakan contoh dalam topik ini, Anda memerlukan yang berikut:

catatan

Bila Anda menggunakan eksctl perintah, Anda dapat menyertakan --profile untuk menentukan profil selain default.

Buat kunci publik untuk Amazon EC2

Gunakan perintah berikut untuk membuat kunci publik dari private key pair Anda.

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

Untuk mempelajari lebih lanjut, lihat Mengambil kunci publik untuk key pair Anda.

Buat cluster

Gunakan perintah berikut untuk membuat cluster. Jika Anda menginginkan nama khusus untuk klaster atau membuatnya di Wilayah yang berbeda, ganti nama dan nilai Wilayah. Anda harus membuat cluster di Wilayah yang sama tempat Anda membuat MWAA lingkungan Amazon. Ganti nilai subnet agar sesuai dengan subnet di VPC jaringan Amazon yang Anda gunakan untuk Amazon. MWAA Ganti nilai untuk ssh-public-key mencocokkan kunci yang Anda gunakan. Anda dapat menggunakan kunci yang ada dari Amazon EC2 yang berada di Wilayah yang sama, atau membuat kunci baru di Wilayah yang sama tempat Anda membuat MWAA lingkungan Amazon.

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

Butuh beberapa waktu untuk menyelesaikan pembuatan cluster. Setelah selesai, Anda dapat memverifikasi bahwa cluster berhasil dibuat dan IAM OIDC Penyedia dikonfigurasi dengan menggunakan perintah berikut:

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

Buat mwaa namespace

Setelah mengonfirmasi bahwa cluster berhasil dibuat, gunakan perintah berikut untuk membuat namespace untuk pod.

kubectl create namespace mwaa

Buat peran untuk mwaa namespace

Setelah membuat namespace, buat role dan role-binding untuk MWAA pengguna Amazon EKS yang dapat menjalankan pod di namespace. MWAA Jika Anda menggunakan nama yang berbeda untuk namespace, ganti mwaa in -n mwaa dengan nama yang Anda gunakan.

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

Konfirmasikan bahwa peran baru dapat mengakses EKS cluster Amazon dengan menjalankan perintah berikut. Pastikan untuk menggunakan nama yang benar jika Anda tidak menggunakan mwaa:

kubectl get pods -n mwaa --as mwaa-service

Anda akan melihat pesan yang dikembalikan yang mengatakan:

No resources found in mwaa namespace.

Buat dan lampirkan IAM peran untuk EKS klaster Amazon

Anda harus membuat IAM peran dan kemudian mengikatnya ke cluster Amazon EKS (k8s) sehingga dapat digunakan untuk otentikasi melalui. IAM Peran ini hanya digunakan untuk masuk ke cluster, dan tidak memiliki izin apa pun untuk konsol atau API panggilan.

Buat peran baru untuk MWAA lingkungan Amazon menggunakan langkah-langkah diPeran MWAA eksekusi Amazon. Namun, alih-alih membuat dan melampirkan kebijakan yang dijelaskan dalam topik itu, lampirkan kebijakan berikut:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

Setelah Anda membuat peran, edit MWAA lingkungan Amazon Anda untuk menggunakan peran yang Anda buat sebagai peran eksekusi untuk lingkungan. Untuk mengubah peran, edit lingkungan yang akan digunakan. Anda memilih peran eksekusi di bawah Izin.

Masalah yang diketahui:

  • Ada masalah yang diketahui dengan peran ARNs dengan subpath yang tidak dapat diautentikasi dengan Amazon. EKS Solusi untuk ini adalah membuat peran layanan secara manual daripada menggunakan yang dibuat oleh Amazon itu sendiri. MWAA Untuk mempelajari lebih lanjut, lihat Peran dengan jalur tidak berfungsi saat jalur disertakan di configmap ARN aws-auth

  • Jika daftar MWAA layanan Amazon tidak tersedia, IAM Anda harus memilih kebijakan layanan alternatif, seperti AmazonEC2, lalu memperbarui kebijakan kepercayaan peran agar sesuai dengan yang berikut:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    Untuk mempelajari lebih lanjut, lihat Cara menggunakan kebijakan kepercayaan dengan IAM peran.

Buat file requirements.txt

Untuk menggunakan kode sampel di bagian ini, pastikan Anda telah menambahkan salah satu opsi database berikut ke Andarequirements.txt. Untuk mempelajari selengkapnya, lihat Menginstal dependensi Python.

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

Membuat pemetaan identitas untuk Amazon EKS

Gunakan ARN untuk peran yang Anda buat dalam perintah berikut untuk membuat pemetaan identitas untuk AmazonEKS. Ubah Wilayah your-region ke Wilayah tempat Anda menciptakan lingkungan. Ganti ARN untuk peran, dan akhirnya, ganti mwaa-execution-role dengan peran eksekusi lingkungan Anda.

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

Buat kubeconfig

Gunakan perintah berikut untuk membuatkubeconfig:

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

Jika Anda menggunakan profil tertentu saat menjalankan, update-kubeconfig Anda perlu menghapus env: bagian yang ditambahkan ke file kube_config.yaml sehingga berfungsi dengan benar dengan Amazon. MWAA Untuk melakukannya, hapus yang berikut dari file dan kemudian simpan:

env: - name: AWS_PROFILE value: profile_name

Buat DAG

Gunakan contoh kode berikut untuk membuat file Python, seperti mwaa_pod_example.py untuk file. DAG

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

Tambahkan DAG dan kube_config.yaml ke bucket Amazon S3

Masukkan file yang DAG Anda buat dan kube_config.yaml file ke dalam bucket Amazon S3 untuk lingkungan AmazonMWAA. Anda dapat memasukkan file ke dalam bucket menggunakan konsol Amazon S3 atau. AWS Command Line Interface

Aktifkan dan picu contoh

Di Apache Airflow, aktifkan contoh dan kemudian picu.

Setelah berjalan dan selesai dengan sukses, gunakan perintah berikut untuk memverifikasi pod:

kubectl get pods -n mwaa

Anda akan melihat output yang serupa dengan yang berikut:

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

Anda kemudian dapat memverifikasi output pod dengan perintah berikut. Ganti nilai nama dengan nilai yang dikembalikan dari perintah sebelumnya:

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888