Utilizzo di Amazon MWAA con Amazon EKS - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Utilizzo di Amazon MWAA con Amazon EKS

L'esempio seguente dimostra come utilizzare Amazon Managed Workflows per Apache Airflow con Amazon EKS.

Versione

  • Il codice di esempio in questa pagina può essere utilizzato conApache Airflow v1nelPython 3.7.

  • Puoi usare l'esempio di codice in questa pagina conApache Airflow v2 e versioni successivenelPython 3.10.

Prerequisiti

Per utilizzare l'esempio in questo argomento, avrai bisogno di quanto segue:

Nota

Quando si utilizza uneksctlcomando, puoi includere un--profileper specificare un profilo diverso da quello predefinito.

Crea una chiave pubblica per Amazon EC2

Usa il comando seguente per creare una chiave pubblica dalla tua coppia di chiavi private.

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

Per saperne di più, consultaRecupero della chiave pubblica per la coppia di chiavi.

Crea il cluster

Usa il comando seguente per creare il cluster. Se desideri un nome personalizzato per il cluster o per crearlo in una regione diversa, sostituisci i valori del nome e della regione. Devi creare il cluster nella stessa regione in cui crei l'ambiente Amazon MWAA. Sostituisci i valori delle sottoreti in modo che corrispondano alle sottoreti della tua rete Amazon VPC che utilizzi per Amazon MWAA. Sostituisci il valore perssh-public-keyin base alla chiave utilizzata. Puoi utilizzare una chiave esistente di Amazon EC2 che si trova nella stessa regione o crearne una nuova nella stessa regione in cui hai creato il tuo ambiente Amazon MWAA.

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

Il completamento della creazione del cluster richiede del tempo. Una volta completato, puoi verificare che il cluster sia stato creato correttamente e che il provider IAM OIDC sia configurato utilizzando il seguente comando:

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

Creare unmwaanamespace

Dopo aver confermato che il cluster è stato creato correttamente, usa il seguente comando per creare un namespace per i pod.

kubectl create namespace mwaa

Crea un ruolo permwaanamespace

Dopo aver creato il namespace, crea un ruolo e un'associazione di ruoli per un utente Amazon MWAA su EKS in grado di eseguire i pod in uno spazio dei nomi MWAA. Se hai usato un nome diverso per il namespace, sostituisci mwaa in-n mwaacon il nome che hai usato.

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

Conferma che il nuovo ruolo possa accedere al cluster Amazon EKS eseguendo il seguente comando. Assicurati di usare il nome corretto se non l'hai usatomwaa:

kubectl get pods -n mwaa --as mwaa-service

Dovresti vedere un messaggio restituito che dice:

No resources found in mwaa namespace.

Crea e associa un ruolo IAM per il cluster Amazon EKS

Devi creare un ruolo IAM e quindi associarlo al cluster Amazon EKS (k8s) in modo che possa essere utilizzato per l'autenticazione tramite IAM. Il ruolo viene utilizzato solo per accedere al cluster e non dispone di autorizzazioni per la console o le chiamate API.

Crea un nuovo ruolo per l'ambiente Amazon MWAA seguendo la procedura descritta inRuolo di esecuzione di Amazon MWAA. Tuttavia, anziché creare e allegare le politiche descritte in quell'argomento, allega la seguente politica:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

Dopo aver creato il ruolo, modifica il tuo ambiente Amazon MWAA per utilizzare il ruolo che hai creato come ruolo di esecuzione per l'ambiente. Per cambiare il ruolo, modifica l'ambiente da utilizzare. Seleziona il ruolo di esecuzione inAutorizzazioni.

Problemi noti:

  • Esiste un problema noto relativo agli ARN di ruolo con percorsi secondari che non sono in grado di autenticarsi con Amazon EKS. La soluzione alternativa consiste nel creare il ruolo del servizio manualmente anziché utilizzare quello creato da Amazon MWAA stesso. Per saperne di più, consultaI ruoli con percorsi non funzionano quando il percorso è incluso nel loro ARN nella configmap di aws-auth

  • Se l'elenco dei servizi Amazon MWAA non è disponibile in IAM, devi scegliere una politica di servizio alternativa, come Amazon EC2, e quindi aggiornare la politica di fiducia del ruolo in modo che corrisponda a quanto segue:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    Per saperne di più, consultaCome utilizzare le politiche di fiducia con i ruoli IAM.

Creare il file requirements.txt

Per utilizzare il codice di esempio in questa sezione, assicurati di aver aggiunto una delle seguenti opzioni di database al tuorequirements.txt. Per ulteriori informazioni, consulta Installazione delle dipendenze in Python.

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

Crea una mappatura dell'identità per Amazon EKS

Usa l'ARN per il ruolo che hai creato nel comando seguente per creare una mappatura dell'identità per Amazon EKS. Cambia la regionela tua regionenella regione in cui hai creato l'ambiente. Sostituisci l'ARN per il ruolo e, infine, sostituiscimwaa-execution-rolecon il ruolo di esecuzione del tuo ambiente.

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

Creazione del kubeconfig

Utilizzate il comando seguente per crearekubeconfig:

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

Se hai utilizzato un profilo specifico durante la corsaupdate-kubeconfigè necessario rimuovere ilenv:sezione aggiunta al file kube_config.yaml in modo che funzioni correttamente con Amazon MWAA. A tale scopo, elimina quanto segue dal file e quindi salvalo:

env: - name: AWS_PROFILE value: profile_name

Crea un DAG

Usa il seguente esempio di codice per creare un file Python, ad esempiomwaa_pod_example.pyper il DAG.

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

Aggiungi il DAG ekube_config.yamlal bucket Amazon S3

Inserisci il DAG che hai creato e ilkube_config.yamlfile nel bucket Amazon S3 per l'ambiente Amazon MWAA. Puoi inserire file nel tuo bucket utilizzando la console Amazon S3 o ilAWS Command Line Interface.

Abilita e attiva l'esempio

In Apache Airflow, abilitate l'esempio e quindi attivatelo.

Dopo l'esecuzione e il completamento con successo, usa il seguente comando per verificare il pod:

kubectl get pods -n mwaa

Verrà visualizzato un output simile al seguente:

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

È quindi possibile verificare l'output del pod con il seguente comando. Sostituisci il valore del nome con il valore restituito dal comando precedente:

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888