Amazon Kinesis Data Firehose
開発者ガイド

Amazon Kinesis Data Firehose のデータ変換

Kinesis Data Firehose では、Lambda 関数を呼び出して、受信した送信元データを変換してから送信先に配信できます。Kinesis Data Firehose のデータ変換は、配信ストリームの作成時に有効にすることができます。

データ変換フロー

Kinesis Data Firehose データ変換を有効にすると、Kinesis Data Firehose はデフォルトで最大 3 MB まで受信データをバッファします (バッファサイズを調整するには、ProcessingConfiguration API を BufferSizeInMBs と呼ばれる ProcessorParameter と共に使用します)。次に、Kinesis Data Firehose は、AWS Lambda 同期呼び出しモードを使用して、バッファされた各バッチで、指定された Lambda 関数を非同期的に呼び出します。変換されたデータは、Lambda から Kinesis Data Firehose に送信されます。その後、変換されたデータは、指定された送信先のバッファサイズとバッファ間隔のいずれかに到達したときに、Kinesis Data Firehose より送信先に送信されます。到達順序は関係ありません。

重要

Lambda 同期呼び出しモードには、リクエストとレスポンスの両方について、ペイロードサイズに 6 MB の制限があります。関数にリクエストを送信するためのバッファサイズが 6 MB 以下であることを確認してください。また、関数より返るレスポンスが 6 MB を超えないことを確認します。

データ変換とステータスモデル

Lambda からのすべての変換されたレコードには、以下のパラメータが含まれる必要があります。含まれない場合、Kinesis Data Firehose はそれらのレコードを拒否し、データ変換の失敗として処理します。

recordId

レコード ID は、呼び出し時に Kinesis Data Firehose から Lambda に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。指定できる値は次のとおりです: Ok (レコードが正常に変換された)、Dropped (レコードが処理ロジックによって意図的に削除された)、ProcessingFailed (レコードを変換できなかった)。レコードのステータスが Ok または Dropped の場合、Kinesis Data Firehose はレコードが正常に処理されたとみなします。それ以外の場合、Kinesis Data Firehose はレコードが処理に失敗したとみなします。

データ

base64 エンコード後の変換されたデータペイロード。

Lambda 設計図

Kinesis Data Firehose には、データ変換用の Lambda 関数を作成するために使用できる以下の Lambda 設計図が用意されています。

  • General Firehose Processing (全般的な Firehose 処理) — 前のセクションで説明したデータ変換とステータスモデルが含まれます。この設計図はカスタム変換ロジックに使用します。

  • Syslog to JSON (Syslog から JSON) — 定義済み JSON フィールド名を使用して、Syslog 行を解析して JSON オブジェクトに変換します。

  • Syslog to CSV (Syslog から CSV) — Syslog 行を解析して CSV 形式に変換します。

  • Kinesis Data Firehose Process Record Streams as source (Kinesis Data Firehose でレコードストリームをソースとして処理) — 入力内の Kinesis Data Streams レコードにアクセスして、処理ステータスとともにそれらを返します。

  • Kinesis Data Firehose CloudWatch Logs Processor (Kinesis Data Firehose CloudWatch ログプロセッサ) — CloudWatch Logsサブスクリプションフィルタによって送信されたレコードから、個々のログイベントを解析して抽出します。

Lambda 設計図は Node.js および Python 言語でのみ利用できます。その他サポートされている言語で独自の関数を実装できます。AWS Lambda でサポートされる言語の詳細については、「概要: Lambda 関数のビルド」を参照してください。

Kinesis Data Firehose の Lambda 設計図をすべて確認するには (Python と Node.js の両方の例を含む)

  1. AWS マネジメントコンソール にサインインし、https://console.aws.amazon.com/lambda/ にある AWS Lambda コンソールを開きます。

  2. [関数の作成]、[設計図] の順に選択します。

  3. Kinesis Data Firehose の Lambda 設計図を特定するには、「firehose」というキーワードを検索します。

データ変換失敗の処理

ネットワークタイムアウトのために、または Lambda 呼び出しの制限に達したために、Lambda 関数呼び出しが失敗した場合、Kinesis Data Firehose は呼び出しをデフォルトで 3 回再試行します。呼び出しが成功しなければ、Kinesis Data Firehose はそのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。CreateDeliveryStream または UpdateDestination API を使用して、再試行オプションを指定または上書きできます。このタイプの失敗の場合、呼び出しエラーログを Amazon CloudWatch Logs に出力できます。詳細については、「CloudWatch Logs を使用した Kinesis Data Firehose のモニタリング」を参照してください。

レコードのデータ変換のステータスが ProcessingFailed の場合、Kinesis Data Firehose はそのレコードを処理失敗として扱います。このタイプの失敗の場合、エラーログを Lambda 関数から Amazon CloudWatch Logs に出力できます。詳細については、AWS Lambda Developer Guideの「AWS Lambda の Amazon CloudWatch Logs へのアクセス」を参照してください。

データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの processing-failed フォルダに配信されます。レコードの形式は以下のとおりです。

{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

呼び出しリクエストの試行回数。

arrivalTimestamp

Kinesis Data Firehose がレコードを受信した時間。

errorCode

Lambda から返された HTTP エラーコード。

errorMessage

Lambda から返されたエラーメッセージ。

attemptEndingTimestamp

Kinesis Data Firehose が Lambda 呼び出しの試行を停止した時間。

rawData

base64 エンコード後のレコードデータ。

lambdaArn

Lambda 関数の Amazon リソースネーム (ARN)。

Lambda の呼び出し時間

Kinesis Data Firehose では、最大 5 分の Lambda 呼び出し時間がサポートされます。Lambda 関数の完了に 5 分を超える時間がかかる場合は、次のエラーが表示されます: Firehose encountered timeout errors when calling AWS Lambda (Firehose で、AWS Lambda を呼び出すときにタイムアウトエラーが発生しました)。サポートされている最大の関数タイムアウトは 5 分です。

このようなエラーが発生した場合の Kinesis Data Firehose による処理の詳細については、「データ変換失敗の処理」を参照してください。

ソースレコードのバックアップ

Kinesis Data Firehose は、変換されたレコードを送信先に配信すると同時に、変換されなかったすべてのレコードを S3 バケットにバックアップできます。ソースレコードのバックアップは、配信ストリームの作成または更新時に有効にすることができます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。