Menghubungkan ke Amazon ECS menggunakanECSOperator - Amazon Managed Workflows for Apache Airflow (MWAA)

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Menghubungkan ke Amazon ECS menggunakanECSOperator

Topik ini menjelaskan bagaimana Anda dapat menggunakanECSOperatoruntuk menyambung ke wadah Amazon Elastic Container Service (Amazon ECS) dari Amazon MWAA. Dalam langkah-langkah berikut, Anda akan menambahkan izin yang diperlukan untuk peran eksekusi lingkungan Anda, menggunakanAWS CloudFormationtemplate untuk membuat klaster Amazon ECS Fargate, dan akhirnya membuat dan mengunggah DAG yang terhubung ke klaster baru Anda.

Versi

  • Anda dapat menggunakan contoh kode di halaman ini denganApache Airflow v2 dan di atasnyadiPython 3.10.

Prasyarat

Untuk menggunakan kode contoh di halaman ini, Anda memerlukan yang berikut ini:

Izin

  • Peran eksekusi untuk lingkungan Anda memerlukan izin untuk menjalankan tugas di Amazon ECS. Anda dapat melampirkanAmazonecs_FullAccess AWS-kebijakan yang dikelola untuk peran eksekusi Anda, atau buat dan lampirkan kebijakan berikut ke peran eksekusi Anda.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "ecs:RunTask", "ecs:DescribeTasks" ], "Resource": "*" }, { "Action": "iam:PassRole", "Effect": "Allow", "Resource": [ "*" ], "Condition": { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } } } ] }
  • Selain menambahkan premisi yang diperlukan untuk menjalankan tugas di Amazon ECS, Anda juga harus memodifikasiCloudWatchPernyataan kebijakan log dalam peran eksekusi Amazon MWAA Anda untuk memungkinkan akses ke grup log tugas Amazon ECS seperti yang ditunjukkan di bawah ini. Grup log Amazon ECS dibuat olehAWS CloudFormationtemplate diMembuat klaster Amazon ECS.

    { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:airflow-environment-name-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*" ] }

Untuk informasi selengkapnya tentang peran eksekusi Amazon MWAA, dan cara melampirkan kebijakan, lihatPeran eksekusi.

Membuat klaster Amazon ECS

Menggunakan berikut iniAWS CloudFormationtemplate, Anda akan membangun klaster Amazon ECS Fargate untuk digunakan dengan alur kerja Amazon MWAA Anda. Untuk informasi lebih lanjut, lihatMembuat definisi tugasdi dalamPanduan Pengembang Layanan Kontainer Amazon Elastic.

  1. Buat file JSON dengan kode berikut dan simpan sebagaiecs-mwaa-cfn.json.

    { "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } }
  2. Di prompt perintah Anda, gunakan yang berikutAWS CLIperintah untuk membuat tumpukan baru. Anda harus mengganti nilainyaSecurityGroupsdanSubnetIdsdengan nilai untuk grup keamanan dan subnet lingkungan Amazon MWAA Anda.

    $ aws cloudformation create-stack \ --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \ --capabilities CAPABILITY_IAM

    Atau, Anda dapat menggunakan skrip shell berikut. Skrip mengambil nilai yang diperlukan untuk grup keamanan lingkungan Anda, dan subnet menggunakanget-environment AWS CLIperintah, kemudian menciptakan stack sesuai. Untuk menjalankan skrip, lakukan hal berikut.

    1. Salin, dan simpan skrip sebagaiecs-stack-helper.shdi direktori yang sama denganAWS CloudFormationTemplat.

      #!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM
    2. Jalankan skrip menggunakan perintah berikut. Gantienvironment-namedanstack-namedengan informasi Anda.

      $ chmod +x ecs-stack-helper.sh $ ./ecs-stack-helper.bash environment-name stack-name

    Jika berhasil, Anda akan melihat output berikut menampilkan baru AndaAWS CloudFormationtumpukan ID.

    {
        "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
    }

Setelah AndaAWS CloudFormationstack selesai danAWStelah menyediakan sumber daya Amazon ECS Anda, Anda siap untuk membuat dan mengunggah DAG Anda.

Sampel kode

  1. Buka prompt perintah, dan arahkan ke direktori tempat kode DAG Anda disimpan. Misalnya:

    cd dags
  2. Salin isi contoh kode berikut dan simpan secara lokal sebagaimwaa-ecs-operator.py, lalu unggah DAG baru Anda ke Amazon S3.

    from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )
    catatan

    Dalam contoh DAG, untukawslogs_group, Anda mungkin perlu memodifikasi grup log dengan nama untuk grup log tugas Amazon ECS Anda. Contoh mengasumsikan grup log bernamamwaa-ecs-zero. Untukawslogs_stream_prefix, gunakan awalan aliran log tugas Amazon ECS. Contoh mengasumsikan awalan aliran log,ecs.

  3. Jalankan yang berikutAWS CLIperintah untuk menyalin DAG ke bucket lingkungan Anda, lalu memicu DAG menggunakan UI Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Jika berhasil, Anda akan melihat output yang mirip dengan berikut ini di log tugas untukecs_operator_taskdi dalamecs_fargate_dagHARI:

    [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
    Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
    [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
    {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
    [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed