Managed Service for Apache Flink アプリケーションのベストプラクティスを維持する - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Managed Service for Apache Flink アプリケーションのベストプラクティスを維持する

このセクションでは、安定したパフォーマンスの Managed Service for Apache Flink アプリケーションを開発するための情報と推奨事項について説明します。

uber JAR のサイズを最小限に抑える

Java/Scala アプリケーションは uber (super/fat) JAR にパッケージ化し、ランタイムによってまだ提供されていない追加の必要な依存関係をすべて含める必要があります。ただし、uber JAR のサイズはアプリケーションの起動時間と再起動時間に影響し、JAR が 512 MB の制限を超える可能性があります。

デプロイ時間を最適化するには、uber JAR に以下を含めないでください

  • 次の例に示すように、ランタイムによって提供される依存関係。POM ファイルまたは compileOnly Gradle 設定にprovidedスコープが必要です。

  • JUnit や Mockito など、テストにのみ使用される依存関係。POM ファイルまたは testImplementation Gradle 設定にtestスコープが必要です。

  • アプリケーションで実際に使用されていない依存関係

  • アプリケーションに必要な静的データまたはメタデータ。静的データは、データストアや Amazon S3 など、実行時にアプリケーションによってロードされる必要があります。

  • 上記の設定の詳細については、この POM サンプルファイルを参照してください。

提供された依存関係

Managed Service for Apache Flink ランタイムは、多くの依存関係を提供します。これらの依存関係は fat JAR に含めず、POM ファイルprovidedの範囲を持つか、maven-shade-plugin設定で明示的に除外する必要があります。fat JAR に含まれるこれらの依存関係はいずれも実行時に無視されますが、JAR のサイズが大きくなり、デプロイ中にオーバーヘッドが追加されます。

ランタイムバージョン 1.18、1.19、および 1.20 でランタイムによって提供される依存関係:

  • org.apache.flink:flink-core

  • org.apache.flink:flink-java

  • org.apache.flink:flink-streaming-java

  • org.apache.flink:flink-scala_2.12

  • org.apache.flink:flink-table-runtime

  • org.apache.flink:flink-table-planner-loader

  • org.apache.flink:flink-json

  • org.apache.flink:flink-connector-base

  • org.apache.flink:flink-connector-files

  • org.apache.flink:flink-clients

  • org.apache.flink:flink-runtime-web

  • org.apache.flink:flink-metrics-code

  • org.apache.flink:flink-table-api-java

  • org.apache.flink:flink-table-api-bridge-base

  • org.apache.flink:flink-table-api-java-bridge

  • org.apache.logging.log4j:log4j-slf4j-impl

  • org.apache.logging.log4j:log4j-api

  • org.apache.logging.log4j:log4j-core

  • org.apache.logging.log4j:log4j-1.2-api

さらに、ランタイムは Managed Service for Apache Flink、 でアプリケーションランタイムプロパティを取得するために使用される ライブラリを提供しますcom.amazonaws:aws-kinesisanalytics-runtime:1.2.0

ランタイムによって提供されるすべての依存関係は、uber JAR に含めないように、次の推奨事項を使用する必要があります。

  • Maven (pom.xml) と SBT (build.sbt) では、providedスコープを使用します。

  • Gradle (build.gradle) で、 compileOnly 設定を使用します。

Apache Flink の親ファーストクラスのロードにより、uber JAR に誤って含まれていた依存関係は実行時に無視されます。詳細については、Apache Flink ドキュメントの parent-first-patterns を参照してください。

Connector

ランタイムに含まれていない FileSystem コネクタを除くほとんどのコネクタは、デフォルトのスコープ () を持つ POM ファイルに含める必要がありますcompile

その他の推奨事項

原則として、Managed Service for Apache Flink に提供される Apache Flink uber JAR には、アプリケーションの実行に必要な最小限のコードが含まれている必要があります。ソースクラス、テストデータセット、ブートストラップ状態を含む依存関係を含めることは、この jar に含めないでください。実行時に静的リソースを取り込む必要がある場合は、この懸念を Amazon S3 などのリソースに分離します。これには、状態ブートストラップや推論モデルなどがあります。

ディープ依存関係ツリーを検討し、ランタイム以外の依存関係を削除してください。

Managed Service for Apache Flink は 512MB の jar サイズをサポートしていますが、これはルールの例外と見なされます。Apache Flink は現在、デフォルト設定で最大 104 MB の jar サイズをサポートしており、必要な jar の最大ターゲットサイズである必要があります。

フォールトトレランス:チェックポイントとセーブポイント

Managed Service for Apache Flink アプリケーションに耐障害性を実装するには、チェックポイントとセーブポイントを使用します。アプリケーションの開発およびメンテナンスを行うときは、以下のことを考える必要があります。

  • アプリケーションでチェックポイントを有効にしておくことをお勧めします。チェックポイントは、スケジュールされたメンテナンス中にアプリケーションに耐障害性を提供します。また、サービスの問題、アプリケーションの依存関係の障害、その他の問題による予期しない障害にも耐障害性を提供します。メンテナンスの詳細については、「Managed Service for Apache Flink のメンテナンスタスクを管理する」を参照してください。

  • アプリケーションの開発時またはトラブルシューティング時には、ApplicationSnapshotConfiguration:: SnapshotsEnabledを false に設定します。アプリケーションが停止するたびにスナップショットが作成されるため、アプリケーションが異常な状態であったり、パフォーマンスが低下したりすると問題が発生する可能性があります。アプリケーションが実稼働環境で安定した状態にはいった後  SnapshotsEnabled を true に設定します。

    注記

    正しい状態データで適切に再起動するように、1 日に数回スナップショットを作成するようにアプリケーションを設定することをお勧めします。スナップショットの正しい頻度は、アプリケーションのビジネスロジックによって異なります。スナップショットを頻繁に作成することで、より最近のデータを復元できますが、コストが増加し、より多くのシステムリソースが必要になります。

    アプリケーションのダウンタイムのモニタリングについては、 を参照してください。

障害耐性の詳細については、「耐障害性を実装する」を参照してください。

サポートされていないコネクタのバージョン。

Apache Flink バージョン 1.15 以降では、Managed Service for Apache Flink は、アプリケーション JARs。Managed Service for Apache Flink バージョン 1.15 以降にアップグレードする場合は、最新の Kinesis コネクタを使用していることを確認してください。これはバージョン 1.15.2 と同じかそれより新しいバージョンです。他のすべてのバージョンは Managed Service for Apache Flink ではサポートされません。これは、Stop with Savepoint 機能で整合性の問題や障害が発生し、クリーン停止/更新オペレーションが妨げられる可能性があるためです。Amazon Managed Service for Apache Flink バージョンのコネクタ互換性の詳細については、「Apache Flink コネクタ」を参照してください。

パフォーマンスと並列処理

アプリケーションの並列処理を調整し、パフォーマンスの落とし穴を避けることで、アプリケーションをあらゆるスループットレベルに合わせて拡張できます。アプリケーションの開発およびメンテナンスを行うときは、以下のことを考える必要があります。

  • すべてのアプリケーションのソースとシンクが十分にプロビジョニングされており、スロットルされていないことを確認します。ソースとシンクが他の AWS サービスである場合は、CloudWatch を使用してそれらのサービスをモニタリングします。

  • 並列処理が非常に高いアプリケーションの場合は、アプリケーション内のすべての演算子に高レベルの並列処理が適用されているかどうかを確認してください。デフォルトでは、Apache Flink はアプリケーショングラフ内のすべてのオペレータに同じアプリケーション並列を適用します。これにより、ソースまたはシンクにおけるプロビジョニングの問題、またはオペレーターのデータ処理のボトルネックが発生する可能性があります。SetParallelismを使用すると、コードの各オペレータの並列処理を変更できます。

  • アプリケーションのオペレータの並列処理設定の意味を理解してください。オペレーターの並列処理を変更すると、オペレーターの並列処理が現在の設定と互換性がないときに作成されたスナップショットからアプリケーションを復元できない場合があります。オペレータの並列処理の設定の詳細について、オペレータの最大並列処理を明示的に設定するを参照してください。

簡易スケーリングについての詳細は、「アプリケーションのスケーリングを実装する」を参照してください。

オペレータごとの並列処理の設定

デフォルトでは、すべてのオペレータにアプリケーションレベルで並列処理が設定されます。DataStream APIと.setParallelism(x)を使用すると、1つのオペレータの並列処理をオーバーライドできます。オペレータの並列処理は、アプリケーションの並列処理と同じかそれ以下の任意の並列処理に設定できます。

可能であれば、オペレータの並列処理をアプリケーション並列処理の関数として定義してください。このようにすると、演算子の並列処理はアプリケーションの並列処理によって変化します。たとえば、オートスケーリングを使用している場合は、すべてのオペレータの並列処理が同じ比率で変化します。

int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);

場合によっては、オペレータの並列処理を定数に設定することをお勧めします。たとえば、Kinesis Stream ソースの並列処理をシャードの数に設定します。このような場合は、ソースストリームを再シャードするなど、コードを変更せずにオペレータ並列処理をアプリケーション設定パラメータとして渡すことを検討してください。

ログ記録

CloudWatch Logs を使用して、アプリケーションのパフォーマンスとエラー状態をモニタリングできます。アプリケーションのロギングを設定するときは、以下のことを考える必要があります。

  • 実行時の問題をデバッグできるように、アプリケーションの CloudWatch ロギングを有効にします。

  • アプリケーションで処理されているすべてのレコードについてログエントリを作成しないでください。これにより、処理中に重大なボトルネックが発生し、データ処理にバックプレッシャーが発生する可能性があります。

  • アプリケーションが正常に動作していない場合に、通知のCloudWatchアラームを作成します。詳細については、を参照してください。

SCIM を実装する方法の詳細については、「」を参照してください。

コーディング

推薦プログラミング手法で、アプリケーションのパフォーマンスと安定性を高めることができます。アプリケーションコードを作成する際は、以下の事を考える必要があります。

  • アプリケーションコードのsystem.exit()、アプリケーションのmainメソッド、またはユーザー定義関数では使用しないでください。コードからアプリケーションをシャットダウンする場合は、アプリケーションで問題が発生したことに関するメッセージを含むExceptionまたはRuntimeExceptionから派生した例外をスローします。

    サービスがこの例外を処理する方法については、以下の点に注意してください。

    • 例外がアプリケーションの main メソッドからスローされた場合、アプリケーションが RUNNING ステータスに移行したときにサービスが ProgramInvocationException でラップし、ジョブマネージャーはジョブの送信に失敗します。

    • 例外がユーザー定義関数からスローされた場合、ジョブ・マネージャーはそのジョブを失敗させて再起動し、例外の詳細が例外ログに書き込まれます。

  • アプリケーション JAR ファイルとそれに含まれる依存関係をシェーディングすることを検討してください。アプリケーションと Apache Flink ランタイムの間でパッケージ名が競合する可能性がある場合は、シェーディングをお勧めします。競合が発生すると、アプリケーションログにタイプ java.util.concurrent.ExecutionException の例外が含まれる可能性があります。アプリケーション JAR ファイルのシェーディングの詳細について、Apache Maven Shade プラグインを参照してください。

ルート認証情報の管理。

長期認証情報を実稼働環境 (またはその他の)アプリケーションに組み込むべきではありません。長期認証情報はバージョン管理システムにチェックインされる可能性が高くて、簡単に紛失する可能性があります。代わりに、ロールを Managed Service for Apache Flink アプリケーションに関連付け、そのロールにアクセス許可を付与できます。実行中の Flink アプリケーションは、環境からそれぞれのアクセス許可を持つ一時的な認証情報を選択できます。認証にユーザー名とパスワードを必要とするデータベースなど、IAM とネイティブに統合されていないサービスに認証が必要な場合は、AWS Secrets Manager にシークレットを保存することを検討する必要があります。

多くの AWS ネイティブサービスは認証をサポートしています。

シャード/パーティションが少ないソースからの読み取り

Apache Kafka または Kinesis Data Stream から読み取る場合、ストリームの並列処理 (Kafka のパーティション数と Kinesis のシャード数) とアプリケーションの並列処理の間に不一致がある可能性があります。単純な設計では、アプリケーションの並列処理はストリームの並列処理を超えることはできません。ソースオペレータの各サブタスクは、1 つ以上のシャード/パーティションからしか読み取ることができません。つまり、シャードが 2つのストリームであり、並列処理が 8 のアプリケーションである場合、ストリームから実際に消費しているのは 2 つのサブタスクだけで、6 つのサブタスクはアイドル状態のままです。これにより、アプリケーションのスループットが大幅に制限される可能性があります。特に、逆シリアル化にコストがかかり、ソース側で実行される場合 (デフォルト)はなおさらです。

この影響を軽減するには、ストリームをスケーリングする方法があります。しかし、それが常に望ましいとは限らないし、可能とも限らない。あるいは、ソースを再構築して、シリアライズを一切行わずに渡すようにすることもできます。あるいは、シリアル化を行わずにbyte[]を渡すようにソースを再構築することもできます。その後、データを再調整してすべてのタスクに均等に分散して、そこでデータを逆シリアル化できます。この方法では、すべてのサブタスクを逆シリアル化に利用できるようになり、この高価になる可能性のある操作がストリームのシャード/パーティションの数に制限されなくなります。

Studio ノートブックの更新間隔

段落結果の更新間隔を変更する場合は、1000 ミリ秒以上の値に設定してください。

Studio ノートブックの最適なパフォーマンス

次のステートメントでテストし、 events-per-second を乗算した値が 25,000,000 未満の場合、最適なパフォーマンスを得number-of-keysました。events-per-second は150,000 未満でした。

SELECT key, sum(value) FROM key-values GROUP BY key

ウォーターマーク戦略とアイドルシャードがタイムウィンドウに与える影響

Apache Kafka と Kinesis Data Streamsからイベントを読み取るとき、ソースはストリームの属性に基づいてイベント時間を設定できます。Kinesis の場合、イベント時間はイベントのおおよその到着時間と等しくなります。ただし、Flink アプリケーションがイベント時間を使用するには、イベントのソースでイベント時間を設定するだけでは十分ではありません。ソースは、イベント時間に関する情報をソースから他のすべてのオペレーターに伝達するウォーターマークを生成する必要があります。Flink のドキュメントには、そのプロセスがどのように実行するかについての概要が書かれています。

デフォルトでは、Kinesis から読み取られたイベントのタイムスタンプは、Kinesis によって決定されておおよその到着時刻に設定されます。アプリケーションでイベント時間が機能するための追加の前提条件は、ウォーターマーク戦略です。

WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));

次に、ウォーターマーク戦略を assignTimestampsAndWatermarks メソッドで DataStream に適用します。便利な組み込み戦略がいくつかあります。

  • forMonotonousTimestamps() はイベント時間 (おおよその到着時間) だけを使用して、(特定のサブタスクごとに) 定期的に最大値をウォーターマークとして出力します。

  • forBoundedOutOfOrderness(Duration.ofSeconds(...)) は前のストラテジーと似ていますが、ウォーターマークの生成にはイベント時間、つまり継続時間を使用します。

Flink ドキュメンテーションから:

ソース関数の各並列サブタスクは通常、ウォーターマークを個別に生成します。これらのウォーターマークは、その特定の並列ソースでのイベント時間を定義します。

ウォーターマークがストリーミングプログラムを通過するにつれて、ウォーターマークが到着したオペレーターのイベント時間を進めます。  オペレータがイベント時間を進めるたびに、後続のオペレーターのために下流に新しいウォーターマークを生成します。

一部のオペレーターは複数の入力ストリームを消費します。 たとえば、ユニオン、または keyBy(…) 関数や Partition(…) 関数に続くオペレータなどです。このようなオペレータの現在のイベント時間は、入力ストリームのイベント時間の最小値です。入力ストリームがイベント時間を更新すると、オペレータもイベント時間を更新します。

つまり、ソースサブタスクがアイドルシャードから消費している場合、ダウンストリームオペレータはそのサブタスクから新しいウォーターマークを受け取らないため、タイムウィンドウを使用するすべてのダウンストリームオペレータの処理が停止します。これを避けるために、顧客はウォーターマークストラテジーに withIdleness オプションを追加することができます。このオプションでは、オペレータは、オペレータのイベント時間を計算するときにアイドル状態のアップストリームサブタスクからウォーターマークを除外します。したがって、アイドル状態のサブタスクは、ダウンストリーム演算子のイベント時間の進行をブロックしなくなりました。

ただし、サブタスクがイベントを読み取っていない場合、つまりストリームにイベントがない場合、組み込みウォーターマーク戦略のアイドル状態オプションはイベント時間を早めません。有限セットのイベントがストリームから読み取られるテスト ケースは特に顕著になります。最後のイベントが読み取られた後、イベント時間は進行しないため、最後のウィンドウ (最後のイベントを含む) は閉じません。

概要

  • シャードがアイドル状態の場合、withIdlenessこの設定では新しいウォーターマークは生成されません。アイドル状態のサブタスクによって送信された最後のウォーターマークは、ダウンストリーム演算子の最小ウォーターマーク計算から除外されます。

  • 組み込みウォーターマーク戦略では、最後のオープンウィンドウは閉じられません (ウォーターマークを進める新しいイベントが送信されますが、新しいウィンドウが作成されて開いたままになる場合を除きます)。

  • 時刻が Kinesis ストリームによって設定されている場合でも、あるシャードが他のシャードよりも速く消費された場合 (アプリケーションの初期化中や、親子関係を無視してすべての既存のシャードTRIM_HORIZONが並行して消費される場合など)、遅延到着イベントが発生する可能性があります。

  • ウォーターマーク戦略withIdlenessの設定は、アイドルシャード の Kinesis ソース固有の設定を中断しているように見えます(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS

次のアプリケーションはストリームから読み取って、イベント時間に基づいてセッションウィンドウを作成しています。

Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });

次の例では、8つのイベントが16シャード ストリームに書き込まれます(最初の2つと最後のイベントは偶然に同じシャードに配置されます)。

$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022

この入力により、イベント1、2、3、イベント4、5、イベント6、イベント7、イベント8の5つのセッションウィンドウが生成されるはずです。ただし、このプログラムでは最初の4つのウィンドウしか生成されません。

11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7

出力には4つのウィンドウしか表示されていません (イベント8を含む最後のウィンドウはありません)。これはイベント時間とウォーターマーク戦略によるものです。構築済みのウォーターマーク戦略では、ストリームから読み取られた最後のイベントの時間を超えて時間が進むことはないため、最後のウィンドウを閉じることはできません。ただし、ウィンドウを閉じるには、最後のイベントから10秒以上経過する必要があります。この場合、最後のウォーターマークは 2022-03-23T10:21:27.170Z ですが、セッションウィンドウを閉じるには、ウォーターマークが 10 秒と 1 ミリ秒後に必要です。

withIdleness オプションをウォーターマーク戦略から削除すると、ウィンドウ演算子の「グローバルウォーターマーク」が進むことができないため、セッションウィンドウは閉じられません。

Flink アプリケーションの起動時 (またはデータスキューがある場合)、一部のシャードは他のシャードよりも速く消費される可能性があります。これにより、サブタスクから一部のウォーターマークが早すぎる可能性があります (サブタスクは、サブスクライブしている他のシャードから消費することなく、1 つのシャードのコンテンツに基づいてウォーターマークを出力する場合があります)。緩和方法は、安全バッファを追加する(forBoundedOutOfOrderness(Duration.ofSeconds(30))か、遅延到着イベントを明示的に許可するさまざまなウォーターマーク戦略です(allowedLateness(Time.minutes(5))

すべてのオペレータに UUID を設定

Apache Flink 用 Managed Service がスナップショットを持つアプリケーションの Flink ジョブを開始するとき、何らかの問題で Flink ジョブが起動できないことがあります。その1つはオペレータ ID の不一致です。Flink では、Flink のジョブグラフオペレータには明示的で一貫性のあるオペレータ ID が必要です。明示的に設定しない場合、Flink は演算子の ID を生成します。これは、Flink がこれらのオペレータ ID を使用してジョブグラフ内のオペレータを一意に識別し、それを使用して各オペレータの状態をセーブポイントに保存するためです。

「オペレータ ID の不一致」の問題は、Flink がジョブグラフのオペレータ ID と、セーブポイントで定義されたオペレータ ID との間で 1:1 のマッピングを見つけられない場合に発生します。これは、明示的な整合性のあるオペレータ IDs が設定されておらず、Flink がすべてのジョブグラフの作成と整合性のないオペレータ IDs を生成する場合に発生します。メンテナンスの実行中にアプリケーションがこの問題に遭遇する可能性が高くなります。これを回避するには、Flink コード内のすべての演算子に UUID を設定することをお勧めします。詳細については、「プロダクションレディネス」段階にある「すべてのオペレータに UUID を設定する」トピックを参照してください。

Maven シェードプラグインに ServiceResourceTransformer を追加する

FlinkはJavaのサービスプロバイダーインターフェース (SPI)を使用して、コネクターやフォーマットなどのコンポーネントをロードします。SPI を使用する複数の Flink 依存関係により、uber-jar の競合や予期しないアプリケーション動作が発生する可能性があります。pom.xml で定義されている Maven シェードプラグインの ServiceResourceTransformer を追加することをお勧めします。

<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>