為 Apache Flink 應用程式建立並執行受管理的服務 - Amazon Kinesis Data Streams

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

為 Apache Flink 應用程式建立並執行受管理的服務

在本練習中,您會建立 Managed Service for Apache Flink 應用程式,並將資料串流作為來源和目的地。

建立兩個 Amazon Kinesis 資料串流

在為本練習建立適用於 Apache Flink 的 Amazon 受管服務之前,請先建立兩個 Kinesis 資料串流 (ExampleInputStreamExampleOutputStream)。您的應用程式會將這些串流用於應用程式來源和目的地串流。

您可以使用 Amazon Kinesis 主控台或下列方法建立這些串流 AWS CLI 指令。如需主控台說明,請參閱建立及更新資料串流

若要建立資料串流 (AWS CLI)
  1. 若要建立第一個串流 (ExampleInputStream),請使用下列 Amazon Kinesis create-stream AWS CLI 指令。

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. 若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為 ExampleOutputStream

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

寫入範例記錄至輸入串流

在本節,您會使用 Python 指令碼將範例記錄寫入供應用程式處理的串流。

注意
  1. 使用下列內容建立名為 stock.py 的檔案:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. 在教學課程後半段,您會執行 stock.py 指令碼來傳送資料至應用程式。

    $ python stock.py

下載並檢查阿帕奇 Flink 流 Java 代碼

此範例的 Java 應用程式程式碼可從中取得 GitHub。若要下載應用程式的程式碼,請執行下列動作:

  1. 使用以下指令複製遠端儲存庫:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. 導覽至 GettingStarted 目錄。

應用程式碼位於 CustomSinkStreamingJob.javaCloudWatchLogSink.java 檔案。請留意下列與應用程式的程式碼相關的資訊:

  • 應用程式使用 Kinesis 來源從來源串流讀取。以下程式碼片段會建立 Kinesis 目的地:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

編譯應用程式程式碼

在本節中,您會使用 Apache Maven 編譯器來建立應用程式的 Java 程式碼。如需有關安裝 Apache Maven 和 Java 開發套件 (JDK) 的詳細資訊,請參閱完成練習的先決條件

Java 應用程式需要下列元件:

  • 專案物件模型 (pom.xml) 檔案。此檔案包含有關應用程式組態和相依性的資訊,包括 Apache Flink 程式庫的 Amazon 受管服務。

  • 包含應用程式邏輯的 main 方法。

注意

若要針對下列應用程式使用 Kinesis 連接器,您必須下載連接器的原始程式碼,並依照 Apache Flink 說明文件中的說明進行建置。

建立和編譯應用程式碼
  1. 在您的開發環境中建立 Java/Maven 應用程式。如需建立應用程式的詳細資訊,請參閱您開發環境的文件:

  2. 將以下程式碼用於名為 StreamingJob.java 的檔案。

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    請注意下列關於上述程式碼範例的事項:

    • 此檔案包含定義應用程式功能的 main 方法。

    • 您的應用程式會建立來源與目的地連接器,以使用 StreamExecutionEnvironment 物件來存取外部資源。

    • 應用程式會使用靜態屬性來建立來源與目的地連接器。若要使用動態應用程式屬性,請使用 createSourceFromApplicationPropertiescreateSinkFromApplicationProperties 方法來建立連接器。這些方法會讀取應用程式的屬性,來設定連接器。

  3. 若要使用應用程式程式碼,請將其編譯並封裝到JAR檔案中。您可以使用下列兩種方式的其中之一,編譯和封裝您的程式碼:

    • 使用命令列 Maven 工具。在包含JAR檔案的目錄中執行下列命令,以建立pom.xml檔案:

      mvn package
    • 設定開發環境。如需詳細資訊,請參閱您的開發環境文件。

    您可以將套件上傳為JAR檔案,也可以壓縮套件並將其上傳為ZIP檔案。如果您使用 AWS CLI,您可以指定您的程式碼內容類型 (JAR或ZIP)。

  4. 如果編譯時發生錯誤,請確認您的 JAVA_HOME 環境變數是否正確設定。

如果應用程式成功編譯,則會建立下列檔案:

target/java-getting-started-1.0.jar

上傳阿帕奇 Flink 流 Java 代碼

在本節中,您會建立 Amazon Simple Storage Service (Amazon S3) 儲存貯體並上傳您的應用程式的程式碼。

上傳應用程式的程式碼
  1. 在開啟 Amazon S3 主控台https://console.aws.amazon.com/s3/

  2. 選擇建立儲存貯體

  3. 儲存貯體名稱欄位中,輸入 ka-app-code-<username>。新增尾碼至儲存貯體名稱,例如您的使用者名稱,使其成為全域唯一的。選擇 Next (下一步)

  4. 設定選項步驟中,保留原有設定並選擇 Next (下一步)

  5. 設定許可步驟中,保留原有設定並選擇 Next (下一步)

  6. 選擇建立儲存貯體

  7. 在 Amazon S3 控制台中,選擇 ka-app-code-<username>桶,然後選擇上傳

  8. 選取檔案步驟中,選擇 新增檔案。導覽至您在上一步驟中建立的 java-getting-started-1.0.jar 檔案。選擇 Next (下一步)

  9. 設定許可步驟中,保留原有設定。選擇 Next (下一步)

  10. 設定屬性步驟中,保留原有設定。選擇上傳

您的應用程式的程式碼現在儲存在您的應用程式可以存取的 Amazon S3 儲存貯體中。

建立並執行 Apache Flink 應用程式的受管理服務

您可以使用主控台或 AWS CLI.

注意

當您使用主控台建立應用程式時, AWS Identity and Access Management (IAM)和 Amazon CloudWatch 日誌資源是為您創建的。當您使用 AWS CLI,您可以個別建立這些資源。

創建並運行應用程序(控制台)

依照以下步驟來使用主控台建立、設定、更新及執行應用程式。

建立應用程式

  1. https://console.aws.amazon.com/Kinesis 處開啟室壁運動主控台。

  2. 在 Amazon Kinesis 儀表板上,選擇建立分析應用程式

  3. Kinesis Analytics - Create application (Kinesis 分析 - 建立應用程式) 頁面,請如下所述提供應用程式詳細資訊:

    • 應用程式名稱中,輸入 MyApplication

    • 對於 Description (說明),輸入 My java test app

    • 針對 ​Runtime (執行時間),選擇 ​Apache Flink 1.6

  4. 對於 [存取權限],選擇 [建立/更新IAM角色] kinesis-analytics-MyApplication-us-west-2

  5. 選擇建立應用程式

注意

使用主控台為 Apache Flink 應用程式建立 Amazon 受管服務時,您可以選擇為應用程式建立IAM角色和政策。應用程式使用此角色和政策來存取其相依資源。這些IAM資源會使用您的應用程式名稱和區域命名,如下所示:

  • 政策:kinesis-analytics-service-MyApplication-us-west-2

  • 角色:kinesis-analytics-MyApplication-us-west-2

編輯IAM策略

編輯原IAM則以新增存取 Kinesis 資料串流的權限。

  1. 在開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 選擇政策。選擇主控台為您在上一節所建立的 kinesis-analytics-service-MyApplication-us-west-2 政策。

  3. 摘要頁面,選擇編輯政策。選擇標JSON籤。

  4. 將下列政策範例的反白部分新增至政策。取代範例帳戶 IDs (012345678901) 使用您的帳戶 ID。

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

設定應用程式

  1. MyApplication頁面上,選擇設定

  2. 設定應用程式頁面,提供程式碼位置

    • 對於 Amazon S3 儲存貯體,請輸入 ka-app-code-<username>

    • 對於 Amazon S3 物件的路徑,請輸入 java-getting-started-1.0.jar

  3. 在 [存取應用程式資源] 下,針對 [存取權限] 選擇 [建立/更新IAM角色] kinesis-analytics-MyApplication-us-west-2

  4. 屬性下,為群組 ID輸入 ProducerConfigProperties

  5. 輸入以下應用程式屬性和數值:

    金鑰
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. 監控下,確保監控指標層級設為應用程式

  7. 若要CloudWatch 記錄,請選取 [啟用] 核取方塊。

  8. 選擇更新

注意

當您選擇啟用 CloudWatch 記錄時,Apache Flink 的受管理服務會為您建立記錄群組和記錄資料流。這些資源的名稱如下所示:

  • 日誌群組:/aws/kinesis-analytics/MyApplication

  • 日誌串流:kinesis-analytics-log-stream

執行應用程式

  1. MyApplication頁面上,選擇 [執行]。確認動作。

  2. 應用程式執行時,重新整理頁面。主控台會顯示 Application graph (應用程式圖形)

停止應用程式

MyApplication頁面上,選擇 [停止]。確認動作。

更新應用程式

您可以使用主控台更新應用程式設定,例如應用程式屬性、監視設定,以及應用程式的位置或檔案名稱JAR。如果您需要更新應用程式程式碼,也可以JAR從 Amazon S3 儲存貯體重新載入應用程式。

MyApplication頁面上,選擇設定。更新應用程式設定,然後選擇更新

建立並執行應用程式 (AWS CLI)

在本節中,您可以使用 AWS CLI 以建立並執行 Apache Flink 應用程式的受管理服務。阿帕奇 Flink 的託管服務使用 kinesisanalyticsv2 AWS CLI 用來建立 Apache Flink 應用程式的受管理服務並與之互動的指令。

建立許可政策

您會先建立具有兩條陳述式的許可政策:一條陳述式授與來源串流上 read 動作的許可,而另一條則是授與目的地串流上 write 動作的許可。接著,您可以將原則附加至IAM角色 (您在下一節中建立)。因此,當 Managed Service for Apache Flink 擔任角色時,服務便具有從來源串流讀取並寫入目的地串流的所需許可。

使用以下程式碼來建立 KAReadSourceStreamWriteSinkStream 許可政策。以您用於建立 Amazon S3 儲存貯體 (以儲存應用程式的程式碼) 的使用者名稱來取代 username。將 Amazon 資源名稱 (ARNs) (012345678901) 中的帳戶 ID 取代為您的帳戶 ID。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

如需建立權限原則的指 step-by-step 示,請參閱IAM使用指南中的教學課程:建立並附加您的第一個客戶管理原則

注意

若要存取其他 AWS 服務,您可以使用 AWS SDK for Java。 Apache Flink 的受管理服務會自動SDK將所需的認證設定為與應用程式相關聯之服務執行IAM角色的認證。無須採取額外的步驟。

建立 IAM 角色

在本節中,您將建立 Apache Flink 的受管理服務可假定讀取來源串流並寫入接收串流的IAM角色。

Managed Service for Apache Flink 沒有許可,無法存取串流。您可以透過IAM角色授與這些權限。每個IAM角色都附加了兩個策略。信任政策會授與擔任角色的 Managed Service for Apache Flink 許可,而許可政策決定了 Managed Service for Apache Flink 在擔任角色後可以執行的作業。

您會將在上一節中建立的許可政策連接至此角色。

建立 IAM 角色
  1. 在開啟IAM主控台https://console.aws.amazon.com/iam/

  2. 在導覽窗格中,選擇角色建立角色

  3. [選取信任的身分類型] 下,選擇 AWS 服務。在選擇將使用此角色的服務下,選擇 Kinesis。在 Select your use case (選取您的使用案例) 下,選擇 Kinesis Analytics (Kinesis 分析)

    選擇下一步:許可

  4. 連接許可政策頁面,選擇下一步:檢閱。您會在建立角色後連接許可政策。

  5. 建立角色頁面,輸入 KA-stream-rw-role 作為角色名稱。選擇建立角色

    現在,您已經創建了一個名為的新IAM角色KA-stream-rw-role。您接著會更新角色的信任和許可政策。

  6. 將 許可政策連接到角色。

    注意

    在此練習中,Managed Service for Apache Flink 擔任從 Kinesis 資料串流 (來源) 讀取資料並將輸出寫入另一個 Kinesis 資料串流的角色。因此您會連接在上一個步驟中建立的政策,建立許可政策

    1. 摘要頁面,選擇許可標籤。

    2. 選擇連接政策

    3. 在搜尋方塊中,輸入 KAReadSourceStreamWriteSinkStream (您在上一節中建立的政策)。

    4. 選擇KAReadInputStreamWriteOutputStream策略,然後選擇「附加策略」。

您現在已建立應用程式用於存取資源的服務執行角色。記下新角色ARN的。

如需建立角色的指 step-by-step 示,請參閱IAM使用指南中的建立IAM角色 (主控台)

建立 Managed Service for Apache Flink 應用程式

  1. 將以下JSON代碼保存到名為的文件中create_request.json。將範例角ARN色取代ARN為您先前建立的角色。將值區ARN尾碼 (username) 取代為您在上一節中選擇的尾碼。使用您的帳戶 ID 取代服務執行角色中的範例帳戶 ID (012345678901)。

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. 使用前述請求執行 CreateApplication 動作以建立應用程式:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

應用程式現在已建立。您會在下一個步驟中啟動應用程式。

啟動應用程式

在本節中,您會透過 StartApplication 動作來啟動應用程式。

啟動應用程式
  1. 將以下JSON代碼保存到名為的文件中start_request.json

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. 以啟動應用程式的上述請求,執行 StartApplication 動作:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

應用程式現在正在執行。您可以在 Amazon CloudWatch 主控台上查看 Apache Flink 的受管服務指標,以確認應用程式是否正常運作。

停止應用程式

在本節,您會使用該 StopApplication 動作來停止應用程式。

停止應用程式
  1. 將以下JSON代碼保存到名為的文件中stop_request.json

    {"ApplicationName": "test" }
  2. 以停止應用程式的上述請求,執行 StopApplication 動作:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

現在已停止應用程式。