本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
建立並執行 Managed Service for Apache Flink for Python 應用程式
在本節中,您會建立適用於 Python 應用程式的 Managed Service for Apache Flink 應用程式,並以 Kinesis 串流做為來源和接收器。
本節包含下列步驟:
建立相依資源
在為本練習建立 Managed Service for Apache Flink 之前,先建立下列相依資源:
-
兩個 Kinesis 串流,用於輸入和輸出。
-
存放應用程式程式碼的 Amazon S3 儲存貯體。
注意
本教學假設您正在 us-east-1 區域中部署應用程式。如果您使用另一個區域,則必須相應地調整所有步驟。
建立兩個 Kinesis 串流
為此練習建立 Managed Service for Apache Flink 應用程式之前,請在將用於部署應用程式的相同區域中建立兩個 Kinesis 資料串流 (ExampleInputStream 和 ExampleOutputStream) (在此範例中為 us-east-1)。您的應用程式會將這些串流用於應用程式來源和目的地串流。
您可以使用 Amazon Kinesis 主控台或以下 AWS CLI 命令來建立這些串流。如需主控台指示,請參閱《Amazon Kinesis Data Streams 開發人員指南》中的建立和更新資料串流。
建立資料串流 (AWS CLI)
-
若要建立第一個串流 (
ExampleInputStream),請使用下列 Amazon Kinesiscreate-streamAWS CLI 命令。$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 -
若要建立應用程式用來寫入輸出的第二個串流,請執行相同的命令,將串流名稱變更為
ExampleOutputStream。$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1
建立 Amazon S3 儲存貯體
您可以使用主控台建立 Amazon S3 儲存貯體。如需建立這些資源的相關指示,請參閱以下主題:
-
《Amazon Simple Storage Service 使用者指南》中的如何建立 S3 儲存貯體。為 Amazon S3 儲存貯體提供全域唯一名稱,例如透過附加您的登入名稱。
注意
請確定您在用於本教學課程的區域中建立 S3 儲存貯體 (us-east-1)。
其他資源
建立應用程式時,Managed Service for Apache Flink 會建立下列 Amazon CloudWatch 資源 (如果尚不存在該資源):
-
名為
/AWS/KinesisAnalytics-java/<my-application>的日誌群組。 -
名為
kinesis-analytics-log-stream的日誌串流。
設定您的本機開發環境
對於開發和偵錯,您可以在機器上執行 Python Flink 應用程式。您可以在您選擇的 Python IDE 中使用 python main.py或 從命令列啟動應用程式。
注意
在您的開發機器上,您必須安裝 Python 3.10 或 3.11、Java 11、Apache Maven 和 Git。我們建議您使用 IDE,例如 PyCharm
安裝 PyFlink 程式庫
若要開發應用程式並在本機執行,您必須安裝 Flink Python 程式庫。
-
使用 VirtualEnv、Conda 或任何類似的 Python 工具建立獨立的 Python 環境。
-
在該環境中安裝 PyFlink 程式庫。使用您在 Amazon Managed Service for Apache Flink 中使用的相同 Apache Flink 執行時間版本。目前,建議的執行時間為 1.19.1。
$ pip install apache-flink==1.19.1 -
執行應用程式時,請確定環境處於作用中狀態。如果您在 IDE 中執行應用程式,請確定 IDE 使用環境做為執行時間。程序取決於您使用的 IDE。
注意
您只需要安裝 PyFlink 程式庫。您不需要在機器上安裝 Apache Flink 叢集。
驗證您的 AWS 工作階段
應用程式使用 Kinesis 資料串流來發佈資料。在本機執行時,您必須擁有有效的已 AWS 驗證工作階段,具有寫入 Kinesis 資料串流的許可。使用下列步驟來驗證您的工作階段:
-
如果您沒有設定 AWS CLI 和具有有效登入資料的具名設定檔,請參閱 設定 AWS Command Line Interface (AWS CLI)。
-
透過發佈下列測試記錄,確認您的 AWS CLI 已正確設定,且您的使用者具有寫入 Kinesis 資料串流的許可:
$ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST -
如果您的 IDE 有要整合的外掛程式 AWS,您可以使用它將登入資料傳遞至在 IDE 中執行的應用程式。如需詳細資訊,請參閱 AWS Toolkit for PyCharm
、 AWS Toolkit for Visual Studio Code 和 AWS Toolkit for IntelliJ IDEA 。
下載並檢查 Apache Flink 串流 Python 程式碼
此範例的 Python 應用程式的程式碼可從 GitHub 下載。若要下載應用程式的程式碼,請執行下列動作:
-
使用以下指令複製遠端儲存庫:
git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git -
導覽至
./python/GettingStarted目錄。
檢閱應用程式元件
應用程式碼位於 。 main.py我們使用內嵌在 Python 中的 SQL 來定義應用程式的流程。
注意
為了獲得最佳化的開發人員體驗,應用程式的設計可在 Amazon Managed Service for Apache Flink 和本機上執行,無需變更任何程式碼,即可在機器上進行開發。應用程式使用 環境變數IS_LOCAL = true來偵測它何時在本機執行。您必須在 shell IS_LOCAL = true或 IDE 的執行組態中設定環境變數。
-
應用程式會設定執行環境並讀取執行時間組態。若要同時在 Amazon Managed Service for Apache Flink 和本機上運作,應用程式會檢查
IS_LOCAL變數。-
以下是應用程式在 Amazon Managed Service for Apache Flink 中執行時的預設行為:
-
載入與應用程式一起封裝的相依性。如需詳細資訊,請參閱 (連結)
-
從您在 Amazon Managed Service for Apache Flink 應用程式中定義的執行期屬性載入組態。如需詳細資訊,請參閱 (連結)
-
-
當應用程式偵測到您在本機執行應用程式
IS_LOCAL = true時:-
從專案載入外部相依性。
-
從專案中包含
application_properties.json的檔案載入組態。... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
-
-
-
應用程式使用 Kinesis Connector
定義具有 CREATE TABLE陳述式的來源資料表。此資料表會從輸入 Kinesis 串流讀取資料。應用程式會從執行時間組態中取得串流的名稱、區域和初始位置。table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """) -
在此範例中,應用程式也會使用 Kinesis Connector
定義接收器資料表。此故事會將資料傳送至輸出 Kinesis 串流。 table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""") -
最後,應用程式會從來源資料表執行
INSERT INTO...接收資料表的 SQL。在更複雜的應用程式中,您可能會有額外的步驟在寫入目的地之前轉換資料。table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""") -
您必須在
main()函數結尾新增另一個步驟,才能在本機執行應用程式:if is_local: table_result.wait()如果沒有此陳述式,應用程式會在您於本機執行時立即終止。當您在 Amazon Managed Service for Apache Flink 中執行應用程式時,不得執行此陳述式。
管理 JAR 相依性
PyFlink 應用程式通常需要一或多個連接器。本教學課程中的應用程式使用 Kinesis Connector
在此範例中,我們會示範如何使用 Apache Maven 來擷取相依性,並封裝應用程式以在 Managed Service for Apache Flink 上執行。
注意
有擷取和封裝相依性的其他方法。此範例示範可正確搭配一或多個連接器使用的方法。它還可讓您在本機、用於開發以及在 Managed Service for Apache Flink 上執行應用程式,而無需變更程式碼。
使用 pom.xml 檔案
Apache Maven 使用 pom.xml 檔案來控制相依性和應用程式封裝。
任何 JAR 相依性都會在 <dependencies>...</dependencies>區塊的 pom.xml 檔案中指定。
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...
若要尋找要使用的正確連接器成品和版本,請參閱 將 Apache Flink 連接器與 Managed Service for Apache Flink 搭配使用。請務必參考您正在使用的 Apache Flink 版本。在此範例中,我們使用 Kinesis 連接器。對於 Apache Flink 1.19,連接器版本為 4.3.0-1.19。
注意
如果您使用的是 Apache Flink 1.19,則沒有為此版本特別發行的連接器版本。使用針對 1.18 發行的連接器。
下載和封裝相依性
使用 Maven 下載pom.xml檔案中定義的相依性,並為 Python Flink 應用程式封裝相依性。
-
導覽至包含 Python 入門專案的目錄,稱為
python/GettingStarted。 -
執行以下命令:
$ mvn package
Maven 會建立新的檔案,名為 ./target/pyflink-dependencies.jar。當您在機器上進行本機開發時,Python 應用程式會尋找此檔案。
注意
如果您忘記執行此命令,當您嘗試執行應用程式時,它會失敗並顯示錯誤: 找不到任何識別符為 "kinesis" 的工廠。
將範例記錄寫入輸入串流
在本節中,您將傳送範例記錄到串流,供應用程式處理。您可以使用 Python 指令碼或 Kinesis Data Generator
使用 Python 指令碼產生範例資料
您可以使用 Python 指令碼將範例記錄傳送至串流。
注意
若要執行此 Python 指令碼,您必須使用 Python 3.x 並安裝AWS 適用於 Python (Boto) 的 SDK
若要開始將測試資料傳送至 Kinesis 輸入串流:
-
從資料產生器 GitHub 儲存庫下載資料產生器
stock.pyPython 指令碼。 -
執行
stock.py指令碼:$ python stock.py
在您完成教學課程的其餘部分時,請讓指令碼保持執行。您現在可以執行 Apache Flink 應用程式。
使用 Kinesis Data Generator 產生範例資料
或者,若要使用 Python 指令碼,您可以使用託管版本
若要設定和執行 Kinesis Data Generator:
-
遵循 Kinesis Data Generator 文件
中的指示來設定對工具的存取。您將執行設定使用者和密碼的 CloudFormation 範本。 -
透過 CloudFormation 範本產生的 URL 存取 Kinesis Data Generator。CloudFormation 範本完成後,您可以在輸出索引標籤中找到 URL。
-
設定資料產生器:
-
區域:選取您用於本教學課程的區域:us-east-1
-
串流/交付串流:選取應用程式將使用的輸入串流:
ExampleInputStream -
每秒記錄數:100
-
記錄範本:複製並貼上下列範本:
{ "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
-
-
測試範本:選擇測試範本,並確認產生的記錄與以下內容類似:
{ "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 } -
啟動資料產生器:選擇選取傳送資料。
Kinesis Data Generator 現在正在將資料傳送至 ExampleInputStream。
在本機執行您的應用程式
您可以在本機測試應用程式,使用 python main.pyIDE 從命令列執行或從 IDE 執行。
若要在本機執行應用程式,您必須安裝正確的 PyFlink 程式庫版本,如上一節所述。如需詳細資訊,請參閱 (連結)
注意
繼續之前,請確認輸入和輸出串流是否可用。請參閱 建立兩個 Amazon Kinesis 資料串流。此外,請確認您具有從兩個串流讀取和寫入的許可。請參閱 驗證您的 AWS 工作階段。
將 Python 專案匯入 IDE
若要開始在 IDE 中使用應用程式,您必須將其匯入為 Python 專案。
您複製的儲存庫包含多個範例。每個範例都是個別的專案。在本教學課程中,將./python/GettingStarted子目錄中的內容匯入 IDE。
將程式碼匯入為現有的 Python 專案。
注意
匯入新 Python 專案的確切程序取決於您使用的 IDE。
檢查本機應用程式組態
在本機執行時,應用程式會使用 下專案資源資料夾中 application_properties.json 檔案中的組態./src/main/resources。您可以編輯此檔案以使用不同的 Kinesis 串流名稱或區域。
[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]
在本機執行您的 Python 應用程式
您可以從命令列做為一般 Python 指令碼,或從 IDE 在本機執行應用程式。
從命令列執行您的應用程式
-
請確定安裝 Python Flink 程式庫的獨立 Python 環境目前處於作用中狀態,例如 Conda 或 VirtualEnv。
-
請確定您
mvn package至少執行一次。 -
設定
IS_LOCAL = true環境變數:$ export IS_LOCAL=true -
以一般 Python 指令碼執行應用程式。
$python main.py
從 IDE 中執行應用程式
-
將 IDE 設定為使用以下組態執行
main.py指令碼:-
使用安裝 PyFlink 程式庫的獨立 Python 環境,例如 Conda 或 VirtualEnv。
-
使用 AWS 登入資料來存取輸入和輸出 Kinesis 資料串流。
-
設定
IS_LOCAL = true。
-
-
設定執行組態的確切程序取決於您的 IDE 和 。
-
當您設定 IDE 後,請執行 Python 指令碼,並在應用程式執行時使用 IDE 提供的工具。
在本機檢查應用程式日誌
在本機執行時,除了應用程式啟動時列印和顯示的幾行以外,應用程式不會在主控台中顯示任何日誌。PyFlink 會將日誌寫入安裝 Python Flink 程式庫的目錄中的檔案。應用程式會在日誌啟動時列印日誌的位置。您也可以執行下列命令來尋找日誌:
$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
-
列出記錄目錄中的檔案。您通常會找到單一
.log檔案。 -
在應用程式執行時自訂檔案:
tail -f <log-path>/<log-file>.log。
觀察 Kinesis 串流中的輸入和輸出資料
您可以使用 Amazon Kinesis 主控台中的資料檢視器,觀察 (產生範例 Python) 或 Kinesis Data Generator (連結) 傳送至輸入串流的記錄。 Amazon Kinesis
若要觀察記錄:
停止應用程式在本機執行
停止在 IDE 中執行的應用程式。IDE 通常提供「停止」選項。確切的位置和方法取決於 IDE。
封裝您的應用程式程式碼
在本節中,您會使用 Apache Maven 將應用程式程式碼和所有必要的相依性封裝在 .zip 檔案中。
再次執行 Maven 套件命令:
$ mvn package
此命令會產生 檔案 target/managed-flink-pyflink-getting-started-1.0.0.zip。
將應用程式套件上傳至 Amazon S3 儲存貯體
在本節中,您將上一節中建立的 .zip 檔案上傳至在本教學課程開始時建立的 Amazon Simple Storage Service (Amazon S3) 儲存貯體。如果您尚未完成此步驟,請參閱 (連結)。
上傳應用程式碼 JAR 檔案
開啟位於 https://console.aws.amazon.com/s3/
的 Amazon S3 主控台。 -
選擇您先前為應用程式碼建立的儲存貯體。
-
選擇上傳。
-
選擇 Add files (新增檔案)。
-
導覽至上一個步驟中產生的 .zip 檔案:
target/managed-flink-pyflink-getting-started-1.0.0.zip。 -
選擇上傳,而不變更任何其他設定。
建立和設定 Managed Service for Apache Flink 應用程式
您可以使用 主控台或 建立和設定 Managed Service for Apache Flink 應用程式 AWS CLI。在本教學課程中,我們將使用 主控台。
建立應用程式
登入 AWS 管理主控台,並在 https://https://console.aws.amazon.com/flink 開啟 Amazon MSF 主控台。
-
確認已選取正確的區域:美國東部 (維吉尼亞北部)us-east-1。
-
開啟右側選單,然後選擇 Apache Flink 應用程式,然後選擇建立串流應用程式。或者,從初始頁面的開始使用區段中選擇建立串流應用程式。
-
在建立串流應用程式頁面上:
-
對於選擇設定串流處理應用程式的方法,請選擇從頭開始建立。
-
針對 Apache Flink 組態、Application Flink 版本,選擇 Apache Flink 1.19。
-
對於應用程式組態:
-
在應用程式名稱中,輸入
MyApplication。 -
對於 Description (說明),輸入
My Python test app。 -
在存取應用程式資源中,選擇使用必要政策建立/更新 IAM 角色 kinesis-analytics-MyApplication-us-east-1。
-
-
對於應用程式設定的範本:
-
針對範本,選擇開發。
-
-
選擇建立串流應用程式。
-
注意
使用主控台建立 Managed Service for Apache Flink 應用程式時,可以選擇是否為應用程式建立 IAM 角色和政策。應用程式使用此角色和政策來存取其相依資源。這些 IAM 資源會如下所述使用您的應用程式名稱和區域命名:
-
政策:
kinesis-analytics-service-MyApplication-us-west-2 -
角色:
kinesisanalytics-MyApplication-us-west-2
Amazon Managed Service for Apache Flink 先前稱為 Kinesis Data Analytics。自動產生的資源名稱會加上 字首,kinesis-analytics以確保回溯相容性。
編輯 IAM 政策
編輯 IAM 政策以新增 Amazon S3 儲存貯體存取許可。
編輯 IAM 政策以新增 S3 儲存貯體許可
前往 https://console.aws.amazon.com/iam/
開啟 IAM 主控台。 -
選擇政策。選擇主控台為您在上一節所建立的
kinesis-analytics-service-MyApplication-us-east-1政策。 -
選擇編輯,然後選擇 JSON 索引標籤。
-
將下列政策範例的反白部分新增至政策。使用您的帳戶 ID 取代範例帳戶 ID (
012345678901)。 -
選擇下一步,然後選擇儲存變更。
設定應用程式
編輯應用程式組態以設定應用程式程式碼成品。
設定應用程式
-
在 MyApplication 頁面,選擇設定。
-
在應用程式碼位置區段中:
-
針對 Amazon S3 儲存貯體,選取您先前為應用程式碼建立的儲存貯體。選擇瀏覽並選擇正確的儲存貯體,然後選擇選擇。請勿選取儲存貯體名稱。
-
對於 Amazon S3 物件的路徑,請輸入
managed-flink-pyflink-getting-started-1.0.0.zip。
-
-
針對存取許可,選擇
kinesis-analytics-MyApplication-us-east-1使用必要政策建立/更新 IAM 角色。 -
移至執行期屬性,並保留所有其他設定的預設值。
-
選擇新增項目並新增下列每個參數:
群組 ID 金鑰 值 InputStream0stream.nameExampleInputStreamInputStream0flink.stream.initposLATESTInputStream0aws.regionus-east-1OutputStream0stream.nameExampleOutputStreamOutputStream0aws.regionus-east-1kinesis.analytics.flink.run.optionspythonmain.pykinesis.analytics.flink.run.optionsjarfilelib/pyflink-dependencies.jar -
請勿修改任何其他區段,然後選擇儲存變更。
注意
當您選擇啟用 Amazon CloudWatch 日誌時,Managed Service for Apache Flink 便會為您建立日誌群組和日誌串流。這些資源的名稱如下所示:
-
日誌群組:
/aws/kinesis-analytics/MyApplication -
日誌串流:
kinesis-analytics-log-stream
執行應用程式
應用程式現在已設定並準備好執行。
執行應用程式
-
在 Amazon Managed Service for Apache Flink 的 主控台上,選擇我的應用程式,然後選擇執行。
-
在下一頁的應用程式還原組態頁面上,選擇使用最新的快照執行,然後選擇執行。
應用程式詳細資訊中的狀態會從
Ready轉換為Starting,然後在應用程式啟動Running時轉換為 。
當應用程式處於 Running 狀態時,您現在可以開啟 Flink 儀表板。
開啟 儀表板
-
選擇開啟 Apache Flink 儀表板。儀表板會在新頁面上開啟。
-
在執行中任務清單中,選擇您可以看到的單一任務。
注意
如果您未正確設定執行期屬性或編輯 IAM 政策,應用程式狀態可能會變成
Running,但 Flink 儀表板會顯示任務正在持續重新啟動。如果應用程式設定錯誤或缺少存取外部資源的許可,這是常見的失敗案例。發生這種情況時,請檢查 Flink 儀表板中的例外狀況索引標籤,以查看問題的原因。
觀察執行中應用程式的指標
在 MyApplication 頁面的 Amazon CloudWatch 指標區段中,您可以查看執行中應用程式的一些基本指標。
檢視指標
-
在重新整理按鈕旁,從下拉式清單中選取 10 秒。
-
當應用程式執行正常時,您可以看到運作時間指標持續增加。
-
Fullrestarts 指標應為零。如果增加,組態可能會發生問題。若要調查問題,請檢閱 Flink 儀表板上的例外狀況索引標籤。
-
在運作狀態良好的應用程式中,失敗檢查點指標的數量應為零。
注意
此儀表板會顯示一組固定的指標,精細程度為 5 分鐘。您可以使用 CloudWatch 儀表板中的任何指標來建立自訂應用程式儀表板。
觀察 Kinesis 串流中的輸出資料
請確定您仍在使用 Python 指令碼或 Kinesis Data Generator 將資料發佈至輸入。
您現在可以使用 https://console.aws.amazon.com/kinesis/
檢視輸出
在以下網址開啟 Kinesis 主控台:https://console.aws.amazon.com/kinesis
。 -
確認區域與您用來執行本教學課程的區域相同。根據預設,它是 us-east-1US East (N. Virginia)。視需要變更 區域。
-
選擇資料串流。
-
選取您要觀察的串流。在本教學課程中,使用
ExampleOutputStream。 -
選擇資料檢視器標籤。
-
選取任何碎片,保持最新為開始位置,然後選擇取得記錄。您可能會看到「找不到此請求的記錄」錯誤。若是如此,請選擇重試取得記錄。發佈至串流顯示的最新記錄。
-
選取資料欄中的值,以 JSON 格式檢查記錄的內容。
停止應用程式
若要停止應用程式,請前往名為 的 Managed Service for Apache Flink 應用程式的主控台頁面MyApplication。
停止應用程式
-
從動作下拉式清單中,選擇停止。
-
應用程式詳細資訊中的狀態會從 轉換為
RunningStopping,然後在應用程式完全停止Ready時轉換為 。注意
別忘了也要停止從 Python 指令碼或 Kinesis Data Generator 將資料傳送至輸入串流。