Amazon Kinesis Data Firehose のデータ変換 - Amazon Kinesis Data Firehose

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Kinesis Data Firehose のデータ変換

Kinesis Data Firehoseでは、Lambda 関数を呼び出して、受信した送信元データを変換してから送信先に配信できます。Kinesis Data Firehose のデータ変換は、配信ストリームの作成時に有効にすることができます。

データ変換フロー

Kinesis Data Firehose データ変換を有効にすると、Kinesis Data Firehose は受信データをバッファします バッファリングヒントの範囲は 0.2 MB から最大 3 MB です。デフォルトのバッファリングヒントは、Splunk を除くすべての宛先で 1 MB です。Splunk の場合、デフォルトのバッファリングヒントは 256 KB です。(バッファサイズを調整するには、BufferSizeInMBs と呼ばれる ProcessorParameterProcessingConfiguration API を使用します。) 次に、Kinesis Data Firehose は、AWS Lambda 同期呼び出しモードを使用して、バッファされた各バッチで、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは、Lambda から Kinesis Data Firehose に送信されます。その後、変換されたデータは、指定された送信先のバッファサイズとバッファ間隔のいずれかに到達したときに、Kinesis Data Firehose より送信先に配信されます。到達順序は関係ありません。

重要

Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。

データ変換とステータスモデル

Lambda からのすべての変換されたレコードには、以下のパラメータが含まれる必要があります。含まれない場合、Kinesis Data Firehose はそれらのレコードを拒否し、データ変換の失敗として処理します。

recordId

レコード ID は呼び出し時に Kinesis Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。レコードのステータスが Ok または Dropped の場合、Kinesis Data Firehose はレコードが正常に処理されたとみなします。それ以外の場合、Kinesis Data Firehose はそれが正常に処理できなかったと見なします。

データ

base64 エンコード後の変換されたデータペイロード。

Lambda の設計図

データ変換用の Lambda 関数を作成するために使用できる設計図があります。これらの設計図の一部は AWS Lambda コンソールにあり、一部は AWS Serverless Application Repository にあります。

AWS Lambda コンソールで使用可能な設計図を表示するには
  1. AWS Management Console にサインインして AWS Lambda コンソール (https://console.aws.amazon.com/lambda/) を開きます。

  2. [関数の作成]、[Use a blueprint (設計図の使用)] の順に選択します。

  3. [設計図] フィールドで、キーワード firehose で検索して Kinesis Data Firehose Lambda 設計図を見つけます。

AWS Serverless Application Repository で使用可能な設計図を表示するには
  1. AWS Serverless Application Repository に移動します。

  2. [すべてのアプリケーションを参照] を選択します。

  3. [アプリケーション] フィールドで、キーワード firehose を検索します。

設計図を使用せずに Lambda 関数を作成することもできます。「AWS Lambda の開始方法」を参照してください。

データ変換失敗の処理

ネットワークタイムアウトのために、または Lambda 呼び出しの制限に達したために、Lambda 関数呼び出しが失敗した場合、Kinesis Data Firehose は呼び出しをデフォルトで 3 回再試行します。呼び出しが成功しなければ、Kinesis Data Firehose はそのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。再試行オプションを指定またはオーバーライドするには、CreateDeliveryStreamまたはUpdateDestinationAPI。このタイプの失敗の場合、呼び出しエラーログを Amazon に出力できます CloudWatch ログ。詳細については、「 CloudWatch ログを使用した Kinesis Data Firehose のモニタリング」を参照してください。

レコードのデータ変換のステータスが ProcessingFailed の場合、Kinesis Data Firehose はそのレコードを処理失敗として扱います。このタイプの失敗の場合、エラーログを Amazon に出力できます CloudWatch Lambda 関数からのログ 詳細については、次を参照してください。Amazon へのアクセス CloudWatch のログAWS Lambda()AWS Lambdaデベロッパーガイド

データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの processing-failed フォルダに配信されます。レコードの形式は以下のとおりです。

{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

呼び出しリクエストの試行回数。

arrivalTimestamp

Kinesis Data Firehose がレコードを受信した時間。

errorCode

Lambda から返された HTTP エラーコード。

errorMessage

Lambda から返されたエラーメッセージ。

attemptEndingTimestamp

Kinesis Data Firehose が Lambda 呼び出しの試行を停止した時間。

rawData

base64 エンコード後のレコードデータ。

lambdaArn

Lambda 関数の Amazon リソースネーム (ARN)。

Lambda の呼び出し時間

Kinesis Data Firehose では、最大 5 分の Lambda の呼び出し時間がサポートされます。Lambda 関数の完了に 5 分を超える時間がかかる場合は、次のエラーが表示されます。Firehose を呼び出し中にタイムアウトエラーが発生しましたAWS[Lambda] 。サポートされている関数の最大タイムアウト時間は 5 分です。

このようなエラーが発生した場合の Kinesis Data Firehose による処理の詳細については、「データ変換失敗の処理」を参照してください。

ソースレコードのバックアップ

Kinesis Data Firehose は、変換されたレコードを送信先に配信すると同時に、変換されなかったすべてのレコードを S3 バケットにバックアップできます。ソースレコードのバックアップは、配信ストリームの作成または更新時に有効にすることができます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。