Amazon Kinesis Data Streams
開発者ガイド

重複レコードの処理

レコードが複数回 Amazon Kinesis Data Streams applicationに配信される理由は、主にプロデューサーの再試行とコンシューマーの再試行の 2 つになります。アプリケーションは、各レコードの複数回処理を予測して適切に処理する必要があります。

プロデューサーの再試行

プロデューサーで PutRecord を呼び出してから Amazon Kinesis Data Streams の受信確認を受け取るまでの間に、ネットワーク関連のタイムアウトを発生する場合があります。この場合、プロデューサーはレコードが Kinesis Data Streams に配信されたかどうかを確認できません。各レコードがアプリケーションにとって重要であれば、同じデータを使用して呼び出しを再試行するようにプロデューサーが定義されているはずです。同じデータを使用した PutRecord の呼び出しが両方とも Kinesis Data Streams に正常にコミットされると、Kinesis Data Streams レコードは 2 つになります。2 つのレコードは、データは同じでも、一意のシーケンス番号が付けられています。厳密な保証を必要とするアプリケーションは、後で処理するときに重複を削除するようにレコード内にプライマリキーを埋め込む必要があります。プロデューサーの再試行に起因する重複の数が、コンシューマーの再試行に起因する重複の数より通常は少ないことに注意してください。

注記

AWS SDK の PutRecord を使用すると、デフォルトの 設定により、失敗した PutRecord の呼び出しが 3 回まで再試行されます。

コンシューマーの再試行

コンシューマー (データ処理アプリケーション) の再試行は、レコードプロセッサが再開するときに発生します。同じシャードのレコードプロセッサは次の場合に再開します。

  1. ワーカーが予期せず終了する

  2. ワーカーのインスタンスが追加または削除される

  3. シャードが結合または分割される

  4. アプリケーションがデプロイされる

これらのすべての場合において、負荷分散処理に対するシャードとワーカーとレコードプロセッサのマッピングは継続的に更新されます。他のインスタンスに移行されたシャードプロセッサは、最後のチェックポイントからレコードの処理を再開します。これにより、次の例に示すように、重複レコード処理が発生します。負荷分散の詳細については、「リシャーディング、拡張、並列処理」を参照してください。

例: コンシューマーの再試行によるレコードの再配信

この例では、ストリームから継続的にレコードを読み取り、ローカルファイルにレコードを集約し、このファイルを Amazon S3 にアップロードするアプリケーションがあるとします。分かりやすいように、1 つのシャードとこのシャードを処理する 1 つのワーカーがあるとします。最後のチェックポイントがレコード番号 10000 であると仮定して、次の例の一連のイベントを考えてみます。

  1. ワーカーで、シャードから次のレコードのバッチを読み込みます (10001 から 20000)。

  2. 次にワーカーで、そのレコードのバッチを関連付けられたレコードプロセッサに渡します。

  3. レコードプロセッサはデータを集約し、Amazon S3 ファイルを作成して、このファイルを Amazon S3 に正常にアップロードします。

  4. 新しいチェックポイントが発生する前にワーカーが予期せず終了します。

  5. アプリケーション、ワーカー、およびレコードプロセッサが再開します。

  6. ワーカーは、正常な最後のチェックポイント (この場合は 10001) から読み込みを開始しました。

したがって、10001 から 20000 のレコードは複数回使用されます。

コンシューマーの再試行に対する弾力性

レコードが複数回処理される可能性がある場合でも、アプリケーションは、レコードが 1 回だけ処理されたかのように副作用を示すことがあります (べき等処理)。この問題に対するソリューションは複雑さと正確性によって異なります。最終データの送信先が重複を適切に処理できる場合は、べき等処理の実行に最終送信先を使用することをお勧めします。たとえば、Elasticsearch で、バージョニングと一意の ID の組み合わせを使用して重複処理を回避できます。

前のセクションのサンプルアプリケーションでは、ストリームから継続的にレコードを読み取り、ローカルファイルにレコードを集約し、このファイルを Amazon S3 にアップロードします。図に示すように、10001 から 20000 のレコードが複数回使用されることにより、複数の Amazon S3 ファイルのデータは同じになります。この例の重複を減らす方法の 1 つは、ステップ 3 で次のスキーマを使用することです。

  1. レコードプロセッサは、各 Amazon S3 ファイルに固定のレコード番号 (5000 など) を使用します。

  2. ファイル名には、このスキーマ (Amazon S3、プレフィックス、シャード ID、および First-Sequence-Num) を使用します。この場合、sample-shard000001-10001 のようになります。

  3. Amazon S3 ファイルをアップロードした後で、Last-Sequence-Num を指定してチェックポイントを作成します。この場合、レコード番号 15000 にチェックポイントが作成されます。

このスキーマを使用すると、レコードが複数回処理されても、Amazon S3 ファイルには同じ名前と同じデータが保持されます。再試行によってのみ、同じファイルに同じデータが複数回書き込まれます。

リシャーディングオペレーションの場合、シャードに残っているレコードの数は必要な一定数より少ないことがあります。この場合、shutdown() メソッドは Amazon S3 にファイルをフラッシュし、最後のシーケンス番号でチェックポイントを作成する必要があります。前述のスキーマも、リシャーディングオペレーションと互換性があります。