Apache Beam 應用程式檢查點失敗 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Apache Beam 應用程式檢查點失敗

如果您的 Beam 應用程式shutdownSourcesAfterIdleMs設定為 0ms,則檢查點可能無法觸發,因為工作處於「已完成」狀態。本節說明此狀況的徵狀和解決方案。

徵狀

移至 Apache Flink 應用程式 CloudWatch 記錄檔的受管理服務,並檢查是否已記錄下列記錄訊息。下列日誌訊息指出檢查點無法觸發,因為某些任務已完成。

{ "locationInformation": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator.onTriggerFailure(CheckpointCoordinator.java:888)", "logger": "org.apache.flink.runtime.checkpoint.CheckpointCoordinator", "message": "Failed to trigger checkpoint for job your job ID since some tasks of job your job ID has been finished, abort the checkpoint Failure reason: Not all required tasks are currently running.", "threadName": "Checkpoint Timer", "applicationARN": your application ARN, "applicationVersionId": "5", "messageSchemaVersion": "1", "messageType": "INFO" }

這也可以在 Flink 儀表板上找到,其中一些任務已進入「FINISHED」狀態,並且無法再執行檢查點。

任務處於「FINISHED」狀態

原因

shutdownSourcesAfterIdleMs 是 Beam 組態變數,可關閉閒置時間 (以毫秒為單位) 的來源。一旦來源關閉,無法再執行檢查點。這可能導致檢查點失敗

任務進入「已完成」狀態的原因之一 shutdownSourcesAfterIdleMs 是設置為 0ms 時,這意味著閒置的任務將立即關閉。

解決方案

若要防止工作立即進入「已完成」狀態, shutdownSourcesAfterIdleMs 請設定為長 .max_Value。這可以透過兩種方式進行:

  • 選項 1:如果您的樑組態是在 Apache Flink 應用程式的受管理服務組態頁面中設定,則您可以新增新的金鑰值對來設定 shutdpwnSourcesAfteridle Ms,如下所示:

    設定 shutdownSourcesAfterIdleMs 為最長. 最大值
  • 選項 2:如果在 JAR 文件中設置了梁配置,則可以按 shutdownSourcesAfterIdleMs 如下方式進行設置:

    FlinkPipelineOptions options = PipelineOptionsFactory.create().as(FlinkPipelineOptions.class); // Initialize Beam Options object options.setShutdownSourcesAfterIdleMs(Long.MAX_VALUE); // set shutdownSourcesAfterIdleMs to Long.MAX_VALUE options.setRunner(FlinkRunner.class); Pipeline p = Pipeline.create(options); // attach specified options to Beam pipeline