Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Managed Service for Apache Flink アプリケーションのベストプラクティスを維持する
このセクションでは、安定したパフォーマンスの Managed Service for Apache Flink アプリケーションを開発するための情報と推奨事項について説明します。
トピック
uber JAR のサイズを最小限に抑える
Java/Scala アプリケーションは uber (super/fat) JAR にパッケージ化し、ランタイムによってまだ提供されていない追加の必要な依存関係をすべて含める必要があります。ただし、uber JAR のサイズはアプリケーションの起動時間と再起動時間に影響し、JAR が 512 MB の制限を超える可能性があります。
デプロイ時間を最適化するには、uber JAR に以下を含めないでください。
-
次の例に示すように、ランタイムによって提供される依存関係。POM ファイルまたは
compileOnly
Gradle 設定にprovided
スコープが必要です。 -
JUnit や Mockito など、テストにのみ使用される依存関係。POM ファイルまたは
testImplementation
Gradle 設定にtest
スコープが必要です。 -
アプリケーションで実際に使用されていない依存関係。
-
アプリケーションに必要な静的データまたはメタデータ。静的データは、データストアや Amazon S3 など、実行時にアプリケーションによってロードされる必要があります。
-
上記の設定の詳細については、この POM サンプルファイル
を参照してください。
提供された依存関係
Managed Service for Apache Flink ランタイムは、多くの依存関係を提供します。これらの依存関係は fat JAR に含めず、POM ファイルprovided
の範囲を持つか、maven-shade-plugin
設定で明示的に除外する必要があります。fat JAR に含まれるこれらの依存関係はいずれも実行時に無視されますが、JAR のサイズが大きくなり、デプロイ中にオーバーヘッドが追加されます。
ランタイムバージョン 1.18、1.19、および 1.20 でランタイムによって提供される依存関係:
-
org.apache.flink:flink-core
-
org.apache.flink:flink-java
-
org.apache.flink:flink-streaming-java
-
org.apache.flink:flink-scala_2.12
-
org.apache.flink:flink-table-runtime
-
org.apache.flink:flink-table-planner-loader
-
org.apache.flink:flink-json
-
org.apache.flink:flink-connector-base
-
org.apache.flink:flink-connector-files
-
org.apache.flink:flink-clients
-
org.apache.flink:flink-runtime-web
-
org.apache.flink:flink-metrics-code
-
org.apache.flink:flink-table-api-java
-
org.apache.flink:flink-table-api-bridge-base
-
org.apache.flink:flink-table-api-java-bridge
-
org.apache.logging.log4j:log4j-slf4j-impl
-
org.apache.logging.log4j:log4j-api
-
org.apache.logging.log4j:log4j-core
-
org.apache.logging.log4j:log4j-1.2-api
さらに、ランタイムは Managed Service for Apache Flink、 でアプリケーションランタイムプロパティを取得するために使用される ライブラリを提供しますcom.amazonaws:aws-kinesisanalytics-runtime:1.2.0
。
ランタイムによって提供されるすべての依存関係は、uber JAR に含めないように、次の推奨事項を使用する必要があります。
-
Maven (
pom.xml
) と SBT (build.sbt
) では、provided
スコープを使用します。 -
Gradle (
build.gradle
) で、compileOnly
設定を使用します。
Apache Flink の親ファーストクラスのロードにより、uber JAR に誤って含まれていた依存関係は実行時に無視されます。詳細については、Apache Flink ドキュメントの parent-first-patterns
Connector
ランタイムに含まれていない FileSystem コネクタを除くほとんどのコネクタは、デフォルトのスコープ () を持つ POM ファイルに含める必要がありますcompile
。
その他の推奨事項
原則として、Managed Service for Apache Flink に提供される Apache Flink uber JAR には、アプリケーションの実行に必要な最小限のコードが含まれている必要があります。ソースクラス、テストデータセット、ブートストラップ状態を含む依存関係を含めることは、この jar に含めないでください。実行時に静的リソースを取り込む必要がある場合は、この懸念を Amazon S3 などのリソースに分離します。これには、状態ブートストラップや推論モデルなどがあります。
ディープ依存関係ツリーを検討し、ランタイム以外の依存関係を削除してください。
Managed Service for Apache Flink は 512MB の jar サイズをサポートしていますが、これはルールの例外と見なされます。Apache Flink は現在、デフォルト設定で最大 104 MB の jar サイズをサポートしており、必要な jar の最大ターゲットサイズである必要があります。
フォールトトレランス:チェックポイントとセーブポイント
Managed Service for Apache Flink アプリケーションに耐障害性を実装するには、チェックポイントとセーブポイントを使用します。アプリケーションの開発およびメンテナンスを行うときは、以下のことを考える必要があります。
アプリケーションでチェックポイントを有効にしておくことをお勧めします。チェックポイントは、スケジュールされたメンテナンス中にアプリケーションに耐障害性を提供します。また、サービスの問題、アプリケーションの依存関係の障害、その他の問題による予期しない障害にも耐障害性を提供します。メンテナンスの詳細については、「Managed Service for Apache Flink のメンテナンスタスクを管理する」を参照してください。
アプリケーションの開発時またはトラブルシューティング時には、ApplicationSnapshotConfiguration:: SnapshotsEnabledを
false
に設定します。アプリケーションが停止するたびにスナップショットが作成されるため、アプリケーションが異常な状態であったり、パフォーマンスが低下したりすると問題が発生する可能性があります。アプリケーションが実稼働環境で安定した状態にはいった後SnapshotsEnabled
をtrue
に設定します。注記
正しい状態データで適切に再起動するように、1 日に数回スナップショットを作成するようにアプリケーションを設定することをお勧めします。スナップショットの正しい頻度は、アプリケーションのビジネスロジックによって異なります。スナップショットを頻繁に作成することで、より最近のデータを復元できますが、コストが増加し、より多くのシステムリソースが必要になります。
障害耐性の詳細については、「耐障害性を実装する」を参照してください。
サポートされていないコネクタのバージョン。
Apache Flink バージョン 1.15 以降では、Managed Service for Apache Flink は、アプリケーション JARs。Managed Service for Apache Flink バージョン 1.15 以降にアップグレードする場合は、最新の Kinesis コネクタを使用していることを確認してください。これはバージョン 1.15.2 と同じかそれより新しいバージョンです。他のすべてのバージョンは Managed Service for Apache Flink ではサポートされません。これは、Stop with Savepoint 機能で整合性の問題や障害が発生し、クリーン停止/更新オペレーションが妨げられる可能性があるためです。Amazon Managed Service for Apache Flink バージョンのコネクタ互換性の詳細については、「Apache Flink コネクタ」を参照してください。
パフォーマンスと並列処理
アプリケーションの並列処理を調整し、パフォーマンスの落とし穴を避けることで、アプリケーションをあらゆるスループットレベルに合わせて拡張できます。アプリケーションの開発およびメンテナンスを行うときは、以下のことを考える必要があります。
すべてのアプリケーションのソースとシンクが十分にプロビジョニングされており、スロットルされていないことを確認します。ソースとシンクが他の AWS サービスである場合は、CloudWatch を使用してそれらのサービスをモニタリングします。
並列処理が非常に高いアプリケーションの場合は、アプリケーション内のすべての演算子に高レベルの並列処理が適用されているかどうかを確認してください。デフォルトでは、Apache Flink はアプリケーショングラフ内のすべてのオペレータに同じアプリケーション並列を適用します。これにより、ソースまたはシンクにおけるプロビジョニングの問題、またはオペレーターのデータ処理のボトルネックが発生する可能性があります。SetParallelism
を使用すると、コードの各オペレータの並列処理を変更できます。 アプリケーションのオペレータの並列処理設定の意味を理解してください。オペレーターの並列処理を変更すると、オペレーターの並列処理が現在の設定と互換性がないときに作成されたスナップショットからアプリケーションを復元できない場合があります。オペレータの並列処理の設定の詳細について、オペレータの最大並列処理を明示的に設定する
を参照してください。
簡易スケーリングについての詳細は、「アプリケーションのスケーリングを実装する」を参照してください。
オペレータごとの並列処理の設定
デフォルトでは、すべてのオペレータにアプリケーションレベルで並列処理が設定されます。DataStream APIと.setParallelism(x)
を使用すると、1つのオペレータの並列処理をオーバーライドできます。オペレータの並列処理は、アプリケーションの並列処理と同じかそれ以下の任意の並列処理に設定できます。
可能であれば、オペレータの並列処理をアプリケーション並列処理の関数として定義してください。このようにすると、演算子の並列処理はアプリケーションの並列処理によって変化します。たとえば、オートスケーリングを使用している場合は、すべてのオペレータの並列処理が同じ比率で変化します。
int appParallelism = env.getParallelism(); ... ...ops.setParalleism(appParallelism/2);
場合によっては、オペレータの並列処理を定数に設定することをお勧めします。たとえば、Kinesis Stream ソースの並列処理をシャードの数に設定します。このような場合は、ソースストリームを再シャードするなど、コードを変更せずにオペレータ並列処理をアプリケーション設定パラメータとして渡すことを検討してください。
ログ記録
CloudWatch Logs を使用して、アプリケーションのパフォーマンスとエラー状態をモニタリングできます。アプリケーションのロギングを設定するときは、以下のことを考える必要があります。
SCIM を実装する方法の詳細については、「」を参照してください。
コーディング
推薦プログラミング手法で、アプリケーションのパフォーマンスと安定性を高めることができます。アプリケーションコードを作成する際は、以下の事を考える必要があります。
アプリケーションコードの
system.exit()
、アプリケーションのmain
メソッド、またはユーザー定義関数では使用しないでください。コードからアプリケーションをシャットダウンする場合は、アプリケーションで問題が発生したことに関するメッセージを含むException
またはRuntimeException
から派生した例外をスローします。サービスがこの例外を処理する方法については、以下の点に注意してください。
例外がアプリケーションの
main
メソッドからスローされた場合、アプリケーションがRUNNING
ステータスに移行したときにサービスがProgramInvocationException
でラップし、ジョブマネージャーはジョブの送信に失敗します。例外がユーザー定義関数からスローされた場合、ジョブ・マネージャーはそのジョブを失敗させて再起動し、例外の詳細が例外ログに書き込まれます。
アプリケーション JAR ファイルとそれに含まれる依存関係をシェーディングすることを検討してください。アプリケーションと Apache Flink ランタイムの間でパッケージ名が競合する可能性がある場合は、シェーディングをお勧めします。競合が発生すると、アプリケーションログにタイプ
java.util.concurrent.ExecutionException
の例外が含まれる可能性があります。アプリケーション JAR ファイルのシェーディングの詳細について、Apache Maven Shade プラグインを参照してください。
ルート認証情報の管理。
長期認証情報を実稼働環境 (またはその他の)アプリケーションに組み込むべきではありません。長期認証情報はバージョン管理システムにチェックインされる可能性が高くて、簡単に紛失する可能性があります。代わりに、ロールを Managed Service for Apache Flink アプリケーションに関連付け、そのロールにアクセス許可を付与できます。実行中の Flink アプリケーションは、環境からそれぞれのアクセス許可を持つ一時的な認証情報を選択できます。認証にユーザー名とパスワードを必要とするデータベースなど、IAM とネイティブに統合されていないサービスに認証が必要な場合は、AWS Secrets Manager
多くの AWS ネイティブサービスは認証をサポートしています。
Kinesis Data Streams— ProcessTaxiStream.java
Amazon MSK — https://github.com/aws/aws-msk-iam-auth/#using-the-amazon-msk-library-for-iam-authentication
Amazon Elasticsearch Service — AmazonElasticsearchSink.java
Amazon S3 – works out of the box on Managed Service for Apache Flink
シャード/パーティションが少ないソースからの読み取り
Apache Kafka または Kinesis Data Stream から読み取る場合、ストリームの並列処理 (Kafka のパーティション数と Kinesis のシャード数) とアプリケーションの並列処理の間に不一致がある可能性があります。単純な設計では、アプリケーションの並列処理はストリームの並列処理を超えることはできません。ソースオペレータの各サブタスクは、1 つ以上のシャード/パーティションからしか読み取ることができません。つまり、シャードが 2つのストリームであり、並列処理が 8 のアプリケーションである場合、ストリームから実際に消費しているのは 2 つのサブタスクだけで、6 つのサブタスクはアイドル状態のままです。これにより、アプリケーションのスループットが大幅に制限される可能性があります。特に、逆シリアル化にコストがかかり、ソース側で実行される場合 (デフォルト)はなおさらです。
この影響を軽減するには、ストリームをスケーリングする方法があります。しかし、それが常に望ましいとは限らないし、可能とも限らない。あるいは、ソースを再構築して、シリアライズを一切行わずに渡すようにすることもできます。あるいは、シリアル化を行わずにbyte[]
を渡すようにソースを再構築することもできます。その後、データを再調整
Studio ノートブックの更新間隔
段落結果の更新間隔を変更する場合は、1000 ミリ秒以上の値に設定してください。
Studio ノートブックの最適なパフォーマンス
次のステートメントでテストし、 events-per-second
を乗算した値が 25,000,000 未満の場合、最適なパフォーマンスを得number-of-keys
ました。events-per-second
は150,000 未満でした。
SELECT key, sum(value) FROM key-values GROUP BY key
ウォーターマーク戦略とアイドルシャードがタイムウィンドウに与える影響
Apache Kafka と Kinesis Data Streamsからイベントを読み取るとき、ソースはストリームの属性に基づいてイベント時間を設定できます。Kinesis の場合、イベント時間はイベントのおおよその到着時間と等しくなります。ただし、Flink アプリケーションがイベント時間を使用するには、イベントのソースでイベント時間を設定するだけでは十分ではありません。ソースは、イベント時間に関する情報をソースから他のすべてのオペレーターに伝達するウォーターマークを生成する必要があります。Flink のドキュメント
デフォルトでは、Kinesis から読み取られたイベントのタイムスタンプは、Kinesis によって決定されておおよその到着時刻に設定されます。アプリケーションでイベント時間が機能するための追加の前提条件は、ウォーターマーク戦略です。
WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(...));
次に、ウォーターマーク戦略を assignTimestampsAndWatermarks
メソッドで DataStream
に適用します。便利な組み込み戦略がいくつかあります。
-
forMonotonousTimestamps()
はイベント時間 (おおよその到着時間) だけを使用して、(特定のサブタスクごとに) 定期的に最大値をウォーターマークとして出力します。 -
forBoundedOutOfOrderness(Duration.ofSeconds(...))
は前のストラテジーと似ていますが、ウォーターマークの生成にはイベント時間、つまり継続時間を使用します。
ソース関数の各並列サブタスクは通常、ウォーターマークを個別に生成します。これらのウォーターマークは、その特定の並列ソースでのイベント時間を定義します。
ウォーターマークがストリーミングプログラムを通過するにつれて、ウォーターマークが到着したオペレーターのイベント時間を進めます。 オペレータがイベント時間を進めるたびに、後続のオペレーターのために下流に新しいウォーターマークを生成します。
一部のオペレーターは複数の入力ストリームを消費します。 たとえば、ユニオン、または keyBy(…) 関数や Partition(…) 関数に続くオペレータなどです。このようなオペレータの現在のイベント時間は、入力ストリームのイベント時間の最小値です。入力ストリームがイベント時間を更新すると、オペレータもイベント時間を更新します。
つまり、ソースサブタスクがアイドルシャードから消費している場合、ダウンストリームオペレータはそのサブタスクから新しいウォーターマークを受け取らないため、タイムウィンドウを使用するすべてのダウンストリームオペレータの処理が停止します。これを避けるために、顧客はウォーターマークストラテジーに withIdleness
オプションを追加することができます。このオプションでは、オペレータは、オペレータのイベント時間を計算するときにアイドル状態のアップストリームサブタスクからウォーターマークを除外します。したがって、アイドル状態のサブタスクは、ダウンストリーム演算子のイベント時間の進行をブロックしなくなりました。
ただし、サブタスクがイベントを読み取っていない場合、つまりストリームにイベントがない場合、組み込みウォーターマーク戦略のアイドル状態オプションはイベント時間を早めません。有限セットのイベントがストリームから読み取られるテスト ケースは特に顕著になります。最後のイベントが読み取られた後、イベント時間は進行しないため、最後のウィンドウ (最後のイベントを含む) は閉じません。
概要
シャードがアイドル状態の場合、
withIdleness
この設定では新しいウォーターマークは生成されません。アイドル状態のサブタスクによって送信された最後のウォーターマークは、ダウンストリーム演算子の最小ウォーターマーク計算から除外されます。組み込みウォーターマーク戦略では、最後のオープンウィンドウは閉じられません (ウォーターマークを進める新しいイベントが送信されますが、新しいウィンドウが作成されて開いたままになる場合を除きます)。
時刻が Kinesis ストリームによって設定されている場合でも、あるシャードが他のシャードよりも速く消費された場合 (アプリケーションの初期化中や、親子関係を無視してすべての既存のシャード
TRIM_HORIZON
が並行して消費される場合など)、遅延到着イベントが発生する可能性があります。ウォーターマーク戦略
withIdleness
の設定は、アイドルシャード の Kinesis ソース固有の設定を中断しているように見えます(ConsumerConfigConstants.SHARD_IDLE_INTERVAL_MILLIS
。
例
次のアプリケーションはストリームから読み取って、イベント時間に基づいてセッションウィンドウを作成しています。
Properties consumerConfig = new Properties(); consumerConfig.put(AWSConfigConstants.AWS_REGION, "eu-west-1"); consumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "TRIM_HORIZON"); FlinkKinesisConsumer<String> consumer = new FlinkKinesisConsumer<>("...", new SimpleStringSchema(), consumerConfig); WatermarkStrategy<String> s = WatermarkStrategy .<String>forMonotonousTimestamps() .withIdleness(Duration.ofSeconds(15)); env.addSource(consumer) .assignTimestampsAndWatermarks(s) .map(new MapFunction<String, Long>() { @Override public Long map(String s) throws Exception { return Long.parseLong(s); } }) .keyBy(l -> 0l) .window(EventTimeSessionWindows.withGap(Time.seconds(10))) .process(new ProcessWindowFunction<Long, Object, Long, TimeWindow>() { @Override public void process(Long aLong, ProcessWindowFunction<Long, Object, Long, TimeWindow>.Context context, Iterable<Long>iterable, Collector<Object> collector) throws Exception { long count = StreamSupport.stream(iterable.spliterator(), false).count(); long timestamp = context.currentWatermark(); System.out.print("XXXXXXXXXXXXXX Window with " + count + " events"); System.out.println("; Watermark: " + timestamp + ", " + Instant.ofEpochMilli(timestamp)); for (Long l : iterable) { System.out.println(l); } } });
次の例では、8つのイベントが16シャード ストリームに書き込まれます(最初の2つと最後のイベントは偶然に同じシャードに配置されます)。
$ aws kinesis put-record --stream-name hp-16 --partition-key 1 --data MQ== $ aws kinesis put-record --stream-name hp-16 --partition-key 2 --data Mg== $ aws kinesis put-record --stream-name hp-16 --partition-key 3 --data Mw== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028721934184977530127978070210" } { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811028795678659974022576354623682" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275134360684221592378842022114" } Wed Mar 23 11:19:57 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 4 --data NA== $ aws kinesis put-record --stream-name hp-16 --partition-key 5 --data NQ== $ date { "ShardId": "shardId-000000000010", "SequenceNumber": "49627894338570054070103749783042116732419934393936642210" } { "ShardId": "shardId-000000000014", "SequenceNumber": "49627894338659257050897872275659034489934342334017700066" } Wed Mar 23 11:20:10 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 6 --data Ng== $ date { "ShardId": "shardId-000000000001", "SequenceNumber": "49627894338369347363316974173886988345467035365375213586" } Wed Mar 23 11:20:22 CET 2022 $ sleep 10 $ aws kinesis put-record --stream-name hp-16 --partition-key 7 --data Nw== $ date { "ShardId": "shardId-000000000008", "SequenceNumber": "49627894338525452579706688535878947299195189349725503618" } Wed Mar 23 11:20:34 CET 2022 $ sleep 60 $ aws kinesis put-record --stream-name hp-16 --partition-key 8 --data OA== $ date { "ShardId": "shardId-000000000012", "SequenceNumber": "49627894338614655560500811029600823255837371928900796610" } Wed Mar 23 11:21:27 CET 2022
この入力により、イベント1、2、3、イベント4、5、イベント6、イベント7、イベント8の5つのセッションウィンドウが生成されるはずです。ただし、このプログラムでは最初の4つのウィンドウしか生成されません。
11:59:21,529 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 5 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 5 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000006,HashKeyRange: {StartingHashKey: 127605887595351923798765477786913079296,EndingHashKey: 148873535527910577765226390751398592511},SequenceNumberRange: {StartingSequenceNumber: 49627894338480851089309627289524549239292625588395704418,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 6 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,530 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000007,HashKeyRange: {StartingHashKey: 148873535527910577765226390751398592512,EndingHashKey: 170141183460469231731687303715884105727},SequenceNumberRange: {StartingSequenceNumber: 49627894338503151834508157912666084957565273949901684850,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,531 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 4 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 4 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000005,HashKeyRange: {StartingHashKey: 106338239662793269832304564822427566080,EndingHashKey: 127605887595351923798765477786913079295},SequenceNumberRange: {StartingSequenceNumber: 49627894338458550344111096666383013521019977226889723986,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 3 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 2 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000004,HashKeyRange: {StartingHashKey: 85070591730234615865843651857942052864,EndingHashKey: 106338239662793269832304564822427566079},SequenceNumberRange: {StartingSequenceNumber: 49627894338436249598912566043241477802747328865383743554,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000003,HashKeyRange: {StartingHashKey: 63802943797675961899382738893456539648,EndingHashKey: 85070591730234615865843651857942052863},SequenceNumberRange: {StartingSequenceNumber: 49627894338413948853714035420099942084474680503877763122,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,532 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 0 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000001,HashKeyRange: {StartingHashKey: 21267647932558653966460912964485513216,EndingHashKey: 42535295865117307932921825928971026431},SequenceNumberRange: {StartingSequenceNumber: 49627894338369347363316974173816870647929383780865802258,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 7 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,533 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000000,HashKeyRange: {StartingHashKey: 0,EndingHashKey: 21267647932558653966460912964485513215},SequenceNumberRange: {StartingSequenceNumber: 49627894338347046618118443550675334929656735419359821826,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Subtask 1 will be seeded with initial shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'}, starting state set as sequence number EARLIEST_SEQUENCE_NUM 11:59:21,568 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000002,HashKeyRange: {StartingHashKey: 42535295865117307932921825928971026432,EndingHashKey: 63802943797675961899382738893456539647},SequenceNumberRange: {StartingSequenceNumber: 49627894338391648108515504796958406366202032142371782690,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 0 11:59:23,209 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000009,HashKeyRange: {StartingHashKey: 191408831393027885698148216680369618944,EndingHashKey: 212676479325586539664609129644855132159},SequenceNumberRange: {StartingSequenceNumber: 49627894338547753324905219158949156394110570672913645714,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,244 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 6 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000010,HashKeyRange: {StartingHashKey: 212676479325586539664609129644855132160,EndingHashKey: 233944127258145193631070042609340645375},SequenceNumberRange: {StartingSequenceNumber: 49627894338570054070103749782090692112383219034419626146,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 event: 6; timestamp: 1648030822428, 2022-03-23T10:20:22.428Z 11:59:23,377 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 3 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000015,HashKeyRange: {StartingHashKey: 319014718988379809496913694467282698240,EndingHashKey: 340282366920938463463374607431768211455},SequenceNumberRange: {StartingSequenceNumber: 49627894338681557796096402897798370703746460841949528306,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,405 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 2 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000014,HashKeyRange: {StartingHashKey: 297747071055821155530452781502797185024,EndingHashKey: 319014718988379809496913694467282698239},SequenceNumberRange: {StartingSequenceNumber: 49627894338659257050897872274656834985473812480443547874,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,581 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000008,HashKeyRange: {StartingHashKey: 170141183460469231731687303715884105728,EndingHashKey: 191408831393027885698148216680369618943},SequenceNumberRange: {StartingSequenceNumber: 49627894338525452579706688535807620675837922311407665282,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:23,586 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 1 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000013,HashKeyRange: {StartingHashKey: 276479423123262501563991868538311671808,EndingHashKey: 297747071055821155530452781502797185023},SequenceNumberRange: {StartingSequenceNumber: 49627894338636956305699341651515299267201164118937567442,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 1 11:59:24,790 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 0 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000012,HashKeyRange: {StartingHashKey: 255211775190703847597530955573826158592,EndingHashKey: 276479423123262501563991868538311671807},SequenceNumberRange: {StartingSequenceNumber: 49627894338614655560500811028373763548928515757431587010,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 4; timestamp: 1648030809282, 2022-03-23T10:20:09.282Z event: 3; timestamp: 1648030797697, 2022-03-23T10:19:57.697Z event: 5; timestamp: 1648030810871, 2022-03-23T10:20:10.871Z 11:59:24,907 INFO org.apache.flink.streaming.connectors.kinesis.internals.KinesisDataFetcher [] - Subtask 7 will start consuming seeded shard StreamShardHandle{streamName='hp-16', shard='{ShardId: shardId-000000000011,HashKeyRange: {StartingHashKey: 233944127258145193631070042609340645376,EndingHashKey: 255211775190703847597530955573826158591},SequenceNumberRange: {StartingSequenceNumber: 49627894338592354815302280405232227830655867395925606578,}}'} from sequence number EARLIEST_SEQUENCE_NUM with ShardConsumer 2 event: 7; timestamp: 1648030834105, 2022-03-23T10:20:34.105Z event: 1; timestamp: 1648030794441, 2022-03-23T10:19:54.441Z event: 2; timestamp: 1648030796122, 2022-03-23T10:19:56.122Z event: 8; timestamp: 1648030887171, 2022-03-23T10:21:27.171Z XXXXXXXXXXXXXX Window with 3 events; Watermark: 1648030809281, 2022-03-23T10:20:09.281Z 3 1 2 XXXXXXXXXXXXXX Window with 2 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 4 5 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030834104, 2022-03-23T10:20:34.104Z 6 XXXXXXXXXXXXXX Window with 1 events; Watermark: 1648030887170, 2022-03-23T10:21:27.170Z 7
出力には4つのウィンドウしか表示されていません (イベント8を含む最後のウィンドウはありません)。これはイベント時間とウォーターマーク戦略によるものです。構築済みのウォーターマーク戦略では、ストリームから読み取られた最後のイベントの時間を超えて時間が進むことはないため、最後のウィンドウを閉じることはできません。ただし、ウィンドウを閉じるには、最後のイベントから10秒以上経過する必要があります。この場合、最後のウォーターマークは 2022-03-23T10:21:27.170Z ですが、セッションウィンドウを閉じるには、ウォーターマークが 10 秒と 1 ミリ秒後に必要です。
withIdleness
オプションをウォーターマーク戦略から削除すると、ウィンドウ演算子の「グローバルウォーターマーク」が進むことができないため、セッションウィンドウは閉じられません。
Flink アプリケーションの起動時 (またはデータスキューがある場合)、一部のシャードは他のシャードよりも速く消費される可能性があります。これにより、サブタスクから一部のウォーターマークが早すぎる可能性があります (サブタスクは、サブスクライブしている他のシャードから消費することなく、1 つのシャードのコンテンツに基づいてウォーターマークを出力する場合があります)。緩和方法は、安全バッファを追加する(forBoundedOutOfOrderness(Duration.ofSeconds(30))
か、遅延到着イベントを明示的に許可するさまざまなウォーターマーク戦略です(allowedLateness(Time.minutes(5))
。
すべてのオペレータに UUID を設定
Apache Flink 用 Managed Service がスナップショットを持つアプリケーションの Flink ジョブを開始するとき、何らかの問題で Flink ジョブが起動できないことがあります。その1つはオペレータ ID の不一致です。Flink では、Flink のジョブグラフオペレータには明示的で一貫性のあるオペレータ ID が必要です。明示的に設定しない場合、Flink は演算子の ID を生成します。これは、Flink がこれらのオペレータ ID を使用してジョブグラフ内のオペレータを一意に識別し、それを使用して各オペレータの状態をセーブポイントに保存するためです。
「オペレータ ID の不一致」の問題は、Flink がジョブグラフのオペレータ ID と、セーブポイントで定義されたオペレータ ID との間で 1:1 のマッピングを見つけられない場合に発生します。これは、明示的な整合性のあるオペレータ IDs が設定されておらず、Flink がすべてのジョブグラフの作成と整合性のないオペレータ IDs を生成する場合に発生します。メンテナンスの実行中にアプリケーションがこの問題に遭遇する可能性が高くなります。これを回避するには、Flink コード内のすべての演算子に UUID を設定することをお勧めします。詳細については、「プロダクションレディネス」段階にある「すべてのオペレータに UUID を設定する」トピックを参照してください。
Maven シェードプラグインに ServiceResourceTransformer を追加する
FlinkはJavaのサービスプロバイダーインターフェース (SPI)
<build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> <executions> <execution> <id>shade</id> <phase>package</phase> <goals> <goal>shade</goal> </goals> <configuration> <transformers combine.children="append"> <!-- The service transformer is needed to merge META-INF/services files --> <transformer implementation="org.apache.maven.plugins.shade.resource.ServicesResourceTransformer"/> <!-- ... --> </transformers> </configuration> </execution> </executions> </plugin>