Kinesis データストリームコンシューマーのトラブルシューティング - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis データストリームコンシューマーのトラブルシューティング

Kinesis クライアントライブラリの使用時に一部の Kinesis Data Streams レコードがスキップされる

レコードがスキップされる最も一般的な原因は、processRecords からスローされる処理されない例外です。Kinesis Client Library (KCL) は、processRecords コードを使用して、データレコードの処理で発生するすべての例外を処理します。processRecords からスローされるすべての例外は、 KCLによって吸収されます。反復的なエラーに対する無限再試行を回避するために、KCL では例外の発生時に処理中であったレコードのバッチを再送信しません。KCL は、レコードプロセッサを再起動することなく、データレコードの次のバッチで processRecords を呼び出します。これにより、事実上、コンシューマーアプリケーションではレコードがスキップされたことになります。レコードのスキップを防止するには、processRecords 内ですべての例外を適切に処理します。

同じシャードに属するレコードが、異なるレコードプロセッサによって同時に処理される

実行されている Kinesis Client Library (KCL) アプリケーションでは、シャードの所有者はひとりだけです。ただし、複数のレコードプロセッサが一時的に同じシャードを処理する場合があります。ネットワーク接続を紛失したワーカーインスタンスの場合、CL はフェイルオーバー時間の期限が切れた後に、到達できないワーカーはレコードを処理していないと仮定し、他のワーカーインスタンスが引き継ぐように指示します。このとき短時間ですが、新しいレコードプロセッサと到達不可能なワーカーのレコードプロセッサが同じシャードのデータを処理する場合があります。

アプリケーションに適したフェイルオーバー時間を設定する必要があります。低レイテンシーアプリケーションの場合、10秒のデフォルトは、待機する最大時間を表している場合があります。ただし、より頻繁に接続が失われる地域で通話を行うなどの接続問題が予想される場合、この数値は低すぎる場合があります。

ネットワーク接続は通常、以前の到達不可能なワーカーに復元されるため、アプリケーションではこのシナリオを予期して処理する必要があります。レコードプロセッサのシャードが別のレコードプロセッサに引き継がれた場合、レコードプロセッサは正常なシャットダウンを実行するために次の 2 つのケースを処理する必要があります。

  1. processRecords への現在の呼び出しが完了した後で、KCL はシャットダウンの理由ZOMBIEを使用してレコードプロセッサでシャットダウンメソッドを呼び出します。レコードプロセッサは、すべてのリソースを必要に応じて適切にクリーンアップした後、終了する必要があります。

  2. zombieワーカーからチェックポイントを作成しようとすると、KCL は ShutdownException をスローします。この例外を受け取った後、コードは現在のメソッドを正常に終了する必要があります。

詳細については、重複レコードの処理を参照してください。

コンシューマーアプリケーションの読み取りの速度が予想よりも遅い

読み取りのスループットが予想よりも遅くなる最も一般的な理由は次のとおりです。

  1. 複数のコンシューマーアプリケーションの読み取りの合計が、シャードごとの制限を超えています。詳細については、クォータと制限を参照してください。この場合、Kinesis データストリームのシャードの数が増えます。

  2. 呼び出しごとの GetRecords の最大数を指定する制限が、低い値で設定されている可能性があります。KCL を使用している場合は、ワーカーに設定した maxRecords プロパティの値が低い可能性があります。一般的に、このプロパティにはシステムのデフォルトを使用することをお勧めします。

  3. processRecords 呼び出し内のロジックに予想よりも時間がかかる場合があります。これには、ロジックが CPU を大量に消費する、I/O をブロックする、同期のボトルネックになっているなど、多くの理由が考えられます。これに該当するかどうかをテストするには、空のレコードプロセッサをテスト実行し、読み取りスループットを比較します。受信データに遅れずに対応する方法については、リシャーディング、スケーリング、並列処理を参照してください。

コンシューマーアプリケーションが 1 つのみである場合、通常、PUT レートの少なくとも 2 倍高速に読み取りを実行できます。これは、書き込みに対して 1 秒あたり最大 1,000 レコードを書き込むことができ、最大合計データ書き込み速度が 1 秒あたり 1 MB (パーティションキーを含む) になるためです。オープンな各シャードは、読み取りに対して 1 秒あたり最大 5 トランザクションをサポートでき、最大合計データ読み取り速度は 1 秒あたり 2MB です。各読み取り (GetRecords) は、レコードのバッチを取得します。GetRecords によって返されるデータのサイズは、シャードの使用状況によって異なります。GetRecords が返すことができるデータの最大サイズは、10 MB です。呼び出しがその制限を返す場合、次の 5 秒以内に行われるそれ以降の呼び出しは ProvisionedThroughputExceededException をスローします。

GetRecords ストリームにデータがある場合でも、空のレコード配列を返します。

レコードの消費、つまり取得は、プルモデルです。デベロッパーは、バックオフなしで連続ループGetRecordsで を呼び出すことが期待されます。GetRecords のすべての呼び出しは、ShardIterator 値も返します。この値は、ループの次のイテレーションで使用する必要があります。

GetRecords オペレーションはブロックしません。その代わりに、関連データレコードまたは空の Records 要素とともに、直ちに制御を戻します。空の Records 要素は、2 つの条件の下で返されます。

  1. 現在シャードにはそれ以上のデータがない。

  2. シャードの ShardIterator で指定されたパートの近くにデータがない。

後者の条件は微妙ですが、レコードを取得するときに無限のシーク時間 (レイテンシー) を回避するために必要な設計上のトレードオフです。そのため、ストリームを使用するアプリケーションはループし、GetRecords を呼び出して、当然のこととして空のレコードを処理します。

本稼働シナリオで、連続ループが終了するのは、NextShardIterator の値が NULL である場合のみにする必要があります。NextShardIteratorNULL である場合、現在のシャードが閉じられ、ShardIterator 値は最後のレコードを過ぎたことを示します。コンシューマーアプリケーションが SplitShard または MergeShards を呼び出さない場合、シャードは開いたままになり、GetRecords の呼び出しは NextShardIterator である NULL 値を返しません。

Kinesis Client Library (KCL) を使用する場合、お客様に対しては前述の消費パターンは抽象化されます。これには、動的に変更する一連のシャードの自動処理が含まれます。KCL により、デベロッパーは入力レコードを処理するロジックのみを提供します。ライブラリが自動的に GetRecords の継続的な呼び出しを行うため、これが可能になります。

シャードイテレータが予期せずに終了する

新しいシャードのイテレータは、GetRecords リクエスト (NextShardIterator として) 返されます。これは次の GetRecordsリクエスト (ShardIterator として) 使用します。通常の場合、このシャードイテレーターは使用する前に有効期限が切れることはありません。ただし、5 分以上 GetRecords を呼び出さなかったため、またはコンシューマーアプリケーションの再起動を実行したため、シャードイテレータの有効期限が切れる場合があります。

シャードイテレーターの有効期限がすぐに切れて使用できない場合、これは Kinesis で使用している DynamoDB テーブルの容量不足でリースデータを保存できないことを示している可能性があります。この状況は、多数のシャードがある場合により発生する可能性が高くなります。この問題を解決するには、シャードテーブルに割り当てられた書き込み容量を増やします。詳細については、リーステーブルを使用して KCL コンシューマーアプリケーションによって処理されたシャードを追跡するを参照してください。

コンシューマーレコードの処理が遅れる

ほとんどのユースケースで、コンシューマーアプリケーションはストリームから最新のデータを読み取ります。特定の状況下では、コンシューマーの読み取りが遅れるという好ましくない事態が発生します。コンシューマーの読み取りの遅れ具合を確認したら、遅れの最も一般的な理由を参照してください。

GetRecords.IteratorAgeMilliseconds メトリクスを起動して、ストリーム内のすべてのシャードとコンシューマーの読み取り位置を追跡します。イテレータの経過日数が保持期間 (デフォルトで 24 時間、最大で 365 日まで設定可能) の 50% を経過すると失効する場合、レコードの有効期限切れによるデータ損失のリスクがあります。とりあえずの解決策は、保持期間を長くすることです。これにより、問題のトラブルシューティングを行う間に重要なデータが失われるのを防ぎます。詳細については、「Amazon CloudWatch による Amazon Kinesis Data Streams サービスのモニタリング」を参照してください。次に、Kinesis Client Library (KCL)、 によって発行されたカスタム CloudWatch メトリクスを使用して、コンシューマーアプリケーションが各シャードからどのくらい遅れているかを特定しますMillisBehindLatest。詳細については、「Amazon CloudWatch を使用した Kinesis Client Library のモニタリング」を参照してください。

コンシューマーが遅れる最も一般的な理由:

  • GetRecords.IteratorAgeMilliseconds の突然の上昇または MillisBehindLatest は、通常ダウンストリームアプリケーションに対する API オペレーションの障害などの一時的な問題を示します。どちらかのメトリクスが恒常的にこのような動きを示す場合、この急激な上昇を調査する必要があります。

  • これらのメトリクスが徐々に上昇する場合は、レコードの処理速度が不十分なためストリームにコンシューマーが追いついていないことを示します。この状況に共通の原因は、物理リソースの不足またはストリームスループットの上昇にレコード処理ロジックが追随できないことです。この動作を確認するには、、RecordProcessor.processRecords.TimeSuccessなど、KCL が processTaskオペレーションに関連付けて出力する他のカスタム CloudWatchメトリクスを確認しますRecordsProcessed

    • スループットの増加に伴う processRecords.Time メトリクスの上昇が確認された場合、レコード処理ロジックを分析して、スループットの増加に対応したスケーリングができない理由を調べる必要があります。

    • スループットの上昇とは関連性がない processRecords.Time 値の上昇が認められた場合は、重要なパスでブロック呼び出しを行っていないか確認します。これは、レコード処理の低下を招きます。代替策として、シャードの数を増やして並列処理を増やす方法があります。最後に、ピーク需要時に適切な容量の物理リソース (メモリ、CPU 使用率など) が基盤の処理ノードに存在することを確認します。

承認されていない KMS マスターキーの権限エラー

このエラーは、KMS マスターキーのアクセス許可なしで、コンシューマーアプリケーションが暗号化されたストリームから読み取りを行ったときに発生します。KMS キーにアクセスする許可をアプリケーションに割り当てるには、AWS KMS でのキーポリシーの使用およびAWS KMS での IAM ポリシーの使用を参照してください。

コンシューマーにとって一般的な問題、質問、トラブルシューティングのアイデア