Amazon Kinesis ストリームをソースとする場合 - Amazon EventBridge

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Kinesis ストリームをソースとする場合

EventBridge Pipes を使用して Kinesis データストリームでレコードを受信できます。その後、オプションでこれらのレコードをフィルタリングまたは拡張してから、処理可能な送信先のいずれかに送信できます。パイプをセットアップするときに選択できる Kinesis 固有の設定があります。EventBridge Pipes は、データを送信先に送信するときに、データストリームのレコードの順序を維持します。

Kinesis データストリームは、シャードのセットです。各シャードには、一連のデータレコードが含まれます。コンシューマーは、Kinesis データストリームからのデータを処理するアプリケーションです。EventBridge Pipe を共有スループットコンシューマー (標準イテレーター) にマップすることも、拡張ファンアウトを使用する専用スループットコンシューマーにマップすることもできます。

標準イテレーターの場合、EventBridge は HTTP プロトコルを使用して、Kinesis ストリームの各シャードにレコードがあるかどうかをポーリングします。このパイプでは、シャードの他のコンシューマーと読み取りスループットを共有します。

レイテンシーを最小限に抑え、読み取りスループットを最大化するために、拡張ファンアウトを使用するデータストリームコンシューマーを作成できます。ストリームコンシューマーは、ストリームから読み取る他のアプリケーションに影響を及ぼさないように、専用の接続を各シャードに割り当てます。専用のスループットは、多数のアプリケーションで同じデータを読み取っている場合や、大きなレコードでストリームを再処理する場合に役立ちます。Kinesis は、HTTP/2 経由で EventBridge にレコードをプッシュします。Kinesis Data Streams の詳細については、「Amazon Kinesis Data Streams からのデータの読み取り」を参照してください。

イベントの例

次のサンプルイベントは、パイプが受信した情報を示しています。このイベントを使用して、イベントパターンを作成およびフィルタリングしたり、入力変換を定義したりできます。すべてのフィールドをフィルタリングできるわけではありません。フィルターできるフィールドの詳細については、「Amazon EventBridge Pipes フィルタリング」を参照してください。

[ { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692538361571095921575989136588898", "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0Lg==", "approximateArrivalTimestamp": 1545084650.987 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692538361571095921575989136588898", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" }, { "kinesisSchemaVersion": "1.0", "partitionKey": "1", "sequenceNumber": "49590338271490256608559692540925702759324208523137515618", "data": "VGhpcyBpcyBvbmx5IGEgdGVzdC4=", "approximateArrivalTimestamp": 1545084711.166 "eventSource": "aws:kinesis", "eventVersion": "1.0", "eventID": "shardId-000000000006:49590338271490256608559692540925702759324208523137515618", "eventName": "aws:kinesis:record", "invokeIdentityArn": "arn:aws:iam::123456789012:role/lambda-role", "awsRegion": "us-east-2", "eventSourceARN": "arn:aws:kinesis:us-east-2:123456789012:stream/lambda-stream" } ]

ポーリングストリームとバッチストリーム

EventBridge は、レコードの Kinesis Streams にあるシャードを 1 秒あたり 4 回の基本レートでポーリングします。レコードが利用可能になると、EventBridge はイベントを処理して結果を待機します。処理が成功すると、EventBridge は、さらに多くのレコードを受け取るまでポーリングを再開します。

デフォルトで、EventBridge はレコードが使用可能になると同時にパイプを呼び出します。EventBridge がソースから読み取るバッチにレコードが 1 つしかない場合、1 つのイベントだけを処理します。少数のレコードを処理しないようにするには、バッチ処理ウィンドウを設定して、最大 5 分間レコードをバッファリングするようにパイプに指示できます。イベントを処理する前に、EventBridge は、完全なバッチを収集する、バッチ処理ウィンドウの期限が切れる、またはバッチが 6 MB のペイロード制限に到達するまでソースからのレコードの読み取りを継続します。

また、各シャードから複数のバッチを並行して処理することで、並行性を高めることもできます。EventBridge は、各シャードで最大 10 個のバッチを同時に処理できます。シャードあたりの同時バッチの数を増やしても、EventBridge はパーティションキーレベルでの順序立った処理を確実に行います。

ParallelizationFactor 設定を使用することで、複数のパイプの同時実行により、Kinesis または DynamoDB データストリームの 1 つのシャードを処理します。EventBridge がシャードからポーリングする同時バッチの数は、1 (デフォルト)~10 の並列化係数で指定できます。例えば、ParallelizationFactor を 2 に設定すると、最大 200 個の EventBridge Pipe の同時実行により、100 個の Kinesis データシャードを処理できます。これにより、データボリュームが揮発性で IteratorAge が高いときに処理のスループットをスケールアップすることができます。Kinesis 集約を使用している場合、並列化係数は機能しません。

ポーリングとストリームの開始位置

パイプの作成時と更新時のストリームソースポーリングは、最終的に一貫性があることに注意してください。

  • パイプ作成中、ストリームからのイベントのポーリングが開始されるまでに数分かかること場合があります。

  • ソースのポーリング構成をパイプで更新している間、ストリームのポーリングイベントを停止して再開するまでに数分かかることがあります。

つまり、LATEST をストリームの開始位置として指定すると、パイプ作成または更新中に送信されるイベントをパイプが見逃す可能性があります。イベントを見逃さないようにするには、ストリームの開始位置を TRIM_HORIZON または AT_TIMESTAMP として指定します。

バッチ項目の失敗の報告

EventBridge がソースからストリーミングデータを使用および処理する場合、デフォルトでは、バッチが完全に成功した場合にのみ、バッチの最大シーケンス番号に チェックポイントが設定されます。正常に処理されたメッセージが失敗したバッチで再処理されないようにするには、成功したメッセージと失敗したメッセージを示すオブジェクトを返すようにエンリッチメントまたはターゲットを設定できます。これを部分的なバッチレスポンスと呼びます。

詳細については、「部分的なバッチ処理失敗」を参照してください。

成功条件と失敗の条件

次のいずれかを返すと、EventBridge は完全な成功として処理します:

  • 空の batchItemFailure リスト

  • null の batchItemFailure リスト

  • 空の EventResponse

  • null の EventResponse

次のいずれかを返すと、EventBridge は完全な失敗として処理します:

  • 空の文字列 itemIdentifier

  • ヌル itemIdentifier

  • 不正なキー名を持つ itemIdentifier

EventBridge は、再試行戦略に基づいて失敗を再試行します。