使用 Lambda 函數作為輸出 - 適用於 SQL 應用程式的 Amazon Kinesis Data Analytics 開發人員指南

針對新專案,我們建議您優先選擇新的 Managed Service for Apache Flink Studio,而非 Kinesis Data Analytics for SQL 應用程式。Managed Service for Apache Flink Studio 易於使用且具備進階分析功能,可讓您在幾分鐘內建置複雜的串流處理應用程式。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 函數作為輸出

AWS Lambda 作為目的地使用可讓您在將 SQL 結果傳送至最終目的地之前,更輕鬆地執行後續處理。常見後續處理任務包括下列:

  • 將多個列彙總到單個記錄中

  • 結合目前結果與過去的結果,以處理延遲到達的資料

  • 根據資訊類型交付到不同目標

  • 記錄格式轉換 (如翻譯成 Protobuf)

  • 字串操作或轉換

  • 分析處理後的資料擴充

  • 地理空間使用案例的自訂處理

  • 資料加密

Lambda 函數可以將分析資訊傳遞至各種 AWS 服務和其他目的地,包括:

如需有關建立 Lambda 應用程式的詳細資訊,請參閱入門 AWS Lambda

Lambda 作為輸出許可

若要使用 Lambda 做為輸出,應用程式的 Lambda 輸出 IAM 角色需要下列許可政策:

{ "Sid": "UseLambdaFunction", "Effect": "Allow", "Action": [ "lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ], "Resource": "FunctionARN" }

Lambda 作為輸出指標

您可以使 CloudWatch 用 Amazon 監視傳送的位元組數、成功與失敗等。如需 Kinesis 資料分析使用 Lambda 作為輸出所發出的 CloudWatch 指標的相關資訊,請參閱 Amazon Kinesis Analytics 指標。

Lambda 作為輸出事件輸入資料模型和記錄回應模型

若要傳送 Kinesis Data Analytics 輸出記錄,您的 Lambda 函數必須符合所需的事件輸入資料和記錄回應模型。

事件輸入資料模型

Kinesis Data Analytics 會持續將輸出記錄從應用程式傳送至 Lambda,這些輸出紀錄用作輸出函數,並具有下列要求模型。在函數中,迭代列表並應用業務邏輯來完成輸出需求(例如,在發送到最終目的地之前的資料轉換)。

欄位 描述
invocationId Lambda 調用 ID (隨機 GUID)。
applicationArn Kinesis Data Analytics 應用程式的 Amazon Resource Name (ARN)。
紀錄
欄位 描述
recordId 記錄 ID (隨機 GUID)
lambdaDeliveryRecordMetadata
欄位 描述
retryHint 交付重試次數
資料 Base64 編碼輸出紀錄承載
注意

retryHint 值在每次交付失敗都會增加。此值不會長期存在,如果應用程式中斷,則會重設。

紀錄回應模型

傳送至 Lambda 做為輸出函數 (含記錄 ID) 的每筆記錄,都必須使用 OkDeliveryFailed 來確認,且必須包含下列參數。否則,Kinesis Data Analytics 會將它們視為交付失敗。

紀錄
欄位 描述
recordId 在調用期間,記錄 ID 會從 Kinesis Data Analytics 傳遞至 Lambda。原始記錄與經確認記錄的 ID 若有任何不符,就會視為幾交付失敗。
result 記錄交付的狀態。以下是可能的值:
  • Ok:記錄已成功轉換並傳送至最終目的地。Kinesis Data Analytics 會擷取記錄讓 SQL 處理。

  • DeliveryFailed:Lambda 作為輸出函數,未成功將記錄交付至最終目的地。Kinesis Data Analytics 會持續重試,將交付失敗記錄傳送至 Lambda 作為輸出函數。

Lambda 輸出調用頻率

Kinesis Data Analytics 應用程式會緩衝輸出記錄,並經常調用 AWS Lambda 目標函數。

  • 如果將記錄發送到資料分析應用程式中的目標應用程式內串流作為暫停視窗,則會在每個暫停視窗觸發程序叫用 AWS Lambda 目標函數。例如,如果使用 60 秒的輪轉窗口將記錄發送到目的地應用程式內串流,則每 60 秒會調用 Lambda 函數一次。

  • 如果記錄以連續查詢或滑動窗口的形式,發送到應用程式的目的地應用程式內串流,則每秒大約會調用一次 Lambda 目的地函數。

注意

適用每個 Lambda 函數調用要求承載大小限制。超過這些限制會導致輸出記錄分割,並跨越多個 Lambda 函數呼叫傳送。

新增用作輸出的 Lambda 函數

以下程序說明如何將 Lambda 函數新增為 Kinesis Data Analytics 應用程式的輸出。

  1. 登入 AWS Management Console 並開啟適用於 Apache Flink 的受管理服務主控台,網址為 https://console.aws.amazon.com/kinesisanalytics

  2. 選擇清單中的應用程式,然後選擇應用程式詳細資訊

  3. 目的地區段中,選擇連接新目的地

  4. 針對目的地項目,選擇 AWS Lambda 函數

  5. 交付記錄至 AWS Lambda 區段中,選擇現有的 Lambda 函數和版本,或選擇建立新的

  6. 如果您要建立新 Lambda 函數,請執行下列動作:

    1. 選擇其中一個範本。如需詳細資訊,為應用程式目的地建立 Lambda 函數

    2. 建立函數 頁面在 Web 瀏覽器的新瀏覽器標籤中開啟。在名稱方塊中,為函數指定一個有意義的名稱 (例如 myLambdaFunction)。

    3. 用後續處理功能為應用程式更新範本。如需建立 Lambda 函數的詳細資訊,請參閱《AWS Lambda 開發人員指南》中的 入門

    4. 在 Kinesis Data Analytics 主控台的 Lambda 函數清單中,選擇您剛建立的 Lambda 函數。為 Lambda 函數版本選擇 $LATEST

  7. 應用程式內串流區段,選擇選擇現有的應用程式內串流。針對應用程式內串流名稱,選擇應用程式的輸出串流。所選輸出串流的結果會傳送至 Lambda 輸出函數。

  8. 將表格的其餘部分保留為預設值,然後選擇儲存並繼續

您的應用程式現在會將記錄從應用程式內串流傳送到 Lambda 函數。您可以在 Amazon CloudWatch 主控台中查看預設範本的結果。監控 AWS/KinesisAnalytics/LambdaDelivery.OkRecords 指標,以查看交付至 Lambda 函數的記錄數。

Lambda 作為輸出之常見故障

以下是交付至 Lambda 函數可能會失敗的常見原因。

  • 並非所有傳送至 Lambda 函數的批次記錄 (具有記錄 ID) 都會傳回 Kinesis Data Analytics 服務。

  • 回應遺失記錄 ID 或狀態欄位。

  • Lambda 函數逾時不足以完成 Lambda 函數中的業務邏輯。

  • Lambda 函數中的業務邏輯不會擷取所有錯誤,導致未處理的例外狀況造成逾時和背壓。這些通常被稱為「毒丸」訊息。

對於資料傳遞失敗,Kinesis Data Analytics 會繼續在同一組記錄上重試 Lambda 調用,直到成功為止。若要深入瞭解失敗,您可以監視下列 CloudWatch 指標:

  • Kinesis Data Analytics 應用程式 Lambda 作為輸出指 CloudWatch 標:指出成功和失敗的次數以及其他統計資料。如需詳細資訊,請參閱Amazon Kinesis Analytics 指標

  • AWS Lambda 功能 CloudWatch 指標和日誌。