Kinesis Data Firehose での入力レコード形式の変換 - Amazon Kinesis Data Firehose

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Data Firehose での入力レコード形式の変換

Amazon Kinesis Data Firehose は、データを Amazon S3 に保存する前に、入力データ形式を JSON から Apache Parquet または Apache ORC に変換できます。Parquet と ORC は、容量を節約し、JSON のような行指向の形式に比べ、より高速なクエリを可能にするカラム型のデータ形式です。カンマ区切り値 (CSV) や構造化テキストなど、JSON 以外の入力形式を変換する場合は、まず AWS Lambda を使用して JSON に変換できます。詳細については、「Amazon Kinesis Data Firehose のデータ変換」を参照してください。

レコード形式の変換の要件

Kinesis Data Firehose でレコードデータの形式を変換するためには、次の 3 つの要素が必要です。

  • 入力データの JSON を読み取るデシリアライザ— デシリアライザーには、次の 2 種類のデシリアライザーのうち、いずれかを選択できます。Apache Hive JSON SerDeまたはOpenX JSON SerDe

    注記

    複数の JSON ドキュメントを同じレコードに結合する場合は、サポートされている JSON 形式で入力が表示されていることを確認してください。JSON ドキュメントの配列は有効な入力ではありません。

    たとえば、これは正しい入力です: {"a":1}{"a":2}

    そして、これは間違った入力です: [{"a":1}, {"a":2}]

  • データの解釈方法を決定するスキーマAWS Glue を使用して AWS Glue Data Catalog にスキーマを作成します。Kinesis Data Firehose はそのスキーマを参照し、使用して入力データを解釈します。同じスキーマを使用して、Kinesis Data Firehose ソフトウェアと分析ソフトウェアの両方を設定できます。詳細については、AWS Glue 開発者ガイドの「AWS Glue データカタログの作成」を参照してください。

  • データをターゲットのカラム型ストレージ形式 (Parquet または ORC) に変換するシリアライザー— シリアライザーには、次の 2 種類のシリアライザーのうち、いずれかを選択できます。ORC SerDeまたはParquet SerDe

重要

レコード形式の変換を有効にすると、Kinesis Data Firehose の送信先を Amazon OpenSearch Service、Amazon Redshift、または Splunk に設定することはできません。形式の変換を有効にすると、Amazon S3 が唯一 Kinesis Data Firehose 配信ストリームに使用できる送信先になります。

Kinesis Data Firehose に送信する前にレコードを集約しても、データの形式を変換できます。

JSON デシリアライザーの選択

入力 JSON に次の形式のタイムスタンプが含まれている場合は、OpenX JSON SerDe を選択します。

  • yyyy-MM-dd'T'HH:mm:ss[.S]'Z'。小数は最大 9 桁まで使用できます – 例: 2017-02-07T15:13:01.39256Z

  • yyyy-[M]M-[d]d HH:mm:ss[.S]。小数は最大 9 桁まで使用できます – 例: 2017-02-07 15:13:01.14

  • エポック秒 – たとえば、1518033528 です。

  • エポックミリ秒 – たとえば、1518033528123 です。

  • 浮動小数点エポック秒 – たとえば、1518033528.123 です。

OpenX JSON SerDe はピリオド (.) をアンダースコア (_) に変換できます。デシリアライズする前に、JSON キーを小文字に変換することもできます。Kinesis Data Firehose を介したこのデシリアライザーで利用可能になるオプションの詳細については、「OpenXJsonSerDe」を参照してください。

どのデシリアライザーを選択するかわからない場合は、サポートされていないタイムスタンプがない限り、OpenX JSON SerDe を使用します。

前述の形式以外のタイムスタンプがある場合は、Apache Hive JSON SerDe を使用します。このデシリアライザーを選択すると、使用するタイムスタンプ形式を指定できます。指定するには、Joda-Time DateTimeFormat 形式の文字列のパターン構文に従います。詳細については、「Class DateTimeFormat」を参照してください。

特殊な値 millis を使用して、エポックミリ秒でタイムスタンプを解析することもできます。形式を指定していない場合は、Kinesis Data Firehose はデフォルトで java.sql.Timestamp::valueOf を使用します。

Hive JSON SerDe は以下を許可しません。

  • 列名のピリオド (.)。

  • タイプが uniontype のフィールド。

  • スキーマに数値型を持つフィールドですが、JSON 形式の文字列です。たとえば、スキーマが (int) で JSON が {"a":"123"} の場合、Hive SerDe ではエラーが発生します。

Hive SerDe はネストされた JSON を文字列に変換しません。たとえば、{"a":{"inner":1}} がある場合、{"inner":1} は文字列として扱われません。

シリアライザーの選択

選択するシリアライザーは、ビジネスニーズに応じて異なります。シリアライザーの 2 つのオプションの詳細については、「ORC SerDe」および「Parquet SerDe」を参照してください。

入力レコードの形式の変換 (コンソール)

Kinesis 配信ストリームを作成または更新するときに、コンソールでデータ形式の変換を有効にできます。データ形式の変換を有効にすると、Amazon S3 が配信ストリームに設定できる唯一の送信先になります。また、形式変換を有効にすると Amazon S3 圧縮が無効化されます。ただし、変換プロセスの一部として Snappy 圧縮が自動的に実行されます。この場合に Kinesis Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy のフレーミング形式については、「BlockCompressorStream.java」を参照してください。

データ配信ストリームのデータ形式の変換を有効にするには
  1. AWS Management Console にサインインし、Kinesis Data Firehose コンソール (https://console.aws.amazon.com/firehose/) を開きます。

  2. 更新する Kinesis Data Firehose 配信ストリームを選択するか、「Amazon Kinesis Data Firehose 配信ストリームの作成」の手順に従って新しい配信ストリームを作成します。

  3. [Convert record format (レコード形式を変換)] で、[Record format conversion (レコード形式の変換)] を [Enabled (有効)] に設定します。

  4. 目的の出力形式を選択します。2 つのオプションの詳細については、Apache Parquet および Apache ORC を参照してください。

  5. AWS Glue テーブルを選択してソースレコードのスキーマを指定します。リージョン、データベース、テーブル、テーブルバージョンを設定します。

入力レコードの形式の変換 (API)

Kinesis Data Firehose で入力データの形式を JSON から Parquet または ORC に変換する場合、ExtendedS3DestinationConfiguration または ExtendedS3DestinationUpdate で、オプションの DataFormatConversionConfiguration 要素を指定します。DataFormatConversionConfiguration を指定する場合は、次の制限が適用されます。

  • BufferingHints では、レコード形式の変換を有効にすると、SizeInMBs を 64 未満の値に設定できません。また、形式の変換が有効でない場合、デフォルト値は 5 です。有効にすると、この値は 128 になります。

  • ExtendedS3DestinationConfiguration または ExtendedS3DestinationUpdateCompressionFormatUNCOMPRESSED に設定する必要があります。CompressionFormat のデフォルト値は UNCOMPRESSED です。したがって、ExtendedS3DestinationConfiguration で指定しないままにすることもできます。その場合もデータは、デフォルトで Snappy 圧縮を使用して、シリアル化プロセスの一環として圧縮されます。この場合に Kinesis Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy のフレーミング形式については、「BlockCompressorStream.java」を参照してください。シリアライザーを構成する場合は、他のタイプの圧縮を選択できます。

レコード形式変換のエラー処理

Kinesis Data Firehose がレコードを解析またはデシリアライズできない場合 (たとえば、データがスキーマと一致しない場合)、エラープレフィックスを付けて Amazon S3 に書き込みます。この書き込みが失敗した場合、Kinesis Data Firehose はこの書き込みを永久に再試行し、追加で配信されないようにします。失敗したレコードごとに、Kinesis Data Firehose は次のスキーマを持つ JSON ドキュメントを書き込みます。

{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }

レコード形式の変換例

AWS CloudFormation を使用してレコード形式の変換をセットアップする方法の例については、「AWS::KinesisFirehose::DeliveryStream」を参照してください。