Firehose での入力レコード形式の変換 - Amazon Data Firehose

Amazon Data Firehose は、以前は Amazon Kinesis Data Firehose と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Firehose での入力レコード形式の変換

Amazon Data Firehose は、データを Amazon S3 に保存する前に、入力データの形式を JSON から Apache Parquet または Apache ORC に変換できます。Parquet と ORC は、容量を節約し、JSON のような行指向の形式に比べ、より高速なクエリを可能にするカラム型のデータ形式です。カンマ区切り値 (CSV) や構造化テキストなど、JSON 以外の入力形式を変換する場合は、まず AWS Lambda を使用して JSON に変換できます。詳細については、「Amazon Data Firehose データ変換」を参照してください。

レコード形式の変換の要件

Amazon Data Firehose では、レコードデータの形式を変換するために次の 3 つの要素が必要です。

  • 入力データの JSON を読み取るデシリアライザーApache Hive JSON SerDeまたは OpenX SerDeJSON の 2 種類のデシリアライザーのいずれかを選択できます。

    注記

    複数の JSON ドキュメントを同じレコードに結合する場合は、サポートされている JSON 形式で入力が表示されていることを確認してください。JSON ドキュメントの配列は有効な入力ではありません。

    たとえば、これは正しい入力です: {"a":1}{"a":2}

    そして、これは間違った入力です: [{"a":1}, {"a":2}]

  • データの解釈方法を決定するスキーマAWS Glue を使用して AWS Glue Data Catalog にスキーマを作成します。次に、Amazon Data Firehose はそのスキーマを参照し、それを使用して入力データを解釈します。同じスキーマを使用して、Amazon Data Firehose と分析ソフトウェアの両方を設定できます。詳細については、AWS Glue 開発者ガイドの「AWS Glue データカタログの作成」を参照してください。

    注記

    AWS Glue Data Catalog で作成されたスキーマは、入力データ構造と一致している必要があります。一致していないと、変換されたデータに、スキーマで指定されていない属性が含まれなくなります。ネストされた JSON を使用する場合は、STRUCT タイプを JSON データの構造を反映したスキーマで使用します。STRUCT タイプを使ってネストされた JSON を処理する方法については、こちらの例を参照してください。

  • データをターゲット列指向ストレージ形式 (Parquet または ORC) に変換するシリアライザーORC SerDeまたは Parquet SerDeの 2 種類のシリアライザーから 1 つを選択できます。

重要

レコード形式の変換を有効にすると、Amazon Data Firehose の送信先を Amazon OpenSearch Service、Amazon Redshift、Splunk に設定することはできません。形式変換を有効にすると、Firehose ストリームに使用できる送信先は Amazon S3 だけです。

レコードを Amazon Data Firehose に送信する前にレコードを集計した場合でも、データの形式を変換できます。

JSON デシリアライザーの選択

入力 JSON に次の形式のタイムスタンプが含まれている場合は、OpenX SerDe JSON を選択します。

  • yyyy-MM-dd'T'HH:mm:ss[.S]'Z'。小数は最大 9 桁まで使用できます – 例: 2017-02-07T15:13:01.39256Z

  • yyyy-[M]M-[d]d HH:mm:ss[.S]。小数は最大 9 桁まで使用できます – 例: 2017-02-07 15:13:01.14

  • エポック秒 – たとえば、1518033528 です。

  • エポックミリ秒 – たとえば、1518033528123 です。

  • 浮動小数点エポック秒 – たとえば、1518033528.123 です。

OpenX JSON SerDe はピリオド (.) をアンダースコア () に変換できます_。デシリアライズする前に、JSON キーを小文字に変換することもできます。Amazon Data Firehose を介したこのデシリアライザーで使用できるオプションの詳細については、「OpenXJsonSerDe」を参照してください。

どのデシリアライザーを選択するべきかわからない場合は、サポートされていないタイムスタンプがない限り SerDe、OpenX JSON を使用してください。

前述の形式以外の形式のタイムスタンプがある場合は、Apache Hive JSON SerDeを使用します。このデシリアライザーを選択すると、使用するタイムスタンプ形式を指定できます。指定するには、Joda-Time DateTimeFormat 形式の文字列のパターン構文に従います。詳細については、「クラス DateTimeFormat」を参照してください。

特殊な値 millis を使用して、エポックミリ秒でタイムスタンプを解析することもできます。形式を指定しない場合、Amazon Data Firehose はjava.sql.Timestamp::valueOfデフォルトで を使用します。

Hive JSON SerDe では、次のことはできません。

  • 列名のピリオド (.)。

  • タイプが uniontype のフィールド。

  • スキーマに数値型を持つフィールドですが、JSON 形式の文字列です。例えば、スキーマが (int) で、JSON が の場合{"a":"123"}、Hive はエラー SerDe を表示します。

Hive SerDe は、ネストされた JSON を文字列に変換しません。たとえば、{"a":{"inner":1}} がある場合、{"inner":1} は文字列として扱われません。

シリアライザーの選択

選択するシリアライザーは、ビジネスニーズに応じて異なります。2 つのシリアライザオプションの詳細については、「ORC SerDe」および「Parquet SerDe」を参照してください。

入力レコードの形式の変換 (コンソール)

Firehose ストリームを作成または更新するときに、コンソールでデータ形式変換を有効にできます。データ形式変換を有効にすると、Firehose ストリームに設定できる唯一の送信先は Amazon S3 です。また、形式変換を有効にすると Amazon S3 圧縮が無効化されます。ただし、変換プロセスの一部として Snappy 圧縮が自動的に実行されます。この場合、Amazon Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy フレーミング形式については、「.BlockCompressorStreamjava」を参照してください。

データ Firehose ストリームのデータ形式変換を有効にするには
  1. にサインインしAWS Management Console、https://console.aws.amazon.com/firehose/ で Amazon Data Firehose コンソールを開きます。

  2. 更新する Firehose ストリームを選択するか、「」の手順に従って新しい Firehose ストリームを作成しますFirehose ストリームの作成

  3. [Convert record format (レコード形式を変換)] で、[Record format conversion (レコード形式の変換)] を [Enabled (有効)] に設定します。

  4. 目的の出力形式を選択します。2 つのオプションの詳細については、Apache Parquet および Apache ORC を参照してください。

  5. AWS Glue テーブルを選択してソースレコードのスキーマを指定します。リージョン、データベース、テーブル、テーブルバージョンを設定します。

入力レコードの形式の変換 (API)

Amazon Data Firehose で入力データの形式を JSON から Parquet または ORC に変換する場合は、ExtendedS3DestinationConfiguration または ExtendedS3 でオプションの ExtendedS3DestinationUpdateDataFormatConversionConfiguration要素を指定します。を指定するとDataFormatConversionConfiguration、次の制限が適用されます。

  • ではBufferingHints、レコード形式の変換を有効にすると、 SizeInMBsを 64 未満の値に設定することはできません。また、形式の変換が有効でない場合、デフォルト値は 5 です。有効にすると、この値は 128 になります。

  • CompressionFormat ExtendedS3DestinationConfiguration または ExtendedS3DestinationUpdate の を に設定する必要がありますUNCOMPRESSEDCompressionFormat のデフォルト値は UNCOMPRESSED です。したがって、ExtendedS3DestinationConfiguration では指定しないままにすることもできます。その場合もデータは、デフォルトで Snappy 圧縮を使用して、シリアル化プロセスの一環として圧縮されます。この場合、Amazon Data Firehose が使用する Snappy のフレーミング形式は Hadoop と互換性があります。つまり、Snappy 圧縮の結果を使用して、Athena でこのデータに対するクエリを実行できます。Hadoop が依存する Snappy フレーミング形式については、「BlockCompressorStream.java」を参照してください。シリアライザーを構成する場合は、他のタイプの圧縮を選択できます。

レコード形式変換のエラー処理

Amazon Data Firehose がレコードを解析または逆シリアル化できない場合 (例えば、データがスキーマと一致しない場合)、エラープレフィックスを使用してレコードを Amazon S3 に書き込みます。この書き込みが失敗した場合、Amazon Data Firehose は永続的に再試行し、それ以上の配信をブロックします。失敗したレコードごとに、Amazon Data Firehose は次のスキーマで JSON ドキュメントを書き込みます。

{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }

レコード形式の変換例

を使用してレコード形式の変換を設定する方法の例についてはAWS CloudFormation、AWS「:::DataFirehose::DeliveryStream」を参照してください。