Amazon Data Firehose データ変換 - Amazon Data Firehose

Amazon Data Firehose は、以前は Amazon Kinesis Data Firehose として知られていました

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Data Firehose データ変換

Amazon Data Firehose は Lambda 関数を呼び出して、受信ソースデータを変換し、変換されたデータを宛先に配信できます。Firehose ストリームを作成するときに、Amazon Data Firehose データ変換を有効にできます。

データ変換フロー

Firehose データ変換を有効にすると、Firehose は受信データをバッファします。バッファリングサイズのヒントの範囲は 0.2 MB~3MB。デフォルトの Lambda バッファリングサイズのヒントは、Splunk と Snowflake を除くすべての送信先で 1 MB です。Splunk と Snowflake の場合、デフォルトのバッファリングヒントは 256 KB です。Lambda バッファリング間隔のヒントの範囲は 0~900 秒です。デフォルトの Lambda バッファリング間隔ヒントは、Snowflake を除くすべての送信先で 60 秒です。Snowflake の場合、デフォルトのバッファリングヒント間隔は 30 秒です。バッファリングサイズを調整するには、 CreateDeliveryStream または UpdateDestination API の ProcessingConfigurationパラメータを、ProcessorParameter呼び出された BufferSizeInMBsと で設定しますIntervalInSeconds。Firehose は、同期呼び出しモードを使用して、バッファされた各バッチで指定された Lambda 関数を AWS Lambda 非同期的に呼び出します。変換されたデータは Lambda から Firehose に送信されます。Firehose は、指定された送信先バッファリングサイズまたはバッファリング間隔のいずれかに達したときに、それを送信先に送信します。

重要

Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。

データ変換とステータスモデル

Lambda から変換されたすべてのレコードには、次のパラメータが含まれている必要があります。含まれていない場合、Amazon Data Firehose はそれらを拒否し、データ変換の失敗として扱います。

Kinesis Data Streams と Direct PUT の場合:

recordId

レコード ID は、呼び出し中に Amazon Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。レコードのステータスが Okまたは の場合Dropped、Amazon Data Firehose はレコードが正常に処理されたと見なします。それ以外の場合、Amazon Data Firehose は処理に失敗したと見なします。

データ

base64 エンコード後の変換されたデータペイロード。

以下は、Lambda の結果の出力例です。

{ "recordId": "<recordId from the Lambda input>", "result": "Ok", "data": "<Base64 encoded Transformed data>" }

Amazon MSK の場合

recordId

レコード ID は、呼び出し中に Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。レコードのステータスが Okまたは の場合Dropped、Firehose はレコードが正常に処理されたと見なします。それ以外の場合、Firehose は処理に失敗したと見なします。

KafkaRecordValue

base64 エンコード後の変換されたデータペイロード。

以下は、Lambda の結果の出力例です。

{ "recordId": "<recordId from the Lambda input>", "result": "Ok", "kafkaRecordValue": "<Base64 encoded Transformed data>" }

Lambda の設計図

これらのブループリントは、 AWS Lambda 関数を作成して使用し、Amazon Data Firehose データストリーム内のデータを変換する方法を示しています。

AWS Lambda コンソールで使用可能なブループリントを表示するには
  1. にサインイン AWS Management Console し、https://console.aws.amazon.com/lambda/ で AWS Lambda コンソールを開きます。

  2. [関数の作成]、[Use a blueprint (設計図の使用)] の順に選択します。

  3. ブループリント フィールドで、 キーワードを検索firehoseして Amazon Data Firehose Lambda ブループリントを検索します。

ブループリントのリスト:

  • Amazon Data Firehose ストリームに送信されたレコードを処理する (Node.js、Python)

    このブループリントは、 AWS Lambda を使用して Firehose データストリーム内のデータを処理する方法の基本的な例を示しています。

    最終リリース日: 2016 年 11 月

    リリースノート: なし

  • Firehose に送信されるプロセス CloudWatch ログ

    このブループリントは廃止されました。Firehose に送信された CloudWatch ログの処理については、「ログを使用した Firehose への書き込み CloudWatch 」を参照してください。

  • syslog 形式の Amazon Data Firehose ストリームレコードを JSON (Node.js) に変換する

    このブループリントは、RFC3164 Syslog 形式の入力レコードを JSON に変換する方法を示しています。

    最終リリース日: 2016 年 11 月

    リリースノート: なし

で使用可能なブループリントを表示するには AWS Serverless Application Repository
  1. AWS Serverless Application Repository に移動します。

  2. [すべてのアプリケーションを参照] を選択します。

  3. [アプリケーション] フィールドで、キーワード firehose を検索します。

設計図を使用せずに Lambda 関数を作成することもできます。AWS 「Lambda の開始方法」を参照してください。

データ変換失敗の処理

ネットワークタイムアウトまたは Lambda 呼び出し制限に達したために Lambda 関数の呼び出しが失敗した場合、Amazon Data Firehose はデフォルトで呼び出しを 3 回再試行します。呼び出しが成功しない場合、Amazon Data Firehose はそのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。または UpdateDestination API を使用して、再試行オプションを指定CreateDeliveryStreamまたは上書きできます。このタイプの失敗では、呼び出しエラーを Amazon CloudWatch Logs にログ記録できます。詳細については、「 CloudWatch ログを使用した Amazon データFirehose モニタリング」を参照してください。

レコードのデータ変換のステータスが の場合ProcessingFailed、Amazon Data Firehose はレコードを正常に処理されなかったものとして扱います。このタイプの障害については、Lambda 関数から Amazon CloudWatch Logs にエラーログを送信できます。詳細については、「 AWS Lambda デベロッパーガイド」の「 の Amazon CloudWatch Logs へのアクセス AWS Lambda」を参照してください。

データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの processing-failed フォルダに配信されます。レコードの形式は以下のとおりです。

{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

呼び出しリクエストの試行回数。

arrivalTimestamp

Amazon Data Firehose がレコードを受信した時刻。

errorCode

Lambda から返された HTTP エラーコード。

errorMessage

Lambda から返されたエラーメッセージ。

attemptEndingTimestamp

Amazon Data Firehose が Lambda 呼び出しの試行を停止した時刻。

rawData

base64 エンコード後のレコードデータ。

lambdaArn

Lambda 関数の Amazon リソースネーム (ARN)。

Lambda の呼び出し時間

Amazon Data Firehose は、最大 5 分の Lambda 呼び出し時間をサポートします。Lambda 関数が完了するまでに 5 分以上かかる場合、次のエラーが表示されます。Firehose で AWS Lambda を呼び出すときにタイムアウトエラーが発生しました。サポートされている関数のタイムアウトは最大 5 分です。

このようなエラーが発生した場合の Amazon Data Firehose の動作については、「」を参照してくださいデータ変換失敗の処理

ソースレコードのバックアップ

Amazon Data Firehose は、変換されたレコードを送信先に配信しながら、変換されていないすべてのレコードを S3 バケットに同時にバックアップできます。Firehose ストリームを作成または更新するときに、ソースレコードのバックアップを有効にできます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。