Amazon Kinesis Data Streams
開発者ガイド

Amazon Kinesis Data Streams コンシューマーのトラブルシューティング

Kinesis クライアントライブラリの使用時に一部の Kinesis Data Streams レコードがスキップされる

レコードがスキップされる最も一般的な原因は、processRecords からスローされる処理されない例外です。Kinesis Client Library (KCL) は、processRecords コードを使用して、データレコードの処理で発生するすべての例外を処理します。processRecords からスローされるすべての例外は、KCL によって吸収されます。反復的なエラーに対する無限再試行を回避するために、KCL では例外の発生時に処理中であったレコードのバッチを再送信しません。KCL は、レコードプロセッサを再起動することなく、データレコードの次のバッチで processRecords を呼び出します。これにより、事実上、コンシューマーアプリケーションではレコードがスキップされたことになります。レコードのスキップを防止するには、processRecords 内ですべての例外を適切に処理します。

同じシャードに属するレコードが、異なるレコードプロセッサによって同時に処理される

実行されている Kinesis Client Library (KCL) アプリケーションでは、シャードの所有者はひとりだけです。ただし、複数のレコードプロセッサが一時的に同じシャードを処理する場合があります。ネットワーク接続を紛失したワーカーインスタンスの場合、KCL はフェイルオーバー時間の期限が切れた後に、到達できないワーカーはレコードを処理していないと仮定し、他のワーカーインスタンスが引き継ぐように指示します。このとき短時間ですが、新しいレコードプロセッサと到達不可能なワーカーのレコードプロセッサが同じシャードのデータを処理する場合があります。

アプリケーションに適したフェイルオーバー時間を設定する必要があります。低レイテンシーアプリケーションの場合、10秒のデフォルトは、待機する最大時間を表している場合があります。ただし、より頻繁に接続が失われる地域で通話を行うなどの接続問題が予想される場合、この数値は低すぎる場合があります。

ネットワーク接続は通常、以前の到達不可能なワーカーに復元されるため、アプリケーションではこのシナリオを予期して処理する必要があります。レコードプロセッサのシャードが別のレコードプロセッサに引き継がれた場合、レコードプロセッサは正常なシャットダウンを実行するために次の 2 つのケースを処理する必要があります。

  1. processRecords への現在の呼び出しが完了した後で、KCL はシャットダウンの理由「ZOMBIE」を使用してレコードプロセッサでシャットダウンメソッドを呼び出します。レコードプロセッサは、すべてのリソースを必要に応じて適切にクリーンアップした後、終了する必要があります。

  2. 「ゾンビ」ワーカーからチェックポイントを作成しようとすると、KCL は ShutdownException をスローします。この例外を受け取った後、コードは現在のメソッドを正常に終了する必要があります。

詳細については、「重複レコードの処理」を参照してください。

コンシューマーアプリケーションの読み取りの速度が予想よりも遅い

読み取りのスループットが予想よりも遅くなる最も一般的な理由は次のとおりです。

  1. 複数おコンシューマーアプリケーションの読み取りの合計が、シャードごとの制限を超えています。詳細については、「Kinesis Data Streams の制限」を参照してください。この場合、Kinesis data stream のシャードの数を増やします。

  2. 呼び出しあたりの GetRecords の最大数を指定する制限に、低い値が設定されています。KCL を使用している場合は、ワーカーに設定した maxRecords プロパティの値が低い可能性があります。一般的に、このプロパティにはシステムのデフォルトを使用することをお勧めします。

  3. processRecords 呼び出し内のロジックに予想よりも時間がかかる場合があります。これには、ロジックが CPU を大量に消費する、I/O をブロックする、同期のボトルネックになっているなど、多くの理由が考えられます。これに該当するかどうかをテストするには、空のレコードプロセッサをテスト実行し、読み取りスループットを比較します。受信データに遅れずに対応する方法については、「リシャーディング、拡張、並列処理」を参照してください。

コンシューマーアプリケーションが 1 つのみである場合、通常、PUT レートの少なくとも 2 倍高速に読み取りを実行できます。これは、書き込みについては最大 1 秒あたり 1,000 レコード、データの最大書き込み合計レートは 1 秒あたり 1 MB (パーティションキーを含む) まで書き込むことができるためです。開いている各シャードは 読み取りは最大 1 秒あたり 5 件のトランザクション、データ読み取りの最大合計レートは 1 秒あたり 2 MB をサポートできます。各読み取り (GetRecords) は、レコードのバッチを取得します。GetRecords によって返されるデータのサイズは、シャードの使用状況によって異なります。GetRecords が返すことができるデータの最大サイズは、10 MB です。呼び出しがその制限を返す場合、次の 5 秒以内に行われるそれ以降の呼び出しは ProvisionedThroughputExceededException をスローします。

ストリームにデータがある場合でも、GetRecords が空の Records 配列を返す

レコードの消費、つまり取得は、プルモデルです。開発者は、バックオフがない連続ループで GetRecords を呼び出す必要があります。GetRecords のすべての呼び出しは、ShardIterator 値も返します。この値は、ループの次のイテレーションで使用する必要があります。

GetRecords オペレーションはブロックしません。その代わりに、関連データレコードまたは空の Records 要素とともに、直ちに制御を戻します。空の Records 要素は、2 つの条件の下で返されます。

  1. 現在シャードにはそれ以上のデータがない.

  2. シャードの ShardIterator で指定されたパートの近くにデータがない。

後者の条件は微妙ですが、レコードを取得するときに無限のシーク時間 (レイテンシー) を回避するために必要な設計上のトレードオフです。そのため、ストリームを使用するアプリケーションはループし、GetRecords を呼び出して、当然のこととして空のレコードを処理します。

本稼働シナリオで、連続ループが終了するのは、NextShardIterator の値が NULL である場合のみにする必要があります。NextShardIteratorNULL である場合、現在のシャードが閉じられ、ShardIterator 値は最後のレコードを過ぎたことを示します。コンシューマーアプリケーションが SplitShard または MergeShards を呼び出さない場合、シャードは開いたままになり、GetRecords の呼び出しは NextShardIterator である NULL 値を返しません。

Kinesis Client Library (KCL) を使用する場合、お客様に対しては前述の消費パターンは抽象化されます。これには、動的に変更する一連のシャードの自動処理が含まれます。KCL により、開発者は入力レコードを処理するロジックのみを提供します。ライブラリが自動的に GetRecords の継続的な呼び出しを行うため、これが可能になります。

シャードイテレータが予期せずに終了する

新しいシャードのイテレータは、GetRecords リクエスト (NextShardIterator として) 返されます。これは次の GetRecordsリクエスト (ShardIterator として) 使用します。通常の場合、このシャードイテレーターは使用する前に有効期限が切れることはありません。ただし、5 分以上 GetRecords を呼び出さなかったため、またはコンシューマーアプリケーションの再起動を実行したため、シャードイテレータの有効期限が切れる場合があります。

シャードイテレーターの有効期限がすぐに切れて使用できない場合、これは Kinesis で使用している DynamoDB テーブルの容量不足でリースデータを保存できないことを示している可能性があります。この状況は、多数のシャードがある場合により発生する可能性が高くなります。この問題を解決するには、シャードテーブルに割り当てられた書き込み容量を増やします。詳細については、「Amazon Kinesis Data Streams Application の状態の追跡」を参照してください。

コンシューマーレコードの処理が遅れる

ほとんどのユースケースで、コンシューマーアプリケーションはストリームから最新のデータを読み取ります。特定の状況下では、コンシューマーの読み取りが遅れるという好ましくない事態が発生します。コンシューマーの読み取りの遅れ具合を確認したら、遅れの最も一般的な理由を参照してください。

GetRecords.IteratorAgeMilliseconds メトリクスを起動して、ストリーム内のすべてのシャードとコンシューマーの読み取り位置を追跡します。イテレータの経過日数が保持期間 (デフォルトで 24 時間、最大で 7 日まで設定可能) の 50% を経過すると失効する場合、レコードの有効期限切れによるデータ損失のリスクがあります。とりあえずの解決策は、保持期間を長くすることです。これにより、問題のトラブルシューティングを行う間に重要なデータが失われるのを防ぎます。詳細については、「Amazon CloudWatch による Amazon Kinesis Data Streams サービスのモニタリング」を参照してください。次に、Kinesis Client Library (KCL)、MillisBehindLatest が出力するカスタム CloudWatch メトリクスを使用して、コンシューマーアプリケーションの読み取りが各シャードからどのくらい遅れているかを確認します。詳細については、「Amazon CloudWatch による Kinesis クライアントライブラリのモニタリング」を参照してください。

コンシューマーが遅れる最も一般的な理由:

  • GetRecords.IteratorAgeMilliseconds の突然の上昇または MillisBehindLatest は、通常ダウンストリームアプリケーションに対する API オペレーションの障害などの一時的な問題を示します。どちらかのメトリクスが恒常的にこのような動きを示す場合、この急激な上昇を調査する必要があります。

  • これらのメトリクスが徐々に上昇する場合は、レコードの処理速度が不十分なためストリームにコンシューマーが追いついていないことを示します。この状況に共通の原因は、物理リソースの不足またはストリームスループットの上昇にレコード処理ロジックが追随できないことです。processTask オペレーション (RecordProcessor.processRecords.TimeSuccessRecordsProcessed など) に関連して KCL が出力する他のカスタム CloudWatch メトリクスを確認することで、この状況を調査できます。

    • スループットの増加に伴う processRecords.Time メトリクスの上昇が確認された場合、レコード処理ロジックを分析して、スループットの増加に対応したスケーリングができない理由を調べる必要があります。

    • スループットの上昇とは関連性がない processRecords.Time 値の上昇が認められた場合は、重要なパスでブロック呼び出しを行っていないか確認します。これは、レコード処理の低下を招きます。代替策として、シャードの数を増やして並列処理を増やす方法があります。最後に、ピーク需要時に適切な容量の物理リソース (メモリ、CPU 使用率など) が基盤の処理ノードに存在することを確認します。

承認されていない KMS マスターキーの権限エラー

このエラーは、KMS マスターキーのアクセス許可なしで、コンシューマーアプリケーションが暗号化されたストリームから読み取りを行ったときに発生します。KMS キーにアクセスする権限をアプリケーションに割り当てる方法については、「AWS KMS でのキーポリシーの使用」および「AWS KMS での IAM ポリシーの使用」を参照してください。