さまざまな Amazon MWAA 環境での DAG の呼び出し - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

さまざまな Amazon MWAA 環境での DAG の呼び出し

次のコード例では、Apache Airflow CLI トークンを作成します。次に、このコードは、ある Amazon MWAA 環境の有向非巡回グラフ (DAG) を使用して、別の Amazon MWAA 環境の DAG を呼び出します。

バージョン

  • このページのコード例は、「Python 3.10」で Apache Airflow v2 以上と共に使用可能です。

前提条件

このページのコード例を使用するには、以下のものが必要である:

  • [パブリックネットワーク] のウェブサーバーにアクセスできる 2 つの「Amazon MWAA 環境」(現在の環境を含む)。

  • ターゲット環境の Amazon Simple Storage Service (Amazon S3) バケットに、サンプルの DAG。

許可

このページのコード例を使用するには、環境の実行ロールに Apache Airflow CLI トークンを作成する権限が必要です。この権限を付与するためには、AWS のマネージドポリシー AmazonMWAAAirflowCliAccess をアタッチすることができます。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "airflow:CreateCliToken" ], "Resource": "*" } ]

詳細については、「Apache Airflow CLI ポリシー: AmazonMWAAAirflowCli アクセス」を参照してください。

依存関係

  • このコード例を Apache Airflow v2 で使用する場合、追加の依存関係は必要ありません。このコードでは、ご使用の環境にある「Apache Airflow v2 のベースインストール」を使用します。

コード例

次のコード例は、現在の環境で DAG を使用して別の環境で DAG を呼び出していると想定しています。

  1. ターミナルで、DAG コードが保存されているディレクトリに移動します。例:

    cd dags
  2. 次のコード例の内容をコピーし、invoke_dag.py という名前でローカルに保存します。以下の値をお客様の情報に置き換えます。

    • your-new-environment-name — DAG を起動する他の環境の名前。

    • your-target-dag-id — 起動する他の環境の DAG の ID。

    from airflow.decorators import dag, task import boto3 from datetime import datetime, timedelta import os, requests DAG_ID = os.path.basename(__file__).replace(".py", "") @task() def invoke_dag_task(**kwargs): client = boto3.client('mwaa') token = client.create_cli_token(Name='your-new-environment-name') url = f"https://{token['WebServerHostname']}/aws_mwaa/cli" body = 'dags trigger your-target-dag-id' headers = { 'Authorization' : 'Bearer ' + token['CliToken'], 'Content-Type': 'text/plain' } requests.post(url, data=body, headers=headers) @dag( dag_id=DAG_ID, schedule_interval=None, start_date=datetime(2022, 1, 1), dagrun_timeout=timedelta(minutes=60), catchup=False ) def invoke_dag(): t = invoke_dag_task() invoke_dag_test = invoke_dag()
  3. 以下の AWS CLI コマンドを実行して、DAG を環境のバケットにコピーし、次に Apache Airflow UI を使用して DAG をトリガーします。

    $ aws s3 cp your-dag.py s3://your-environment-bucket/dags/
  4. DAG が正常に実行されると、invoke_dag_task のタスクログに以下のような出力が表示されます。

    [2022-01-01, 12:00:00 PDT] {{python.py:152}} INFO - Done. Returned value was: None
    [2022-01-01, 12:00:00 PDT] {{taskinstance.py:1280}} INFO - Marking task as SUCCESS. dag_id=invoke_dag, task_id=invoke_dag_task, execution_date=20220101T120000, start_date=20220101T120000, end_date=20220101T120000
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:154}} INFO - Task exited with return code 0
    [2022-01-01, 12:00:00 PDT] {{local_task_job.py:264}} INFO - 0 downstream tasks scheduled from follow-on schedule check

    DAG が正常に呼び出されたことを確認するには、新しい環境の Apache Airflow UI に移動し、次の操作を行います。

    1. [DAG] ページの DAG のリストから新しいターゲット DAG を見つけます。

    2. [最終実行] で、最新の DAG 実行のタイムスタンプを確認します。このタイムスタンプは、他の環境における invoke_dag の最新のタイムスタンプとほぼ一致する必要があります。

    3. [最近のタスク] で、前回の実行が成功したことを確認します。