Amazon Data Firehose でデータを変換する - Amazon Data Firehose

Amazon S3 の Apache Iceberg テーブルへの Amazon Data Firehose ストリームの配信はプレビュー中であり、変更される可能性があります。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Data Firehose でデータを変換する

Amazon Data Firehose は Lambda 関数を呼び出して、受信ソースデータを変換し、変換されたデータを宛先に配信できます。Firehose ストリームを作成するときに、Amazon Data Firehose データ変換を有効にできます。

データ変換フローを理解する

Firehose データ変換を有効にすると、Firehose は受信データをバッファします。バッファリングサイズのヒントの範囲は 0.2 MB から 3MBです。デフォルトの Lambda バッファリングサイズヒントは、Splunk と Snowflake を除くすべての送信先で 1 MB です。Splunk および Snowflake の場合、デフォルトのバッファリングヒントは 256 KB です。Lambda バッファリング間隔のヒントの範囲は 0~900 秒です。デフォルトの Lambda バッファリング間隔ヒントは、Snowflake を除くすべての送信先で 60 秒です。Snowflake の場合、デフォルトのバッファリングヒント間隔は 30 秒です。バッファリングサイズを調整するには、 CreateDeliveryStreamまたは の ProcessingConfigurationパラメータを、 BufferSizeInMBsおよび というProcessorParameter名前の UpdateDestinationAPIで設定しますIntervalInSeconds。Firehose は、同期呼び出しモードを使用して、バッファされた各バッチで指定された Lambda 関数を AWS Lambda 非同期的に呼び出します。変換されたデータは Lambda から Firehose に送信されます。Firehose は、指定された送信先バッファリングサイズまたはバッファリング間隔のいずれか早い方に達したときに、それを送信先に送信します。

重要

Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。

データ変換とステータスモデル

Lambda から変換されたすべてのレコードには、次のパラメータが含まれている必要があります。含まれていない場合、Amazon Data Firehose はそれらを拒否し、それをデータ変換の失敗として扱います。

Kinesis Data Streams および Direct の場合PUT:

recordId

レコード ID は、呼び出し中に Amazon Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。レコードのステータスが Okまたは の場合Dropped、Amazon Data Firehose はレコードが正常に処理されたと見なします。それ以外の場合、Amazon Data Firehose は処理に失敗したと見なします。

データ

base64 エンコード後の変換されたデータペイロード。

以下は、Lambda の結果の出力例です。

{ "recordId": "<recordId from the Lambda input>", "result": "Ok", "data": "<Base64 encoded Transformed data>" }

Amazon の場合 MSK

recordId

レコード ID は、呼び出し中に Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。レコードのステータスが Okまたは の場合Dropped、Firehose はレコードが正常に処理されたと見なします。それ以外の場合、Firehose は処理に失敗したと見なします。

KafkaRecordValue

base64 エンコード後の変換されたデータペイロード。

以下は、Lambda の結果の出力例です。

{ "recordId": "<recordId from the Lambda input>", "result": "Ok", "kafkaRecordValue": "<Base64 encoded Transformed data>" }

Lambda ブループリントの使用

これらのブループリントは、Lambda AWS 関数を作成して使用して Amazon Data Firehose データストリーム内のデータを変換する方法を示しています。

AWS Lambda コンソールで使用可能なブループリントを表示するには
  1. にサインイン AWS Management Console し、 で AWS Lambda コンソールを開きますhttps://console.aws.amazon.com/lambda/

  2. [関数の作成]、[Use a blueprint (設計図の使用)] の順に選択します。

  3. ブループリント フィールドで、 キーワードを検索firehoseして Amazon Data Firehose Lambda ブループリントを検索します。

ブループリントのリスト:

  • Amazon Data Firehose ストリームに送信されたレコードを処理する (Node.js、Python)

    この設計図は、 AWS Lambda を使用して Firehose データストリーム内のデータを処理する方法の基本的な例を示しています。

    最終リリース日: 2016 年 11 月

    リリースノート: なし

  • Firehose に送信されるプロセス CloudWatch ログ

    このブループリントは廃止されました。Firehose に送信された CloudWatch ログの処理については、「ログを使用した Firehose への書き込み CloudWatch 」を参照してください。

  • Amazon Data Firehose ストリームレコードを syslog 形式で JSON (Node.js) に変換する

    この設計図は、Syslog RFC3164 形式の入力レコードを に変換する方法を示していますJSON。

    最終リリース日: 2016 年 11 月

    リリースノート: なし

で使用できるブループリントを表示するには AWS Serverless Application Repository
  1. AWS Serverless Application Repository に移動します。

  2. [すべてのアプリケーションを参照] を選択します。

  3. [アプリケーション] フィールドで、キーワード firehose を検索します。

設計図を使用せずに Lambda 関数を作成することもできます。AWS 「Lambda の開始方法」を参照してください

データ変換の失敗を処理する

ネットワークタイムアウトまたは Lambda 呼び出し制限に達したために Lambda 関数の呼び出しが失敗した場合、Amazon Data Firehose はデフォルトで呼び出しを 3 回再試行します。呼び出しが成功しない場合、Amazon Data Firehose はそのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。CreateDeliveryStream または を使用して、再試行オプションを指定または上書きできますUpdateDestinationAPI。このタイプの障害では、呼び出しエラーを Amazon CloudWatch Logs にログ記録できます。詳細については、「 CloudWatch ログを使用した Amazon Data Firehose のモニタリング」を参照してください。

レコードのデータ変換のステータスが の場合ProcessingFailed、Amazon Data Firehose はレコードを処理に失敗したものとして扱います。このタイプの障害については、Lambda 関数から Amazon CloudWatch Logs にエラーログを発行できます。詳細については、「 デベロッパーガイド」の「 の Amazon CloudWatch Logs へのアクセス AWS Lambda」を参照してください。 AWS Lambda

データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの processing-failed フォルダに配信されます。レコードの形式は以下のとおりです。

{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

呼び出しリクエストの試行回数。

arrivalTimestamp

Amazon Data Firehose がレコードを受信した時刻。

errorCode

Lambda によって返されるHTTPエラーコード。

errorMessage

Lambda から返されたエラーメッセージ。

attemptEndingTimestamp

Amazon Data Firehose が Lambda 呼び出しの試行を停止した時刻。

rawData

base64 エンコード後のレコードデータ。

lambdaArn

Lambda 関数の Amazon リソースネーム (ARN)。

Lambda 呼び出し期間を理解する

Amazon Data Firehose は、最大 5 分の Lambda 呼び出し時間をサポートします。Lambda 関数の完了に 5 分以上かかる場合、次のエラーが表示されます。Firehose は AWS Lambda を呼び出すときにタイムアウトエラーを検出しました。サポートされている関数のタイムアウトは最大 5 分です。

このようなエラーが発生した場合の Amazon Data Firehose の動作については、「」を参照してくださいデータ変換の失敗を処理する

ソースレコードのバックアップ

Amazon Data Firehose は、変換されたレコードを送信先に配信しながら、変換されていないすべてのレコードを S3 バケットに同時にバックアップできます。Firehose ストリームを作成または更新するときに、ソースレコードのバックアップを有効にできます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。