Amazon MSK によるスタジオノートブックの作成 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon MSK によるスタジオノートブックの作成

このチュートリアルでは、Amazon MSK クラスターをソースとして使用する Studio ノートブックを作成する方法について説明します。

セットアップ

このチュートリアルでは、プレーンテキストでアクセスできる Amazon MSK クラスターが必要です。Amazon MSK クラスターをまだセットアップしていない場合は、「Amazon MSK の使用入門」チュートリアルに従って、Amazon VPC、Amazon MSK クラスター、トピック、および Amazon EC2 クライアントインスタンスを作成してください。

チュートリアルを実行するときは、以下の手順を実行します。

VPC に NAT ゲートウェイを追加

Amazon MSK の使用入門」チュートリアルに従って Amazon MSK クラスターを作成した場合、または既存の Amazon VPC にプライベートサブネット用の NAT ゲートウェイがまだない場合は、Amazon VPC に NAT ゲートウェイを追加する必要があります。アーキテクチャを次の図に示します。

Amazon VPC 用の NAT ゲートウェイを作成するには、以下を実行します。

  1. Amazon VPC コンソール (https://console.aws.amazon.com/vpc/) を開きます。

  2. 左のナビゲーションバーから、[NAT ゲートウェイ] を選択します。

  3. NAT ゲートウェイ」ページで「NAT ゲートウェイの作成」を選択します。

  4. [NAT ゲートウェイの作成] ページで、以下の値を入力します。

    名前-オプション ZeppelinGateway
    サブネット AWS KafkaTutorialSubnet1
    エラスティック IP アロケーション ID 使用可能な Elastic IP を選択してください。使用可能なエラスティック IP がない場合は、[エラスティック IP の割り当て] を選択してから、コンソールが作成する Elastic IP を選択します。

    [Create NAT Gateway] (NAT ゲートウェイの作成) を選択します。

  5. 左のナビゲーションバーで、[Route Tables] (ルートテーブル) を選択します。

  6. [ルートテーブルの作成] を選択します。

  7. [ルートテーブルの作成] ページで、以下の情報を指定します。

    • 名前タグ: ZeppelinRouteTable

    • VPC: 自分の VPC (VPC などAWS KafkaTutorial) を選択してください。

    [作成] を選択します。

  8. ルートテーブルのリストで、を選択します。ZeppelinRouteTable[ルート] タブを選択し、[ルート編集] を選択します。

  9. [ルートの編集] ページで、[ルートの追加] を選択します。

  10. [送信先] に「0.0.0.0/0」と入力します。[ターゲット] で [NAT ゲートウェイ] を選択しますZeppelinGateway。[ルートの保存] を選択します。[閉じる] を選びます。

  11. 「ルートテーブル」ページで、「サブネットアソシエーション」ZeppelinRouteTableタブを選択した状態で、「サブネットアソシエーション」タブを選択します。「サブネット関連付けの編集」を選択します。

  12. サブネットアソシエーションの編集」 ページで、「AWS KafkaTutorialSubnet2」と「AWS KafkaTutorialSubnet3」を選択します。[保存] を選択します。

AWS Glue 接続とテーブルを作成します。

Studio ノートブックは、Amazon MSK データソースに関するメタデータ用の「AWS Glue」データベースを使用します。このセクションでは、Amazon MSK AWS Glue クラスターへのアクセス方法を説明する接続と、Studio Notebook AWS Glue などのクライアントにデータソース内のデータを表示する方法を説明するテーブルを作成します。

接続を作成する
  1. AWS Management Console にサインインし、https://console.aws.amazon.com/glue/ AWS Glue のコンソールを開きます。

  2. AWS Glue データベースをまだお持ちでない場合は、左側のナビゲーションバーから [データベース] を選択します。[Add database] (データベースの追加) を選択します。[データベースの追加] ウィンドウで、[データベース名] に default を入力します。[作成] を選択します。

  3. 左のナビゲーションバーから、[接続]を選択します。[Add connection (接続の追加)] を選択します。

  4. 接続を追加」ウィンドウで、次の値を入力します。

    • [Connection Name] (接続名) に、ZeppelinConnection と入力します。

    • [接続タイプ] で、[Kafka] を選択します。

    • Kafka ブートストラップサーバー URL」には、クラスターのブートストラップブローカーの文字列を指定します。ブートストラップブローカーは、MSK コンソールから、または次の CLI コマンドを入力して取得できます。

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • SSL 接続が必要」チェックボックスをオフにします。

    [次へ] をクリックします。

  5. [VPC] ページで、次の値を入力します。

    • VPC の場合は、VPC の名前 (VPC など AWS KafkaTutorial) を選択します。

    • [サブネット] には 2 を選択します。AWS KafkaTutorialSubnet

    • セキュリティグループ」では、使用可能なすべてのグループを選択します。

    [次へ] をクリックします。

  6. 接続プロパティ」/「接続アクセス」ページで 「完了」を選択します。

テーブルを作成する
注記

次の手順で説明するように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックにある Apache Flink 用 Managed Service のテーブル作成コネクタコードを使用して DDL ステートメントでテーブルを作成することもできます。その後、 AWS Glue チェックインしてテーブルが正しく作成されたことを確認できます。

  1. 左のナビゲーションバーで、[テーブル] を選択します。「テーブル」ページで、「テーブルを追加」、「テーブルを手動で追加」を選択します。

  2. テーブルのプロパティの設定」ページで、「テーブル名」に stock を入力します。以前に作成したデータベースを選択していることを確認してください。[次へ] をクリックします。

  3. データストアの追加」ページで「Kafka」を選択します。[トピック名] には、トピック名 (例:AWS KafkaTutorialTopic) を入力します。[接続] には、を選択しますZeppelinConnection

  4. 分類」ページで「JSON」を選択します。[次へ] をクリックします。

  5. スキーマを定義するで、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。

    列名 データ型
    ticker string
    price double

    [次へ] をクリックします。

  6. 次のページで設定を確認し、「終了」を選択します。

  7. テーブルの一覧で、新しく作成したテーブルを選択します。

  8. テーブル編集」を選択し、キー managed-flink.proctime と値 proctime を含むプロパティを追加します。

  9. [適用] を選択します。

Amazon MSK による Studio ノートブックの作成

アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。

AWS Management Console またはを使用してアプリケーションを作成できます AWS CLI。
注記

Amazon MSK コンソールから既存のクラスターを選択し、「データをリアルタイムで処理」を選択することで Studio ノートブックを作成することもできます。

を使用して Studio ノートブックを作成します。 AWS Management Console

  1. https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard」にある Apache Flink コンソール用 Managed Service を開きます。

  2. Apache Flink アプリケーション用 Managed Service」ページで、「Studio」タブを選択します。「Studio ノートブックの作成」を選択します。

    注記

    Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成するには、入力の Amazon MSK クラスターまたは Kinesis データストリームを選択し、「データをリアルタイムで処理」を選択します。

  3. [Studio ノートブックの作成] ページで、次の情報を入力します。

    • Studio ノートブック名」に MyNotebook を入力します。

    • AWS Glue データベース」の「デフォルト」を選択します。

    [Studio ノートブックの作成] を選択します。

  4. MyNotebookページで [設定] タブを選択します。「Networking」セクションで、「編集」を選択します。

  5. [ネットワークの編集] MyNotebook ページで、Amazon MSK クラスターに基づく VPC 設定を選択します。「Amazon MSK クラスター」には Amazon MSK クラスターを選択します。[変更を保存] を選択します。

  6. MyNotebookページで [Run] を選択します。ステータス」に「実行中」が表示されるまで待ちます。

を使用して Studio ノートブックを作成します。 AWS CLI

を使用して Studio ノートブックを作成するには AWS CLI、次の操作を行います。

  1. 次の情報があることを確認します。アプリケーションを作成するにはこれらの値が必要です。

    • アカウント ID。

    • Amazon MSK クラスターを含む Amazon VPC 用のサブネット ID やセキュリティグループ ID。

  2. create.json というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. アプリケーションを作成するには、次のコマンドを実行します。

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. コマンドが完了すると、次のような出力が表示され、新しい Studio ノートブックの詳細が表示されます。

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Amazon MSK クラスターにデータを送信します。

このセクションでは、Amazon EC2 クライアントで Python スクリプトを実行して Amazon MSK データソースにデータを送信します。

  1. Amazon EC2 クライアントに接続します。

  2. 以下のコマンドを実行して Python バージョン 3、Pip、および Kafka for Python パッケージをインストールし、アクションを確認します。

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 次のコマンドを入力して、 AWS CLI クライアントマシン上でを設定します。

    aws configure

    アカウントの認証情報と us-east-1region に入力します。

  4. stock.py というファイルを次の内容で作成します。サンプル値を Amazon MSK クラスターの Bootstrap Brokers 文字列に置き換え、トピックがそうでない場合はトピック名を更新してください。AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 次のコマンドを使用してスクリプトを実行します。

    $ python3 stock.py
  6. 以下のセクションを実行している間は、スクリプトを実行したままにしておきます。

Studio ノートブックをテストします。

このセクションでは、Studio ノートブックを使用して Amazon MSK クラスターのデータをクエリします。

  1. https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard」にある Apache Flink 用 Managed Serviceコンソールを開きます。

  2. [Apache Flink アプリケーション用 Managed Service] ページで、[Studio ノートブック] タブを選択します。を選択してくださいMyNotebook

  3. MyNotebookページで、「Apache ツェッペリンで開く」を選択します。

    新しいタブで Apache Zeppelin インターフェイスが開きます。

  4. Zeppelinへようこそ!」でページで「Zeppelinの新ノート」を選択します。

  5. Zeppelin Note」ページで、新しいノートに次のクエリを入力します。

    %flink.ssql(type=update) select * from stock

    実行アイコンを選択します。

    アプリケーションは Amazon MSK クラスターのデータを表示します。

アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「FLINK JOB」を選択します。Flink Dashboard の詳細については、「Managed Service for Apache Flink デベロッパーガイド」の「Apache Flink ダッシュボード」を参照してください。

Flink ストリーミング SQL クエリの他の例については、「Apache Flink ドキュメント」の「クエリ」を参照してください。