Kinesis データストリームコンシューマーのトラブルシューティング - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis データストリームコンシューマーのトラブルシューティング

Kinesis クライアントライブラリの使用時に一部の Kinesis Data Streams レコードがスキップされる

レコードがスキップされる最も一般的な原因は、processRecords からスローされる処理されない例外です。Kinesis クライアントライブラリ (KCL) は、processRecordsコードを使用して、データレコードの処理から発生するすべての例外を処理します。からスローされた例外processRecordsKCLに吸収される。繰り返し発生するエラーに対する無限再試行を回避するために、KCL は例外の発生時に処理中であったレコードのバッチを再送信しません。その後、KCL はを呼び出します。processRecordsレコードプロセッサを再起動しないで、データレコードの次のバッチを示します。これにより、事実上、コンシューマーアプリケーションではレコードがスキップされたことになります。レコードのスキップを防止するには、processRecords 内ですべての例外を適切に処理します。

同じシャードに属するレコードが、異なるレコードプロセッサによって同時に処理される

実行されている Kinesis Client Library (KCL) アプリケーションでは、シャードの所有者はひとりだけです。ただし、複数のレコードプロセッサが一時的に同じシャードを処理する場合があります。ネットワーク接続を紛失したワーカーインスタンスの場合、KCL はフェイルオーバー時間の期限が切れた後に、到達できないワーカーはレコードを処理していないと仮定し、他のワーカーインスタンスが引き継ぐように指示します。このとき短時間ですが、新しいレコードプロセッサと到達不可能なワーカーのレコードプロセッサが同じシャードのデータを処理する場合があります。

アプリケーションに適したフェイルオーバー時間を設定する必要があります。低レイテンシーアプリケーションの場合、10秒のデフォルトは、待機する最大時間を表している場合があります。ただし、より頻繁に接続が失われる地域で通話を行うなどの接続問題が予想される場合、この数値は低すぎる場合があります。

ネットワーク接続は通常、以前の到達不可能なワーカーに復元されるため、アプリケーションではこのシナリオを予期して処理する必要があります。レコードプロセッサのシャードが別のレコードプロセッサに引き継がれた場合、レコードプロセッサは正常なシャットダウンを実行するために次の 2 つのケースを処理する必要があります。

  1. 現在の呼び出しの後processRecordsが完了した場合、KCL はシャットダウンの理由「ZOMBIE」を使用してレコードプロセッサでシャットダウンメソッドを呼び出します。レコードプロセッサは、すべてのリソースを必要に応じて適切にクリーンアップした後、終了する必要があります。

  2. 「ゾンビ」ワーカーからチェックポイントを作成しようとすると、KCL はをスローしますShutdownException。この例外を受け取った後、コードは現在のメソッドを正常に終了する必要があります。

詳細については、「重複レコードの処理」を参照してください。

コンシューマーアプリケーションの読み取りの速度が予想よりも遅い

読み取りのスループットが予想よりも遅くなる最も一般的な理由は次のとおりです。

  1. 複数のコンシューマーアプリケーションの読み取りの合計が、シャードごとの制限を超えています。詳細については、「クォータと制限」を参照してください。この場合、Kinesis データストリームのシャードの数を増やします。

  2. -限定の最大数を指定しますGetRecordsコールごとに低い値が設定されている可能性があります。KCL を使用している場合は、ワーカーに設定した maxRecords プロパティの値が低い可能性があります。一般的に、このプロパティにはシステムのデフォルトを使用することをお勧めします。

  3. processRecords 呼び出し内のロジックに予想よりも時間がかかる場合があります。これには、ロジックが CPU を大量に消費する、I/O をブロックする、同期のボトルネックになっているなど、多くの理由が考えられます。これに該当するかどうかをテストするには、空のレコードプロセッサをテスト実行し、読み取りスループットを比較します。受信データに遅れずに対応する方法については、「リシャーディング、拡張、並列処理」を参照してください。

コンシューマーアプリケーションが 1 つのみである場合、通常、PUT レートの少なくとも 2 倍高速に読み取りを実行できます。これは、書き込みに対して最大 1,000 レコードを書き込むことができ、1 秒あたり 1 MB の最大データ書き込みレート (パーティションキーを含む) まで書き込むことができます。各オープンシャードは、1 秒あたり最大 5 トランザクションをサポートし、1 秒あたり 2 MB の最大データ読み取りレートをサポートします。各読み取り (GetRecords) は、レコードのバッチを取得します。GetRecords によって返されるデータのサイズは、シャードの使用状況によって異なります。次のデータの最大サイズGetRecordsが返すことができるのは10 MBです。呼び出しがその制限を返す場合、次の 5 秒以内に行われるそれ以降の呼び出しは ProvisionedThroughputExceededException をスローします。

GetRecordsストリームにデータがある場合でも、空の Records 配列を返す

レコードの消費、つまり取得は、プルモデルです。開発者は電話する予定ですGetRecordsバックオフのない連続ループで。GetRecords のすべての呼び出しは、ShardIterator 値も返します。この値は、ループの次のイテレーションで使用する必要があります。

GetRecords オペレーションはブロックしません。その代わりに、関連データレコードまたは空の Records 要素とともに、直ちに制御を戻します。空の Records 要素は、2 つの条件の下で返されます。

  1. 現在シャードにはそれ以上のデータがない.

  2. シャードの ShardIterator で指定されたパートの近くにデータがない。

後者の条件は微妙ですが、レコードを取得するときに無限のシーク時間 (レイテンシー) を回避するために必要な設計上のトレードオフです。そのため、ストリームを使用するアプリケーションはループし、GetRecords を呼び出して、当然のこととして空のレコードを処理します。

本稼働シナリオで、連続ループが終了するのは、NextShardIterator の値が NULL である場合のみにする必要があります。NextShardIteratorNULL である場合、現在のシャードが閉じられ、ShardIterator 値は最後のレコードを過ぎたことを示します。コンシューマーアプリケーションが SplitShard または MergeShards を呼び出さない場合、シャードは開いたままになり、GetRecords の呼び出しは NextShardIterator である NULL 値を返しません。

Kinesis Client Library (KCL) を使用する場合、上記の消費パターンは抽象化されます。これには、動的に変更する一連のシャードの自動処理が含まれます。KCL を使用すると、開発者は入力レコードを処理するロジックのみを提供します。ライブラリが自動的に GetRecords の継続的な呼び出しを行うため、これが可能になります。

シャードイテレータが予期せずに終了する

新しいシャードのイテレータは、GetRecords リクエスト (NextShardIterator として) 返されます。これは次の GetRecordsリクエスト (ShardIterator として) 使用します。通常の場合、このシャードイテレーターは使用する前に有効期限が切れることはありません。ただし、5 分以上 GetRecords を呼び出さなかったため、またはコンシューマーアプリケーションの再起動を実行したため、シャードイテレータの有効期限が切れる場合があります。

シャードイテレーターの有効期限がすぐに切れて使用できない場合、Kinesis で使用されている DynamoDB テーブルの容量不足でリースデータを保存できないことを示している可能性があります。この状況は、多数のシャードがある場合により発生する可能性が高くなります。この問題を解決するには、シャードテーブルに割り当てられた書き込み容量を増やします。詳細については、「リーステーブルを使用して KCL コンシューマアプリケーションによって処理されたシャードを追跡する」を参照してください。

コンシューマーレコードの処理が遅れる

ほとんどのユースケースで、コンシューマーアプリケーションはストリームから最新のデータを読み取ります。特定の状況下では、コンシューマーの読み取りが遅れるという好ましくない事態が発生します。コンシューマーの読み取りの遅れ具合を確認したら、遅れの最も一般的な理由を参照してください。

GetRecords.IteratorAgeMilliseconds メトリクスを起動して、ストリーム内のすべてのシャードとコンシューマーの読み取り位置を追跡します。イテレーターの経過日数が保持期間 (デフォルト) の 50% を経過すると注意してください。,24時間、最大構成可能365days)の場合、記録の有効期限によるデータ損失のリスクがあります。とりあえずの解決策は、保持期間を長くすることです。これにより、問題のトラブルシューティングを行う間に重要なデータが失われるのを防ぎます。詳細については、「Amazon CloudWatch による Amazon Kinesis Data Streams サービスのモニタリング」を参照してください。次に、カスタムを使用して、コンシューマーアプリケーションの読み取りが各シャードからどのくらい遅れているかを確認しますCloudWatchKinesis Client Library (KCL) から出力されるメトリクスMillisBehindLatest。詳細については、「Amazon CloudWatch を使用した Kinesis Client Library のモニタリング」を参照してください。

コンシューマーが遅れる最も一般的な理由:

  • GetRecords.IteratorAgeMilliseconds の突然の上昇または MillisBehindLatest は、通常ダウンストリームアプリケーションに対する API オペレーションの障害などの一時的な問題を示します。どちらかのメトリクスが恒常的にこのような動きを示す場合、この急激な上昇を調査する必要があります。

  • これらのメトリクスが徐々に上昇する場合は、レコードの処理速度が不十分なためストリームにコンシューマーが追いついていないことを示します。この状況に共通の原因は、物理リソースの不足またはストリームスループットの上昇にレコード処理ロジックが追随できないことです。他のカスタムを参照して、この動作を確認できます。CloudWatchKCL が生成するメトリックprocessTask操作、を含むRecordProcessor.processRecords.Time,Success, およびRecordsProcessed

    • スループットの増加に伴う processRecords.Time メトリクスの上昇が確認された場合、レコード処理ロジックを分析して、スループットの増加に対応したスケーリングができない理由を調べる必要があります。

    • スループットの上昇とは関連性がない processRecords.Time 値の上昇が認められた場合は、重要なパスでブロック呼び出しを行っていないか確認します。これは、レコード処理の低下を招きます。代替策として、シャードの数を増やして並列処理を増やす方法があります。最後に、ピーク需要時に適切な容量の物理リソース (メモリ、CPU 使用率など) が基盤の処理ノードに存在することを確認します。

承認されていない KMS マスターキーの権限エラー

このエラーは、KMS マスターキーのアクセス許可なしで、コンシューマーアプリケーションが暗号化されたストリームから読み取りを行ったときに発生します。KMS キーにアクセスするためのアクセス許可をアプリケーションに割り当てる方法については、「」を参照してください。でのキーポリシーの使用AWSKMSそしてでの IAM ポリシーの使用AWSKMS

消費者にとって一般的な問題、質問、トラブルシューティングのアイデア