ECSOperator를 사용하여 Amazon ECS에 연결 - Amazon Managed Workflows for Apache Airflow

ECSOperator를 사용하여 Amazon ECS에 연결

이 항목에서는 ECSOperator를 사용하여 Amazon MWAA에서 Amazon Elastic Container Service(Amazon ECS) 컨테이너에 연결하기 위한 샘플 코드를 설명합니다. 다음 단계에서는 환경의 실행 역할에 필요한 권한을 추가하고, AWS CloudFormation 템플릿을 사용하여 Amazon ECS Fargate 클러스터를 생성하고, 마지막으로 새 클러스터에 연결되는 DAG를 생성 및 업로드합니다.

버전

  • 이 페이지의 코드 예제는 Python 3.10Apache Airflow v2에서 사용할 수 있습니다.

필수 조건

이 페이지의 이 샘플 코드를 사용하려면 다음 항목이 필요합니다.

권한

  • 환경의 실행 역할에는 Amazon ECS에서 작업을 실행할 수 있는 권한이 필요합니다. AWSAmazonECS_FullAccess 관리형 정책을 실행 역할에 연결하거나 다음 정책을 생성하여 실행 역할에 연결할 수 있습니다.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "ecs:RunTask", "ecs:DescribeTasks" ], "Resource": "*" }, { "Action": "iam:PassRole", "Effect": "Allow", "Resource": [ "*" ], "Condition": { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } } } ] }
  • Amazon ECS에서 작업을 실행하는 데 필요한 권한을 추가하는 것 외에도 다음과 같이 Amazon MWAA 실행 역할에서 CloudWatch Logs 정책 설명을 수정하여 Amazon ECS 작업 로그 그룹에 대한 액세스를 허용해야 합니다. Amazon ECS 로그 그룹은 Amazon ECS 클러스터 생성의 AWS CloudFormation 템플릿에 의해 생성됩니다.

    { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:airflow-environment-name-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*" ] }

Amazon MWAA 실행 역할 및 정책 연결 방법에 대한 자세한 내용은 실행 역할 섹션을 참조하십시오.

Amazon ECS 클러스터 생성

다음 AWS CloudFormation 템플릿을 사용하여 Amazon MWAA 워크플로에 사용할 Amazon ECS Fargate 클러스터를 구축합니다. 자세한 내용을 알아보려면 Amazon Elastic Container Service 개발자 안내서작업 정의 생성을 참조하십시오.

  1. 다음 코드로 JSON 파일을 생성하고 ecs-mwaa-cfn.json으로 저장합니다.

    { "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } }
  2. 명령 프롬프트에서 다음 AWS CLI 명령을 사용하여 새 스택을 생성합니다. SecurityGroups 값과 SubnetIds 값을 Amazon MWAA 환경의 보안 그룹 및 서브넷의 값으로 바꿔야 합니다.

    $ aws cloudformation create-stack \ --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \ --capabilities CAPABILITY_IAM

    또는 다음 쉘 스크립트를 사용할 수 있습니다. 스크립트는 get-environment AWS CLI 명령을 사용하여 환경의 보안 그룹 및 서브넷에 필요한 값을 검색한 다음 그에 따라 스택을 생성합니다. 스크립트를 실행하려면 다음을 수행합니다.

    1. 스크립트를 복사하여 AWS CloudFormation 템플릿과 동일한 디렉터리에 ecs-stack-helper.sh로 저장합니다.

      #!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM
    2. 다음 명령을 사용하여 스크립트를 실행합니다. environment-namestack-name을(를) 자신의 정보로 대체합니다.

      $ chmod +x ecs-stack-helper.sh $ ./ecs-stack-helper.bash environment-name stack-name

    성공하면 새 AWS CloudFormation 스택 ID를 표시하는 다음 출력이 표시됩니다.

    {
        "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
    }

AWS CloudFormation 스택이 완성되고 AWS가 Amazon ECS 리소스를 프로비저닝한 후에는 DAG를 생성하고 업로드할 준비가 된 것입니다.

코드 샘플

  1. 명령 프롬프트를 열고 DAG 코드가 저장된 디렉터리로 이동합니다. 예:

    cd dags
  2. 다음 코드 샘플의 내용을 복사하여 로컬에 mwaa-ecs-operator.py로 저장한 다음 새 DAG를 Amazon S3에 업로드합니다.

    from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )
    참고

    예제 DAG에서는 awslogs_group의 경우 Amazon ECS 작업 로그 그룹의 이름을 사용하여 로그 그룹을 수정해야 할 수 있습니다. 예제에서는 mwaa-ecs-zero이라는 이름의 로그 그룹을 가정합니다. awslogs_stream_prefix의 경우 Amazon ECS 작업 로그 스트림 접두사를 사용합니다. 이 예제에서는 로그 스트림 접두사, ecs를 가정합니다.

  3. 다음 AWS CLI 명령을 실행하여 DAG를 환경 버킷에 복사한 다음 Apache Airflow UI를 사용하여 DAG를 트리거합니다.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. 성공하면 ecs_fargate_dag DAG의 ecs_operator_task에 대한 작업 로그에 다음과 비슷한 출력이 표시됩니다.

    [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
    Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
    [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
    {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
    [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed