Kinesis Data Streams を使用した Studio ノートブックの作成 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kinesis Data Streams を使用した Studio ノートブックの作成

このチュートリアルでは、Kinesis Data Stream をソースとして使用する Studio ノートブックを作成する方法について説明します。

セットアップ

Studio ノートブックを作成する前に、Kinesis データストリーム (ExampleInputStream) を作成します。アプリケーションはこのストリームをアプリケーションソースとして使用します。

このストリームは Amazon Kinesis コンソールまたは次の AWS CLI コマンドを使用して作成できます。コンソールでの操作方法については、「Amazon Kinesis Data Streams デベロッパーガイド」 の 「データストリームの作成および更新」 を参照してください。ストリーム ExampleInputStream に名前を付け、[オープンシャード数] を 1 に設定します。

を使用してストリーム (ExampleInputStream) を作成するには AWS CLI、次の Amazon Kinesis create-stream AWS CLI コマンドを使用します。

$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \ --profile adminuser

テーブルを作成します。 AWS Glue

Studio ノートブックは、Kinesis Data Streams データソースに関するメタデータに「AWS Glue」データベースを使用します。

注記

データベースは最初に手動で作成することも、ノートブックの作成時に Apache Flink 用 Managed Service に自動的に作成させることもできます。同様に、このセクションで説明されているように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックで Apache Flink 用 Managed Service のテーブル作成コネクタコードで DDL ステートメントを使用してテーブルを作成することもできます。その後、 AWS Glue チェックインしてテーブルが正しく作成されたことを確認できます。

テーブルを作成する
  1. AWS Management Console にサインインし、https://console.aws.amazon.com/glue/ AWS Glue のコンソールを開きます。

  2. AWS Glue データベースをまだお持ちでない場合は、左側のナビゲーションバーから [データベース] を選択します。[Add database] (データベースの追加) を選択します。[データベースの追加] ウィンドウで、[データベース名] に default を入力します。[作成] を選択します。

  3. 左のナビゲーションバーで、[テーブル] を選択します。「テーブル」ページで、「テーブルを追加」、「テーブルを手動で追加」を選択します。

  4. テーブルのプロパティの設定」ページで、「テーブル名」に stock を入力します。以前に作成したデータベースを選択していることを確認してください。[次へ] をクリックします。

  5. [データストアの追加] ページで、[Kinesis] を選択します。[Stream name](ストリーム名)に ExampleInputStream を入力します。[Kinesis ソース URL] には、 https://kinesis.us-east-1.amazonaws.com の入力を選択します。[Kinesis ソース URL] をコピーして貼り付ける場合は、先頭または末尾のスペースを必ず削除してください。[次へ] をクリックします。

  6. 分類」ページで「JSON」を選択します。[次へ] をクリックします。

  7. スキーマを定義するで、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。

    列名 データ型
    ticker string
    price double

    [次へ] をクリックします。

  8. 次のページで設定を確認し、「終了」を選択します。

  9. テーブルの一覧で、新しく作成したテーブルを選択します。

  10. テーブル編集」を選択し、キー managed-flink.proctime と値 proctime を含むプロパティを追加します。

  11. [適用] を選択します。

Kinesis Data Streams を使用した Studio ノートブックの作成

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

アプリケーションを作成するには、 AWS Management Console またはを使用できます AWS CLI。

を使用して Studio ノートブックを作成します。 AWS Management Console

  1. https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard」にある Apache Flink コンソール用 Managed Service を開きます。

  2. Apache Flink アプリケーション用 Managed Service」ページで、「Studio」タブを選択します。[Studio ノートブックの作成] を選択します。

    注記

    入力する Amazon MSK クラスターまたは Kinesis Data Streams を選択して [リアルタイムでデータを処理] を選択することで、Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成することもできます。

  3. [Studio ノートブックの作成] ページで、次の情報を入力します。

    • ノートブックの名前に「MyNotebook」を入力します。

    • AWS Glue データベース」の「デフォルト」を選択します。

    [Studio ノートブックの作成] を選択します。

  4. MyNotebookページで [実行] を選択します。「ステータス」に「実行中」が表示されるまで待ちます。ノートブックの実行中は料金が発生します。

を使用して Studio ノートブックを作成します。 AWS CLI

を使用して Studio ノートブックを作成するには AWS CLI、次の操作を行います。

  1. アカウント ID を確認します。アプリケーションを作成する際にこの値が必要になります。

  2. ロール arn:aws:iam::AccountID:role/ZeppelinRole を作成し、コンソールで自動作成されたロールに以下の権限を追加します。

    "kinesis:GetShardIterator",

    "kinesis:GetRecords",

    "kinesis:ListShards"

  3. create.json というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  4. アプリケーションを作成するには、次のコマンドを実行します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  5. コマンドが完了すると、新しい Studio ノートブックの詳細を示す出力結果が表示されます。次は出力の例です。

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  6. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Kinesis データストリームへのデータ送信

Kinesis データストリームにテストデータを送信するには、次の手順に従います。

  1. Kinesis Data Generator を開きます。

  2. でCognitoユーザーを作成」を選択します。 CloudFormation

  3. AWS CloudFormation コンソールが開き、Kinesis データジェネレーターテンプレートが表示されます。[次へ] をクリックします。

  4. [Specify component details] (コンポーネントの詳細の指定) ページで、Cognito ユーザーのユーザー名とパスワードを入力します。[次へ] をクリックします。

  5. [スタックオプションの設定] ページで、[次へ] を選択します。

  6. Kinesis-データジェネレーター-Cognit-Userのレビューページで、IAMリソースを作成する可能性のある [同意します] を選択します。 AWS CloudFormation チェックボックス。[Create Stack] (スタックの作成) を選択します。

  7. AWS CloudFormation スタックの作成が完了するまでお待ちください。スタックが完了したら、コンソールで Kinesis-データジェネレーター-Cognito-Userスタックを開き、[出力] タブを選択します。 AWS CloudFormation 出力値としてリストされている URL KinesisDataGeneratorUrlを開きます。

  8. [Amazon Kinesis Data Generator] ページで、ステップ 4 で作成した認証情報を使用してログインします。

  9. 次のページで、次の値を入力します。

    リージョン us-east-1
    ストリーム/Firehose ストリーム ExampleInputStream
    1 秒あたりのレコード数 1

    [Record Template] (記録テンプレート) に、次の内容を貼り付けます。

    { "ticker": "{{random.arrayElement( ["AMZN","MSFT","GOOG"] )}}", "price": {{random.number( { "min":10, "max":150 } )}} }
  10. [データ送信] を選択します。

  11. ジェネレータは、Kinesis データストリームにデータを送信します。

    次のセクションを完了する間、ジェネレータを作動させたままにしておきます。

Studio ノートブックをテストします。

このセクションでは、Studio ノートブックを使用して Kinesis データストリームのデータをクエリします。

  1. https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard」にある Apache Flink コンソール用 Managed Service を開きます。

  2. [Apache Flink アプリケーション用 Managed Service] ページで、[Studio ノートブック] タブを選択します。を選択しますMyNotebook

  3. MyNotebookページで「Apache ツェッペリンで開く」を選択します。

    新しいタブで Apache Zeppelin インターフェイスが開きます。

  4. [Zeppelinへようこそ!]ページで [Zeppelin Note] を選択します。

  5. Zeppelin Note」ページで、新しいノートに次のクエリを入力します。

    %flink.ssql(type=update) select * from stock

    実行アイコンを選択します。

    しばらくすると、ノートには Kinesis データストリームのデータが表示されます。

アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「FLINK JOB」を選択します。Flink Dashboard の詳細については、「Managed Service for Apache Flink デベロッパーガイド」の「Apache Flink ダッシュボード」を参照してください。

Flink ストリーミング SQL クエリの他の例については、「Apache Flink ドキュメント」の「クエリ」を参照してください。