メニュー
Amazon Kinesis Firehose
Kinesis Firehose

Amazon Kinesis Firehose のデータ変換

Kinesis Firehose では、Lambda 関数を呼び出して、受信した送信元データを変換してから送信先に配信できます。Kinesis Firehose のデータ変換は、配信ストリームの作成時に有効にすることができます。

データ変換フロー

Kinesis Firehose のデータ変換を有効にすると、Kinesis Firehose は受信ストリームを、3 MB か、配信ストリームで指定されたバッファサイズの、いずれか小さい方のサイズまで、バッファします。その後、Kinesis Firehose は、指定された Lambda 関数を非同期に呼び出して、バッファした各バッチを渡します。変換されたデータは、Lambda から Kinesis Firehose に送信されてバッファされます。変換されたデータは、指定されたバッファサイズか、バッファ間隔の、いずれかに先に達したときに、送信先に配信されます。

データ変換とステータスモデル

Lambda からのすべての変換されたレコードには、以下のパラメーターが含まれる必要があります。含まれない場合、Kinesis Firehose はそれらのレコードを拒否し、データ変換失敗として扱います。

recordId

レコード ID は Kinesis Firehose から Lambda に呼び出し時に渡されます。変換されたレコードには、同じレコード ID が含まれる必要があります。元のレコードの ID と変換されたレコードの ID との不一致は、データ変換失敗として扱われます。

result

レコードのデータ変換のステータス。可能な値は、"Ok" (レコードが正常に変換された)、"Dropped" (レコードが処理ロジックによって意図的に削除された)、"ProcessingFailed" (レコードを変換できなかった) です。レコードのステータスが "Ok" または "Dropped" の場合、Kinesis Firehose はレコードが正常に処理されたとみなします。それ以外の場合、Kinesis Firehose はレコードが処理に失敗したとみなします。

データ

base64 エンコード後の変換されたデータペイロード。

Lambda 設計図

Kinesis Firehose には、データ変換用の Lambda 関数を作成するために使用できる以下の Lambda 設計図が用意されています。

  • General Kinesis Firehose Processing - 前のセクションで説明したデータ変換とステータスモデルが含まれます。この設計図はカスタム変換ロジックに使用します。

  • Apache Log to JSON - 定義済み JSON フィールド名を使用して、Apache ログラインを解析して JSON オブジェクトに変換します。

  • Apache Log to CSV - Apache のログ行を解析して CSV 形式に変換します。

  • Syslog to JSON - 定義済み JSON フィールド名を使用して、Syslog 行を解析して JSON オブジェクトに変換します。

  • Syslog to CSV - Syslog 行を解析して CSV 形式に変換します。

データ変換失敗の処理

ネットワークタイムアウトのために、または Lambda 呼び出しの制限に達したために、Lambda 関数呼び出しが失敗した場合、Kinesis Firehose は呼び出しをデフォルトで 3 回再試行し、それでも成功しなければ、そのレコードのバッチをスキップします。スキップされたレコードは処理失敗として扱われます。CreateDeliveryStream または UpdateDestination API を使用して、再試行オプションを指定または上書きできます。このタイプの失敗の場合、呼び出しエラーログを Amazon CloudWatch Logs に出力できます。詳細については、「Amazon CloudWatch Logs でのモニタリング」を参照してください。

レコードのデータ変換のステータスが ProcessingFailed の場合、Kinesis Firehose はそのレコードを処理失敗として扱います。このタイプの失敗の場合、エラーログを Lambda 関数から Amazon CloudWatch Logs に出力できます。詳細については、『AWS Lambda Developer Guide』の「AWS Lambda の Amazon CloudWatch Logs へのアクセス」を参照してください。

データ変換が失敗した場合、処理に失敗したレコードは S3 バケットの processing_failed フォルダに配信されます。レコードの形式は以下のとおりです。

Copy
{ "attemptsMade": "count", "arrivalTimestamp": "timestamp", "errorCode": "code", "errorMessage": "message", "attemptEndingTimestamp": "timestamp", "rawData": "data", "lambdaArn": "arn" }
attemptsMade

呼び出しリクエストの試行回数。

arrivalTimestamp

Kinesis Firehose がレコードを受信した時間。

errorCode

Lambda から返された HTTP エラーコード。

errorMessage

Lambda から返されたエラーメッセージ。

attemptEndingTimestamp

Kinesis Firehose が Lambda 呼び出しの試行を停止した時間。

rawData

base64 エンコード後のレコードデータ。

lambdaArn

Lambda 関数の Amazon リソースネーム (ARN)。

ソースレコードのバックアップ

Kinesis Firehose は、変換されたレコードを送信先に配信すると同時に、変換されなかったすべてのレコードを S3 バケットにバックアップできます。ソースレコードのバックアップは、配信ストリームの作成または更新時に有効にすることができます。ソースレコードのバックアップは、有効にした後で無効にすることはできません。