翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
以下のトピックでは、Amazon Kinesis Data Streams コンシューマーの一般的な問題に対するソリューションを提供します。
LeaseManagementConfig コンストラクタのコンパイルエラー
Kinesis Client Library (KCL) バージョン 3.x 以降にアップグレードすると、LeaseManagementConfig
コンストラクタに関連するコンパイルエラーが発生する可能性があります。KCL バージョン 3.x 以降ConfigsBuilder
で を使用する代わりに、設定を設定するLeaseManagementConfig
オブジェクトを直接作成している場合は、KCL アプリケーションコードをコンパイルするときに次のエラーメッセージが表示されることがあります。
Cannot resolve constructor 'LeaseManagementConfig(String, DynamoDbAsyncClient, KinesisAsyncClient, String)'
バージョン 3.x 以降の KCL では、applicationName パラメータの後に applicationName (タイプ: 文字列) パラメータを 1 つ追加する必要があります。 tableName
-
前: leaseManagementConfig = new LeaseManagementConfig(tableName, dynamoDBClient, kinesisClient, streamName, workerIdentifier)
-
後: leaseManagementConfig = new LeaseManagementConfig(tableName, applicationName, dynamoDBClient, kinesisClient, streamName, workerIdentifier)
LeaseManagementConfig オブジェクトを直接作成する代わりに、 を使用して KCL 3.x 以降のバージョンでConfigsBuilder
設定を行うことをお勧めします。 ConfigsBuilder
は、KCL アプリケーションを設定するより柔軟で保守可能な方法を提供します。
以下は、 を使用して KCL 設定ConfigsBuilder
を設定する例です。
ConfigsBuilder configsBuilder = new ConfigsBuilder(
streamName,
applicationName,
kinesisClient,
dynamoClient,
cloudWatchClient,
UUID.randomUUID().toString(),
new SampleRecordProcessorFactory()
);
Scheduler scheduler = new Scheduler(
configsBuilder.checkpointConfig(),
configsBuilder.coordinatorConfig(),
configsBuilder.leaseManagementConfig()
.failoverTimeMillis(60000), // this is an example
configsBuilder.lifecycleConfig(),
configsBuilder.metricsConfig(),
configsBuilder.processorConfig(),
configsBuilder.retrievalConfig()
);
Kinesis クライアントライブラリの使用時に一部の Kinesis Data Streams レコードがスキップされる
レコードがスキップされる最も一般的な原因は、processRecords
からスローされる処理されない例外です。Kinesis Client Library (KCL) は、processRecords
コードを使用して、データレコードの処理で発生するすべての例外を処理します。processRecords
からスローされるすべての例外は、 KCLによって吸収されます。反復的なエラーに対する無限再試行を回避するために、KCL では例外の発生時に処理中であったレコードのバッチを再送信しません。KCL は、レコードプロセッサを再起動することなく、データレコードの次のバッチで processRecords
を呼び出します。これにより、事実上、コンシューマーアプリケーションではレコードがスキップされたことになります。レコードのスキップを防止するには、processRecords
内ですべての例外を適切に処理します。
同じシャードに属するレコードが、異なるレコードプロセッサによって同時に処理される
実行されている Kinesis Client Library (KCL) アプリケーションでは、シャードの所有者はひとりだけです。ただし、複数のレコードプロセッサが一時的に同じシャードを処理する場合があります。ワーカーインスタンスがネットワーク接続を失った場合、KCL は、フェイルオーバー時間が経過した後、到達できないワーカーがレコードを処理していないと見なし、他のワーカーインスタンスに引き継ぐように指示します。このとき短時間ですが、新しいレコードプロセッサと到達不可能なワーカーのレコードプロセッサが同じシャードのデータを処理する場合があります。
アプリケーションに適したフェイルオーバー時間を設定します。低レイテンシーアプリケーションの場合、10秒のデフォルトは、待機する最大時間を表している場合があります。ただし、より頻繁に接続が失われる地域で通話を行うなどの接続問題が予想される場合、この数値は低すぎる場合があります。
ネットワーク接続は通常、以前の到達不可能なワーカーに復元されるため、アプリケーションではこのシナリオを予期して処理する必要があります。レコードプロセッサのシャードが別のレコードプロセッサに引き継がれた場合、レコードプロセッサは正常なシャットダウンを実行するために次の 2 つのケースを処理する必要があります。
-
への現在の呼び出しが完了すると、KCL
processRecords
はシャットダウン理由「ZOMBIE」を使用してレコードプロセッサでシャットダウンメソッドを呼び出します。レコードプロセッサは、すべてのリソースを必要に応じて適切にクリーンアップした後、終了する必要があります。 -
zombieワーカーからチェックポイントを作成しようとすると、KCL は
ShutdownException
をスローします。この例外を受け取った後、コードは現在のメソッドを正常に終了する必要があります。
詳細については、「重複レコードを処理する」を参照してください。
コンシューマーアプリケーションの読み取りの速度が予想よりも遅い
読み取りのスループットが予想よりも遅くなる最も一般的な理由は次のとおりです。
-
複数のコンシューマーアプリケーションの読み取りの合計が、シャードごとの制限を超えています。詳細については、クォータと制限を参照してください。この場合、Kinesis データストリームのシャードの数が増えます。
-
呼び出しごとの GetRecords の最大数を指定する制限が、低い値で設定されている可能性があります。KCL を使用している場合は、ワーカーに設定した
maxRecords
プロパティの値が低い可能性があります。一般的に、このプロパティにはシステムのデフォルトを使用することをお勧めします。 -
processRecords
呼び出し内のロジックに予想よりも時間がかかる場合があります。これには、ロジックが CPU を大量に消費する、I/O をブロックする、同期のボトルネックになっているなど、多くの理由が考えられます。これに該当するかどうかをテストするには、空のレコードプロセッサをテスト実行し、読み取りスループットを比較します。受信データに遅れずに対応する方法については、シャードの数を変更するには、再シャーディング、スケーリング、並列処理を使用します。を参照してください。
コンシューマーアプリケーションが 1 つのみである場合、通常、PUT レートの少なくとも 2 倍高速に読み取りを実行できます。これは、書き込みに対して 1 秒あたり最大 1,000 レコードを書き込むことができ、最大合計データ書き込み速度が 1 秒あたり 1 MB (パーティションキーを含む) になるためです。オープンな各シャードは、読み取りに対して 1 秒あたり最大 5 トランザクションをサポートでき、最大合計データ読み取り速度は 1 秒あたり 2MB です。各読み取り (GetRecords) は、レコードのバッチを取得します。GetRecords によって返されるデータのサイズは、シャードの使用状況によって異なります。GetRecords が返すことができるデータの最大サイズは、10 MB です。呼び出しがその制限を返す場合、次の 5 秒以内に行われた後続の呼び出しは をスローしますProvisionedThroughputExceededException
。
ストリームにデータがある場合でも、GetRecords が空のレコード配列を返す
レコードの消費、つまり取得は、プルモデルです。開発者は、バックオフがない連続ループで GetRecords を呼び出す必要があります。GetRecords のすべての呼び出しは、ShardIterator
値も返します。この値は、ループの次のイテレーションで使用する必要があります。
GetRecords オペレーションはブロックしません。その代わりに、関連データレコードまたは空の Records
要素とともに、直ちに制御を戻します。空の Records
要素は、2 つの条件の下で返されます。
-
現在シャードにはそれ以上のデータがない。
-
シャードの
ShardIterator
で指定されたパートの近くにデータがない。
後者の条件は微妙ですが、レコードを取得するときに無限のシーク時間 (レイテンシー) を回避するために必要な設計上のトレードオフです。そのため、ストリームを使用するアプリケーションはループし、GetRecords を呼び出して、当然のこととして空のレコードを処理します。
本稼働シナリオで、連続ループが終了するのは、NextShardIterator
の値が NULL
である場合のみにする必要があります。NextShardIterator
が NULL
である場合、現在のシャードが閉じられ、ShardIterator
値は最後のレコードを過ぎたことを示します。コンシューマーアプリケーションが SplitShard または MergeShards を呼び出さない場合、シャードは開いたままになり、GetRecords の呼び出しは NextShardIterator
である NULL
値を返しません。
Kinesis Client Library (KCL) を使用する場合、前述の消費パターンが抽象化されます。これには、動的に変更する一連のシャードの自動処理が含まれます。KCL により、デベロッパーは入力レコードを処理するロジックのみを提供します。ライブラリが自動的に GetRecords の継続的な呼び出しを行うため、これが可能になります。
シャードイテレーターが予期せず期限切れになる
新しいシャードのイテレータは、GetRecords リクエスト (NextShardIterator
として) 返されます。これは次の GetRecordsリクエスト (ShardIterator
として) 使用します。通常の場合、このシャードイテレーターは使用する前に有効期限が切れることはありません。ただし、5 分以上 GetRecords を呼び出さなかったため、またはコンシューマーアプリケーションの再起動を実行したため、シャードイテレータの有効期限が切れる場合があります。
シャードイテレーターを使用できる直前に期限切れになった場合は、Kinesis が使用する DynamoDB テーブルにリースデータを保存するのに十分な容量がないことを示している可能性があります。この状況は、多数のシャードがある場合により発生する可能性が高くなります。この問題を解決するには、シャードテーブルに割り当てられた書き込み容量を増やします。詳細については、「リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡する」を参照してください。
コンシューマーレコードの処理が遅れる
ほとんどのユースケースで、コンシューマーアプリケーションはストリームから最新のデータを読み取ります。特定の状況下では、コンシューマーの読み取りが遅れるという好ましくない事態が発生します。コンシューマーの読み取りの遅れ具合を確認したら、遅れの最も一般的な理由を参照してください。
GetRecords.IteratorAgeMilliseconds
メトリクスを起動して、ストリーム内のすべてのシャードとコンシューマーの読み取り位置を追跡します。イテレータの経過日数が保持期間 (デフォルトで 24 時間、最大で 365 日まで設定可能) の 50% を経過すると失効する場合、レコードの有効期限切れによるデータ損失のリスクがあります。とりあえずの解決策は、保持期間を長くすることです。これにより、問題のトラブルシューティングを行う間に重要なデータが失われるのを防ぎます。詳細については、Amazon で Amazon Kinesis Data Streams サービスをモニタリングする CloudWatchを参照してください。次に、Kinesis Client Library (KCL)、MillisBehindLatest
が出力するカスタム CloudWatch メトリクスを使用して、コンシューマーアプリケーションの読み取りが各シャードからどのくらい遅れているかを確認します。詳細については、Amazon で Kinesis クライアントライブラリをモニタリングする CloudWatchを参照してください。
コンシューマーが遅れる最も一般的な理由:
-
GetRecords.IteratorAgeMilliseconds
の突然の上昇またはMillisBehindLatest
は、通常ダウンストリームアプリケーションに対する API オペレーションの障害などの一時的な問題を示します。いずれかのメトリクスがこの動作を一貫して表示している場合は、これらの急激な増加を調べてください。 -
これらのメトリクスが徐々に上昇する場合は、レコードの処理速度が不十分なためストリームにコンシューマーが追いついていないことを示します。この状況に共通の原因は、物理リソースの不足またはストリームスループットの上昇にレコード処理ロジックが追随できないことです。
processTask
オペレーション (RecordProcessor.processRecords.Time
、Success
、RecordsProcessed
など) に関連して KCL が出力する他のカスタム CloudWatch メトリクスを確認することで、この状況を確認できます。-
スループットの増加に伴う
processRecords.Time
メトリクスの上昇が確認された場合、レコード処理ロジックを分析して、スループットの増加に対応したスケーリングができない理由を調べる必要があります。 -
スループットの上昇とは関連性がない
processRecords.Time
値の上昇が認められた場合は、重要なパスでブロック呼び出しを行っていないか確認します。これは、レコード処理の低下を招きます。代替策として、シャードの数を増やして並列処理を増やす方法があります。最後に、ピーク需要時に基盤となる処理ノードに十分な量の物理リソース (メモリ、CPU 使用率など) があることを確認します。
-
不正な KMS キーのアクセス許可エラー
このエラーは、コンシューマーアプリケーションが AWS KMS キーに対するアクセス許可なしで暗号化されたストリームから読み取る場合に発生します。KMS キーにアクセスする許可をアプリケーションに割り当てるには、KMS でのキーポリシーの使用およびAWS KMS での IAM ポリシーの使用を参照してください。
DynamoDbException: 更新式で指定されたドキュメントパスが更新に無効です
AWS SDK for Java バージョン 2.27.19 から 2.27.23 で KCL 3.x を使用する場合、次の DynamoDB 例外が発生することがあります。
「software.amazon.awssdk.services.dynamodb.model.DynamoDbException: 更新式で指定されたドキュメントパスが更新に無効です (サービス: DynamoDb、ステータスコード: 400、リクエスト ID: xxx)」
このエラーは、KCL 3.x によって管理 AWS SDK for Java される DynamoDB メタデータテーブルに影響する の既知の問題が原因で発生します。この問題はバージョン 2.27.19 で発生し、2.27.23 までのすべてのバージョンに影響します。この問題はバージョン AWS SDK for Java 2.27.24 で解決されました。最適なパフォーマンスと安定性を得るには、バージョン 2.28.0 以降にアップグレードすることをお勧めします。