例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する - Amazon Kinesis Data Analytics for SQL Applications 開発者ガイド

新規プロジェクトでは、Kinesis Data Analytics for SQL よりも 新しい Managed Service for Apache Flink Studio を使用することをお勧めします。Managed Service for Apache Flink Studio は、使いやすさと高度な分析機能を兼ね備えているため、高度なストリーム処理アプリケーションを数分で構築できます。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

例: Kinesis Data Analytics アプリケーションにリファレンスデータを追加する

この実習では、既存の Kinesis Data Analytics アプリケーションにリファレンスデータを追加します。リファレンスデータについては、次のトピックを参照してください。

この実習では、Kinesis Data Analytics 開始方法の実習で作成したアプリケーションにリファレンスデータを追加します。リファレンスデータは、各ティッカーシンボルの会社名を提供します。次に例を示します。

Ticker, Company AMZN,Amazon ASD, SomeCompanyA MMB, SomeCompanyB WAS, SomeCompanyC

まず、使用開始の演習のステップを完了してスターターアプリケーションを作成します。次に、以下のステップに従ってリファレンスデータを設定し、アプリケーションに追加します。

  1. データを準備する

    • 上記の参照データをオブジェクトとして Amazon Simple Storage Service (Amazon S3) に保存します。

    • ユーザーに代わって Kinesis Data Analytics が Amazon S3 オブジェクトの読み取りを引き受けられるよう IAM ロールを作成します。

  2. アプリケーションにリファレンスデータソースを追加します。

    Kinesis Data Analytics は Amazon S3 オブジェクトを読み取り、アプリケーションコードでクエリできるアプリケーション内リファレンステーブルを作成します。

  3. コードをテストします。

    アプリケーションコード内に、アプリケーション内ストリームをアプリケーション内リファレンステーブルに結合して各ティッカーシンボルの会社名を取得する結合クエリを作成できます。

ステップ 1: 準備

このセクションでは、サンプルのリファレンスデータを Amazon S3 バケットにオブジェクトとして保存します。ユーザーに代わって Kinesis Data Analytics がオブジェクトの読み取りを引き受けられるよう IAM ロールを作成することもできます。

Amazon S3 オブジェクトとしてのリファレンスデータの保存

このステップでは、サンプルのリファレンスデータを Amazon S3 オブジェクトとして保存します。

  1. テキストエディタを開き、以下のデータを追加して、ファイルを TickerReference.csv として保存します。

    Ticker, Company AMZN,Amazon ASD, SomeCompanyA MMB, SomeCompanyB WAS, SomeCompanyC

  2. TickerReference.csv ファイルを S3 バケットにアップロードします。手順については、「Amazon Simple Storage Service ユーザーガイド」の「Amazon S3 へのオブジェクトのアップロード」を参照してください。

IAM ロールを作成します。

次に、Kinesis Data Analytics が Amazon S3 オブジェクトの読み取りを引き受けられるよう IAM ロールを作成します。

  1. AWS Identity and Access Management (IAM) で、KinesisAnalytics-ReadS3Object という名前の IAM ロールを作成します。ロールを作成するには、IAM ユーザーガイドにある「Amazon Service (AWS Management Console) 用のロールを作成する」の手順に従ってください。

    IAM コンソールで、以下を指定します。

    • [ロールタイプの選択] で、[AWS Lambda] を選択します。ロールの作成が完了したら、Kinesis Data Analytics (AWS Lambda ではなく) がロールを引き受けられるよう信頼ポリシーを変更します。

    • [Attach Policy] ページでポリシーをアタッチしないでください。

  2. IAM; ロールポリシーを更新します。

    1. IAM; コンソールで、作成したロールを選択します。

    2. [Trust Relationships (信頼関係)] タブで、信頼ポリシーを更新してロールを引き受ける権限を Kinesis Data Analytics に付与します。以下に信頼ポリシーを示します。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "kinesisanalytics.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

    3. [Permissions] タブで、[AmazonS3ReadOnlyAccess] という Amazon 管理ポリシーをアタッチします。これにより、Amazon S3 オブジェクトを読み取るアクセス権限をロールに付与します。このポリシーを以下に示します。

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "s3:Get*", "s3:List*" ], "Resource": "*" } ] }

ステップ 2: リファレンスデータソースをアプリケーション設定に追加する

このステップでは、リファレンスデータソースをアプリケーション設定に追加します。最初に、次の情報が必要です。

  • S3 バケット名およびオブジェクトキー名

  • IAM; ロールの Amazon リソースネーム (ARN)

  1. アプリケーションのメインページで [Connect reference data (リファレンスデータの接続)] を選択します。

  2. [リファレンスデータソースの接続] ページで、リファレンスデータオブジェクトを含む Amazon S3 バケットを選択し、オブジェクトのキー名を入力します。

  3. [アプリケーション内リファレンステーブル名] に CompanyName と入力します。

  4. [選択したリソースへのアクセス] セクションで、[Kinesis Analytics が引き受ける IAM ロール] を選択後、前のセクションで作成した IAM ロール [KinesisAnalytics-ReadS3Object] を選択します。

  5. [スキーマの検出] を選択します。コンソールによって、リファレンスデータの 2 つの列が検出されます。

  6. [保存して閉じる] を選択します。

ステップ 3: テスト: アプリケーション内リファレンステーブルをクエリする

アプリケーション内リファレンステーブル CompanyName をクエリできるようになりました。ティッカー価格データをリファレンステーブルに結合することにより、リファレンス情報を使用してアプリケーションを強化できます。結果に会社名が表示されます。

  1. アプリケーションコードを以下に置き換えます。クエリはアプリケーション内入力ストリームをアプリケーション内リファレンステーブルと結合します。アプリケーションコードは、結果を別のアプリケーション内ストリーム、DESTINATION_SQL_STREAM に書き込みます。

    CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (ticker_symbol VARCHAR(4), "Company" varchar(20), sector VARCHAR(12), change DOUBLE, price DOUBLE); CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM" SELECT STREAM ticker_symbol, "c"."Company", sector, change, price FROM "SOURCE_SQL_STREAM_001" LEFT JOIN "CompanyName" as "c" ON "SOURCE_SQL_STREAM_001".ticker_symbol = "c"."Ticker";
  2. アプリケーション出力が [SQLResults] タブに表示されていることを確認します。一部の行に会社名が表示されることを確認します (サンプルのリファレンスデータには、すべての会社名はありません)。