Connessione ad Amazon ECS tramiteECSOperator - Amazon Managed Workflows for Apache Airflow

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Connessione ad Amazon ECS tramiteECSOperator

L'argomento descrive come è possibile utilizzareECSOperatorper connettersi a un container Amazon Elastic Container Service (Amazon ECS) da Amazon MWAA. Nei passaggi seguenti, aggiungerai le autorizzazioni richieste al ruolo di esecuzione del tuo ambiente, usa unAWS CloudFormationmodello per creare un cluster Amazon ECS Fargate e infine creare e caricare un DAG che si connette al nuovo cluster.

Versione

  • Puoi usare l'esempio di codice in questa pagina conApache Airflow v2 e versioni successivenelPython 3.10.

Prerequisiti

Per utilizzare il codice di esempio in questa pagina, avrai bisogno di quanto segue:

Autorizzazioni

  • Il ruolo di esecuzione per il tuo ambiente richiede l'autorizzazione per eseguire attività in Amazon ECS. Puoi allegare ilAmazonECS_FullAccess AWS-policy gestita al tuo ruolo di esecuzione oppure crea e allega la seguente politica al tuo ruolo di esecuzione.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "VisualEditor0", "Effect": "Allow", "Action": [ "ecs:RunTask", "ecs:DescribeTasks" ], "Resource": "*" }, { "Action": "iam:PassRole", "Effect": "Allow", "Resource": [ "*" ], "Condition": { "StringLike": { "iam:PassedToService": "ecs-tasks.amazonaws.com" } } } ] }
  • Oltre ad aggiungere le autorizzazioni necessarie per eseguire attività in Amazon ECS, devi anche modificare ilCloudWatchRegistra la dichiarazione politica nel tuo ruolo di esecuzione di Amazon MWAA per consentire l'accesso al gruppo di log delle attività di Amazon ECS, come illustrato di seguito. Il gruppo di log Amazon ECS è stato creato daAWS CloudFormationmodello inCrea un cluster Amazon ECS.

    { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:CreateLogGroup", "logs:PutLogEvents", "logs:GetLogEvents", "logs:GetLogRecord", "logs:GetLogGroupFields", "logs:GetQueryResults" ], "Resource": [ "arn:aws:logs:region:account-id:log-group:airflow-environment-name-*", "arn:aws:logs:*:*:log-group:ecs-mwaa-group:*" ] }

Per ulteriori informazioni sul ruolo di esecuzione di Amazon MWAA e su come allegare una policy, consultaRuolo di esecuzione.

Crea un cluster Amazon ECS

Utilizzando quanto segueAWS CloudFormationmodello, creerai un cluster Amazon ECS Fargate da utilizzare con il tuo flusso di lavoro Amazon MWAA. Per ulteriori informazioni, vedereCreazione di una definizione di attivitànelGuida per gli sviluppatori di Amazon Elastic Container Service.

  1. Crea un file JSON con il seguente codice e salvalo comeecs-mwaa-cfn.json.

    { "AWSTemplateFormatVersion": "2010-09-09", "Description": "This template deploys an ECS Fargate cluster with an Amazon Linux image as a test for MWAA.", "Parameters": { "VpcId": { "Type": "AWS::EC2::VPC::Id", "Description": "Select a VPC that allows instances access to ECR, as used with MWAA." }, "SubnetIds": { "Type": "List<AWS::EC2::Subnet::Id>", "Description": "Select at two private subnets in your selected VPC, as used with MWAA." }, "SecurityGroups": { "Type": "List<AWS::EC2::SecurityGroup::Id>", "Description": "Select at least one security group in your selected VPC, as used with MWAA." } }, "Resources": { "Cluster": { "Type": "AWS::ECS::Cluster", "Properties": { "ClusterName": { "Fn::Sub": "${AWS::StackName}-cluster" } } }, "LogGroup": { "Type": "AWS::Logs::LogGroup", "Properties": { "LogGroupName": { "Ref": "AWS::StackName" }, "RetentionInDays": 30 } }, "ExecutionRole": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Effect": "Allow", "Principal": { "Service": "ecs-tasks.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/service-role/AmazonECSTaskExecutionRolePolicy" ] } }, "TaskDefinition": { "Type": "AWS::ECS::TaskDefinition", "Properties": { "Family": { "Fn::Sub": "${AWS::StackName}-task" }, "Cpu": 2048, "Memory": 4096, "NetworkMode": "awsvpc", "ExecutionRoleArn": { "Ref": "ExecutionRole" }, "ContainerDefinitions": [ { "Name": { "Fn::Sub": "${AWS::StackName}-container" }, "Image": "137112412989.dkr.ecr.us-east-1.amazonaws.com/amazonlinux:latest", "PortMappings": [ { "Protocol": "tcp", "ContainerPort": 8080, "HostPort": 8080 } ], "LogConfiguration": { "LogDriver": "awslogs", "Options": { "awslogs-region": { "Ref": "AWS::Region" }, "awslogs-group": { "Ref": "LogGroup" }, "awslogs-stream-prefix": "ecs" } } } ], "RequiresCompatibilities": [ "FARGATE" ] } }, "Service": { "Type": "AWS::ECS::Service", "Properties": { "ServiceName": { "Fn::Sub": "${AWS::StackName}-service" }, "Cluster": { "Ref": "Cluster" }, "TaskDefinition": { "Ref": "TaskDefinition" }, "DesiredCount": 1, "LaunchType": "FARGATE", "PlatformVersion": "1.3.0", "NetworkConfiguration": { "AwsvpcConfiguration": { "AssignPublicIp": "ENABLED", "Subnets": { "Ref": "SubnetIds" }, "SecurityGroups": { "Ref": "SecurityGroups" } } } } } } }
  2. Nel prompt dei comandi, usa quanto segueAWS CLIcomando per creare un nuovo stack. È necessario sostituire i valoriSecurityGroupseSubnetIdscon valori per i gruppi di sicurezza e le sottoreti del tuo ambiente Amazon MWAA.

    $ aws cloudformation create-stack \ --stack-name my-ecs-stack --template-body file://ecs-mwaa-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=your-mwaa-security-group \ ParameterKey=SubnetIds,ParameterValue=your-mwaa-subnet-1\\,your-mwaa-subnet-1 \ --capabilities CAPABILITY_IAM

    In alternativa, puoi usare il seguente script di shell. Lo script recupera i valori richiesti per i gruppi di sicurezza e le sottoreti dell'ambiente utilizzando ilget-environment AWS CLIcomando, quindi crea lo stack di conseguenza. Per eseguire lo script, effettuate le seguenti operazioni.

    1. Copiare e salvare lo script con nomeecs-stack-helper.shnella stessa cartella della tuaAWS CloudFormationmodello.

      #!/bin/bash joinByString() { local separator="$1" shift local first="$1" shift printf "%s" "$first" "${@/#/$separator}" } response=$(aws mwaa get-environment --name $1) securityGroupId=$(echo "$response" | jq -r '.Environment.NetworkConfiguration.SecurityGroupIds[]') subnetIds=$(joinByString '\,' $(echo "$response" | jq -r '.Environment.NetworkConfiguration.SubnetIds[]')) aws cloudformation create-stack --stack-name $2 --template-body file://ecs-cfn.json \ --parameters ParameterKey=SecurityGroups,ParameterValue=$securityGroupId \ ParameterKey=SubnetIds,ParameterValue=$subnetIds \ --capabilities CAPABILITY_IAM
    2. Esegui lo script utilizzando i seguenti comandi. Sostituiscienvironment-nameestack-namecon i tuoi dati.

      $ chmod +x ecs-stack-helper.sh $ ./ecs-stack-helper.bash environment-name stack-name

    In caso di successo, vedrai il seguente output che mostra il tuo nuovoAWS CloudFormationID dello stack.

    {
        "StackId": "arn:aws:cloudformation:us-west-2:123456789012:stack/my-ecs-stack/123456e7-8ab9-01cd-b2fb-36cce63786c9"
    }

Dopo il tuoAWS CloudFormationlo stack è completato eAWSha fornito le tue risorse Amazon ECS, sei pronto per creare e caricare il tuo DAG.

Esempio di codice

  1. Apri un prompt dei comandi e vai alla directory in cui è archiviato il codice DAG. Ad esempio:

    cd dags
  2. Copia il contenuto del seguente esempio di codice e salvalo localmente comemwaa-ecs-operator.py, quindi carica il tuo nuovo DAG su Amazon S3.

    from http import client from airflow import DAG from airflow.providers.amazon.aws.operators.ecs import ECSOperator from airflow.utils.dates import days_ago import boto3 CLUSTER_NAME="mwaa-ecs-test-cluster" #Replace value for CLUSTER_NAME with your information. CONTAINER_NAME="mwaa-ecs-test-container" #Replace value for CONTAINER_NAME with your information. LAUNCH_TYPE="FARGATE" with DAG( dag_id = "ecs_fargate_dag", schedule_interval=None, catchup=False, start_date=days_ago(1) ) as dag: client=boto3.client('ecs') services=client.list_services(cluster=CLUSTER_NAME,launchType=LAUNCH_TYPE) service=client.describe_services(cluster=CLUSTER_NAME,services=services['serviceArns']) ecs_operator_task = ECSOperator( task_id = "ecs_operator_task", dag=dag, cluster=CLUSTER_NAME, task_definition=service['services'][0]['taskDefinition'], launch_type=LAUNCH_TYPE, overrides={ "containerOverrides":[ { "name":CONTAINER_NAME, "command":["ls", "-l", "/"], }, ], }, network_configuration=service['services'][0]['networkConfiguration'], awslogs_group="mwaa-ecs-zero", awslogs_stream_prefix=f"ecs/{CONTAINER_NAME}", )
    Nota

    Nell'esempio DAG, perawslogs_group, potrebbe essere necessario modificare il gruppo di log con il nome del gruppo di log delle attività Amazon ECS. L'esempio presuppone un gruppo di log denominatomwaa-ecs-zero. Perawslogs_stream_prefix, utilizza il prefisso del flusso di log dei task di Amazon ECS. L'esempio presuppone un prefisso del flusso di log,ecs.

  3. Esegui quanto segueAWS CLIcomando per copiare il DAG nel bucket dell'ambiente, quindi attivare il DAG utilizzando l'interfaccia utente di Apache Airflow.

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. In caso di successo, vedrai un output simile al seguente nei registri delle attività perecs_operator_tasknelecs_fargate_dagGIORNO:

    [2022-01-01, 12:00:00 UTC] {{ecs.py:300}} INFO - Running ECS Task -
    Task definition: arn:aws:ecs:us-west-2:123456789012:task-definition/mwaa-ecs-test-task:1 - on cluster mwaa-ecs-test-cluster
    [2022-01-01, 12:00:00 UTC] {{ecs-operator-test.py:302}} INFO - ECSOperator overrides:
    {'containerOverrides': [{'name': 'mwaa-ecs-test-container', 'command': ['ls', '-l', '/']}]}
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:379}} INFO - ECS task ID is: e012340b5e1b43c6a757cf012c635935
    [2022-01-01, 12:00:00 UTC] {{ecs.py:313}} INFO - Starting ECS Task Log Fetcher
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] total 52
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 bin -> usr/bin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x   2 root root 4096 Apr  9  2019 boot
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   5 root root  340 Jul 19 17:54 dev
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   1 root root 4096 Jul 19 17:54 etc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 home
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    7 Jun 13 18:51 lib -> usr/lib
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    9 Jun 13 18:51 lib64 -> usr/lib64
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:51 local
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 media
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 mnt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 opt
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x 103 root root    0 Jul 19 17:54 proc
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-x-\-\-   2 root root 4096 Apr  9  2019 root
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Jun 13 18:52 run
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] lrwxrwxrwx   1 root root    8 Jun 13 18:51 sbin -> usr/sbin
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x   2 root root 4096 Apr  9  2019 srv
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] dr-xr-xr-x  13 root root    0 Jul 19 17:54 sys
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxrwxrwt   2 root root 4096 Jun 13 18:51 tmp
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  13 root root 4096 Jun 13 18:51 usr
    [2022-01-01, 12:00:00 UTC] {{ecs.py:119}} INFO - [2022-07-19, 17:54:03 UTC] drwxr-xr-x  18 root root 4096 Jun 13 18:52 var
    .
    .
    .
    [2022-01-01, 12:00:00 UTC] {{ecs.py:328}} INFO - ECS Task has been successfully executed