翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Kinesis Data Firehose での入力レコード形式の変換
Amazon Kinesis Data Firehose は、データを Amazon S3 に保存する前に、入力データ形式を JSON から Apache Parquet
トピック
レコード形式の変換の要件
Kinesis Data Firehose でレコードデータの形式を変換するためには、次の 3 つの要素が必要です。
-
入力データの JSON を読み取るデシリアライザ— デシリアライザーには、次の 2 種類のデシリアライザーのうち、いずれかを選択できます。Apache Hive JSON SerDe
またはOpenX JSON SerDe 。 注記
複数の JSON ドキュメントを同じレコードに結合する場合は、サポートされている JSON 形式で入力が表示されていることを確認してください。JSON ドキュメントの配列は有効な入力ではありません。
たとえば、これは正しい入力です:
{"a":1}{"a":2}
そして、これは間違った入力です:
[{"a":1}, {"a":2}]
-
データの解釈方法を決定するスキーマ – AWS Glue を使用して AWS Glue Data Catalog にスキーマを作成します。Kinesis Data Firehose はそのスキーマを参照し、使用して入力データを解釈します。同じスキーマを使用して、Kinesis Data Firehose ソフトウェアと分析ソフトウェアの両方を設定できます。詳細については、AWS Glue 開発者ガイドの「AWS Glue データカタログの作成」を参照してください。
-
データをターゲットのカラム型ストレージ形式 (Parquet または ORC) に変換するシリアライザー— シリアライザーには、次の 2 種類のシリアライザーのうち、いずれかを選択できます。ORC SerDe
またはParquet SerDe 。
重要
レコード形式の変換を有効にすると、Kinesis Data Firehose の送信先を Amazon OpenSearch Service、Amazon Redshift、または Splunk に設定することはできません。形式の変換を有効にすると、Amazon S3 が唯一 Kinesis Data Firehose 配信ストリームに使用できる送信先になります。
Kinesis Data Firehose に送信する前にレコードを集約しても、データの形式を変換できます。
JSON デシリアライザーの選択
入力 JSON に次の形式のタイムスタンプが含まれている場合は、OpenX JSON SerDe
-
yyyy-MM-dd'T'HH:mm:ss[.S]'Z'。小数は最大 9 桁まで使用できます – 例:
2017-02-07T15:13:01.39256Z
。 -
yyyy-[M]M-[d]d HH:mm:ss[.S]。小数は最大 9 桁まで使用できます – 例:
2017-02-07 15:13:01.14
。 -
エポック秒 – たとえば、
1518033528
です。 -
エポックミリ秒 – たとえば、
1518033528123
です。 -
浮動小数点エポック秒 – たとえば、
1518033528.123
です。
OpenX JSON SerDe はピリオド (.
) をアンダースコア (_
) に変換できます。デシリアライズする前に、JSON キーを小文字に変換することもできます。Kinesis Data Firehose を介したこのデシリアライザーで利用可能になるオプションの詳細については、「OpenXJsonSerDe」を参照してください。
どのデシリアライザーを選択するかわからない場合は、サポートされていないタイムスタンプがない限り、OpenX JSON SerDe を使用します。
前述の形式以外のタイムスタンプがある場合は、Apache Hive JSON SerDeDateTimeFormat
形式の文字列のパターン構文に従います。詳細については、「Class DateTimeFormat
特殊な値 millis
を使用して、エポックミリ秒でタイムスタンプを解析することもできます。形式を指定していない場合は、Kinesis Data Firehose はデフォルトで java.sql.Timestamp::valueOf
を使用します。
Hive JSON SerDe は以下を許可しません。
-
列名のピリオド (
.
)。 -
タイプが
uniontype
のフィールド。 -
スキーマに数値型を持つフィールドですが、JSON 形式の文字列です。たとえば、スキーマが (int) で JSON が
{"a":"123"}
の場合、Hive SerDe ではエラーが発生します。
Hive SerDe はネストされた JSON を文字列に変換しません。たとえば、{"a":{"inner":1}}
がある場合、{"inner":1}
は文字列として扱われません。
シリアライザーの選択
選択するシリアライザーは、ビジネスニーズに応じて異なります。シリアライザーの 2 つのオプションの詳細については、「ORC SerDe
入力レコードの形式の変換 (コンソール)
Kinesis 配信ストリームを作成または更新するときに、コンソールでデータ形式の変換を有効にできます。データ形式の変換を有効にすると、Amazon S3 が配信ストリームに設定できる唯一の送信先になります。また、形式変換を有効にすると Amazon S3 圧縮が無効化されます。ただし、変換プロセスの一部として Snappy 圧縮が自動的に実行されます。この場合に Kinesis Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy のフレーミング形式については、「BlockCompressorStream.java
データ配信ストリームのデータ形式の変換を有効にするには
-
AWS Management Console にサインインし、Kinesis Data Firehose コンソール (https://console.aws.amazon.com/firehose/
) を開きます。 -
更新する Kinesis Data Firehose 配信ストリームを選択するか、「Amazon Kinesis Data Firehose 配信ストリームの作成」の手順に従って新しい配信ストリームを作成します。
-
[Convert record format (レコード形式を変換)] で、[Record format conversion (レコード形式の変換)] を [Enabled (有効)] に設定します。
-
目的の出力形式を選択します。2 つのオプションの詳細については、Apache Parquet
および Apache ORC を参照してください。 -
AWS Glue テーブルを選択してソースレコードのスキーマを指定します。リージョン、データベース、テーブル、テーブルバージョンを設定します。
入力レコードの形式の変換 (API)
Kinesis Data Firehose で入力データの形式を JSON から Parquet または ORC に変換する場合、ExtendedS3DestinationConfiguration または ExtendedS3DestinationUpdate で、オプションの DataFormatConversionConfiguration 要素を指定します。DataFormatConversionConfiguration を指定する場合は、次の制限が適用されます。
-
BufferingHints では、レコード形式の変換を有効にすると、
SizeInMBs
を 64 未満の値に設定できません。また、形式の変換が有効でない場合、デフォルト値は 5 です。有効にすると、この値は 128 になります。 -
ExtendedS3DestinationConfiguration または ExtendedS3DestinationUpdate の
CompressionFormat
をUNCOMPRESSED
に設定する必要があります。CompressionFormat
のデフォルト値はUNCOMPRESSED
です。したがって、ExtendedS3DestinationConfiguration で指定しないままにすることもできます。その場合もデータは、デフォルトで Snappy 圧縮を使用して、シリアル化プロセスの一環として圧縮されます。この場合に Kinesis Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy のフレーミング形式については、「BlockCompressorStream.java」を参照してください。シリアライザーを構成する場合は、他のタイプの圧縮を選択できます。
レコード形式変換のエラー処理
Kinesis Data Firehose がレコードを解析またはデシリアライズできない場合 (たとえば、データがスキーマと一致しない場合)、エラープレフィックスを付けて Amazon S3 に書き込みます。この書き込みが失敗した場合、Kinesis Data Firehose はこの書き込みを永久に再試行し、追加で配信されないようにします。失敗したレコードごとに、Kinesis Data Firehose は次のスキーマを持つ JSON ドキュメントを書き込みます。
{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }
レコード形式の変換例
AWS CloudFormation を使用してレコード形式の変換をセットアップする方法の例については、「AWS::KinesisFirehose::DeliveryStream」を参照してください。