Como conectar-se ao Amazon ECS usando o ECSOperator - Amazon Managed Workflows for Apache Airflow

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Como conectar-se ao Amazon ECS usando o ECSOperator

O tópico descreve como você pode usar ECSOperator para se conectar a um contêiner do Amazon Elastic Container Service (Amazon ECS) do Amazon MWAA. Nas etapas a seguir, você adicionará as permissões necessárias ao perfil de execução do seu ambiente, usará um modelo AWS CloudFormation para criar um cluster Amazon ECS Fargate e, por fim, criará e carregará um DAG que se conecta ao seu novo cluster.

Versão

Pré-requisitos

Para usar o código de amostra nesta página, você precisará do seguinte:

Permissões

  • O perfil de execução do seu ambiente precisa de permissão para executar tarefas no Amazon ECS. Você pode anexar a política gerenciada por AWS AmazonECS_FullAccess ao seu perfil de execução ou criar e anexar a seguinte política ao seu perfil de execução.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "ecs:RunTask", "ecs:DescribeTasks" ], "Resource": "*" }, { "Action": "iam:PassRole", "Effect": "Allow", "Resource": [ "*" ], "Condition": { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } } } ] }
  • Além de adicionar as permissões necessárias para executar tarefas no Amazon ECS, você também deve modificar a declaração de política do CloudWatch Logs em seu perfil de execução do Amazon MWAA para permitir acesso ao grupo de logs de tarefas do Amazon ECS, conforme mostrado a seguir. O grupo de logs do Amazon ECS é criado pelo modelo AWS CloudFormation em Crie um cluster do Amazon ECS.

    { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:airflow-environment-name-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*" ] }

Para obter mais informações sobre o perfil de execução do Amazon MWAA e como anexar uma política, consulte Perfil de execução.

Crie um cluster do Amazon ECS

Ao usar o modelo AWS CloudFormation a seguir, você criará um cluster Amazon ECS Fargate para usar com seu fluxo de trabalho do Amazon MWAA. Para obter mais informações, consulte Como criar uma definição de tarefa no Guia do desenvolvedor do Amazon Elastic Container Service.

  1. Crie um arquivo JSON com o seguinte conteúdo e salve-o como ecs-mwaa-cfn.json.

    { "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } }
  2. Em seu prompt de comando, use o seguinte comando AWS CLI para criar uma nova pilha. Você deve substituir os valores SecurityGroups e SubnetIds por valores para os grupos de segurança e sub-redes do seu ambiente do Amazon MWAA.

    $ aws cloudformation create-stack \ --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \ --capabilities CAPABILITY_IAM

    Como alternativa, você pode usar o seguinte script shell. O script recupera os valores necessários para os grupos de segurança e sub-redes do seu ambiente usando o comando get-environment AWS CLI e, em seguida, cria a pilha adequadamente. Para executar o script.

    1. Copie e salve o script como ecs-stack-helper.sh no mesmo diretório do seu modelo AWS CloudFormation.

      #!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM
    2. Execute o script usando os comandos a seguir. Substitua environment-name e stack-name por suas informações.

      $ chmod +x ecs-stack-helper.sh $ ./ecs-stack-helper.bash environment-name stack-name

    Se tiver êxito, você verá o seguinte resultado exibindo sua nova ID de pilha AWS CloudFormation.

    {
        "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
    }

Depois que sua pilha AWS CloudFormation estiver concluída e AWS provisionar seus recursos do Amazon ECS, você estará pronto para criar e carregar seu DAG.

Exemplo de código

  1. Abra um prompt de comando e navegue até o diretório em que seu código DAG está armazenado. Por exemplo:

    cd dags
  2. Copie o conteúdo da amostra de código a seguir e salve localmente como mwaa-ecs-operator.py e, em seguida, faça o upload de seu novo DAG para o Amazon S3.

    from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )
    nota

    No exemplo do DAG, para awslogs_group, talvez você precise modificar o grupo de logs com o nome do seu grupo de logs de tarefas do Amazon ECS. O exemplo pressupõe um grupo de logs chamado mwaa-ecs-zero. Para awslogs_stream_prefix, use o prefixo do fluxo de logs de tarefas do Amazon ECS. O exemplo pressupõe um prefixo de fluxo de log, ecs.

  3. Execute o seguinte comando AWS CLI para copiar o DAG para o bucket do seu ambiente e, em seguida, acionar o DAG usando a IU do Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. Se tiver êxito, você verá uma saída semelhante à seguinte nos logs de tarefas para ecs_operator_task no DAG ecs_fargate_dag:

    [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
    Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
    [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
    {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
    [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed