本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
切換串流工作階段類型
使用 AWS Glue 互動式工作階段組態魔術命令 %streaming
,定義您正在執行的任務並初始化串流互動式工作階段。
為互動式開發抽樣輸入串流
為幫助增強 AWS Glue 互動式工作階段中的互動式體驗,我們開發出的一個工具是在 GlueContext
下新增了一種新方法,該方法用於獲取靜態 DynamicFrame 中的串流快照。GlueContext
讓您能夠檢查、互動並實作您的工作流程。
使用 GlueContext
類執行個體,您將能夠找到方法 getSampleStreamingDynamicFrame
。此方法所需的引數為:
-
dataFrame
:Spark Streaming DataFrame -
options
:請參閱下列可用的選項
可用選項包括:
-
windowSize:這也稱為微量批持續時間。此參數將判定觸發前一個批次後串流查詢將等待的時長。此參數值必須小於
pollingTimeInMs
。 -
pollingTimeInMs:方法將執行的總時長。它將至少觸發一個微批次,以從輸入串流中獲取樣本記錄。
-
recordPollingLimit:此參數可幫助您限制將從串流輪詢的記錄總數。
-
(選用) 您也可以使用
writeStreamFunction
將此自訂函數應用至每個記錄抽樣函數。請參閱下列 Scala 和 Python 中的範例。
-
val sampleBatchFunction = (batchDF: DataFrame, batchId: Long) => {//Optional but you can replace your own forEachBatch function here} val jsonString: String = s"""{"pollingTimeInMs": "10000", "windowSize": "5 seconds"}""" val dynFrame = glueContext.getSampleStreamingDynamicFrame(YOUR_STREAMING_DF, JsonOptions(jsonString), sampleBatchFunction) dynFrame.show()
注意
當抽樣 DynFrame
為空白時,它可能是由以下幾個原因造成的:
-
串流來源設定為「最新」,而且在抽樣週期期間沒有擷取任何新資料。
-
輪詢時間不足,無法處理它擷取的記錄。除非整個批次處理完畢,否則資料不會顯示。
在互動式工作階段中執行串流應用程式
在 AWS Glue 互動式工作階段中,您可以執行 AWS Glue 串流應用程式,就像您在 AWS Glue 主控台中建立串流應用程式一樣。由於互動式工作階段是以工作階段為基礎,因此在執行時間遇到異常情形不會導致工作階段停止。反覆開發批次函數現在帶來了額外好處。例如:
def batch_function(data_frame, batch_id):
log.info(data_frame.count())
invalid_method_call()
glueContext.forEachBatch(frame=streaming_df, batch_function = batch_function, options = {**})
上面的範例中包含了對方法的無效使用,與將退出整個應用程式的常規 AWS Glue 任務不同的是,使用者的編碼內容和定義得到完全保留,且工作階段仍然可運作。無需引導新叢集並重新執行之前的所有轉換。這樣,您就可以專注於快速逐一查看批次函數實作,以獲得理想的結果。
需要注意的是,互動式工作階段會以封鎖的方式評估每個陳述句,以便讓工作階段一次只執行一個陳述句。由於串流查詢是連續的並且永不結束,因此具有作用中串流查詢的工作階段將無法處理任何後續陳述句,除非它們被中斷。您可以直接從 Jupyter 筆記本發出中斷命令,我們的核心會為您處理取消操作。
以下列正在等待執行的陳述句序列為範例:
Statement 1:
val number = df.count()
#Spark Action with deterministic result
Result: 5
Statement 2:
streamingQuery.start().awaitTermination()
#Spark Streaming Query that will be executing continously
Result: Constantly updated with each microbatch
Statement 3:
val number2 = df.count()
#This will not be executed as previous statement will be running indefinitely