Kinesis Data Firehose での入力レコード形式の変換 - Amazon Kinesis Data Firehose

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Data Firehose での入力レコード形式の変換

Amazon Kinesis Data Firehose は、入力データの形式を JSON からApache ParquetまたはApache ORCデータを Amazon S3 に保存する前に、「」を参照してください。Parquet と ORC は、容量を節約し、JSON のような行指向の形式に比べ、より高速なクエリを可能にするカラム型のデータ形式です。カンマ区切り値 (CSV) や構造化テキストなど、JSON 以外の入力形式を変換する場合は、まず AWS Lambda を使用して JSON に変換できます。詳細については、「Amazon Kinesis Data Firehose データ変換の失敗」を参照してください。

レコード形式の変換の要件

Kinesis Data Firehose では、レコードデータの形式を変換するために、次の 3 つの要素が必要です。

  • 入力データのJSONを読み取るデシリアライザ— デシリアライザーの 2 種類のデシリアライザーのうち、いずれかを選択できます。Apache Hive JSON SerDeまたはOpenX JSON SerDe

    注記

    複数の JSON ドキュメントを同じレコードに結合する場合は、サポートされている JSON 形式で入力が引き続き表示されることを確認してください。JSON ドキュメントの配列は有効な入力ではありません。

    たとえば、これは正しい入力です。{"a":1}{"a":2}

    これは誤った入力です:[{"a":1}, {"a":2}]

  • データの解釈方法を決定するスキーマです— を使用するAWS接着語にスキーマを作成するAWS Glue Data Catalog。Kinesis Data Firehose はそのスキーマを参照し、使用して入力データを解釈します。同じスキーマを使用して、Kinesis Data Firehose ソフトウェアと分析ソフトウェアの両方を構成できます。詳細については、「」を参照してください。の入力AWSGlue データカタログAWS Glueデベロッパーガイド

  • データをターゲットのカラム型ストレージ形式 (Parquet または ORC) に変換するシリアライザー— シリアライザーの 2 種類のシリアライザーのうち、いずれかを選択できます。ORC SerDeまたはParquet SerDe

重要

レコード形式の変換を有効にすると、Kinesis Data Firehose の送信先を Amazon OpenSearch サービス (OpenSearch サービス)、Amazon Redshift、Splunk に設定することはできません。形式変換を有効にすると、Amazon S3 が唯一 Kinesis Data Firehose 配信ストリームに使用できる送信先になります。

Kinesis Data Firehose に送信する前にレコードを集約しても、データの形式を変換できます。

JSON デシリアライザーの選択

入力 JSON に次の形式のタイムスタンプが含まれている場合は、OpenX JSON SerDe を選択します。

  • yyy-MM-dd'T'HH: mm: ss [.S] 'Z'。小数は最大 9 桁まで使用できます。例:。2017-02-07T15:13:01.39256Z

  • yyyyy-[M] M-[d] d HH: mm: ss [.S]。小数は最大 9 桁まで使用できます。例:。2017-02-07 15:13:01.14

  • エポック秒 — たとえば、です。1518033528

  • エポックミリ秒 — たとえば、です。1518033528123

  • 浮動小数点エポック秒 — たとえば、です。1518033528.123

OpenX JSON SerDe はピリオド (.) をアンダースコア (_) に変換できます。デシリアライズする前に、JSON キーを小文字に変換することもできます。Kinesis Data Firehose を介したこのデシリアライザーで利用可能になるオプションの詳細については、OpenXJsonSerDe

どのデシリアライザーを選択するかわからない場合は、サポートされていないタイムスタンプがない限り、OpenX JSON SerDe を使用します。

前述の形式以外のタイムスタンプがある場合は、Apache Hive JSON SerDe を使用します。このデシリアライザーを選択すると、使用するタイムスタンプ形式を指定できます。指定するには、Joda-Time DateTimeFormat 形式の文字列のパターン構文に従います。詳細については、「Class DateTimeFormat」を参照してください。

特殊な値 millis を使用して、エポックミリ秒でタイムスタンプを解析することもできます。形式を指定していない場合は、Kinesis Data Firehose はデフォルトで java.sql.Timestamp::valueOf を使用します。

Hive JSON SerDe は以下を許可しません。

  • 列名のピリオド (.)。

  • タイプが uniontype のフィールド。

  • スキーマに数値型を持つフィールドですが、JSON 形式の文字列です。たとえば、スキーマが (int) で JSON が {"a":"123"} の場合、Hive SerDe ではエラーが発生します。

Hive SerDe はネストされた JSON を文字列に変換しません。たとえば、{"a":{"inner":1}} がある場合、{"inner":1} は文字列として扱われません。

シリアライザーの選択

選択するシリアライザーは、ビジネスニーズに応じて異なります。シリアライザーの 2 つのオプションの詳細については、「ORC SerDe」および「Parquet SerDe」を参照してください。

入力レコードの形式の変換 (コンソール)

Kinesis 配信ストリームを作成または更新するときに、コンソールでデータ形式の変換を有効にできます。データ形式の変換を有効にすると、Amazon S3 が配信ストリームに設定できる唯一の送信先になります。また、形式変換を有効にすると Amazon S3 圧縮が無効化されます。ただし、変換プロセスの一部として Snappy 圧縮が自動的に実行されます。この場合に Kinesis Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy のフレーミング形式については、「BlockCompressorStream.java」を参照してください。

データ配信ストリームのデータ形式の変換を有効にするには

  1. にサインインしますAWS Management Consoleにサインインし、Kinesis Data Firehose コンソール (https://console.aws.amazon.com/firehose/

  2. 更新する Kinesis Data Firehose 配信ストリームを選択するか、Amazon Kinesis Data Firehose 配信ストリームの作成

  3. [Convert record format (レコード形式を変換)] で、[Record format conversion (レコード形式の変換)] を [Enabled (有効)] に設定します。

  4. 目的の出力形式を選択します。2 つのオプションの詳細については、Apache Parquet および Apache ORC を参照してください。

  5. AWS Glue テーブルを選択してソースレコードのスキーマを指定します。リージョン、データベース、テーブル、テーブルバージョンを設定します。

入力レコードの形式の変換 (API)

Kinesis Data Firehose で入力データの形式を JSON から Parquet または ORC に変換する場合、オプションのDataFormatConversionConfigurationの要素ExtendedS3DestinationConfigurationまたはExtends3 デスティネーション更新DataFormatConversionConfiguration を指定する場合は、次の制限が適用されます。

  • BufferingHints では、レコード形式の変換を有効にすると、SizeInMBs を 64 未満の値に設定できません。また、形式の変換が有効でない場合、デフォルト値は 5 です。有効にすると、この値は 128 になります。

  • 設定する必要がありますCompressionFormatExtendedS3DestinationConfigurationまたはExtends3 デスティネーション更新UNCOMPRESSEDCompressionFormat のデフォルト値は UNCOMPRESSED です。したがって、ExtendedS3DestinationConfiguration で指定しないままにすることもできます。その場合もデータは、デフォルトで Snappy 圧縮を使用して、シリアル化プロセスの一環として圧縮されます。この場合に Kinesis Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy のフレーミング形式については、「BlockCompressorStream.java」を参照してください。シリアライザーを構成する場合は、他のタイプの圧縮を選択できます。

レコード形式変換のエラー処理

Kinesis Data Firehose がレコードを解析またはデシリアライズできない場合 (たとえば、データがスキーマと一致しない場合)、エラー接頭辞を付けて Amazon S3 に書き込みます。この書き込みが失敗した場合、Kinesis Data Firehose はこの書き込みを永久に再試行し、追加で配信されないようにします。失敗したレコードごとに、Kinesis Data Firehose は次のスキーマを持つ JSON ドキュメントを書き込みます。

{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }

レコード形式の変換例

でレコード形式の変換を設定する方法の例については、AWS CloudFormation「」を参照してください。AWS። KinesisFirehose። DeliveryStream