使用 Amazon MSK 建立 Studio 筆記本 - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Amazon MSK 建立 Studio 筆記本

本教學課程說明如何建立使用 Amazon MSK 叢集作為來源的 Studio 筆記本。

設定

在本教學課程中,您需要一個允許純文字存取的 Amazon MSK 叢集。如果尚未設定 Amazon MSK 叢集,請依照 Amazon MSK 使用入門教學課程來建立 Amazon VPC、Amazon MSK 叢集、主題和 Amazon EC2 用戶端執行個體。

跟隨教學課程學習時,請執行下列動作:

將 NAT 閘道新增至您的 VPC

如果依照 Amazon MSK 使用入門教學課程建立 Amazon MSK 叢集,或者您現有的 Amazon VPC 還沒有適用於其私有子網路的 NAT 閘道,則必須將 NAT 閘道新增到 Amazon VPC。下圖顯示一般架構。

若要為您的 Amazon VPC 建立 NAT 閘道,請執行下列動作:

  1. 前往 https://console.aws.amazon.com/vpc/ 開啟 Amazon VPC 主控台。

  2. 從左側導覽列選擇 NAT 閘道

  3. NAT 閘道頁面,選擇建立 NAT 閘道

  4. 建立 NAT 閘道頁面,提供下列值:

    名稱-可選 ZeppelinGateway
    子網 AWS KafkaTutorialSubnet1
    彈性 IP 配置識別碼 選擇可用的彈性 IP。如果沒有可用的彈性 IP,請選擇「配置彈性 IP」,然後選擇主控台建立的 Elasic IP。

    選擇建立 NAT 閘道

  5. 在導覽列中,選擇路由表

  6. 選擇建立路由表

  7. 建立路由表頁面,提供以下資訊:

    • 名稱標籤ZeppelinRouteTable

    • VPC:選擇您的虛擬私人雲端 (例如 AWS KafkaTutorialV PC)。

    選擇建立

  8. 在路由表清單中,選擇ZeppelinRouteTable。選擇路由標籤,然後選擇編輯路由

  9. 編輯路由標籤中,選擇新增路由

  10. 中,為目標輸入 0.0.0.0/0。針對「目標」,選擇「NAT 閘道ZeppelinGateway。選擇儲存路由。選擇關閉

  11. 在「路由表」頁面上,選取後,ZeppelinRouteTable選擇「子網路關聯」標籤。選擇編輯子網路關聯

  12. 在 [編輯子網路關聯] 頁面中,選擇 [AWS KafkaTutorialSubnet2] 和 [AWS KafkaTutorialSubnet3]。選擇儲存

創建一個 AWS Glue 連接和表

您的 Studio 筆記本使用 AWS Glue 資料庫取得有關 Amazon MSK 資料來源的中繼資料。在本節中,您會建立說明如何存取 Amazon MSK 叢集的 AWS Glue 連線,以及一個說明如何將資料來源中的資料呈現給用戶端 (例如 Studio 筆記本) 的 AWS Glue 表格。

建立連線
  1. 請登入 AWS Management Console 並開啟 AWS Glue 主控台,網址為 https://console.aws.amazon.com/glue/

  2. 如果您還沒有資料 AWS Glue 庫,請從左側導覽列選擇 [資料庫]。選擇新增資料庫。在新增資料庫視窗中,為資料庫名稱輸入 default。選擇建立

  3. 從左側導覽列選擇連線。選擇新增連線

  4. 新增連線視窗中,提供下列值:

    • 對於連線名稱,請輸入 ZeppelinConnection

    • 對於連線類型,請選擇 Kafka

    • 對於 Kafka 啟動伺服器 URL,請為叢集提供啟動代理程式字串。您可以從 MSK 主控台或輸入下列 CLI 命令來取得啟動代理程式:

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • 取消核取需要 SSL 連線核取方塊。

    選擇下一步

  5. VPC 頁面,提供下列值:

    • 對於 VPC,請選擇 VPC 的名稱(例如 AWS KafkaTutorialV PC)。

    • 對於子網路,選擇 AWS KafkaTutorialSubnet2

    • 對於安全群組,請選擇所有可用的群組。

    選擇下一步

  6. 連線屬性 / 連線存取權頁面,選擇完成

建立資料表
注意

您可以依照下列步驟所述手動建立資料表,也可以在 Apache Zeppelin 的筆記本中,使用針對 Managed Service for Apache Flink 的建立資料表連接器程式碼,透過 DDL 陳述式建立資料表。然後,您可以入庫 AWS Glue 以確保表格已正確建立。

  1. 在左側導覽列中,選擇資料表。在資料表頁面,選擇新增資料表 > 手動新增資料表

  2. 設定資料表頁面,為資料表名稱輸入 stock。請務必選取先前建立的資料庫。選擇下一步

  3. 新增資料存放區頁面,選擇 Kafka。對於主題名稱,輸入您的主題名稱(例如 AWS KafkaTutorialTopic)。對於「連線」,請選擇ZeppelinConnection

  4. 分類頁面,選擇 JSON。選擇下一步

  5. 定義結構描述頁面,選擇「新增資料欄」以新增資料欄。新增具有下列屬性的欄:

    欄名稱 資料類型
    ticker string
    price double

    選擇下一步

  6. 在下一頁上,確認您的設定,然後選擇完成

  7. 從資料表清單中選取您新建立的資料表。

  8. 選擇編輯資料表,然後新增索引鍵為 managed-flink.proctime 值為 proctime 的屬性。

  9. 選擇套用

使用 Amazon MSK 建立 Studio 筆記本

現在,您已建立應用程式使用的資源,接下來可以建立您的 Studio 筆記本。

您可以使用 AWS Management Console 或建立應用程式 AWS CLI。
注意

您也可以選擇現有叢集,然後選擇即時處理資料,從 Amazon MSK 主控台建立 Studio 筆記本。

建立工作室筆記本 AWS Management Console

  1. 前往 https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard 開啟 Managed Service in the Apache Flink 主控台。

  2. Managed Service for Apache Flink 應用程式頁面,選擇 Studio 標籤。選擇建立 Studio 筆記本

    注意

    若要從 Amazon MSK 或 Kinesis Data Streams 主控台建立 Studio 筆記本,請選取您的輸入 Amazon MSK 叢集或 Kinesis 資料串流,然後選擇即時處理資料

  3. 建立 Studio 筆記本頁面,提供下列資訊:

    • Studio 筆記本名稱輸入 MyNotebook

    • AWS Glue 資料庫選擇預設值

    選擇建立 Studio 筆記本

  4. MyNotebook頁面中,選擇「組態」頁籤。在網路模式區段中,選擇編輯

  5. 在 [編輯以下項目的聯網 MyNotebook] 頁面中,選擇以 Amazon MSK 叢集為基礎的 VPC 組態。為 Amazon MSK 叢集選擇 Amazon MSK 叢集。選擇儲存變更

  6. MyNotebook頁面中,選擇 [執行]。等待狀態顯示為執行中

建立工作室筆記本 AWS CLI

若要使用建立您的 Studio 筆記本 AWS CLI,請執行下列動作:

  1. 請務必備妥下列資訊:您需要這些值來建立應用程式。

    • 帳戶 ID。

    • 子網路 ID 以及包含 Amazon MSK 叢集的 Amazon VPC 的安全群組 ID。

  2. 建立稱為 create.json 的檔案,其中具有以下內容。使用您的資訊取代預留位置的值。

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. 若要建立應用程式,請執行下列命令:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. 命令完成後,您應該會看到類似如下的輸出,其中顯示新 Studio 筆記本的詳細資料:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. 若要執行應用程式,請執行下列命令:使用您的帳戶 ID 取代範例值。

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

將資料傳送至 Amazon MSK 叢集

在本節中,您會在 Amazon EC2 用戶端中執行 Python 指令碼,以將資料傳送到您的 Amazon MSK 資料來源。

  1. 連線到 Amazon EC2 用戶端。

  2. 執行以下命令來安裝 Python 版本 3、Pip 和 Kafka for Python 套件,並確認操作:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. 輸入下列命令, AWS CLI 在用戶端電腦上設定:

    aws configure

    提供帳戶憑證,並為 region 提供 us-east-1

  4. 建立稱為 stock.py 的檔案,其中具有以下內容。將範例值取代為 Amazon MSK 叢集的引導代理程式字串,如果您的主題不AWS KafkaTutorialTopic是,請更新主題名稱:

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. 使用下列命令執行指令碼:

    $ python3 stock.py
  6. 完成下一節時,讓指令碼保持執行狀態。

測試 Studio 筆記本

在本節中,您可以使用 Studio 筆記本查詢 Amazon MSK 叢集中的資料。

  1. 前往 https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard 開啟 Managed Service in the Apache Flink 主控台。

  2. Managed Service for Apache Flink 應用程式頁面,選擇 Studio 筆記本標籤。選擇MyNotebook

  3. MyNotebook頁面中,選擇在 Apache 齊柏林飛艇中打開

    Apache Zeppelin 介面會在新標籤中開啟。

  4. 歡迎來到 Zeppelin! 頁面,選擇 Zeppelin 新筆記

  5. Zeppelin 筆記頁面,在新筆記中輸入以下查詢:

    %flink.ssql(type=update) select * from stock

    選擇執行圖示。

    應用程式會顯示 Amazon MSK 叢集中的資料。

若要為應用程式開啟 Apache Flink 儀表板以檢視操作層面,請選擇 FLINK 作業。如需 Flink 儀表板的詳細資訊,請參閱 Managed Service for Apache Flink 開發人員指南中的 Apache Flink 儀表板

如需 Flink 串流 SQL 查詢的更多範例,請參閱 Apache Flink 文件中的查詢