Apache Beam アプリケーションのチェックポイント障害 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Beam アプリケーションのチェックポイント障害

Beam アプリケーションが 0ms shutdownSourcesAfterIdleMsに設定されていると、タスクがFINISHED「」状態であるため、チェックポイントのトリガーに失敗することがあります。このセクションでは、この状態の症状と解決策について説明します。

症状

Managed Service for Apache Flink アプリケーション CloudWatch ログに移動し、次のログメッセージがログに記録されているかどうかを確認します。次のログメッセージは、一部のタスクが完了したためにチェックポイントがトリガーされなかったことを示しています。

{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN": your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }

これは、一部のタスクがFINISHED「」状態になり、チェックポイントができなくなった Flink ダッシュボードでも確認できます。

FINISHED「」状態のタスク

原因

shutdownSourcesAfterIdleMs は、設定されたミリ秒の時間アイドル状態であったソースをシャットダウンする Beam 設定変数です。ソースがシャットダウンされると、チェックポイント設定はできなくなります。これにより、「チェックポイント障害」が発生する可能性があります。

タスクがFINISHED「」状態になる原因の 1 つは、 shutdownSourcesAfterIdleMs が 0ms に設定されている場合です。つまり、アイドル状態のタスクはすぐにシャットダウンされます。

ソリューション

タスクがすぐにFINISHED「」状態にならないようにするには、 shutdownSourcesAfterIdleMs を Long.MAX_ に設定しますVALUE。これには 2 つの方法で実行できます。

  • オプション 1: Managed Service for Apache Flink アプリケーション設定ページでビーム設定が設定されている場合は、次のように新しいキーと値のペアを追加して shutdpwnSourcesAfteridleMs を設定できます。

    shutdownSourcesAfterIdleMs を Long.MAX_ に設定します。VALUE
  • オプション 2: JARファイルにビーム設定が設定されている場合は、 shutdownSourcesAfterIdleMs 次のように を設定できます。

    FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline