アプリケーション入力の設定 - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

アプリケーション入力の設定

Amazon Kinesis Data Analytics アプリケーションでは、1 つのストリーミングソースから入力を受け取ることができます。また、オプションで 1 つのリファレンスデータソースを使用できます。詳細については、「Amazon Kinesis Data Analytics for SQL Applications: 仕組み」を参照してください。このトピックのセクションでは、アプリケーション入力ソースについて説明します。

ストリーミングソースの設定

アプリケーションを作成するときに、ストリーミングソースを指定します。アプリケーションを作成した後に入力を変更することもできます。Amazon Kinesis Data Analytics では、アプリケーションに対して以下のストリーミングソースがサポートされています。

  • Kinesis データストリーム

  • Firehose 配信ストリーム

注記

2023 年 9 月 12 日以降、SQL 用 Kinesis Data Analytics をまだ使用していない場合、Kinesis Data Firehose をソースとして使用して新しいアプリケーションを作成することはできません。KinesisFirehoseInput と一緒に Kinesis Data Analytics for SQL アプリケーションを使用している既存のお客様は、Kinesis Data Analytics を使用して既存のアカウント内で、引き続き KinesisFirehoseInput でアプリケーションを追加できます。既存のお客様で、KinesisFirehoseInput と一緒に Kinesis Data Analytics for SQL アプリケーションを使用して新規アカウントを作成される場合は、サービス制限拡大フォームを通してケースを作成できます。詳細については、AWS Support センターを参照してください。新しいアプリケーションを本番環境に移行する前に、必ずテストすることをお勧めします。

注記

Kinesis データストリームが暗号化されている場合、Kinesis Data Analytics は暗号化されたストリームのデータにシームレスにアクセスし、それ以上の設定は必要ありません。Kinesis Data Analytics では、Kinesis Data Streams から読み取った暗号化されていないデータは保存されません。詳細については、「Kinesis Data Streams 用のサーバー側の暗号化とは」を参照してください。

Kinesis Data Analytics は、ストリーミングソースに新しいデータがあるか継続的にポーリングし、入力設定に応じてアプリケーション内ストリームに取り込みます。

注記

アプリケーションの入力として Kinesis ストリームを追加しても、ストリーム内のデータには影響を与えません。Firehose 配信ストリームなどの別のリソースも同じ Kinesis ストリームにアクセスした場合、Firehose 配信ストリームと Kinesis Data Analytics アプリケーションの両方が同じデータを受け取ります。ただし、スループットとスロットリングは影響を受ける可能性があります。

アプリケーションコードは、アプリケーション内ストリームをクエリできます。入力設定の一部として以下を指定します。

  • ストリーミングソース – ストリームの Amazon リソースネーム (ARN) と、Kinesis Data Analytics がユーザーに代わってストリームにアクセスするために引き受けることができる IAM ロールを指定します。

  • アプリケーション内ストリーム名のプレフィックス – アプリケーションを起動すると、Kinesis Data Analytics によって指定されたアプリケーション内ストリームが作成されます。この名前を使用して、アプリケーションコードでアプリケーション内ストリームにアクセスします。

    オプションで、ストリーミングリソースを複数のアプリケーション内ストリームにマッピングできます。詳細については、「Limits」を参照してください。この場合、Amazon Kinesis Data Analytics は、prefix_001prefix_002prefix_003 などの名前で、アプリケーション内ストリームを指定された数だけ作成します。デフォルトでは、Kinesis Data Analytics はストリーミングソースを prefix_001 という名前の 1 つのアプリケーション内ストリームにマッピングします。

    アプリケーション内ストリームに挿入できる行は、速度の制限があります。そのため、Kinesis Data Analytics では複数のアプリケーション内ストリームをサポートして、より高速にアプリケーションにレコードを届けます。アプリケーションがストリーミングソースのデータをアップし続けることができない場合は、並列処理ユニットを追加してパフォーマンスを向上させることができます。

  • マッピングスキーマ – ストリーミングソースでのレコード形式 (JSON、CSV) を指定します。また、ストリームの各レコードが、作成されるアプリケーション内ストリーム内の列にマッピングされる方法も指定します。ここで、列名とデータ型を指定します。

注記

Kinesis Data Analytics は、入力アプリケーション内ストリームの作成時に、識別子 (ストリーム名および列名) に引用符を追加します。このストリームと列をクエリする場合は、完全一致 (大文字と小文字が正確に一致) を使用して引用符内を指定する必要があります。識別子の詳細については、「Amazon Managed Service for Apache Flink SQL リファレンス」の「Identifiers (識別子)」を参照してください。

Amazon Kinesis Data Analytics コンソールでアプリケーションを作成し、入力を設定できます。その後、コンソールは必要な API コールを行います。アプリケーション入力の設定は、新しいアプリケーション API の作成時、または既存のアプリケーションに入力設定を追加するときに行うことができます。詳細については、CreateApplicationおよびAddApplicationInputを参照してください。以下は Createapplication API リクエストボディの入力設定部分です。

"Inputs": [ { "InputSchema": { "RecordColumns": [ { "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "KinesisFirehoseInput": { "ResourceARN": "string", "RoleARN": "string" }, "KinesisStreamsInput": { "ResourceARN": "string", "RoleARN": "string" }, "Name": "string" } ]

リファレンスソースの設定

オプションで既存のアプリケーションにリファレンスデータソースを追加して、ストリーミングソースから入力されるデータを強化できます。リファレンスデータは Amazon S3 バケット内のオブジェクトとして保存する必要があります。アプリケーションが起動すると、Amazon Kinesis Data Analytics は Amazon S3 オブジェクトを読み取り、アプリケーション内テーブルを作成します。その後、アプリケーションコードでこれをアプリケーション内ストリームと結合できます。

サポートされている形式 (CSV、JSON) で、Amazon S3 オブジェクトにリファレンスデータを保存します。たとえば、アプリケーションで株注文を分析すると仮定します。ストリーミングソースには次に示すレコード形式があるとします。

Ticker, SalePrice, OrderId AMZN $700 1003 XYZ $250 1004 ...

この場合、会社名などの詳細を株価ティッカーに提供するリファレンスデータソースを保持することを検討する場合があります。

Ticker, Company AMZN, Amazon XYZ, SomeCompany ...

アプリケーションのリファレンスデータソースは、API またはコンソールで追加できます。Amazon Kinesis Data Analytics では、以下の API アクションで、リファレンスデータソースを管理します。

コンソールを使用してリファレンスデータを追加する方法の詳細については、「例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する」を参照してください。

次の点に注意してください。

  • アプリケーションが実行されている場合、Kinesis Data Analytics はアプリケーション内リファレンステーブルを作成し、ただちにリファレンスデータをロードします。

  • アプリケーションが実行されていない (たとえば、準備完了状態など) 場合、Kinesis Data Analytics は更新された入力設定を保存するだけです。アプリケーションの実行が始まると、Kinesis Data Analytics はリファレンスデータをテーブルとしてアプリケーションにロードします。

Kinesis Data Analytics がアプリケーション内リファレンステーブルを作成した後で、データを更新するとします。Amazon S3 オブジェクトを更新したり、別の Amazon S3 オブジェクトを使用したいという場合があるかもしれません。この場合は、UpdateApplication を明示的に呼び出すか、コンソールで [アクション]、[リファレンスデータテーブルを同期] の順に選択します。Kinesis Data Analytics では、アプリケーション内リファレンステーブルは自動的に更新されません。

リファレンスデータソースとして作成できる Amazon S3 オブジェクトには、サイズの制限があります。詳細については、「Limits」を参照してください。オブジェクトのサイズが制限を超えた場合には、Kinesis Data Analytics でデータをロードできません。アプリケーションの状態は実行中と表示されますが、データが読み込まれません。

リファレンスデータソースを追加する場合は、次の情報を指定します。

  • S3 バケットおよびオブジェクトキー名 – バケット名およびオブジェクトキーに加えて、ユーザーの代わりにオブジェクトを読み取るために Kinesis Data Analytics が引き受けることができる IAM ロールも指定します。

  • アプリケーション内リファレンステーブル名 – Kinesis Data Analytics がこのアプリケーション内テーブルを作成し、Amazon S3 オブジェクトを読み取ってそこに入力します。これは、アプリケーションコードで指定するテーブルの名前です。

  • マッピングスキーマ – レコード形式 (JSON, CSV)、Amazon S3 オブジェクトに保存されたデータのエンコードを記述します。また、各データ要素がどのようにアプリケーション内リファレンステーブルにマッピングされるかも記述します。

以下に、AddApplicationReferenceDataSource API リクエストの本文を示します。

{ "applicationName": "string", "CurrentapplicationVersionId": number, "ReferenceDataSource": { "ReferenceSchema": { "RecordColumns": [ { "IsDropped": boolean, "Mapping": "string", "Name": "string", "SqlType": "string" } ], "RecordEncoding": "string", "RecordFormat": { "MappingParameters": { "CSVMappingParameters": { "RecordColumnDelimiter": "string", "RecordRowDelimiter": "string" }, "JSONMappingParameters": { "RecordRowPath": "string" } }, "RecordFormatType": "string" } }, "S3ReferenceDataSource": { "BucketARN": "string", "FileKey": "string", "ReferenceRoleARN": "string" }, "TableName": "string" } }