Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.
Conexión a Amazon ECS mediante el ECSOperator
En el tema se describe cómo puede usar el ECSOperator
para conectarse a un contenedor de Amazon Elastic Container Service (Amazon ECS) desde Amazon MWAA. En los siguientes pasos, añadirá los permisos necesarios a la función de ejecución de su entorno, utilizará una plantilla de AWS CloudFormation para crear un clúster de Fargate de Amazon ECS y, por último, creará y cargará un DAG que se conecte a su nuevo clúster.
Versión
-
Puede usar el código de ejemplo de esta página con Apache Airflow v2 y versiones posteriores en Python 3.10
.
Requisitos previos
Para usar el código de muestra de esta página, necesitará lo siguiente:
Permisos
-
El rol de ejecución de su entorno necesita permiso para ejecutar tareas en Amazon ECS. Puede adjuntar la política Amazonecs_FullAccess
gestionada por AWS a su rol de ejecución o crear y adjuntar la política siguiente a su rol de ejecución. { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "ecs:RunTask", "ecs:DescribeTasks" ], "Resource": "*" }, { "Action": "iam:PassRole", "Effect": "Allow", "Resource": [ "*" ], "Condition": { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } } } ] }
-
Además de añadir los permisos necesarios para ejecutar tareas en Amazon ECS, también debe modificar la declaración de política de CloudWatch Logs en su rol de ejecución de Amazon MWAA para permitir el acceso al grupo de registro de tareas de Amazon ECS, como se muestra a continuación. El grupo de registro de Amazon ECS se crea mediante la plantilla de AWS CloudFormation en Creación de un clúster de Amazon ECS..
{ "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:
region
:account-id
:log-group:airflow-environment-name
-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group
:*" ] }
Para más información sobre el rol de ejecución de Amazon MWAA y sobre cómo adjuntar una política, consulte Rol de ejecución.
Creación de un clúster de Amazon ECS.
Con la siguiente plantilla de AWS CloudFormation, creará un clúster de Fargate de Amazon ECS para usarlo con su flujo de trabajo de Amazon MWAA. Para más información, consulte Creación de una definición de tarea en la Guía para desarrolladores de Amazon Elastic Container Service.
-
Cree un archivo JSON con el siguiente contenido y guárdelo como
ecs-mwaa-cfn.json
.{ "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } }
-
Para crear una pila nueva, utilice el comando de AWS CLI siguiente en el símbolo del sistema. Debe reemplazar los valores
SecurityGroups
ySubnetIds
por los valores de los grupos de seguridad y las subredes de su entorno de Amazon MWAA.$
aws cloudformation create-stack \ --stack-name
my-ecs-stack
--template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group
\ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1
\\,your-mwaa-subnet-1
\ --capabilities CAPABILITY_IAMTambién puede utilizar este intérprete de comandos: El script recupera los valores necesarios para los grupos de seguridad y las subredes de su entorno mediante el comando AWS CLI de
get-environment
y, a continuación, crea la pila en consecuencia. Para ejecutar el script, siga los pasos que se detallan a continuación.-
Copie y guarde el script como
ecs-stack-helper.sh
en el mismo directorio que su plantilla de AWS CloudFormation.#!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM
-
Ejecute el script mediante los comandos siguientes. Reemplace el
environment-name
y elstack-name
con su información.$
chmod +x ecs-stack-helper.sh
$
./ecs-stack-helper.bash
environment-name
stack-name
Si se realiza correctamente, aparecerá el siguiente resultado en el que se muestra el ID de su nueva pila de AWS CloudFormation.
{ "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9" }
-
Una vez que haya completado su pila de AWS CloudFormation y AWS haya aprovisionado sus recursos de Amazon ECS, estará listo para crear y cargar su DAG.
Código de ejemplo
-
Abra un símbolo del sistema y vaya hasta el directorio en el que está almacenado el código DAG. Por ejemplo:
cd dags
-
Copie el contenido del siguiente código de ejemplo y guárdelo localmente como
mwaa-ecs-operator.py
; a continuación, cargue su nuevo DAG a Amazon S3.from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )
nota
En el ejemplo del DAG, para
awslogs_group
, puede que tenga que modificar el grupo de registro con el nombre de su grupo de registro de tareas de Amazon ECS. El ejemplo asume un grupo de registro denominado “mwaa-ecs-zero
”. Paraawslogs_stream_prefix
, utilice el prefijo del flujo de registro de tareas de Amazon ECS. El ejemplo asume un prefijo de flujo de registro,ecs
. -
Ejecute el siguiente comando de AWS CLI para copiar el DAG en el bucket de su entorno y, a continuación, active el DAG mediante la interfaz de usuario de Apache Airflow.
$
aws s3 cp
your-dag
.py s3://your-environment-bucket
/dags/ -
Si se ejecuta correctamente, verá un resultado similar al siguiente en los registros de tareas del
ecs_operator_task
en elecs_fargate_dag
DAG:[2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task - Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides: {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]} . . . [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935 [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52 [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 7 Jun 13 18:51 bin -> usr/bin [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 2 root root 4096 Apr 9 2019 boot [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 5 root root 340 Jul 19 17:54 dev [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 1 root root 4096 Jul 19 17:54 etc [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 home [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 7 Jun 13 18:51 lib -> usr/lib [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 9 Jun 13 18:51 lib64 -> usr/lib64 [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Jun 13 18:51 local [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 media [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 mnt [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 opt [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root 0 Jul 19 17:54 proc [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\- 2 root root 4096 Apr 9 2019 root [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Jun 13 18:52 run [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx 1 root root 8 Jun 13 18:51 sbin -> usr/sbin [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 2 root root 4096 Apr 9 2019 srv [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 13 root root 0 Jul 19 17:54 sys [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt 2 root root 4096 Jun 13 18:51 tmp [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 13 root root 4096 Jun 13 18:51 usr [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x 18 root root 4096 Jun 13 18:52 var . . . [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed