Amazon Managed Workflows for Apache Airflow のクイックスタートチュートリアル - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Managed Workflows for Apache Airflow のクイックスタートチュートリアル

このクイックスタートチュートリアルでは、 テンプレートを使用して、Amazon VPC インフラストラクチャ、 dagsフォルダを含む Amazon S3 バケット、および Amazon Managed Workflows for Apache Airflow 環境を同時に AWS CloudFormation 作成します。

このチュートリアルでは、次の作業を行いました。

このチュートリアルでは、DAG を Amazon S3 にアップロードし、Apache Airflow で DAG を実行し、 でログを表示する 3 つの AWS Command Line Interface (AWS CLI) コマンドについて説明します CloudWatch。最後に、Apache Airflow 開発チーム用の IAM ポリシーを作成する手順を詳しく説明します。

注記

このページの AWS CloudFormation テンプレートは、 で利用可能な最新バージョンの Apache Airflow 用の Amazon Managed Workflows for Apache Airflow 環境を作成します AWS CloudFormation。利用可能な最新バージョンは Apache Airflow v2.8.1 です。

このページの AWS CloudFormation テンプレートでは、以下が作成されます。

  • VPC インフラストラクチャ。このテンプレートは インターネット経由のパブリックルーティング を使用しています。それは WebserverAccessMode: PUBLIC_ONLY 内の Apache Airflow ウェブサーバー に パブリックネットワークアクセスモード を使用します。

  • Amazon S3 バケット。このテンプレートは、dags フォルダー付きの Amazon S3 バケットを作成します。Amazon MWAA 用の Amazon S3 バケットの作成 で定義されているように、[バケットバージョニング] を有効にして、[すべてのパブリックアクセスをブロック] するように設定されています。

  • Amazon MWAA 環境。このテンプレートは、Amazon S3 バケットの dagsフォルダに関連付けられている Amazon MWAA 環境、Amazon MWAA が使用する AWS のサービスに対するアクセス許可を持つ実行ロール、および で定義されているように、 AWS 所有キー を使用した暗号化のデフォルトを作成しますAmazon MWAA 環境を作成する

  • CloudWatch ログに記録します。このテンプレートは、 で定義されているように、Apache Airflow ログを CloudWatch 「INFO」レベルで有効にし、Airflow スケジューラロググループ Airflow ウェブサーバーロググループ Airflow ワーカーロググループ Airflow DAG 処理ロググループ および Airflow タスクロググループ に対して有効にしますAmazon での Airflow ログの表示 CloudWatch

このチュートリアルでは、以下のタスクを完了します。

  • DAG をアップロードして実行します。最新の Amazon MWAA がサポートする Apache Airflow バージョン用の Apache Airflow のチュートリアル DAG を Amazon S3 にアップロードし、DAG の追加と更新 で定義されているように Apache Airflow UI で実行します。

  • ログの表示。で定義されているように、 CloudWatch ログで Airflow ウェブサーバーロググループを表示しますAmazon での Airflow ログの表示 CloudWatch

  • アクセスコントロールポリシーを作成します。Amazon MWAA 環境へのアクセス で定義されているように、Apache Airflow 開発チーム用にアクセスコントロールポリシーを IAM で作成します。

前提条件

AWS Command Line Interface (AWS CLI) は、コマンドラインシェルでコマンドを使用して AWS サービスとやり取りするためのオープンソースツールです。このページのステップを完了するには、以下のものが必要です。

ステップ 1: AWS CloudFormation テンプレートをローカルに保存する

  • 次のテンプレートの内容をコピーし、mwaa_public_network.yml としてローカルに保存します。「テンプレートをダウンロードする」 こともできます。

    AWSTemplateFormatVersion: "2010-09-09" Parameters: EnvironmentName: Description: An environment name that is prefixed to resource names Type: String Default: MWAAEnvironment VpcCIDR: Description: The IP range (CIDR notation) for this VPC Type: String Default: 10.192.0.0/16 PublicSubnet1CIDR: Description: The IP range (CIDR notation) for the public subnet in the first Availability Zone Type: String Default: 10.192.10.0/24 PublicSubnet2CIDR: Description: The IP range (CIDR notation) for the public subnet in the second Availability Zone Type: String Default: 10.192.11.0/24 PrivateSubnet1CIDR: Description: The IP range (CIDR notation) for the private subnet in the first Availability Zone Type: String Default: 10.192.20.0/24 PrivateSubnet2CIDR: Description: The IP range (CIDR notation) for the private subnet in the second Availability Zone Type: String Default: 10.192.21.0/24 MaxWorkerNodes: Description: The maximum number of workers that can run in the environment Type: Number Default: 2 DagProcessingLogs: Description: Log level for DagProcessing Type: String Default: INFO SchedulerLogsLevel: Description: Log level for SchedulerLogs Type: String Default: INFO TaskLogsLevel: Description: Log level for TaskLogs Type: String Default: INFO WorkerLogsLevel: Description: Log level for WorkerLogs Type: String Default: INFO WebserverLogsLevel: Description: Log level for WebserverLogs Type: String Default: INFO Resources: ##################################################################################################################### # CREATE VPC ##################################################################################################################### VPC: Type: AWS::EC2::VPC Properties: CidrBlock: !Ref VpcCIDR EnableDnsSupport: true EnableDnsHostnames: true Tags: - Key: Name Value: MWAAEnvironment InternetGateway: Type: AWS::EC2::InternetGateway Properties: Tags: - Key: Name Value: MWAAEnvironment InternetGatewayAttachment: Type: AWS::EC2::VPCGatewayAttachment Properties: InternetGatewayId: !Ref InternetGateway VpcId: !Ref VPC PublicSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PublicSubnet1CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ1) PublicSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PublicSubnet2CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Subnet (AZ2) PrivateSubnet1: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 0, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet1CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ1) PrivateSubnet2: Type: AWS::EC2::Subnet Properties: VpcId: !Ref VPC AvailabilityZone: !Select [ 1, !GetAZs '' ] CidrBlock: !Ref PrivateSubnet2CIDR MapPublicIpOnLaunch: false Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Subnet (AZ2) NatGateway1EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway2EIP: Type: AWS::EC2::EIP DependsOn: InternetGatewayAttachment Properties: Domain: vpc NatGateway1: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway1EIP.AllocationId SubnetId: !Ref PublicSubnet1 NatGateway2: Type: AWS::EC2::NatGateway Properties: AllocationId: !GetAtt NatGateway2EIP.AllocationId SubnetId: !Ref PublicSubnet2 PublicRouteTable: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Public Routes DefaultPublicRoute: Type: AWS::EC2::Route DependsOn: InternetGatewayAttachment Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet1 PublicSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PublicRouteTable SubnetId: !Ref PublicSubnet2 PrivateRouteTable1: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ1) DefaultPrivateRoute1: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable1 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway1 PrivateSubnet1RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable1 SubnetId: !Ref PrivateSubnet1 PrivateRouteTable2: Type: AWS::EC2::RouteTable Properties: VpcId: !Ref VPC Tags: - Key: Name Value: !Sub ${EnvironmentName} Private Routes (AZ2) DefaultPrivateRoute2: Type: AWS::EC2::Route Properties: RouteTableId: !Ref PrivateRouteTable2 DestinationCidrBlock: 0.0.0.0/0 NatGatewayId: !Ref NatGateway2 PrivateSubnet2RouteTableAssociation: Type: AWS::EC2::SubnetRouteTableAssociation Properties: RouteTableId: !Ref PrivateRouteTable2 SubnetId: !Ref PrivateSubnet2 SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: GroupName: "mwaa-security-group" GroupDescription: "Security group with a self-referencing inbound rule." VpcId: !Ref VPC SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup EnvironmentBucket: Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true ##################################################################################################################### # CREATE MWAA ##################################################################################################################### MwaaEnvironment: Type: AWS::MWAA::Environment DependsOn: MwaaExecutionPolicy Properties: Name: !Sub "${AWS::StackName}-MwaaEnvironment" SourceBucketArn: !GetAtt EnvironmentBucket.Arn ExecutionRoleArn: !GetAtt MwaaExecutionRole.Arn DagS3Path: dags NetworkConfiguration: SecurityGroupIds: - !GetAtt SecurityGroup.GroupId SubnetIds: - !Ref PrivateSubnet1 - !Ref PrivateSubnet2 WebserverAccessMode: PUBLIC_ONLY MaxWorkers: !Ref MaxWorkerNodes LoggingConfiguration: DagProcessingLogs: LogLevel: !Ref DagProcessingLogs Enabled: true SchedulerLogs: LogLevel: !Ref SchedulerLogsLevel Enabled: true TaskLogs: LogLevel: !Ref TaskLogsLevel Enabled: true WorkerLogs: LogLevel: !Ref WorkerLogsLevel Enabled: true WebserverLogs: LogLevel: !Ref WebserverLogsLevel Enabled: true SecurityGroup: Type: AWS::EC2::SecurityGroup Properties: VpcId: !Ref VPC GroupDescription: !Sub "Security Group for Amazon MWAA Environment ${AWS::StackName}-MwaaEnvironment" GroupName: !Sub "airflow-security-group-${AWS::StackName}-MwaaEnvironment" SecurityGroupIngress: Type: AWS::EC2::SecurityGroupIngress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" SourceSecurityGroupId: !Ref SecurityGroup SecurityGroupEgress: Type: AWS::EC2::SecurityGroupEgress Properties: GroupId: !Ref SecurityGroup IpProtocol: "-1" CidrIp: "0.0.0.0/0" MwaaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - airflow-env.amazonaws.com - airflow.amazonaws.com Action: - "sts:AssumeRole" Path: "/service-role/" MwaaExecutionPolicy: DependsOn: EnvironmentBucket Type: AWS::IAM::ManagedPolicy Properties: Roles: - !Ref MwaaExecutionRole PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: airflow:PublishMetrics Resource: - !Sub "arn:aws:airflow:${AWS::Region}:${AWS::AccountId}:environment/${EnvironmentName}" - Effect: Deny Action: s3:ListAllMyBuckets Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - "s3:GetObject*" - "s3:GetBucket*" - "s3:List*" Resource: - !Sub "${EnvironmentBucket.Arn}" - !Sub "${EnvironmentBucket.Arn}/*" - Effect: Allow Action: - logs:DescribeLogGroups Resource: "*" - Effect: Allow Action: - logs:CreateLogStream - logs:CreateLogGroup - logs:PutLogEvents - logs:GetLogEvents - logs:GetLogRecord - logs:GetLogGroupFields - logs:GetQueryResults - logs:DescribeLogGroups Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:airflow-${AWS::StackName}*" - Effect: Allow Action: cloudwatch:PutMetricData Resource: "*" - Effect: Allow Action: - sqs:ChangeMessageVisibility - sqs:DeleteMessage - sqs:GetQueueAttributes - sqs:GetQueueUrl - sqs:ReceiveMessage - sqs:SendMessage Resource: - !Sub "arn:aws:sqs:${AWS::Region}:*:airflow-celery-*" - Effect: Allow Action: - kms:Decrypt - kms:DescribeKey - "kms:GenerateDataKey*" - kms:Encrypt NotResource: !Sub "arn:aws:kms:*:${AWS::AccountId}:key/*" Condition: StringLike: "kms:ViaService": - !Sub "sqs.${AWS::Region}.amazonaws.com" Outputs: VPC: Description: A reference to the created VPC Value: !Ref VPC PublicSubnets: Description: A list of the public subnets Value: !Join [ ",", [ !Ref PublicSubnet1, !Ref PublicSubnet2 ]] PrivateSubnets: Description: A list of the private subnets Value: !Join [ ",", [ !Ref PrivateSubnet1, !Ref PrivateSubnet2 ]] PublicSubnet1: Description: A reference to the public subnet in the 1st Availability Zone Value: !Ref PublicSubnet1 PublicSubnet2: Description: A reference to the public subnet in the 2nd Availability Zone Value: !Ref PublicSubnet2 PrivateSubnet1: Description: A reference to the private subnet in the 1st Availability Zone Value: !Ref PrivateSubnet1 PrivateSubnet2: Description: A reference to the private subnet in the 2nd Availability Zone Value: !Ref PrivateSubnet2 SecurityGroupIngress: Description: Security group with self-referencing inbound rule Value: !Ref SecurityGroupIngress MwaaApacheAirflowUI: Description: MWAA Environment Value: !Sub "https://${MwaaEnvironment.WebserverUrl}"

ステップ 2: を使用してスタックを作成する AWS CLI

  1. コマンドプロンプトで、mwaa_public_network.yml が保存されているディレクトリに移動します。例:

    cd mwaaproject
  2. AWS CLI を使用してスタックを作成するには、「aws cloudformation create-stack」コマンドを使用します。

    aws cloudformation create-stack --stack-name mwaa-environment-public-network --template-body file://mwaa_public_network.yml --capabilities CAPABILITY_IAM
    注記

    Amazon VPC インフラストラクチャ、Amazon S3 バケット、Amazon MWAA 環境の作成には 30 分以上かかります。

ステップ 3: DAG を Amazon S3 にアップロードし、Apache Airflow UI で実行する

  1. サポートされている最新の Apache Airflow バージョン」の tutorial.py ファイルの内容をコピーし、tutorial.py という名前でローカルに保存します。

  2. コマンドプロンプトで、tutorial.py が保存されているディレクトリに移動します。例:

    cd mwaaproject
  3. 以下のコマンドを使って、Amazon S3 バケットをすべてリストアップします

    aws s3 ls
  4. 以下のコマンドを使用して、ご使用の環境の Amazon S3 バケット内のファイルとフォルダを一覧表示します。

    aws s3 ls s3://YOUR_S3_BUCKET_NAME
  5. 以下のスクリプトを使用して、tutorial.py ファイルを dags フォルダにアップロードしてください。YOUR_S3_BUCKET_NAME のサンプル値に置き換えてください。

    aws s3 cp tutorial.py s3://YOUR_S3_BUCKET_NAME/dags/
  6. Amazon MWAA コンソールで「環境ページ」を開きます。

  7. 環境を選択します。

  8. [Airflow UI を開く] を選択します。

  9. Apache Airflow UI で、使用可能な DAG のリストから [チュートリアル] 用の DAG を選択します。

  10. DAG の詳細ページで、DAG 名の横にある [DAG の一時停止/一時停止解除] トグルを選択して DAG の一時停止を解除します。

  11. [DAG をトリガー] を選択します。

ステップ 4: Logs で CloudWatch ログを表示する

AWS CloudFormation スタックで有効化されたすべての Apache Airflow ログについて、 CloudWatch コンソールで Apache Airflow ログを表示できます。次のセクションでは、「Airflow ウェブサーバーロググループ」のログを表示する方法を示します。

  1. Amazon MWAA コンソールで「環境ページ」を開きます。

  2. 環境を選択します。

  3. [モニタリング] ペインで [Airflow ウェブサーバーのロググループ] を選択します。

  4. [ログストリーム]webserver_console_ip ログを選択します。

次のステップ