開始使用 (斯卡拉) - Managed Service for Apache Flink

Amazon Managed Service for Apache Flink 之前稱為 Amazon Kinesis Data Analytics for Apache Flink。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

開始使用 (斯卡拉)

注意

從版本 1.15 開始,Flink 是免費的斯卡拉。應用程式現在可以使用任何 Scala 版本的 Java API。Flink 仍然在內部的幾個關鍵組件中使用 Scala,但不會將 Scala 暴露給用戶代碼類加載器。因此,您必須將 Scala 依賴關係添加到您的 JAR 檔案中。

如需 Flink 1.15 中的 Scala 變更之詳細資訊,請參閱在 1.15 版中移除了 Scala 相依性

在本練習中,您會為 Scala 建立 Managed Service for Apache Flink 應用程式,並將 Kinesis 資料串流作為來源和目的地。

建立相依資源

在為本練習建立 Managed Service for Apache Flink 應用程式之前,先建立下列相依資源:

  • 兩個 Kinesis 串流,用於輸入和輸出。

  • Amazon S3 儲存貯體,用來儲存應用程式的程式碼 (ka-app-code-<username>)

您可以在主控台中建立 Kinesis 串流和 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:

  • 《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。為資料串流 ExampleInputStreamExampleOutputStream 命名。

    建立資料串流 (AWS CLI)

    • 若要建立第一個串流 (ExampleInputStream),請使用下列 Amazon Kinesis 建立 AWS CLI 串流命令。

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • 若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為 ExampleOutputStream

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • 《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體透過附加登入名稱 (例如 ka-app-code-<username>),為 Amazon S3 儲存貯體提供全域唯一的名稱。

其他資源

當您建立應用程式時,Apache Flink 的受管服務會建立下列 Amazon CloudWatch 資源 (如果這些資源尚未存在):

  • 名為 /AWS/KinesisAnalytics-java/MyApplication 的日誌群組。

  • 名為 kinesis-analytics-log-stream 的日誌串流。

將樣本記錄寫入輸入流

在本節,您會使用 Python 指令碼將範例記錄寫入供應用程式處理的串流。

注意
注意

本節中的 Python 指令碼會使用 AWS CLI。您必須 AWS CLI 將您的配置為使用您的帳戶憑據和默認區域。若要設定您的 AWS CLI,請輸入下列內容:

aws configure
  1. 使用下列內容建立名為 stock.py 的檔案:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. 執行 stock.py 指令碼:

    $ python stock.py

    在完成教學課程的其餘部分時,讓指令碼保持執行狀態。

下載並檢查應用程式程式碼

此範例的 Python 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:

  1. 如果您尚未安裝 Git 用戶端,請先安裝。如需詳細資訊,請參閱安裝 Git

  2. 使用以下指令複製遠端儲存庫:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. 導覽至 amazon-kinesis-data-analytics-java-examples/scala/GettingStarted 目錄。

請留意下列與應用程式的程式碼相關的資訊:

  • build.sbt 檔案包含應用程式的組態和相依性資訊,包括 Managed Service for Apache Flink 程式庫。

  • BasicStreamingJob.scala 檔案包含定義應用程式功能的主要方法。

  • 應用程式使用 Kinesis 來源從來源串流讀取。以下程式碼片段會建立 Kinesis 來源:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    應用程式也會使用 Kinesis 接收器寫入結果串流。以下程式碼片段會建立 Kinesis 目的地:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • 應用程式會建立來源和接收器連接器,以使用 StreamExecutionEnvironment 物件存取外部資源。

  • 應用程式會使用動態應用程式屬性來建立來源與目的地連接器。會讀取執行期應用程式的屬性,來設定連接器。如需執行期屬性的詳細資訊,請參閱執行期屬性

編譯和上傳應用程式的程式碼

在本節中,您會編譯應用程式的程式碼,並將其上傳至在建立相依資源一節建立的 Amazon S3 儲存貯體。

編譯應用程式的程式碼

在本節中,您將使用 SBT 建置工具來建置應用程式的 Scala 程式碼。若要安裝 SBT,請參閱使用 cs 安裝程式安裝 sbt。您還需要安裝 Java 開發套件 (JDK)。請參閱完成練習的先決條件

  1. 請將應用程式的程式碼編譯並封裝成 JAR 檔案,以使用應用程式的程式碼。您可以使用 SBT 編譯和封裝程式碼:

    sbt assembly
  2. 如果應用程式成功編譯,則會建立下列檔案:

    target/scala-3.2.0/getting-started-scala-1.0.jar
上傳 Apache Flink 串流 Scala 程式碼

在本節中,您會建立 Amazon S3 儲存貯體並上傳您應用程式的程式碼。

  1. 前往 https://console.aws.amazon.com/s3/ 開啟的 Amazon Simple Storage Service (Amazon S3) 主控台。

  2. 選擇建立儲存貯體

  3. 儲存貯體名稱欄位中,輸入 ka-app-code-<username>。新增尾碼至儲存貯體名稱,例如您的使用者名稱,使其成為全域唯一的。選擇下一步

  4. 設定選項中,保留原有設定並選擇下一步

  5. 設定許可步驟中,保留原有設定並選擇下一步

  6. 選擇建立儲存貯體

  7. 選擇 ka-app-code-<username> 儲存貯體,然後選擇上傳

  8. 選取檔案步驟中,選擇 新增檔案。導覽至您在上一步驟中建立的 getting-started-scala-1.0.jar 檔案。

  9. 您不需要變更物件的任何設定,因此請選擇上傳

您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。