SQL 入力列へのストリーミングソース要素のマッピング - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

SQL 入力列へのストリーミングソース要素のマッピング

注記

2023 年 9 月 12 日以降、SQL 用 Kinesis Data Analytics をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。詳細については、「制限」を参照してください。

Amazon Kinesis Data Analytics で、標準 SQL を使用して JSON 形式または CSV 形式でストリーミングデータを処理および分析します。

  • ストリーミングする CSV データを処理して分析するには、入力ストリームの列に列名とデータ型を割り当てます。入力ストリームから列定義ごとに 1 つの列が順番にアプリケーションでインポートされます。

    アプリケーションの入力ストリームにすべての列を含める必要はありませんが、ソースストリームから列をスキップすることはできません。たとえば、5 つの要素を含む入力ストリームから最初の 3 つの列をインポートすることはできますが、列 1、2、4 のみインポートすることはできません。

  • ストリーミング JSON データを処理して分析するには、JSONPath 式を使用して、ストリーミングソースの JSON 要素を入力ストリームの SQL 列にマッピングします。Amazon Kinesis Data Analytics で JSONPath を使用する方法の詳細については、JSONPath の操作 を参照してください。SQL テーブルの列は、JSON 型からマッピングされたデータ型です。サポートされているデータ型のリストについては、「データ型」を参照してください。JSON データを SQL データに変換する方法については、「JSON データ型から SQL データ型へのマッピング」を参照してください。

入力ストリームの設定方法の詳細については、「アプリケーション入力の設定」を参照してください。

SQL 列への JSON データのマッピング

AWS Management Console または Kinesis Data Analytics API を使用して、JSON 要素を入力列にマッピングできます。

  • コンソールを使用して要素を列にマッピングするには、「スキーマエディタの使用」を参照してください。

  • Kinesis Data Analytics API を使用して要素を列にマッピングするには、次のセクションを参照してください。

JSON 要素をアプリケーション内の入力ストリームの列にマッピングするには、スキーマで、以下の情報が各列に必要です。

  • ソース式: 列のデータの場所を識別する JSONPath 式。

  • 列名: SQL クエリでデータの参照に使用する名前。

  • データ型: 列の SQL データ型。

API を使用する場合

ストリーミングソースから入力列に要素をマッピングするには、Kinesis Data Analytics API (CreateApplication) のアクションを使用できます。アプリケーション内ストリームを作成するには、SQL で使用するスキーマ化されたバージョンにデータを変換するスキーマを指定します。この CreateApplication アクションでは、単一のストリーミングソースから入力を受信するようにアプリケーションを設定します。JSON 要素または CSV 列を SQL 列にマッピングするには、「RecordColumnオブジェクトSourceSchema」を RecordColumns 配列内に作成します。RecordColumn オブジェクトには以下のスキーマがあります。

{ "Mapping": "String", "Name": "String", "SqlType": "String" }

RecordColumn オブジェクトのフィールドには、次の値があります。

  • Mapping: JSONPath 式は、入力ストリームレコードのデータの場所を識別します。ソースストリームの入力スキーマで、この値は CSV 形式で表示されません。

  • Name: アプリケーション内 SQL データストリームの列名。

  • SqlType: アプリケーション内 SQL データストリームのデータのデータ型。

JSON 入力スキーマの例

次の例では、JSON スキーマの InputSchema の値の形式について説明します。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(4)", "Name": "TICKER_SYMBOL", "Mapping": "$.TICKER_SYMBOL" }, { "SqlType": "VARCHAR(16)", "Name": "SECTOR", "Mapping": "$.SECTOR" }, { "SqlType": "TINYINT", "Name": "CHANGE", "Mapping": "$.CHANGE" }, { "SqlType": "DECIMAL(5,2)", "Name": "PRICE", "Mapping": "$.PRICE" } ], "RecordFormat": { "MappingParameters": { "JSONMappingParameters": { "RecordRowPath": "$" } }, "RecordFormatType": "JSON" }, "RecordEncoding": "UTF-8" }

CSV 入力スキーマの例

次の例では、カンマ区切り (CSV) 形式の InputSchema の値の形式について説明します。

"InputSchema": { "RecordColumns": [ { "SqlType": "VARCHAR(16)", "Name": "LastName" }, { "SqlType": "VARCHAR(16)", "Name": "FirstName" }, { "SqlType": "INTEGER", "Name": "CustomerId" } ], "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": ",", "RecordRowDelimiter": "\n" } }, "RecordFormatType": "CSV" }, "RecordEncoding": "UTF-8" }

JSON データ型から SQL データ型へのマッピング

JSON データ型は、アプリケーションの入力スキーマに基づき、対応する SQL データ型に変換されます。サポートされている SQL データ型の詳細については、「データ型」を参照してください。Amazon Kinesis Data Analytics では、次のルールに基づき、JSON データ型から SQL データ型に変換されています。

Null リテラル

JSON 入力ストリームの null リテラル (例: "City":null) は、送信先のデータ型に関係なく、SQL null に変換されます。

ブールリテラル

JSON 入力ストリームのブールリテラル (例: "Contacted":true) は、次のように SQL データに変換されます。

  • 数値型 (DECIMAL、INT など): true は 1、false は 0 に変換されます。

  • バイナリ (BINARY または VARBINARY):

    • true: 結果には、設定されている最下位ビットとクリアされた残りのビットが表示されます。

    • false: 結果にはクリアされたビットがすべて表示されます。

    VARBINARY へ変換すると、1 バイトのデータ値になります。

  • BOOLEAN: 対応する SQL BOOLEAN 値に変換されます。

  • 文字型 (CHAR または VARCHAR): 対応する文字列値 (true または false) に変換されます。値は、フィールドの長さに合わせて切り捨てられます。

  • 日時型 (DATE、TIME、TIMESTAMP): 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

JSON 入力ストリームの数値リテラル (例: "CustomerId":67321) は、次のように SQL データに変換されます。

  • 数値 (DECIMAL、INT など): 直接変換されます。変換後の値が、対象のデータ型 (つまり、123.4 を INT に変換) のサイズまたは精度を超過した場合、変換は失敗し、強制エラーがエラーストリームに書き込まれます。

  • バイナリ (BINARY または VARBINARY): 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

  • BOOLEAN:

    • 0: false に変換します。

    • 他のすべての数値: true に変換します。

  • 文字 (CHAR または VARCHAR): 数値の文字列表現に変換します。

  • 日時型 (DATE、TIME、TIMESTAMP): 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

文字列

JSON 入力ストリームの文字列値 (例: "CustomerName":"John Doe") は、SQL データを次のように変換します。

  • 数値 (DECIMAL、INT など): Amazon Kinesis Data Analytics は値をターゲットデータ型に変換しようとします。値を変換できない場合、変換が失敗し、強制エラーがエラーストリームに書き込まれます。

  • バイナリ (BINARY または VARBINARY): ソース文字列が有効なバイナリリテラル (つまり、X'3F67A23A'、偶数 f) の場合、値は対象のデータ型に変換されます。それ以外の場合、変換は失敗し、強制エラーがエラーストリームに書き込まれます。

  • BOOLEAN: ソース文字列が "true" の場合、true に変換されます。この比較では、大文字小文字を区別しません。それ以外の場合は、false に変換されます。

  • 文字型 (CHAR または VARCHAR): 入力の文字列値に変換します。対象のデータ型よりも値が長い場合は切り捨てられるため、エラーストリームに書き込まれるエラーはありません。

  • 日時型 (DATE、TIME または TIMESTAMP): ソース文字列がターゲット値に変換できる形式の場合、値は変換されます。それ以外の場合、変換は失敗し、強制エラーがエラーストリームに書き込まれます。

    有効な日時型形式は以下のとおりです。

    • "1992-02-14"

    • "1992-02-14 18:35:44.0"

配列またはオブジェクト

JSON 入力ストリームの配列またはオブジェクトは、次のように SQL データに変換されます。

  • 文字型 (CHAR または VARCHAR): 配列またはオブジェクトのソーステキストに変換します。配列へのアクセス を参照してください。

  • それ以外のデータ型: 変換は失敗し、強制エラーがエラーストリームに書き込まれます。

JSON 配列の例については、「JSONPath の操作」を参照してください。

関連トピック