Lambda 関数を使用して DAG を呼び出す - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Lambda 関数を使用して DAG を呼び出す

次のコード例では、「AWS Lambda」関数を使用して Apache Airflow CLI トークンを取得し、Amazon MWAA 環境で有向非巡回グラフ (DAG) を呼び出します。

Version

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

コードサンプルを使用するには、以下が必要です。

注記

Lambda 関数と Amazon MWAA 環境が同じ VPC にある場合は、このコードをプライベートネットワークで使用できます。この構成では、Lambda 関数の実行ロールに、Amazon Elastic Compute Cloud (Amazon EC2) CreateNetworkInterface API オペレーションを呼び出すアクセス許可が必要です。このアクセス許可は、 AWSLambdaVPCAccessExecutionRole AWS 管理ポリシーを使用して付与できます。

アクセス許可

このページのコード例を使用するには、Amazon MWAA 環境の実行ロールが airflow:CreateCliToken アクションを実行するためのアクセス権が必要です。このアクセス許可は、 AmazonMWAAAirflowCliAccess AWS 管理ポリシーを使用して指定できます。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ] }

詳細については、「Apache Airflow CLI ポリシー: AmazonMWAAAirflowCli アクセス」を参照してください。

依存関係

  • このコード例を Apache Airflow v2 で使用する場合、追加の依存関係は必要ありません。このコードでは、ご使用の環境にある「Apache Airflow v2 のベースインストール」を使用します。

コード例

  1. https://console.aws.amazon.com/lambda/ で AWS Lambda コンソールを開きます。

  2. 関数リストから Lambda 関数を選択します。

  3. 関数ページで次のコードをコピーし、以下をリソース名に置き換えます。

    • YOUR_ENVIRONMENT_NAME – Amazon MWAA 環境の名前。

    • YOUR_DAG_NAME — 呼び出したい DAG の名前。

    import boto3 import http.client import base64 import ast mwaa_env_name = 'YOUR_ENVIRONMENT_NAME' dag_name = 'YOUR_DAG_NAME' mwaa_cli_command = 'dags trigger' ​ client = boto3.client('mwaa') ​ def lambda_handler(event, context): # get web token mwaa_cli_token = client.create_cli_token( Name=mwaa_env_name ) conn = http.client.HTTPSConnection(mwaa_cli_token['WebServerHostname']) payload = mwaa_cli_command + " " + dag_name headers = { 'Authorization': 'Bearer ' + mwaa_cli_token['CliToken'], 'Content-Type': 'text/plain' } conn.request("POST", "/aws_mwaa/cli", payload, headers) res = conn.getresponse() data = res.read() dict_str = data.decode("UTF-8") mydata = ast.literal_eval(dict_str) return base64.b64decode(mydata['stdout'])
  4. [デプロイ] を選択します。

  5. [テスト] を選択し、Lambda コンソールを使用して関数を呼び出します。

  6. Lambda が DAG を正常に呼び出したことを確認するには、Amazon MWAA コンソールを使用して環境の Apache Airflow UI に移動し、次の操作を行います。

    1. [DAG] ページの DAG のリストから新しいターゲット DAG を見つけます。

    2. [最終実行] で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における invoke_dag の最新のタイムスタンプとほぼ一致する必要があります。

    3. [最近のタスク] で、前回の実行が成功したことを確認します。