翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon MWAA での Apache Airflow のパフォーマンス調整
このページでは、「Amazon MWAA での Apache Airflow 構成オプションの使用」を使用して Apache Airflow 環境向けの Amazon マネージドワークフローのパフォーマンスを調整するために推奨するベストプラクティスについて説明します。
Apache Airflow 構成オプションの追加
以下の手順では、Airflow 構成オプションを環境に追加するステップを説明します。
-
Amazon MWAA コンソールで「環境ページ」を開きます。
-
環境を選択します。
-
[編集] を選択します。
-
[次へ] をクリックします。
-
Airflow 設定オプションペインで [カスタム設定を追加] を選択します。
-
ドロップダウンリストから構成を選択して値を入力するか、カスタム構成を入力して値を入力します。
-
追加する設定ごとに [カスタム設定を追加] を選択します。
-
[保存] を選択します。
詳細については、「Amazon MWAA での Apache Airflow 構成オプションの使用」を参照してください。
Apache Airflow スケジューラー
Apache Airflow スケジューラーは Apache Airflow のコアコンポーネントです。スケジューラーに問題があると、DAG の解析やタスクのスケジュール設定ができなくなる可能性があります。Apache Airflow スケジューラーの調整の詳細については、Apache Airflow ドキュメンテーションウェブサイトの「スケジューラーのパフォーマンスの微調整」を参照してください。
パラメータ
このセクションでは、Apache Airflow スケジューラーで使用できる構成オプションとそのユースケースについて説明します。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
[Description] (説明) |
ユースケース |
v2
|
「celery.sync_parallelism」
|
1
|
Celery エグゼキュターがタスクの状態を同期するために使用するプロセスの数です。
|
このオプションを使用すると、Celery エグゼキュターが使用するプロセスを制限することでキューの競合を防ぐことができます。デフォルトでは、タスクログを Logs 1 に配信する際にエラーが発生しないように値が設定されています。 CloudWatch この値を 0 に設定すると最大数のプロセスを使用することになりますが、タスクログの配信時にエラーが発生する可能性があります。
|
v2
|
「scheduler.processor_poll_interval」
|
1
|
スケジューラーの「ループ」で DAG ファイルが連続して処理されるまでに待機する秒数。
|
このオプションを使用すると、DAG 解析結果の取得、タスクの検索とキューへの追加、エグゼキューターでのキュー内のタスクの実行が完了した後に、スケジューラーがスリープする時間を長くすることで、スケジューラーの CPU 使用率を解放できます。この値を増やすと、Apache Airflow v2 の場合は scheduler.parsing_processes で、Apache Airflow v1 の場合は scheduler.max_threads で、環境で実行されるスケジューラーのスレッド数が消費されます。これにより、スケジューラーが DAG を解析する容量が減り、DAG がウェブサーバーに表示されるまでにかかる時間が長くなる可能性があります。
|
v2
|
「scheduler.max_dagruns_to_create_per_loop」
|
10
|
スケジューラー「ループ」DagRunsごとに作成する DAG の最大数。
|
このオプションを使用すると、スケジューラーの「ループ」の最大数を減らすことで、リソースを解放してタスクのスケジューリングに充てることができます。DagRuns
|
v2
|
「scheduler.parsing_processes」
|
2
|
スケジューラーが DAG をスケジュールするために並列して実行できるスレッドの数。
|
このオプションを使用すると、スケジューラーが DAG を解析するために並列して実行するプロセスの数を減らすことで、リソースを解放できます。DAG の解析がタスクスケジューリングに影響している場合は、この数を低く抑えることをお勧めします。環境の vCPU 数よりも小さい値を指定する必要があります。詳細については、「制限」を参照してください。
|
制限
このセクションでは、スケジューラーのデフォルトパラメータを調整する際に考慮すべき制限について説明します。
- scheduler.parsing_processes、scheduler.max_threads
-
1 つの環境クラスの vCPU ごとに 2 つのスレッドを使用できます。環境クラスのスケジューラー用に少なくとも 1 つのスレッドを予約する必要があります。タスクのスケジュールが遅れていることに気付いた場合は、「環境クラス」を増やす必要があるかもしれません。たとえば、大規模な環境では、スケジューラ用に 4 vCPU の Fargate コンテナインスタンスがあります。つまり、他のプロセスに使用できるスレッドの 7
合計は最大数になります。つまり、2 つのスレッドに 4 つの vCPUs を乗算した値から、スケジューラー自体の 1 を引いた値です。scheduler.max_threads
および scheduler.parsing_processes
で指定する値は、環境クラスで使用できるスレッド数 (以下を参照) を超えてはなりません。
-
mw1.small — 他のプロセスの 1
スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
-
mw1.medium — 他のプロセスの 3
スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
-
mw1.large — 他のプロセスの 7
スレッド数を超えてはいけません。残りのスレッドはスケジューラー用に予約されています。
DAG フォルダー
Apache Airflow スケジューラーは、環境内の DAG フォルダーを継続的にスキャンします。含まれている plugins.zip
ファイル、または「airflow」インポートステートメントを含む Python (.py
) ファイル。生成されたすべての Python DAG オブジェクトは、そのファイルに格納され、スケジューラによって処理され、スケジュールする必要があるタスクがある場合はそれが決定されます。DagBagDAG ファイルの解析は、そのファイルに実行可能な DAG オブジェクトが含まれているかどうかに関係なく行われます。
パラメータ
このセクションでは、DAGs フォルダーで使用できる構成オプションとそのユースケースについて説明します。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
[Description] (説明) |
ユースケース |
v2
|
「scheduler.dag_dir_list_interval」
|
300 秒
|
DAGs フォルダーをスキャンして新しいファイルを探す必要のある秒数。
|
このオプションを使用すると、DAGs フォルダーを解析する秒数を増やすことでリソースを解放できます。DAGs フォルダーに大量のファイルがあることが原因で、total_parse_time metrics で解析時間が長くなる場合は、この値を増やすことをお勧めします。
|
v2
|
「scheduler.min_file_process_interval」
|
30 秒
|
スケジューラーが DAG を解析して DAG への更新が反映されるまでの秒数。
|
このオプションを使用して、DAG を解析する前にスケジューラーが待機する秒数を増やすことで、リソースを解放できます。たとえば、30 の値を指定すると、DAG ファイルは 30 秒ごとに解析されます。お使いの環境の CPU 使用率を下げるには、この数値を高くしておくことをお勧めします。
|
DAG ファイル
Apache Airflow スケジューラーループの一部として、個々の DAG ファイルが解析されて DAG Python オブジェクトが抽出されます。Apache Airflow v2 以降では、スケジューラーは同時に最大数の「解析プロセス」を解析します。同じファイルが再び解析される前に、scheduler.min_file_process_interval
で指定した秒数が経過する必要があります。
パラメータ
このセクションでは、Apache Airflow DAG ファイルで使用できる構成オプションとそのユースケースについて説明します。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
[Description] (説明) |
ユースケース |
v2
|
「core.dag_file_processor_timeout」
|
50 秒
|
DAG DagFileProcessorファイルの処理がタイムアウトするまでの秒数。
|
このオプションを使用すると、タイムアウトまでにかかる時間を増やすことでリソースを解放できます。DagFileProcessorDAG 処理ログにタイムアウトが発生し、実行可能な DAG が読み込まれなくなる場合は、この値を増やすことをお勧めします。
|
v2
|
「core.dagbag_import_timeout」
|
30 秒
|
Python ファイルをインポートするまでの秒数がタイムアウトします。
|
このオプションを使用すると、Python ファイルをインポートして DAG オブジェクトを抽出する際にスケジューラーがタイムアウトになるまでにかかる時間を増やすことで、リソースを解放できます。このオプションはスケジューラーの「ループ」の一部として処理され、core.dag_file_processor_timeout で指定されている値よりも小さい値が含まれている必要があります。
|
v2
|
「core.min_serialized_dag_update_interval」
|
30
|
データベース内のシリアル化された DAG が更新されるまでの最小秒数。
|
このオプションを使用すると、データベース内のシリアル化された DAG が更新されるまでの秒数を増やすことで、リソースを解放できます。DAG の数が多い場合、または複雑な DAG がある場合は、この値を増やすことをおすすめします。この値を増やすと、DAG がシリアル化されるときのスケジューラーとデータベースへの負荷が軽減されます。
|
v2
|
「core.min_serialized_dag_fetch_interval」
|
10
|
シリアル化された DAG がに既に読み込まれているときに、データベースから再フェッチされる秒数。 DagBag
|
このオプションを使用すると、シリアル化された DAG を再フェッチする秒数を増やすことでリソースを解放できます。データベースの「書き込み」速度を下げるには、この値を core.min_serialized_dag_update_interval で指定した値より大きくする必要があります。この値を増やすと、DAG がシリアル化される際のウェブサーバーとデータベースの負荷が軽減されます。
|
タスク
Apache Airflow スケジューラーとワーカーはどちらもタスクのキューイングとデキューに関与します。スケジューラーは、解析済みのスケジュール設定が完了したタスクを [なし] ステータスから [スケジュール済み] ステータスに移行します。同じく Fargate のスケジューラーコンテナーで実行されているエグゼキューターは、それらのタスクをキューに入れ、そのステータスを [キューで待機中] に設定します。ワーカーにキャパシティがあると、キューからタスクを取り出してステータスを [実行中] に設定し、その後、タスクが成功したか失敗したかに基づいてステータスを [成功] または [失敗] に変更します。
パラメータ
このセクションでは、Apache Airflow タスクで使用できる構成オプションとそのユースケースについて説明します。
Amazon MWAA がオーバーライドするデフォルトの構成オプションは赤
でマークされています。
- Apache Airflow v2
-
バージョン |
設定オプション |
デフォルト |
[Description] (説明) |
ユースケース |
v2
|
「コア・パラレリズム」
|
10000
|
「実行中」のステータスを持つことができるタスクインスタンスの最大数。
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。指定する値は、使用可能なワーカーの数にワーカーのタスク密度を「掛けた」ものであるべきです。この値を変更するのは、多数のタスクが「実行中」または「キューで待機中」の状態で停止している場合のみにすることをおすすめします。
|
v2
|
「core.dag_concurrency」
|
10000
|
各 DAG で同時に実行できるタスクインスタンスの数。
|
このオプションを使用すると、同時に実行できるタスクインスタンスの数を増やすことでリソースを解放できます。たとえば、10 個の並列タスクを含む 100 個の DAG があり、すべての DAG を同時に実行したい場合、利用可能なワーカーの数に celery.worker_concurrency でのワーカータスクの密度を「掛け」、それを DAG の数 (100 など) で割って最大並列処理を計算できます。
|
v2
|
「core.execute_tasks_new_python_interpreter」
|
True
|
Apache Airflow が親プロセスをフォークするか、新しい Python プロセスを作成してタスクを実行するかを決定します。 |
True に設定すると、Apache Airflow はプラグインに加えた変更を、タスクを実行するために作成された新しい Python プロセスとして認識します。
|
v2
|
「celery.worker_concurrency」
|
該当なし
|
Amazon MWAA は、このオプションの Airflow ベースインストールをオーバーライドして、自動スケーリングコンポーネントの一部としてワーカーをスケーリングします。
|
このオプションに指定された値はすべて無視されます。 |
v2
|
「celery.worker_autoscale」
|
mw1.small - 5,0
mw1.medium - 10,0
mw1.large - 20,0
mw1.xlarge-40,0
サイズ 1.2xLarge-80,0
|
ワーカー向けのタスク同時実行性。
|
このオプションを使用すると、ワーカーの minimum 、maximum のタスク同時実行を減らすことでリソースを解放できます。ワーカーは、そのための十分なリソースがあるかどうかに関係なく、構成された maximum までの同時タスク受け入れます。十分なリソースがない状態でタスクをスケジュールすると、タスクはすぐに失敗します。リソースを大量に消費するタスクの場合は、タスクあたりの容量を増やすために値をデフォルトより小さくしてこの値を変更することをお勧めします。
|