トラブルシューティング Apache Airflow v2 における DAG、オペレータ、接続、その他の問題 - Amazon Managed Workflows for Apache Airflow

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

トラブルシューティング Apache Airflow v2 における DAG、オペレータ、接続、その他の問題

このページのトピックでは、Apache Airflow v2 Python の依存関係、カスタムプラグイン、DAGs、オペレータ、接続、タスク、およびウェブサーバが Amazon でホストされている Apache Airflow ワークフロー環境で発生する可能性のある問題の解決方法について説明します。

接続

以下のトピックでは、Apache Airflow の接続を使用する際や別の AWS データベースを使用する際に受け取る可能性のあるエラーについて説明しています。

Secrets Manager に接続できません。

次のステップを推奨します。

  1. Apache Airflow 接続と変数のシークレットキーを作成する方法を「AWS Secrets Manager シークレットを使用した Apache Airflow 接続の設定」 で学習できます。

  2. test-variable で ApacheAirflow 変数 (Apache Airflow 変数 AWS Secrets Manager の でのシークレットキーの使用) のシークレットキーを使用する方法を学習します。

  3. Apache Airflow 接続 AWS Secrets Manager に でシークレットキーを使用する で ApacheAirflow 接続 (myconn) のシークレットキーを使用する方法を学習します。

secretsmanager:ResourceTag/<tag-key> シークレットマネージャの条件またはリソース制限は、実行ロールポリシーでどのように設定しますか。

注記

Apache Airflow バージョン 2.0 やそれ以前のバージョンに適用されます。

現時点では、Apache Airflow の既知の問題のため、環境の実行ロールで条件キーやその他のリソース制限を使用して Secrets Manager secrets へのアクセスを制限することはできません。

Snowflake に接続できません。

次のステップを推奨します。

  1. GitHub の「aws-mwaa-local-runner」を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。

  2. ご使用の環境に適した requirements.txt に次のエントリを追加します。

    apache-airflow-providers-snowflake==1.3.0
  3. 以下のインポートを DAG に追加する:

    from airflow.providers.snowflake.operators.snowflake import SnowflakeOperator

Apache Airflow 接続オブジェクトに、次のキーバリューのペアが含まれていることを確認します。

  1. 接続 ID: snowflake_conn

  2. コーンタイプ: スノーフレーク

  3. ホスト: <my account> <my region if not us-west-2>.snowflakecomputing.com

  4. スキーマ: <my schema>

  5. ログイン: <my user name>

  6. パスワード: ********

  7. ポート: <port, if any>

  8. エキストラ:

    { "account": "<my account>", "warehouse": "<my warehouse>", "database": "<my database>", "region": "<my region if not using us-west-2 otherwise omit this line>" }

例:

>>> import json >>> from airflow.models.connection import Connection >>> myconn = Connection( ... conn_id='snowflake_conn', ... conn_type='Snowflake', ... host='YOUR_ACCOUNT.YOUR_REGION.snowflakecomputing.com', ... schema='YOUR_SCHEMA' ... login='YOUR_USERNAME', ... password='YOUR_PASSWORD', ... port='YOUR_PORT' ... extra=json.dumps(dict(account='YOUR_ACCOUNT', warehouse='YOUR_WAREHOUSE', database='YOUR_DB_OPTION', region='YOUR_REGION')), ... )

Airflow UI に接続が表示されない

Apache Airflow は Apache Airflow UI に接続テンプレートを用意しています。接続タイプに関係なく、接続 URI 文字列を生成します。Apache Airflow UI に接続テンプレートがない場合は、HTTP 接続テンプレートを使用するなど、代替の接続テンプレートを使用して接続 URI 文字列を生成できます。

次のステップを推奨します。

  1. Amazon MWAA が Apache Airflow UI で提供している接続タイプについては、「Amazon MWAA 環境にインストールされている Apache エアフロープロバイダーパッケージ」 を参照してください。

  2. Apache Airflow CLI コマンドリファレンス のCLI でApache Airflow接続を作成するコマンドを確認します。

  3. 接続タイプの概要 Amazon MWAA上のApache Airflow UI では使用できない接続タイプのために、Apache Airflow UI で接続テンプレートを交換して使用する方法を学習します。

ウェブサーバ

次のトピックでは、Amazon MWAA 上の Apache Airflow ウェブサーバーで発生する可能性のあるエラーについて説明します。

ウェブサーバーにアクセスすると、5xx エラーが表示されます。

次のステップを推奨します。

  1. Apache Airflow 構成オプションを確認してください。Apache Airflow 構成オプションとして指定したキーと値のペア(例: AWS Secrets Manager )が正しく設定されていることを確認してください。詳細については、「Secrets Manager に接続できません。」を参照してください。

  2. requirements.txt をチェックしてください。Apache Airflow バージョンと互換性のある、requirements.txt にリストされている Airflow の「extras」パッケージおよびその他のライブラリを確認してください。

  3. requirements.txt ファイルに Python の依存関係を指定する方法については、「requirements.txt での Python 依存関係の管理」を参照してください。

「スケジューラーは実行されていないようです」というエラー表示があります。

スケジューラが実行されていないように見える場合や、最後の「ハートビート」が数時間前に受信された場合は、DAGs が Apache Airflow に表示されず、新しいタスクがスケジューリングされない可能性があります。

次のステップを推奨します。

  1. VPC セキュリティグループがポート 5432 へのインバウンドアクセスを許可していることを確認します。これは、ご使用の環境の Amazon Aurora PostgreSQL メタデータデータベースに接続するために必要のポートです。このルールを追加したら、 Amazon MWAA を数分間待つとエラーは消えるはずです。詳細については、「Amazon MWAA の VPC のセキュリティ」を参照してください。

    注記
    • Aurora PostgreSQL メタデータベースは 「Amazon MWAAサービスアーキテクチャ」 の一部であり、AWS アカウント では表示されません。

    • データベース関連のエラーは通常、スケジューラー障害の症状であり、根本的な原因ではありません。

  2. スケジューラーが動作していない場合は、「依存関係のインストールの失敗」「スケジューラーの過負荷」 など、さまざまな要因が原因である可能性があります。CloudWatch Logs で対応するロググループを表示して、DAGs、プラグイン、および要件が正しく機能していることを確認します。詳細については、「Amazon Managed Workflows for Apache Airflow のモニタリングとメトリクス」を参照してください。

タスク

次のトピックでは、Apache Airflow ログを表示した際に発生する可能性があるエラーについて説明します。

タスクが行き詰まっいるか、完了していません。

Apache Airflow タスクが「行き詰まっている」か、完了していない場合は、次のステップを実行することをお勧めします。

  1. 多数の DAG が定義されている可能性があります。DAG の数を減らし、環境の更新 (ログレベルの変更など) を実行して強制的にリセットしてください。

    1. Airflow は DAG が有効かどうかに関係なく解析します。環境の 50% を超える容量を使用していると、Apache Airflow スケジューラに負荷がかかり始める可能性があります。これにより、CloudWatch メトリクスの合計解析時間が長くなったり、CloudWatCloudWatch Logs の DAG 処理時間が長くることがあります。Apache Airflow の設定を最適化する方法は他にもありますが、このガイドの対象範囲には含まれていません。

    2. ご使用の環境のパフォーマンスを調整するために推奨するベスト・プラクティスの詳細については、「Amazon での Apache Airflow のパフォーマンスチューニング MWAA」 を参照してください。

  2. キューには、多数のタスクがある可能性があります。これは通常、「なし」状態のタスクが多数発生しているか、CloudWatch でキューに入れられたタスクや保留中のタスクが多数発生している場合に発生します。これは以下のような原因で起こりうる:

    1. 実行するタスクの数が環境の実行能力を超えている場合や、自動スケーリング前にキューに入れられたタスクの数が多い場合は、タスクを検出して追加のワーカー.をデプロイする時間があります。

    2. 実行するタスクの数が、実行可能な環境よりも多い場合は、DAG が同時に実行するタスクの数を減らすか、Apache Airflow ワーカーの最小数を増やすことをお勧めします。

    3. 自動スケーリングで追加のワーカーを検出してデプロイする時間ができる前に、大量のタスクがキューに入れられている場合は、タスクの配備をずらすか、最小 Apache Airflow ワーカスレッドを増やすことをお勧めします。

    4. AWS Command Line Interface (AWS CLI) の 「update-environment」 コマンドを使用して、環境で実行されるワーカーの最小数または最大数を変更できます。

      aws mwaa update-environment --name MyEnvironmentName --min-workers 2 --max-workers 10
    5. ご使用の環境のパフォーマンスを調整するために推奨するベスト・プラクティスの詳細については、「Amazon での Apache Airflow のパフォーマンスチューニング MWAA」 を参照してください。

  3. タスクログとして表示され、それ以上の指示なしに Apache Airflow で停止したタスクが、実行中に削除された可能性があります。これは以下のような原因で起こりうる:

    1. もし、1)現在のタスクが現在の環境のキャパシティを超え、2)数分間、タスクが実行されなかったり、キューに入れられなかったりした後、3)新しいタスクがキューに入れられたりする瞬間があります。

    2. Amazon MWAA 自動スケーリングは、最初のシナリオに応じてワーカーを追加します。2 番目のシナリオでは、追加のワーカーが削除されます。キューに入っているタスクの中には、ワーカーが削除される過程にあり、コンテナが削除された時点で終了するものもあります。

    3. 環境内の最小ワーカー数を増やすことをお勧めします。もう 1 つの選択肢は、DAG とタスクの時間を調整して、これらの状況が発生しないようにすることです

    4. また、最小ワーカー数を環境内の最大ワーカー数と同じようにに設定して、オートスケーリングを効果的に無効にすることもできます。AWS Command Line Interface (AWS CLI) の 「update-environment」 コマンドを使用して、ワーカーの最小数と最大数を同じに設定して自動スケーリングを無効にします

      aws mwaa update-environment --name MyEnvironmentName --min-workers 5 --max-workers 5
    5. ご使用の環境のパフォーマンスを調整するために推奨するベスト・プラクティスの詳細については、「Amazon での Apache Airflow のパフォーマンスチューニング MWAA」 を参照してください。

  4. タスクが「実行中」状態のままになっている場合は、タスクをクリアしたり、成功または失敗のマークを付けることもできます。これにより、環境の自動スケーリングコンポーネントは、環境で実行されているワーカー数をスケールダウンできます。次の図は、取り残されたタスクの例です。

    これは課題が立ち消えになっているイメージです。
    1. 取り残されたタスクの円を選択し、クリアを選択します(図を参照)。これにより Amazon MWAA はワーカーをスケールダウンできます。そうしない場合、Amazon MWAA はどの DAG が有効か無効かを判断できず、キューにタスクが残っている場合はスケールダウンできません。

      Apache Airflow アクション
  5. Apache Airflow タスクライフサイクルの詳細については、Apache Airflow リファレンスガイドの 「概念」 を参照してください。

CLI

次のトピックでは、AWS Command Line Interface で Airflow CLI コマンドを実行したときに発生する可能性があるエラーについて説明します。

CLI で DAG をトリガーすると「503」エラーが表示されます

Airflow CLI は Apache Airflow ウェブサーバー上でで実行され、同時実行性が制限されています。通常、 CLI コマンドを最大 4 つ同時に実行できます。

dags backfill Apache エアフロー CLI コマンドが失敗した原因は? 回避策はありますか?

注記

以下は Apache Airflow v2.0.2 の環境にのみ適用されます。

他の Apache Airflow CLI コマンドと同様に、backfill コマンドはすべての DAGs を処理する前に、CLI 操作がどの DAGs に適用されているかに関係なく、すべての DAGs をローカルに解決します。Apache Airflow v2.0.2 を使用する Amazon MWAA 環境では、CLI コマンドの実行時にプラグインと要件がウェブサーバにインストールされていないため、解決操作が失敗し、backfill 操作が呼び出されません。環境に要件やプラグインがない場合、backfill 操作は成功します。

backfill CLI コマンドを実行できるようにするには、bash 演算子で呼び出すことをお勧めします。bash オペレータでは、backfill はワーカーから起動されます。これにより、必要な要件とプラグインがすべて使用可能になり、インストール後、 DAG が正常に解析できるようになります。次の例は、BashOperator を実行するための backfill 付き DAG を作成する方法を示しています。

from airflow import DAG from airflow.operators.bash_operator import BashOperator from airflow.utils.dates import days_ago with DAG(dag_id="backfill_dag", schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: cli_command = BashOperator( task_id="bash_command", bash_command="airflow dags backfill my_dag_id" )

演算子

次のトピックでは、Operators を使用するときに受け取る可能性のあるエラーについて説明します。

S3-Transform オペレータの使用中に PermissionError: [Errno 13] Permission denied エラーが発生しました。

S3Transform オペレータを使用してシェルスクリプトを実行しようとして PermissionError: [Errno 13] Permission denied エラーが発生する場合は、次の手順を実行することをお勧めします。次のステップは、既存の plugins.zip ファイルがあることを前提としています。新しい plugins.zip を作成する場合は、「カスタムプラグインのインストール」 を参照してください。

  1. GitHub の「aws-mwaa-local-runner」を使用して、DAG、カスタムプラグイン、Python の依存関係をローカルでテストします。

  2. 「変換」スクリプトを作成します。

    #!/bin/bash cp $1 $2
  3. (オプション) macOS と Linux ユーザーは、次のコマンドを実行して、スクリプトが実行可能であることを確認しなければならないことがあります。

    chmod 777 transform_test.sh
  4. スクリプトを plugins.zip に追加します。

    zip plugins.zip transform_test.sh
  5. 「plugins.zip を Amazon S3 にアップロードする」 のステップに従ってください。

  6. 「Amazon MWAA コンソールのplugins.zip バージョンの指定」 のステップに従ってください。

  7. 以下の DAG を作成します。

    from airflow import DAG from airflow.providers.amazon.aws.operators.s3_file_transform import S3FileTransformOperator from airflow.utils.dates import days_ago import os DAG_ID = os.path.basename(__file__).replace(".py", "") with DAG (dag_id=DAG_ID, schedule_interval=None, catchup=False, start_date=days_ago(1)) as dag: file_transform = S3FileTransformOperator( task_id='file_transform', transform_script='/usr/local/airflow/plugins/transform_test.sh', source_s3_key='s3://YOUR_S3_BUCKET/files/input.txt', dest_s3_key='s3://YOUR_S3_BUCKET/files/output.txt' )
  8. 「Amazon S3 への DAG コードのアップロード」 のステップに従ってください。