重複レコードの処理 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

重複レコードの処理

レコードが複数回に配信される理由は、主にプロデューサーの再試行とコンシューマーの再試行の 2 つになります。アプリケーションは、各レコードの複数回処理を予測して適切に処理する必要があります。

プロデューサーの再試行

プロデューサーでを呼び出してネットワーク関連のタイムアウトを発生する場合があります。PutRecordただし、Amazon Kinesis Data Streams から承認を受け取るまでの間です。プロデューサーは、レコードが Kinesis Data Streams に配信されたかどうかを確認できません。各レコードがアプリケーションにとって重要であれば、同じデータを使用して呼び出しを再試行するようにプロデューサーが定義されているはずです。(両方に対して)PutRecord同じデータに対する呼び出しが Kinesis Data Streams に正常にコミットされると、2 つの Kinesis Data Streams レコードが作成されます。2 つのレコードは、データは同じでも、一意のシーケンス番号が付けられています。厳密な保証を必要とするアプリケーションは、後で処理するときに重複を削除するようにレコード内にプライマリキーを埋め込む必要があります。プロデューサーの再試行に起因する重複の数が、コンシューマーの再試行に起因する重複の数より通常は少ないことに注意してください。

コンシューマーの再試行

コンシューマー (データ処理アプリケーション) の再試行は、レコードプロセッサが再開するときに発生します。同じシャードのレコードプロセッサは次の場合に再開します。

  1. ワーカーが予期せず終了する

  2. ワーカーのインスタンスが追加または削除される

  3. シャードが結合または分割される

  4. アプリケーションがデプロイされる

これらすべての場合において、shards-to-worker-to-record-processorマッピングは継続的に更新され、ロードバランシング処理が行われます。他のインスタンスに移行されたシャードプロセッサは、最後のチェックポイントからレコードの処理を再開します。これにより、次の例に示すように、重複レコード処理が発生します。負荷分散の詳細については、「リシャーディング、拡張、並列処理」を参照してください。

例: コンシューマーの再試行によるレコードの再配信

この例では、ストリームから継続的にレコードを読み取り、ローカルファイルにレコードを集約し、このファイルを Amazon S3 にアップロードするアプリケーションがあるとします。分かりやすいように、1 つのシャードとこのシャードを処理する 1 つのワーカーがあるとします。最後のチェックポイントがレコード番号 10000 であると仮定して、次の例の一連のイベントを考えてみます。

  1. ワーカーで、シャードから次のレコードのバッチを読み込みます (10001 から 20000)。

  2. 次にワーカーで、そのレコードのバッチを関連付けられたレコードプロセッサに渡します。

  3. レコードプロセッサはデータを集約し、Amazon S3 ファイルを作成して、ファイルを Amazon S3 に正常にアップロードします。

  4. 新しいチェックポイントが発生する前にワーカーが予期せず終了します。

  5. アプリケーション、ワーカー、およびレコードプロセッサが再開します。

  6. ワーカーは、正常な最後のチェックポイント (この場合は 10001) から読み込みを開始しました。

したがって、10001 から 20000 のレコードは複数回使用されます。

コンシューマーの再試行に対する弾力性

レコードが複数回処理される可能性がある場合でも、アプリケーションは、レコードが 1 回だけ処理されたかのように副作用を示すことがあります (べき等処理)。この問題に対するソリューションは複雑さと正確性によって異なります。最終データの送信先が重複を適切に処理できる場合は、べき等処理の実行に最終送信先を使用することをお勧めします。たとえば、とOpenSearchバージョニングと一意の ID の組み合わせを使用して、重複処理を回避できます。

前のセクションのサンプルアプリケーションでは、ストリームから継続的にレコードを読み取り、ローカルファイルにレコードを集約し、このファイルを Amazon S3 にアップロードします。図に示すように、10001 から 20000 のレコードが複数回使用されることにより、複数の Amazon S3 ファイルには同じデータが含まれています。この例の重複を減らす方法の 1 つは、ステップ 3 で次のスキーマを使用することです。

  1. レコードプロセッサは、5000 など、Amazon S3 ファイルごとに固定数のレコードを使用します。

  2. ファイル名には、次のスキーマを使用します。Amazon S3 プレフィックス、シャード ID、およびFirst-Sequence-Num。この場合、sample-shard000001-10001 のようになります。

  3. Amazon S3 ファイルをアップロードした後で、を指定してチェックポイントを作成します。Last-Sequence-Num。この場合、レコード番号 15000 にチェックポイントが作成されます。

このスキームでは、レコードが複数回処理されても、生成される Amazon S3 ファイルの名前は同じで、データは同じになります。再試行によってのみ、同じファイルに同じデータが複数回書き込まれます。

リシャーディングオペレーションの場合、シャードに残っているレコードの数は必要な一定数より少ないことがあります。この場合、shutdown()メソッドは、ファイルを Amazon S3 にフラッシュし、最後のシーケンス番号でチェックポイントを作成する必要があります。前述のスキーマも、リシャーディングオペレーションと互換性があります。