翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon Kinesis Data Firehose のデータ変換
Kinesis Data Firehoseでは、Lambda 関数を呼び出して、受信した送信元データを変換してから送信先に配信できます。Kinesis Data Firehose のデータ変換は、配信ストリームの作成時に有効にすることができます。
データ変換フロー
Kinesis Data Firehose データ変換を有効にすると、Kinesis Data Firehose は受信データをバッファします。バッファリングヒントは 0.2 MB から最大 3 MB までの範囲です。デフォルトのバッファリングヒントは、Splunk を除くすべての宛先で 1 MB です。Splunk の場合、デフォルトのバッファリングヒントは 256 KB です。(バッファサイズを調整するには、BufferSizeInMBs
と呼ばれる ProcessorParameter
で ProcessingConfiguration
API を使用します。) 次に、Kinesis Data Firehose は、AWS Lambda 同期呼び出しモードを使用して、バッファされた各バッチで、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは、Lambda から Kinesis Data Firehose に送信されます。その後、変換されたデータは、指定された送信先のバッファサイズとバッファ間隔のいずれかに到達したときに、Kinesis Data Firehose より送信先に配信されます。到達順序は関係ありません。
重要
Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。
データ変換とステータスモデル
Lambda からのすべての変換されたレコードには、以下のパラメータが含まれる必要があります。含まれない場合、Kinesis Data Firehose はそれらのレコードを拒否し、データ変換の失敗として処理します。
- recordId
-
レコード ID は呼び出し時に Kinesis Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。
- result
-
レコードのデータ変換のステータス。指定できる値は次のとおりです:
Ok
(レコードが正常に変換された)、Dropped
(レコードが処理ロジックによって意図的に削除された)、ProcessingFailed
(レコードを変換できなかった)。レコードのステータスがOk
またはDropped
の場合、Kinesis Data Firehose はレコードが正常に処理されたとみなします。それ以外の場合、Kinesis Data Firehose はそれが正常に処理できなかったと見なします。 - データ
-
base64 エンコード後の変換されたデータペイロード。
Lambda の設計図
これらの設計図は、AWS Lambda 関数を作成して使用して Kinesis Firehose データストリームのデータを変換する方法を示しています。
AWS Lambda コンソールで使用可能な設計図を表示するには
AWS Management Console にサインインして AWS Lambda コンソール (https://console.aws.amazon.com/lambda/
) を開きます。 -
[関数の作成]、[Use a blueprint (設計図の使用)] の順に選択します。
-
[設計図] フィールドで、キーワード
firehose
で検索して Kinesis Data Firehose Lambda 設計図を見つけます。
設計図のリスト:
-
Kinesis Firehose ストリームに送信されたプロセスレコード (Node.js、Python)
この設計図は、AWS Lambda を使用して Kinesis Data Firehose データストリーム内のデータを処理する方法の基本的な例を示しています。
最新のリリース日:2016 年 11 月
リリースノート:なし。
-
Kinesis Firehose CloudWatch に送信されたプロセスログ (Node.js、Python)
この設計図は、AWS Lambda を使用して Kinesis Data Firehose に送信された Amazon CloudWatch ログを解凍する方法を示しています。
最新のリリース日:2022年9月15日。
リリースノート:このブループリントの最新リリースバージョンでは、エラー処理を改善して Kinesis Data Firehose と Lambda の使用料を削減する方法を示しています。このブループリントを使用するときは、Kinesis Firehose Lambda のバッファサイズ設定を 256 KB に変更してください。
-
Kinesis Firehose ストリームレコードを syslog 形式で JSON (Node.js) に変換します
この設計図は、RFC3164 Syslog 形式の入力レコードを JSON に変換する方法を示しています。
最新のリリース日:2016 年 11 月
リリースノート:なし。
AWS Serverless Application Repository で使用可能な設計図を表示するには
-
[すべてのアプリケーションを参照] を選択します。
-
[アプリケーション] フィールドで、キーワード
firehose
を検索します。
設計図を使用せずに Lambda 関数を作成することもできます。「AWS Lambda の開始方法」を参照してください。
データ変換失敗の処理
ネットワークタイムアウトのために、または Lambda 呼び出しの制限に達したために、Lambda 関数呼び出しが失敗した場合、Kinesis Data Firehose は呼び出しをデフォルトで 3 回再試行します。呼び出しが成功しなければ、Kinesis Data Firehose はそのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。またはUpdateDestination
API CreateDeliveryStreamを使用して再試行オプションを指定またはオーバーライドできます。このタイプの失敗の場合、呼び出しエラーログを Amazon CloudWatch Logs に出力できます。詳細については、「Kinesis データファイアホースによるモニタリングCloudWatchログ」を参照してください。
レコードのデータ変換のステータスが ProcessingFailed
の場合、Kinesis Data Firehose はそのレコードを処理失敗として扱います。このタイプの失敗の場合、エラーログを Lambda 関数から Amazon CloudWatch Logs に出力できます。詳細については、AWS LambdaAWS Lambda開発者ガイドの「Amazon CloudWatch Logs へのアクセス」を参照してください。
データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの processing-failed
フォルダに配信されます。レコードの形式は以下のとおりです。
{ "attemptsMade": "
count
", "arrivalTimestamp": "timestamp
", "errorCode": "code
", "errorMessage": "message
", "attemptEndingTimestamp": "timestamp
", "rawData": "data
", "lambdaArn": "arn
" }
attemptsMade
-
呼び出しリクエストの試行回数。
arrivalTimestamp
-
Kinesis Data Firehose がレコードを受信した時間。
errorCode
-
Lambda から返された HTTP エラーコード。
errorMessage
-
Lambda から返されたエラーメッセージ。
attemptEndingTimestamp
-
Kinesis Data Firehose が Lambda 呼び出しの試行を停止した時間。
rawData
-
base64 エンコード後のレコードデータ。
lambdaArn
-
Lambda 関数の Amazon リソースネーム (ARN)。
Lambda の呼び出し時間
Kinesis Data Firehose では、最大 5 分の Lambda の呼び出し時間がサポートされます。Lambda 関数の完了に 5 分を超える時間がかかる場合は、次のエラーが表示されます: Firehose で、AWS Lambda を呼び出すときにタイムアウトエラーが発生しました。サポートされる関数の最大タイムアウトは 5 分です。
このようなエラーが発生した場合の Kinesis Data Firehose による処理の詳細については、「データ変換失敗の処理」を参照してください。
ソースレコードのバックアップ
Kinesis Data Firehose は、変換されたレコードを送信先に配信すると同時に、変換されなかったすべてのレコードを S3 バケットにバックアップできます。ソースレコードのバックアップは、配信ストリームの作成または更新時に有効にすることができます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。