MWAA아마존과 아마존을 함께 사용하기 EKS - Amazon Managed Workflows for Apache Airflow

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

MWAA아마존과 아마존을 함께 사용하기 EKS

다음 샘플은 Amazon에서 Apache Airflow용 Amazon 관리형 워크플로를 사용하는 방법을 보여줍니다. EKS

버전

  • 이 페이지의 샘플 코드는 Python 3.7Apache Airflow v1과 함께 사용할 수 있습니다.

  • 이 페이지의 코드 예제를 Python 3.10의 아파치 에어플로우 v2와 함께 사용할 수 있습니다.

사전 조건

이 항목의 예제를 사용하려면 다음이 필요합니다.

참고

eksctl 명령을 사용할 때 --profile를 포함하여 기본값 이외의 프로필을 지정할 수 있습니다.

Amazon용 퍼블릭 키 생성 EC2

다음 명령을 사용하여 프라이빗 키 페어에서 퍼블릭 키를 생성합니다.

ssh-keygen -y -f myprivatekey.pem > mypublickey.pub

자세한 내용은 키 페어에 대한 퍼블릭 키 검색을 참조하십시오.

클러스터 생성

다음 명령을 사용하여 클러스터를 생성합니다. 클러스터에 사용자 지정 이름을 지정하거나 다른 리전에 생성하려면 이름 및 리전 값을 바꿉니다. Amazon MWAA 환경을 생성한 지역과 동일한 지역에 클러스터를 생성해야 합니다. Amazon에서 사용하는 Amazon VPC 네트워크의 서브넷과 일치하도록 서브넷 값을 바꾸십시오. MWAA ssh-public-key의 값을 사용하는 키와 일치하도록 바꿉니다. 동일한 지역에 있는 Amazon의 기존 키를 사용하거나 Amazon EC2 MWAA 환경을 만든 동일한 지역에서 새 키를 생성할 수 있습니다.

eksctl create cluster \ --name mwaa-eks \ --region us-west-2 \ --version 1.18 \ --nodegroup-name linux-nodes \ --nodes 3 \ --nodes-min 1 \ --nodes-max 4 \ --with-oidc \ --ssh-access \ --ssh-public-key MyPublicKey \ --managed \ --vpc-public-subnets "subnet-11111111111111111, subnet-2222222222222222222" \ --vpc-private-subnets "subnet-33333333333333333, subnet-44444444444444444"

클러스터 생성을 완료하는 데 시간이 걸립니다. 완료되면 다음 명령을 사용하여 클러스터가 성공적으로 생성되었고 IAM OIDC 공급자가 구성되어 있는지 확인할 수 있습니다.

eksctl utils associate-iam-oidc-provider \ --region us-west-2 \ --cluster mwaa-eks \ --approve

mwaa 네임스페이스 생성

클러스터가 성공적으로 생성되었는지 확인한 후 다음 명령을 사용하여 포드에 대한 네임스페이스를 생성합니다.

kubectl create namespace mwaa

mwaa 네임스페이스에 대한 역할 생성

네임스페이스를 생성한 후 네임스페이스에서 포드를 실행할 수 EKS 있는 Amazon MWAA 사용자를 위한 역할 및 역할 바인딩을 생성합니다. MWAA 네임스페이스에 다른 이름을 사용한 경우 -n mwaa의 mwaa를 사용한 이름으로 바꿉니다.

cat << EOF | kubectl apply -f - -n mwaa kind: Role apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role rules: - apiGroups: - "" - "apps" - "batch" - "extensions" resources: - "jobs" - "pods" - "pods/attach" - "pods/exec" - "pods/log" - "pods/portforward" - "secrets" - "services" verbs: - "create" - "delete" - "describe" - "get" - "list" - "patch" - "update" --- kind: RoleBinding apiVersion: rbac.authorization.k8s.io/v1 metadata: name: mwaa-role-binding subjects: - kind: User name: mwaa-service roleRef: kind: Role name: mwaa-role apiGroup: rbac.authorization.k8s.io EOF

다음 명령을 실행하여 새 역할이 Amazon EKS 클러스터에 액세스할 수 있는지 확인합니다. 사용하지 않았다면 반드시 올바른 이름을 사용하세요.mwaa:

kubectl get pods -n mwaa --as mwaa-service

또한 다음과 같은 메시지가 표시됩니다.

No resources found in mwaa namespace.

Amazon EKS 클러스터용 IAM 역할 생성 및 연결

IAM역할을 생성한 다음 Amazon EKS (k8s) 클러스터에 바인딩해야 인증에 사용할 수 있습니다. IAM 역할은 클러스터에 로그인하는 데만 사용되며 콘솔 또는 API 호출에 대한 권한은 없습니다.

의 단계를 사용하여 Amazon MWAA 환경을 위한 새 역할을 생성합니다Amazon MWAA 실행 역할. 하지만 해당 항목에 설명된 정책을 생성하고 첨부하는 대신 다음 정책을 첨부하십시오.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "airflow:PublishMetrics", "Resource": "arn:aws:airflow:${MWAA_REGION}:${ACCOUNT_NUMBER}:environment/${MWAA_ENV_NAME}" }, { "Effect": "Deny", "Action": "s3:ListAllMyBuckets", "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "s3:GetObject*", "s3:GetBucket*", "s3:List*" ], "Resource": [ "arn:aws:s3:::{MWAA_S3_BUCKET}", "arn:aws:s3:::{MWAA_S3_BUCKET}/*" ] }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults", "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:${MWAA_REGION}:${ACCOUNT_NUMBER}:log-group:airflow-${MWAA_ENV_NAME}-*" ] }, { "Effect": "Allow", "Action": "cloudwatch:PutMetricData", "Resource": "*" }, { "Effect": "Allow", "Action": [ "sqs:ChangeMessageVisibility", "sqs:DeleteMessage", "sqs:GetQueueAttributes", "sqs:GetQueueUrl", "sqs:ReceiveMessage", "sqs:SendMessage" ], "Resource": "arn:aws:sqs:${MWAA_REGION}:*:airflow-celery-*" }, { "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:DescribeKey", "kms:GenerateDataKey*", "kms:Encrypt" ], "NotResource": "arn:aws:kms:*:${ACCOUNT_NUMBER}:key/*", "Condition": { "StringLike": { "kms:ViaService": [ "sqs.${MWAA_REGION}.amazonaws.com" ] } } }, { "Effect": "Allow", "Action": [ "eks:DescribeCluster" ], "Resource": "arn:aws:eks:${MWAA_REGION}:${ACCOUNT_NUMBER}:cluster/${EKS_CLUSTER_NAME}" } ] }

역할을 생성한 후에는 생성한 역할을 MWAA 환경의 실행 역할로 사용하도록 Amazon 환경을 편집하십시오. 역할을 변경하려면 사용할 환경을 편집합니다. 권한에서 실행 역할을 선택합니다.

알려진 문제:

  • Amazon에서 인증할 수 없는 하위 ARNs 경로와 관련된 역할과 관련된 알려진 문제가 있습니다. EKS 이에 대한 해결 방법은 Amazon에서 직접 만든 서비스 역할을 사용하는 대신 서비스 역할을 수동으로 생성하는 MWAA 것입니다. 자세한 내용은 aws-auth configmap에서 경로가 포함된 경우 경로가 있는 역할이 작동하지 않음을 참조하십시오. ARN

  • Amazon MWAA 서비스 목록을 사용할 수 없는 경우 EC2 Amazon과 같은 대체 서비스 정책을 선택한 다음 다음과 일치하도록 역할의 신뢰 정책을 업데이트해야 합니다. IAM

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": [ "airflow-env.amazonaws.com", "airflow.amazonaws.com" ] }, "Action": "sts:AssumeRole" } ] }

    자세한 내용은 IAM역할과 함께 신뢰 정책을 사용하는 방법을 참조하십시오.

requirements.txt 파일 생성

이 섹션의 샘플 코드를 사용하려면 다음 데이터베이스 옵션 중 하나를 requirements.txt에 추가했는지 확인합니다. 자세한 내용은 Python 종속성 설치을 참조하십시오.

Apache Airflow v2
kubernetes apache-airflow[cncf.kubernetes]==3.0.0
Apache Airflow v1
awscli kubernetes==12.0.1

Amazon용 아이덴티티 매핑 생성 EKS

다음 명령에서 생성한 역할에 를 사용하여 Amazon에 대한 ID 매핑을 생성합니다EKS. ARN 지역 변경 your-region 환경을 만든 지역으로 역할을 대체하고 마지막으로 교체하십시오. ARN mwaa-execution-role 환경의 실행 역할과 함께 사용하세요.

eksctl create iamidentitymapping \ --region your-region \ --cluster mwaa-eks \ --arn arn:aws:iam::111222333444:role/mwaa-execution-role \ --username mwaa-service

kubeconfig 생성

다음 명령을 실행해 kubeconfig을 생성합니다.

aws eks update-kubeconfig \ --region us-west-2 \ --kubeconfig ./kube_config.yaml \ --name mwaa-eks \ --alias aws

실행할 update-kubeconfig 때 특정 프로필을 사용한 경우 kube_config.yaml 파일에 추가된 env: 섹션을 제거하여 Amazon에서 제대로 작동하도록 해야 합니다. MWAA 이렇게 하려면 파일에서 다음을 삭제한 다음 저장합니다.

env: - name: AWS_PROFILE value: profile_name

생성하기 DAG

다음 코드 예제를 사용하여 Python 파일 (예:) mwaa_pod_example.py 을 생성합니다DAG.

Apache Airflow v2
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.providers.cncf.kubernetes.operators.kubernetes_pod import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )
Apache Airflow v1
""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. """ from airflow import DAG from datetime import datetime from airflow.contrib.operators.kubernetes_pod_operator import KubernetesPodOperator default_args = { 'owner': 'aws', 'depends_on_past': False, 'start_date': datetime(2019, 2, 20), 'provide_context': True } dag = DAG( 'kubernetes_pod_example', default_args=default_args, schedule_interval=None) #use a kube_config stored in s3 dags folder for now kube_config_path = '/usr/local/airflow/dags/kube_config.yaml' podRun = KubernetesPodOperator( namespace="mwaa", image="ubuntu:18.04", cmds=["bash"], arguments=["-c", "ls"], labels={"foo": "bar"}, name="mwaa-pod-test", task_id="pod-task", get_logs=True, dag=dag, is_delete_operator_pod=False, config_file=kube_config_path, in_cluster=False, cluster_context='aws' )

DAG및 를 Amazon S3 kube_config.yaml 버킷에 추가합니다.

DAG생성한 kube_config.yaml 파일과 파일을 Amazon MWAA 환경용 Amazon S3 버킷에 넣습니다. Amazon S3 콘솔 또는 AWS Command Line Interface를 사용하여 파일을 버킷에 넣을 수 있습니다.

예제 활성화 및 트리거

Apache Airflow에서 예제를 활성화한 다음 트리거합니다.

성공적으로 실행되고 완료되면 다음 명령을 사용하여 포드를 확인합니다.

kubectl get pods -n mwaa

다음과 유사한 출력 화면이 표시되어야 합니다.

NAME READY STATUS RESTARTS AGE mwaa-pod-test-aa11bb22cc3344445555666677778888 0/1 Completed 0 2m23s

이후 다음 명령을 사용하여 포드의 출력을 확인할 수 있습니다. 이름 값을 이전 명령에서 반환된 값으로 바꿉니다.

kubectl logs -n mwaa mwaa-pod-test-aa11bb22cc3344445555666677778888