Amazon Managed Service for Apache Flink は、以前は Amazon Kinesis Data Analytics for Apache Flink と呼ばれていました。
翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon MSK によるスタジオノートブックの作成
このチュートリアルでは、Amazon MSK クラスターをソースとして使用する Studio ノートブックを作成する方法について説明します。
このチュートリアルには、次のセクションが含まれています。
セットアップ
このチュートリアルでは、プレーンテキストでアクセスできる Amazon MSK クラスターが必要です。Amazon MSK クラスターをまだセットアップしていない場合は、「Amazon MSK の使用入門」チュートリアルに従って、Amazon VPC、Amazon MSK クラスター、トピック、および Amazon EC2 クライアントインスタンスを作成してください。
チュートリアルを実行するときは、以下の手順を実行します。
「ステップ 3: Amazon MSK クラスターを作成する」のステップ 4 で、
ClientBroker
値をTLS
からPLAINTEXT
に変更します。
VPC に NAT ゲートウェイを追加
「Amazon MSK の使用入門」チュートリアルに従って Amazon MSK クラスターを作成した場合、または既存の Amazon VPC にプライベートサブネット用の NAT ゲートウェイがまだない場合は、Amazon VPC に NAT ゲートウェイを追加する必要があります。アーキテクチャを次の図に示します。
![](images/vpc_05.png)
Amazon VPC 用の NAT ゲートウェイを作成するには、以下を実行します。
Amazon VPC コンソール (https://console.aws.amazon.com/vpc/
) を開きます。 左のナビゲーションバーから、[NAT ゲートウェイ] を選択します。
「NAT ゲートウェイ」ページで「NAT ゲートウェイの作成」を選択します。
[NAT ゲートウェイの作成] ページで、以下の値を入力します。
名前-オプション ZeppelinGateway
サブネット AWS KafkaTutorialSubnet1 エラスティック IP アロケーション ID 使用可能な Elastic IP を選択してください。使用可能なエラスティック IP がない場合は、[エラスティック IP の割り当て] を選択してから、コンソールが作成する Elastic IP を選択します。 [Create NAT Gateway] (NAT ゲートウェイの作成) を選択します。
左のナビゲーションバーで、[Route Tables] (ルートテーブル) を選択します。
[ルートテーブルの作成] を選択します。
[ルートテーブルの作成] ページで、以下の情報を指定します。
名前タグ:
ZeppelinRouteTable
VPC: 自分の VPC (VPC などAWS KafkaTutorial) を選択してください。
[作成] を選択します。
ルートテーブルのリストで、を選択します。ZeppelinRouteTable[ルート] タブを選択し、[ルート編集] を選択します。
[ルートの編集] ページで、[ルートの追加] を選択します。
[送信先] に「
0.0.0.0/0
」と入力します。[ターゲット] で [NAT ゲートウェイ] を選択しますZeppelinGateway。[ルートの保存] を選択します。[閉じる] を選びます。「ルートテーブル」ページで、「サブネットアソシエーション」ZeppelinRouteTableタブを選択した状態で、「サブネットアソシエーション」タブを選択します。「サブネット関連付けの編集」を選択します。
「サブネットアソシエーションの編集」 ページで、「AWS KafkaTutorialSubnet2」と「AWS KafkaTutorialSubnet3」を選択します。[保存] を選択します。
AWS Glue 接続とテーブルを作成します。
Studio ノートブックは、Amazon MSK データソースに関するメタデータ用の「AWS Glue」データベースを使用します。このセクションでは、Amazon MSK AWS Glue クラスターへのアクセス方法を説明する接続と、Studio Notebook AWS Glue などのクライアントにデータソース内のデータを表示する方法を説明するテーブルを作成します。
接続を作成する
AWS Management Console にサインインし、https://console.aws.amazon.com/glue/ AWS Glue
のコンソールを開きます。 AWS Glue データベースをまだお持ちでない場合は、左側のナビゲーションバーから [データベース] を選択します。[Add database] (データベースの追加) を選択します。[データベースの追加] ウィンドウで、[データベース名] に
default
を入力します。[作成] を選択します。左のナビゲーションバーから、[接続]を選択します。[Add connection (接続の追加)] を選択します。
「接続を追加」ウィンドウで、次の値を入力します。
[Connection Name] (接続名) に、
ZeppelinConnection
と入力します。[接続タイプ] で、[Kafka] を選択します。
「Kafka ブートストラップサーバー URL」には、クラスターのブートストラップブローカーの文字列を指定します。ブートストラップブローカーは、MSK コンソールから、または次の CLI コマンドを入力して取得できます。
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn
ClusterArn
「SSL 接続が必要」チェックボックスをオフにします。
[次へ] をクリックします。
[VPC] ページで、次の値を入力します。
VPC の場合は、VPC の名前 (VPC など AWS KafkaTutorial) を選択します。
[サブネット] には 2 を選択します。AWS KafkaTutorialSubnet
「セキュリティグループ」では、使用可能なすべてのグループを選択します。
[次へ] をクリックします。
「接続プロパティ」/「接続アクセス」ページで 「完了」を選択します。
テーブルを作成する
注記
次の手順で説明するように手動でテーブルを作成することも、Apache Zeppelin 内のノートブックにある Apache Flink 用 Managed Service のテーブル作成コネクタコードを使用して DDL ステートメントでテーブルを作成することもできます。その後、 AWS Glue チェックインしてテーブルが正しく作成されたことを確認できます。
左のナビゲーションバーで、[テーブル] を選択します。「テーブル」ページで、「テーブルを追加」、「テーブルを手動で追加」を選択します。
「テーブルのプロパティの設定」ページで、「テーブル名」に
stock
を入力します。以前に作成したデータベースを選択していることを確認してください。[次へ] をクリックします。「データストアの追加」ページで「Kafka」を選択します。[トピック名] には、トピック名 (例:AWS KafkaTutorialTopic) を入力します。[接続] には、を選択しますZeppelinConnection。
「分類」ページで「JSON」を選択します。[次へ] をクリックします。
スキーマを定義するで、[Add column] を編集して列を追加します。以下のプロパティを持つ列を追加します。
列名 データ型 ticker
string
price
double
[次へ] をクリックします。
次のページで設定を確認し、「終了」を選択します。
-
テーブルの一覧で、新しく作成したテーブルを選択します。
-
「テーブル編集」を選択し、キー
managed-flink.proctime
と値proctime
を含むプロパティを追加します。 -
[適用] を選択します。
Amazon MSK による Studio ノートブックの作成
アプリケーションで使用するリソースを作成したので、次は Studio ノートブックを作成します。
AWS Management Console またはを使用してアプリケーションを作成できます AWS CLI。
注記
Amazon MSK コンソールから既存のクラスターを選択し、「データをリアルタイムで処理」を選択することで Studio ノートブックを作成することもできます。
を使用して Studio ノートブックを作成します。 AWS Management Console
「https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
」にある Apache Flink コンソール用 Managed Service を開きます。 「Apache Flink アプリケーション用 Managed Service」ページで、「Studio」タブを選択します。「Studio ノートブックの作成」を選択します。
注記
Amazon MSK または Kinesis Data Streams コンソールから Studio ノートブックを作成するには、入力の Amazon MSK クラスターまたは Kinesis データストリームを選択し、「データをリアルタイムで処理」を選択します。
[Studio ノートブックの作成] ページで、次の情報を入力します。
-
「Studio ノートブック名」に
MyNotebook
を入力します。 「AWS Glue データベース」の「デフォルト」を選択します。
[Studio ノートブックの作成] を選択します。
-
MyNotebookページで [設定] タブを選択します。「Networking」セクションで、「編集」を選択します。
[ネットワークの編集] MyNotebook ページで、Amazon MSK クラスターに基づく VPC 設定を選択します。「Amazon MSK クラスター」には Amazon MSK クラスターを選択します。[変更を保存] を選択します。
MyNotebookページで [Run] を選択します。「ステータス」に「実行中」が表示されるまで待ちます。
を使用して Studio ノートブックを作成します。 AWS CLI
を使用して Studio ノートブックを作成するには AWS CLI、次の操作を行います。
次の情報があることを確認します。アプリケーションを作成するにはこれらの値が必要です。
アカウント ID。
Amazon MSK クラスターを含む Amazon VPC 用のサブネット ID やセキュリティグループ ID。
create.json
というファイルを次の内容で作成します。プレースホルダー値を、ユーザー自身の情報に置き換えます。{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::
AccountID
:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1
", "SubnetID 2
", "SubnetID 3
" ], "SecurityGroupIds": [ "VPC Security Group ID
" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID
:database/default" } } } } }アプリケーションを作成するには、次のコマンドを実行します。
aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
コマンドが完了すると、次のような出力が表示され、新しい Studio ノートブックの詳細が表示されます。
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
アプリケーションを起動するには、次のコマンドを実行します。サンプル値をアカウント ID に置き換えます。
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:
012345678901
:application/MyNotebook\
Amazon MSK クラスターにデータを送信します。
このセクションでは、Amazon EC2 クライアントで Python スクリプトを実行して Amazon MSK データソースにデータを送信します。
Amazon EC2 クライアントに接続します。
以下のコマンドを実行して Python バージョン 3、Pip、および Kafka for Python パッケージをインストールし、アクションを確認します。
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
次のコマンドを入力して、 AWS CLI クライアントマシン上でを設定します。
aws configure
アカウントの認証情報と
us-east-1
をregion
に入力します。stock.py
というファイルを次の内容で作成します。サンプル値を Amazon MSK クラスターの Bootstrap Brokers 文字列に置き換え、トピックがそうでない場合はトピック名を更新してください。AWS KafkaTutorialTopicfrom kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "
<<Bootstrap Broker List>>
" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())次のコマンドを使用してスクリプトを実行します。
$ python3 stock.py
以下のセクションを実行している間は、スクリプトを実行したままにしておきます。
Studio ノートブックをテストします。
このセクションでは、Studio ノートブックを使用して Amazon MSK クラスターのデータをクエリします。
「https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
」にある Apache Flink 用 Managed Serviceコンソールを開きます。 [Apache Flink アプリケーション用 Managed Service] ページで、[Studio ノートブック] タブを選択します。を選択してくださいMyNotebook。
MyNotebookページで、「Apache ツェッペリンで開く」を選択します。
新しいタブで Apache Zeppelin インターフェイスが開きます。
「Zeppelinへようこそ!」でページで「Zeppelinの新ノート」を選択します。
「Zeppelin Note」ページで、新しいノートに次のクエリを入力します。
%flink.ssql(type=update) select * from stock
実行アイコンを選択します。
アプリケーションは Amazon MSK クラスターのデータを表示します。
アプリケーションの Apache Flink ダッシュボードを開いて運用状況を表示するには、「FLINK JOB」を選択します。Flink Dashboard の詳細については、「Managed Service for Apache Flink デベロッパーガイド」の「Apache Flink ダッシュボード」を参照してください。
Flink ストリーミング SQL クエリの他の例については、「Apache Flink ドキュメント