Amazon Managed Service for Apache Flink (Amazon MSF) は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
Managed Service for Apache Flink アプリケーションのベストプラクティスを維持する
このセクションでは、安定した高性能の Managed Service for Apache Flink アプリケーションを開発するための情報および推薦事項が記載されています。
トピック
uber JAR のサイズを最小限にする
Java/Scala アプリケーションは uber (super/fat) JAR でパッケージ化され、ランタイムによってまだ提供されていない追加の必要な依存関係がすべて含まれている必要があります。ただし、uber JAR のサイズはアプリケーションの起動時間および再起動時間に影響し、JAR が 512 MB の上限を超える可能性があります。
デプロイ時間を最適化するには、uber JAR に次の内容を含めないでください。
-
次の例で示されるように、ランタイムによって提供される依存関係。Gradle 設定の POM ファイルまたは
compileOnlyにprovidedスコープが必要です。 -
例えば JUnit や Mockito など、テストにのみ使用される依存関係。Gradle 設定の POM ファイルまたは
testImplementationにtestスコープが必要です。 -
アプリケーションで実際に使用されていない依存関係。
-
アプリケーションに必要な静的データまたはメタデータ。静的データは、ランタイム時にアプリケーションによって、例えばデータストアや Amazon S3 などからロードされる必要があります。
-
上記の設定の詳細については、この「POM example file
」を参照してください。
提供された依存関係
Managed Service for Apache Flink ランタイムは、数多くの依存関係を提供します。これらの依存関係は fat JAR に含めないで POM ファイルに provided スコープを持つか、maven-shade-plugin 設定で明示的に除外する必要があります。fat JAR に含まれるこれらの依存関係はランタイム時に無視されますが、JAR のサイズが大きくなり、デプロイ中にオーバーヘッドが追加されます。
ランタイムバージョン 1.18、1.19、1.20 でランタイムによって提供される依存関係
-
org.apache.flink:flink-core -
org.apache.flink:flink-java -
org.apache.flink:flink-streaming-java -
org.apache.flink:flink-scala_2.12 -
org.apache.flink:flink-table-runtime -
org.apache.flink:flink-table-planner-loader -
org.apache.flink:flink-json -
org.apache.flink:flink-connector-base -
org.apache.flink:flink-connector-files -
org.apache.flink:flink-clients -
org.apache.flink:flink-runtime-web -
org.apache.flink:flink-metrics-code -
org.apache.flink:flink-table-api-java -
org.apache.flink:flink-table-api-bridge-base -
org.apache.flink:flink-table-api-java-bridge -
org.apache.logging.log4j:log4j-slf4j-impl -
org.apache.logging.log4j:log4j-api -
org.apache.logging.log4j:log4j-core -
org.apache.logging.log4j:log4j-1.2-api
さらに、ランタイムは Managed Service for Apache Flink でアプリケーションランタイムプロパティを取得するために使用されるライブラリを提供します (com.amazonaws:aws-kinesisanalytics-runtime:1.2.0)。
ランタイムによって提供されるすべての依存関係は uber JAR に含めないように、次の推奨事項を使用する必要があります。
-
Maven (
pom.xml) と SBT (build.sbt) では、providedスコープを使用します。 -
Gradle (
build.gradle) では、compileOnly設定を使用します。
Apache Flink の親優先のクラスロードにより、uber JAR に誤って含まれてしまう依存関係はランタイム時に無視されます。詳細については、「Apache Beam ドキュメント」の「parent-first-patterns
Connector
ランタイムに含まれていないほとんどのコネクタ (FileSystem コネクタを除く) は、デフォルトのスコープ (compile) で POM ファイルに含める必要があります。
その他の推奨事項:
ルールとして、Managed Service for Apache Flink に提供される Apache Flink uber JAR には、アプリケーションの実行に必要な最小限のコードが含まれている必要があります。ソースクラス、テストデータセット、ブートストラップ状態を含む依存関係は、この jar には含めないでください。ランタイム時に静的リソースをプルする必要がある場合、この懸念を Amazon S3 などのリソースに分離します。このような例には、状態ブートストラップや推論モデルなどがあります。
時間をかけて詳細な依存関係ツリーを検討し、ランタイム以外の依存関係を削除してください。
Managed Service for Apache Flink は 512MB の jar サイズをサポートしていますが、これはルールの例外と見なされます。現在、Apache Flink はデフォルト設定で最大 104 MB の jar サイズをサポートしており、これが必要な jar の最大ターゲットサイズです。
フォールトトレランス:チェックポイントとセーブポイント
チェックポイントおよびセーブポイントを使用して、Managed Service for Apache Flink アプリケーションに耐障害性を実装します。アプリケーションの開発およびメンテナンスを行うときは、以下のことを考える必要があります。
アプリケーションにチェックポイントを有効にしておくことをお勧めします。チェックポイントは定期メンテナンスだけでなく、サービスの問題、アプリケーションの依存関係による障害、その他の問題による予期しない障害時にも、アプリケーションの耐障害性を提供します。メンテナンスの詳細については、「Managed Service for Apache Flink のメンテナンスタスクを管理する」を参照してください。
アプリケーションの開発時またはトラブルシューティング時には、ApplicationSnapshotConfiguration:: SnapshotsEnabledを
falseに設定します。アプリケーションが停止するたびにスナップショットが作成されるため、アプリケーションが異常な状態であったり、パフォーマンスが低下したりすると問題が発生する可能性があります。アプリケーションが実稼働環境で安定した状態にはいった後SnapshotsEnabledをtrueに設定します。注記
アプリケーションが 1 日に数回スナップショットを作成し、正しい状態データで適切に再起動されるように設定することをお勧めします。スナップショットの正しい頻度は、アプリケーションのビジネスロジックによって異なります。頻繁にスナップショットを作成すると、より新しいデータを復元できますが、コストが増加して必要なシステムリソースが増えます。
アプリケーションのダウンタイムのモニタリングについては、Managed Service for Apache Flink でのメトリクスおよびディメンション を参照してください。
障害耐性の詳細については、「耐障害性を実装する」を参照してください。
サポートされていないコネクタのバージョン。
Apache Flink バージョン 1.15 以降では、アプリケーション JAR にバンドルされたサポート対象外の Kinesis コネクタを使用している場合、Managed Service for Apache Flink はアプリケーションの起動や更新を自動的に防止します。Managed Service for Apache Flink バージョンを 1.15 以降にアップグレードするときは、最新の Kinesis コネクタを使用していることを確認してください。これはバージョン 1.15.2 と同じかそれより新しいバージョンです。Stop with Savepoint 機能で整合性問題や障害を引き起こして、クリーン停止/更新の操作を妨げる可能性があるため、その他のバージョンはすべて Managed Service for Apache Flink でサポートされません。Amazon Managed Service for Apache Flink バージョンのコネクタ互換性の詳細については、「Apache Flink connectors」を参照してください。
パフォーマンスと並列処理
アプリケーションの並列処理を調整し、パフォーマンスの落とし穴を避けることで、アプリケーションをあらゆるスループットレベルに合わせて拡張できます。アプリケーションの開発およびメンテナンスを行うときは、以下のことを考える必要があります。
すべてのアプリケーションのソースとシンクが十分にプロビジョニングされており、スロットルされていないことを確認します。ソースとシンクが他の AWS サービスである場合は、CloudWatchを使用してそれらのサービスを監視します。
並列処理が非常に高いアプリケーションの場合は、アプリケーション内のすべての演算子に高レベルの並列処理が適用されているかどうかを確認してください。デフォルトでは、Apache Flink はアプリケーショングラフ内のすべてのオペレータに同じアプリケーション並列を適用します。これにより、ソースまたはシンクにおけるプロビジョニングの問題、またはオペレーターのデータ処理のボトルネックが発生する可能性があります。SetParallelism
を使用すると、コードの各オペレータの並列処理を変更できます。 アプリケーションのオペレータの並列処理設定の意味を理解してください。オペレーターの並列処理を変更すると、オペレーターの並列処理が現在の設定と互換性がないときに作成されたスナップショットからアプリケーションを復元できない場合があります。オペレータの並列処理の設定の詳細について、オペレータの最大並列処理を明示的に設定する
を参照してください。
簡易スケーリングについての詳細は、「アプリケーションスケーリングを実装する」を参照してください。
オペレータごとの並列処理の設定
デフォルトでは、すべてのオペレータにアプリケーションレベルで並列処理が設定されます。DataStream APIと.setParallelism(x)を使用すると、1つのオペレータの並列処理をオーバーライドできます。オペレータの並列処理は、アプリケーションの並列処理と同じかそれ以下の任意の並列処理に設定できます。
可能であれば、オペレータの並列処理をアプリケーション並列処理の関数として定義してください。このようにすると、演算子の並列処理はアプリケーションの並列処理によって変化します。たとえば、オートスケーリングを使用している場合は、すべてのオペレータの並列処理が同じ比率で変化します。
int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);
場合によっては、オペレータの並列処理を定数に設定することをお勧めします。たとえば、Kinesis Stream ソースの並列処理をシャードの数に設定します。このような場合、コードを変更せずにオペレータの並列処理をアプリケーション設定パラメーターとして渡して変更することを検討してください (例えば、ソースストリームの再シャードなど)。
ログ記録
CloudWatch Logs を使用して、アプリケーションのパフォーマンスとエラー状態をモニタリングできます。アプリケーションのロギングを設定するときは、以下のことを考える必要があります。
実行時の問題をデバッグできるように、アプリケーションの CloudWatch ロギングを有効にします。
アプリケーションで処理されているすべてのレコードについてログエントリを作成しないでください。処理中に深刻なボトルネックが発生し、データ処理でバックプレッシャーにつながる可能性があります。
アプリケーションが正常に動作していない場合に、通知のCloudWatchアラームを作成します。詳細については、Amazon Managed Service for Apache Flink で CloudWatch アラームを使用するを参照してください。
SCIM を実装する方法の詳細については、「」を参照してください。
コーディング
推薦プログラミング手法で、アプリケーションのパフォーマンスと安定性を高めることができます。アプリケーションコードを作成する際は、以下の事を考える必要があります。
アプリケーションコードの
system.exit()、アプリケーションのmainメソッド、またはユーザー定義関数では使用しないでください。コードからアプリケーションをシャットダウンする場合は、アプリケーションで問題が発生したことに関するメッセージを含むExceptionまたはRuntimeExceptionから派生した例外をスローします。サービスがこの例外を処理する方法については、以下の点に注意してください。
例外がアプリケーションの
mainメソッドからスローされた場合、アプリケーションがRUNNINGステータスに移行したときにサービスがProgramInvocationExceptionでラップし、ジョブマネージャーはジョブの送信に失敗します。例外がユーザー定義関数からスローされた場合、ジョブ・マネージャーはそのジョブを失敗させて再起動し、例外の詳細が例外ログに書き込まれます。
アプリケーション JAR ファイルとそれに含まれる依存関係をシェーディングすることを検討してください。アプリケーションと Apache Flink ランタイムの間でパッケージ名が競合する可能性がある場合は、シェーディングをお勧めします。競合が発生すると、アプリケーションログにタイプ
java.util.concurrent.ExecutionExceptionの例外が含まれる可能性があります。アプリケーション JAR ファイルのシェーディングの詳細について、Apache Maven Shade プラグインを参照してください。
ルート認証情報の管理。
長期認証情報を実稼働環境 (またはその他の)アプリケーションに組み込むべきではありません。長期認証情報はバージョン管理システムにチェックインされる可能性が高くて、簡単に紛失する可能性があります。代わりに、Managed Service for Apache Flink アプリケーションにロールを関連付けて、そのロールにアクセス許可を付与することができます。実行中の Flink アプリケーションは、それぞれのアクセス許可を持つ一時的な認証情報を環境から選択できます。IAM とネイティブに統合されていないサービスに認証が必要な場合 (例えば、認証にユーザー名およびパスワードが必要なデータベース)、AWS Secrets Manager
多くの AWS ネイティブサービスが認証をサポートしています。
Kinesis Data Streams— ProcessTaxiStream.java
Amazon MSK — https://github.com/aws/aws-msk-iam-auth/#using-the-amazon-msk-library-for-iam-authentication
Amazon Elasticsearch Service — AmazonElasticsearchSink.java
Amazon S3 – works out of the box on Managed Service for Apache Flink
シャード/パーティションが少ないソースからの読み取り
Apache Kafka または Kinesis Data Stream から読み取るとき、ストリームの並列処理 (Kafka のパーティション数および Kinesis のシャード数) およびアプリケーションの並列処理が一致しない場合があります。単純な設計では、アプリケーションの並列処理はストリームの並列処理を超えることはできません。ソースオペレータの各サブタスクは、1 つ以上のシャード/パーティションからしか読み取ることができません。つまり、シャードが 2つのストリームであり、並列処理が 8 のアプリケーションである場合、ストリームから実際に消費しているのは 2 つのサブタスクだけで、6 つのサブタスクはアイドル状態のままです。これにより、アプリケーションのスループットが大幅に制限される可能性があります。特に、逆シリアル化にコストがかかり、ソース側で実行される場合 (デフォルト)はなおさらです。
この影響を軽減するには、ストリームをスケーリングする方法があります。しかし、それが常に望ましいとは限らないし、可能とも限らない。あるいは、ソースを再構築して、シリアライズを一切行わずに渡すようにすることもできます。あるいは、シリアル化を行わずにbyte[]を渡すようにソースを再構築することもできます。その後、データを再調整
Studio ノートブックの更新間隔
段落結果の更新間隔を変更する場合は、1000 ミリ秒以上の値に設定してください。
Studio ノートブックの最適なパフォーマンス
次のステートメントでテストしたところ、events-per-second と number-of-keys の積が 25,000,000 未満のときに最適なパフォーマンスが得られました。events-per-second は150,000 未満でした。
SELECT key, sum(value) FROM key-values GROUP BY key
ウォーターマーク戦略とアイドルシャードがタイムウィンドウに与える影響
Apache Kafka と Kinesis Data Streamsからイベントを読み取るとき、ソースはストリームの属性に基づいてイベント時間を設定できます。Kinesis の場合、イベント時間はイベントのおおよその到着時間と等しくなります。ただし、Flink アプリケーションがイベント時間を使用するには、イベントのソースでイベント時間を設定するだけでは十分ではありません。ソースは、イベント時間に関する情報をソースから他のすべてのオペレーターに伝達するウォーターマークを生成する必要があります。Flink のドキュメント
デフォルトでは、Kinesis から読み取られたイベントのタイムスタンプは、Kinesis によって決定されておおよその到着時刻に設定されます。アプリケーションでイベント時間が機能するための追加の前提条件は、ウォーターマーク戦略です。
WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));
次に、ウォーターマーク戦略を assignTimestampsAndWatermarks メソッドで DataStream に適用します。便利なビルトイン戦略がいくつかあります。
-
forMonotonousTimestamps()はイベント時間 (おおよその到着時間) だけを使用して、(特定のサブタスクごとに) 定期的に最大値をウォーターマークとして出力します。 -
forBoundedOutOfOrderness(Duration.ofSeconds(...))は前のストラテジーと似ていますが、ウォーターマークの生成にはイベント時間、つまり継続時間を使用します。
ソース関数の各並列サブタスクは通常、ウォーターマークを個別に生成します。これらのウォーターマークは、その特定の並列ソースでのイベント時間を定義します。
ウォーターマークがストリーミングプログラムを通過するにつれて、ウォーターマークが到着したオペレーターのイベント時間を進めます。 オペレータがイベント時間を進めるたびに、後続のオペレーターのために下流に新しいウォーターマークを生成します。
一部のオペレーターは複数の入力ストリームを消費します。 たとえば、ユニオン、または keyBy(…) 関数や Partition(…) 関数に続くオペレータなどです。このようなオペレータの現在のイベント時間は、入力ストリームのイベント時間の最小値です。入力ストリームがイベント時間を更新すると、オペレータもイベント時間を更新します。
つまり、ソースサブタスクがアイドルシャードから消費している場合、ダウンストリームオペレータはそのサブタスクから新しいウォーターマークを受け取らないため、タイムウィンドウを使用するすべてのダウンストリームオペレータの処理が停止します。これを避けるために、顧客はウォーターマークストラテジーに withIdleness オプションを追加することができます。このオプションを使用すると、オペレータのイベント時間を計算するときに、オペレーターはアイドル状態のアップストリームサブタスクからウォーターマークを除外します。したがって、アイドル状態のサブタスクはダウンストリームオペレータのイベント時間の進行をブロックしなくなります。
使用するシャード割り当て機能により、一部のワーカーに Kinesis シャードが割り当てられない場合があります。この場合、すべての Kinesis シャードが継続的にイベントデータを配信しても、これらのワーカーはアイドル状態のソース動作をマニフェストします。ソースオペレータで uniformShardAssigner を使用することで、このリスクを軽減できます。ワーカー数がアクティブなシャード数以下である限り、すべてのソースサブタスクに処理するシャードが確保されます。
ただし、イベントを読み取っているサブタスクがない場合 (つまり、ストリームにイベントが存在しない)、組み込みのウォーターマーク戦略を使用したアイドル状態オプションはイベント時間を進めません。有限セットのイベントがストリームから読み取られるテスト ケースは特に顕著になります。最後のイベントが読み込まれた後にイベント時間が進まないため、最後のウィンドウ (最後のイベントを含む) は閉じません。
概要
シャードがアイドル状態の場合、
withIdleness設定では新しいウォーターマークは生成されません。アイドル状態のサブタスクによって送信された最後のウォーターマークは、ダウンストリームオペレータの最小ウォーターマーク計算から除外されます。組み込みのウォーターマーク戦略を使用すると、最後に開いたウィンドウは閉じません (ウォーターマークを進める新しいイベントが送信される場合を除くが、これによって新しいウィンドウが作成されて開いたままになる)。
Kinesis ストリームによって時間が設定されても、1 つのシャードが他のシャードよりも早く消費される場合、遅延到着イベントが引き続き発生する可能性があります (例えばアプリの初期化中、あるいは既存のすべてのシャードが親子関係を無視して並行で消費される
TRIM_HORIZONを使用している場合)。ウォーターマーク戦略の
withIdleness設定は、アイドル状態のシャード(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLISの Kinesis ソース固有設定を中断するようです。
例
次のアプリケーションはストリームから読み取って、イベント時間に基づいてセッションウィンドウを作成しています。
Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });
次の例では、8つのイベントが16シャード ストリームに書き込まれます(最初の2つと最後のイベントは偶然に同じシャードに配置されます)。
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022
この入力により、イベント1、2、3、イベント4、5、イベント6、イベント7、イベント8の5つのセッションウィンドウが生成されるはずです。ただし、このプログラムでは最初の4つのウィンドウしか生成されません。
11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7
出力には4つのウィンドウしか表示されていません (イベント8を含む最後のウィンドウはありません)。これはイベント時間とウォーターマーク戦略によるものです。プレビルドのウォーターマーク戦略では、ストリームから読み込まれた最後のイベントの時間を過ぎて時間が進むことはないため、最後のウィンドウを閉じることができません。ただし、ウィンドウを閉じるには、最後のイベントから10秒以上経過する必要があります。この場合、最後のウォーターマークは「2022-03-23T10:21:27.170Z」ですが、セッションウィンドウを閉じるには、10 秒と 1 ミリ秒後のウォーターマークが必要です。
withIdleness オプションがウォーターマーク戦略から削除される場合、ウィンドウオペレータの「グローバルウォーターマーク」が進めないため、セッションウィンドウは閉じられなくなります。
Flink アプリケーションが起動されると (またはデータスキューがある場合)、一部のシャードは他のシャードよりも早く消費される可能性があります。サブタスクによる一部のウォーターマークの出力が早すぎる場合があります (サブスクライブしている他のシャードから消費せず、サブタスクは 1 つのシャードの内容に基づいてウォーターマークを出力する場合がある)。軽減する方法としては、安全バッファー (forBoundedOutOfOrderness(Duration.ofSeconds(30)) を追加したり、到着が遅れるイベント (allowedLateness(Time.minutes(5)) を明示的に許可したりするさまざまなウォーターマーク戦略があります。
すべてのオペレータに UUID を設定
Apache Flink 用 Managed Service がスナップショットを持つアプリケーションの Flink ジョブを開始するとき、何らかの問題で Flink ジョブが起動できないことがあります。その 1 つは オペレータ ID の不一致 です。Flink では、Flink のジョブグラフオペレータには明示的で一貫性のあるオペレータ ID が必要です。明示的に設定されていない場合、Flink はオペレータに ID を生成します。これは、Flink がこれらのオペレータ ID を使用してジョブグラフ内のオペレータを一意に識別し、それを使用して各オペレータの状態をセーブポイントに保存するためです。
オペレータ ID の不一致問題は、Flink がジョブグラフのオペレータ ID とセーブポイントで定義されたオペレータ ID の間に 1:1 のマッピングが見つからない場合に発生します。明示的な一貫性のあるオペレーター ID が設定されておらず、Flink がすべてのジョブグラフ作成と一致しないオペレーター ID を生成したときに発生します。メンテナンスの実行中にアプリケーションがこの問題に遭遇する可能性が高くなります。これを回避するには、Flink コードではすべてのオペレータに UUID を設定することをお勧めします。詳細については、「Production readiness」の「Set a UUID for all operators」トピックを参照してください。
ServiceResourceTransformer を Maven Shade プラグインに追加する
FlinkはJavaのサービスプロバイダーインターフェース (SPI)
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>