Tutoriel de démarrage rapide pour Amazon Managed Workflows pour Apache Airflow - Amazon Managed Workflows for Apache Airflow

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Tutoriel de démarrage rapide pour Amazon Managed Workflows pour Apache Airflow

Ce didacticiel de démarrage rapide utilise un AWS CloudFormation modèle qui crée simultanément l'infrastructure Amazon VPC, un compartiment Amazon S3 avec un dags dossier et un environnement Amazon Managed Workflows pour Apache Airflow.

Dans ce tutoriel

Ce didacticiel vous présente trois AWS Command Line Interface (AWS CLI) commandes pour télécharger un DAG sur Amazon S3, exécuter le DAG dans Apache Airflow et afficher les connexions. CloudWatch Il se termine en vous expliquant les étapes de création d'une politique IAM pour une équipe de développement d'Apache Airflow.

Note

Le AWS CloudFormation modèle de cette page crée un environnement Amazon Managed Workflows pour Apache Airflow pour la dernière version d'Apache Airflow disponible dans. AWS CloudFormation La dernière version disponible est Apache Airflow v2.8.1.

Le AWS CloudFormation modèle de cette page crée les éléments suivants :

  • Infrastructure VPC. Le modèle utiliseRoutage public sur Internet. Il utilise le Mode d'accès au réseau public pour le serveur Web Apache Airflow dansWebserverAccessMode: PUBLIC_ONLY.

  • Compartiment Amazon S3. Le modèle crée un compartiment Amazon S3 avec un dags dossier. Il est configuré pour bloquer tout accès public, avec le contrôle de version des compartiments activé, comme défini dansCréez un compartiment Amazon S3 pour Amazon S3 pour Amazon Amazon S3 pour Amazon S3.

  • Environnement Amazon MWAA. Le modèle crée un environnement Amazon MWAA associé au dags dossier du compartiment Amazon S3, un rôle d'exécution autorisant les AWS services utilisés par Amazon MWAA et l'environnement par défaut pour le chiffrement à l'aide d'une cléAWS détenue, comme défini dans. Création d'un environnement Amazon MWAA

  • CloudWatch Journaux. Le modèle permet à Apache Airflow de se connecter CloudWatch au niveau « INFO » et au-delà pour le groupe de journaux du planificateur Airflow, le groupe de journaux du serveur Web Airflow, le groupe de journaux de travail Airflow, le groupe de journaux de traitement Airflow DAG et le groupe de journaux de tâches Airflow, tels que définis dans. Afficher les journaux Airflow sur Amazon CloudWatch

Dans ce didacticiel, vous allez effectuer les tâches suivantes :

  • Téléchargez et exécutez un DAG. Téléchargez le didacticiel DAG d'Apache Airflow pour la dernière version d'Apache Airflow compatible avec Amazon MWAA sur Amazon S3, puis exécutez-le dans l'interface utilisateur d'Apache Airflow, comme défini dans. Ajout ou mise à jour des DAG

  • Afficher les journaux. Affichez le groupe de journaux du serveur Web Airflow dans CloudWatch Logs, tel que défini dansAfficher les journaux Airflow sur Amazon CloudWatch.

  • Créez une politique de contrôle d'accès. Créez une politique de contrôle d'accès dans IAM pour votre équipe de développement Apache Airflow, comme défini dans. Accès à un environnement Amazon MWAA

Prérequis

The AWS Command Line Interface (AWS CLI) est un outil open source qui vous permet d'interagir avec les AWS services à l'aide de commandes dans votre shell de ligne de commande. Pour effectuer les étapes indiquées sur cette page, vous avez besoin des éléments suivants :

Étape 1 : enregistrer le AWS CloudFormation modèle localement

  • Copiez le contenu du modèle suivant et enregistrez-le localement sousmwaa-public-network.yml. Vous pouvez également télécharger le modèle.

    AWSTemplateFormatVersion: "2010-09-09" Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: "mwaa-security-group" GroupDescription: "Security group with a self-referencing inbound rule." VpcId: !Ref VPC SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: VpcId: !Ref VPC GroupDescription: !Sub "Security Group for Amazon MWAA Environment ${AWS::StackName}-MwaaEnvironment" GroupName: !Sub "airflow-security-group-${AWS::StackName}-MwaaEnvironment" SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup SecurityGroupEgress: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" CidrIp: "0.0.0.0/0" MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: "*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"

Deuxième étape : créer la pile à l'aide du AWS CLI

  1. Dans votre invite de commande, accédez au répertoire dans lequel mwaa-public-network.yml est stocké. Par exemple :

    cd mwaaproject
  2. Utilisez la aws cloudformation create-stackcommande pour créer la pile à l'aide du AWS CLI.

    aws cloudformation create-stack --stack-name mwaa-environment-public-network --template-body file://mwaa-public-network.yml --capabilities CAPABILITY_IAM
    Note

    La création de l'infrastructure Amazon VPC, du compartiment Amazon S3 et de l'environnement Amazon MWAA prend plus de 30 minutes.

Troisième étape : télécharger un DAG sur Amazon S3 et l'exécuter dans l'interface utilisateur d'Apache Airflow

  1. Copiez le contenu du tutorial.py fichier correspondant à la dernière version prise en charge d'Apache Airflow et enregistrez-le localement soustutorial.py.

  2. Dans votre invite de commande, accédez au répertoire dans lequel tutorial.py est stocké. Par exemple :

    cd mwaaproject
  3. Utilisez la commande suivante pour répertorier tous vos compartiments Amazon S3.

    aws s3 ls
  4. Utilisez la commande suivante pour répertorier les fichiers et les dossiers du compartiment Amazon S3 de votre environnement.

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  5. Utilisez le script suivant pour télécharger le tutorial.py fichier dags dans votre dossier. Remplacez la valeur d'échantillon par YOUR_S3_BUCKET_NAME.

    aws s3 cp tutorial.py s3://YOUR_S3_BUCKET_NAME/dags/
  6. Ouvrez la page Environnements sur la console Amazon MWAA.

  7. Choisissez un environnement.

  8. Choisissez Open Airflow UI.

  9. Dans l'interface utilisateur d'Apache Airflow, dans la liste des DAG disponibles, choisissez le DAG du didacticiel.

  10. Sur la page des détails du DAG, cliquez sur le bouton Pause/Annuler le DAG à côté du nom de votre DAG pour le remettre en pause.

  11. Choisissez Trigger DAG.

Quatrième étape : Afficher les journaux dans CloudWatch Logs

Vous pouvez consulter les journaux Apache Airflow dans la CloudWatch console pour tous les journaux Apache Airflow activés par la AWS CloudFormation pile. La section suivante explique comment afficher les journaux du groupe de journaux du serveur Web Airflow.

  1. Ouvrez la page Environnements sur la console Amazon MWAA.

  2. Choisissez un environnement.

  3. Choisissez le groupe de journaux du serveur Web Airflow dans le volet de surveillance.

  4. Choisissez le webserver_console_ip log in Log streams.

Quelle est la prochaine étape ?