Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
チェックポイント
チェックポイントは、アプリケーションの状態をフォールトトレラントに保つための Flink のメカニズムです。このメカニズムにより、ジョブが失敗した場合に Flink はオペレーターの状態を回復でき、アプリケーションには障害のない実行と同じセマンティクスが与えられます。Apache Flink 用 Managed Serviceでは、アプリケーションの状態は RocksDB に保存されます。RocksDB は組み込みのキー/バリューストアで、動作状態をディスク上に保持します。チェックポイントを取得すると、その状態は Amazon S3 にもアップロードされるため、ディスクが失われた場合でも、チェックポイントを使用してアプリケーションの状態を復元できます。
詳細については、「状態スナップショットの仕組み
チェックポイントステージ
Flink のチェックポインティングオペレーターサブタスクには、主に 5 つのステージがあります。
Waiting 「Start Delay」— Flink はストリームに挿入されたチェックポイントバリアを使用するため、このステージの時間はオペレータがチェックポイントバリアに到達するのを待つ時間です。
アライメント「Alignment Duration」 — この段階では、サブタスクは1つのバリアに達しましたが、他の入力ストリームからのバリアを待っています。
同期チェックポイント 「同期時間」 — この段階は、サブタスクが実際にオペレータの状態のスナップショットを撮り、サブタスク上の他のすべてのアクティビティをブロックする段階です。
非同期チェックポイント 「非同期時間」 — この段階の大部分は、Amazon S3 に状態をアップロードするサブタスクです。この段階では、サブタスクはブロックされなくなり、レコードを処理できるようになります。
承認 — 通常、これは短い段階であり、単に に確認を送信 JobManager し、コミットメッセージ (Kafka シンクなど) を実行するサブタスクです。
これらの各段階(確認は除く)は、Flink WebUIから入手できるチェックポイントの期間メトリックに対応しており、チェックポイントが長くなる原因の特定に役立ちます。
チェックポイントで利用できる各メトリックの正確な定義を確認するには、「履歴タブ
調査中
長いチェックポイント期間を調査する場合、決定すべき最も重要なのはチェックポイントのボトルネック、つまりどのオペレーターとサブタスクがチェックポイントに最も時間がかかっているのか、そのサブタスクのどの段階に長時間かかっているのかを判断することです。これは、ジョブチェックポイントタスクの Flink WebUI を使用して確認できます。Flink の Web インターフェースには、チェックポイントの問題の調査に役立つデータや情報が表示されます。詳細については、「チェックポイントの監視
まず、Job グラフ内の各オペレータの「エンドツーエンド期間」を確認して、どのオペレータがチェックポイントに時間がかかっているかを判断し、さらに調査する必要があります。Flink のドキュメントによると、所要時間の定義は次のとおりです。
「トリガーのタイムスタンプから最新の確認応答までの期間(確認応答をまだ受け取っていない場合はn/a)。」 チェックポイントが完了するまでのこの終了までの期間は、チェックポイントを確認した最後のサブタスクによって決まります。「通常、この時間は 1 つのサブタスクが実際に状態をチェックポイントするのに必要な時間よりも長くなります。」
チェックポイントの他の時間でも、その時間がどこに費やされているかについて、より詳細な情報が得られます。
「Sync Duration」の値が大きい場合は、スナップショット作成中に何かが発生していることを示しています。この段階でsnapshotState()
は snapshotState 、インターフェイスを実装するクラスに対して が呼び出されます。これはユーザーコードである可能性があるため、スレッドダンプはこれを調査するために役立ちます。
「非同期時間」が長い場合は、Amazon S3 への状態のアップロードに多くの時間が費やされていると考えられます。これは、状態が大きい場合や、アップロードされる状態ファイルが多数ある場合に発生する可能性があります。このような場合は、アプリケーションがどのように状態を使用しているかを調べ、可能な限り Flink のネイティブデータ構造が使用されていることを確認する必要があります (「Using Keyed State
「Start Delay」の値が大きい場合は、チェックポイントの障壁がオペレータに到達するのを待つ時間の大半が費やされていることがわかります。これは、アプリケーションがレコードを処理するのに時間がかかっていることを示しています。つまり、バリアがジョブグラフ内をゆっくりと流れているということです。これは通常、Job にバックプレッシャーがかかっている場合や、オペレーターが常に忙しい場合に発生します。2 番目の KeyedProcess オペレーター JobGraph がビジー状態の の例を次に示します。
Flink フレームグラフまたは TaskManager スレッドダンプを使用して、何に時間がかかるかを調査できます。ボトルネックが特定されたら、フレームグラフまたはスレッドダンプを使用してさらに調査できます。
スレッドダンプ
スレッドダンプは、フレームグラフよりもレベルが少し低いもう 1 つのデバッグツールです。スレッドダンプは、ある時点でのすべてのスレッドの実行状態を出力します。Flink はJVMスレッドダンプを受け取ります。これは、Flink プロセス内のすべてのスレッドの実行状態です。スレッドの状態は、スレッドのスタックトレースといくつかの追加情報によって示されます。フレームグラフは、実際には複数のスタックトレースを短時間で連続して取得して構築されます。グラフはこれらのトレースを視覚化したもので、一般的なコードパスを簡単に識別できます。
"KeyedProcess (1/3)#0" prio=5 Id=1423 RUNNABLE at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:154) at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>>19) at $line33.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14) at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ...
上の図は、Flink UI から取得した単一スレッドのスレッドダンプのスニペットです。1 行目には、このスレッドに関する次のような一般的な情報が含まれています。
スレッド名 KeyedProcess (1/3)#0
スレッドの優先度「prio=5」
一意のスレッド「Id=1423」
スレッド状態 RUNNABLE
通常、スレッドの名前からそのスレッドの一般的な目的に関する情報が得られます。演算子スレッドは、演算子と同じ名前を持つため、名前で識別できます。また、KeyedProcess (1/3)#0 スレッドはKeyedProcess演算子からのもので、1 番目 (3 つのうち) のサブタスクからのものであるなど、どのサブタスクに関連しているかも示します。
スレッドは、次に示す状態のいずれかになります。
NEW – スレッドは作成されていますが、まだ処理されていません
RUNNABLE – スレッドは で実行されています CPU
BLOCKED – スレッドは別のスレッドがロックを解除するのを待っています
WAITING – スレッドは
wait()
、、join()
またはpark()
メソッドを使用して待機していますTIMED_WAITING – スレッドはスリープ、待機、結合、またはパークメソッドを使用して待機していますが、最大待機時間があります。
注記
Flink 1.13 では、スレッドダンプ内の 1 つのスタックトレースの最大深度は 8 に制限されています。
注記
スレッドダンプは読み取りが難しく、複数のサンプルを採取して手動で分析する必要があるため、Flink アプリケーションのパフォーマンス問題をデバッグする最後の手段はスレッドダンプです。できる限り、フレームグラフを使用するのが望ましいです。
Flink のスレッドダンプです。
Flink では、Flink UI の左側のナビゲーションバーで「タスクマネージャ」 オプションを選択し、特定のタスクマネージャーを選択して [スレッドダンプ] タブに移動すると、「スレッドダンプ」を実行できます。スレッドダンプは、ダウンロードしたり、お気に入りのテキストエディター(またはスレッドダンプアナライザー)にコピーしたり、Flink Web UIのテキストビュー内で直接分析したりできます(ただし、この最後のオプションは少し扱いにくい場合があります)。
特定の演算子を選択した場合に、TaskManagersタブのスレッドダンプを取るタスクマネージャーを決定するために使用できます。これは、オペレータがオペレータのさまざまなサブタスクで実行されており、異なるタスクマネージャーでも実行できることを示しています。
ダンプは複数のスタックトレースで構成されます。ただし、ダンプを調べるときには、オペレータに関連するものが最も重要です。オペレータースレッドにはオペレータと同じ名前があり、どのサブタスクに関連しているかがわかるので、これらは簡単に見つかります。例えば、次のスタックトレースは KeyedProcess演算子からのもので、最初のサブタスクです。
"KeyedProcess (1/3)#0" prio=5 Id=595 RUNNABLE at app//scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:155) at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:19) at $line360.$read$$iw$$iw$ExpensiveFunction.processElement(<console>:14) at app//org.apache.flink.streaming.api.operators.KeyedProcessOperator.processElement(KeyedProcessOperator.java:83) at app//org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:205) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134) at app//org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105) at app//org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:66) ...
同じ名前のオペレータが複数あると混乱するかもしれませんが、オペレータに名前を付けることでこの問題を回避できます。例:
.... .process(new ExpensiveFunction).name("Expensive function")
「フレームグラフ 」
フレームグラフは、ターゲットコードのスタックトレースを視覚化する便利なデバッグツールです。これにより、最も頻繁に使用されるコードパスを特定できます。スタックトレースを何度もサンプリングして作成されます。フレームグラフの X 軸にはさまざまなスタックプロファイルが表示され、Y 軸にはスタックの深さとスタックトレースの呼び出しが表示されます。フレームグラフの 1 つの長方形はスタックフレームを表し、フレームの幅はスタック内での出現頻度を示します。フレームグラフとその使用方法の詳細については、「フレームグラフ
Flink では、オペレータを選択してタブを選択することで、オペレータのフレームグラフに Web UI からアクセスできますFlameGraph。十分な数のサンプルが収集されると、フレームグラフが表示されます。以下は、 FlameGraph チェックポイントに多くの時間がかかっ ProcessFunction ていた の です。
これは非常に単純なフレームグラフであり、 ExpensiveFunction オペレータprocessElement
の 内の各ルックですべてのCPU時間が費やされていることを示しています。また、コード内のどこで実行されているかを判断するのに役立つ行番号も表示されます。