使用 Lambda 處理 Amazon MSK 訊 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Lambda 處理 Amazon MSK 訊

注意

如果您想要將資料傳送到 Lambda 函數以外的目標,或在傳送資料之前豐富資料,請參閱 Amazon EventBridge 管道

將 Amazon 添加MSK為事件源

若要建立事件來源對應,請使用 Lambda 主控台、或 AWS Command Line Interface (AWS CLI) 將 Amazon 新增MSK為 Lambda 函數觸發器AWS SDK請注意,當您將 Amazon 添加MSK為觸發器時,Lambda 會假VPC設 Amazon MSK 叢集的設置,而不是 Lambda 函數的VPC設置。

本節說明如何使用 Lambda 主控台和 AWS CLI建立事件來源映射。

必要條件

可自訂的取用者群組 ID

將 Kafka 設為事件來源時,您可以指定取用者群組 ID。此取用者群組 ID 是您希望 Lambda 函數加入之 Kafka 取用者群組的現有識別符。您可以使用此功能將任何進行中的 Kafka 記錄處理設定從其他取用者無縫遷移至 Lambda。

如果您指定取用者群組 ID,且該取用者群組內還有其他作用中的輪詢者,則 Kafka 會將訊息分配給所有取用者。換句話說,Lambda 不會收到有關 Kafka 主題的所有訊息。如果您希望 Lambda 處理主題中的所有訊息,請關閉該取用者群組中的任何其他輪詢者。

此外,如果您指定取用者群組 ID,且 Kafka 找到具有相同 ID 的有效現有取用者群組,則 Lambda 會忽略用於事件來源映射的 StartingPosition 參數。相反的,Lambda 會根據取用者群組的承諾偏移量開始處理記錄。如果您指定取用者群組 ID,但 Kafka 找不到現有的取用者群組,則 Lambda 會使用指定的 StartingPosition 來設定事件來源。

您指定的取用者群組 ID 在所有 Kafka 事件來源中必須是唯一的。使用指定的取用者群組 ID 建立 Kafka 事件來源映射之後,您就無法更新此值。

添加 Amazon MSK 觸發器(控制台)

請按照以下步驟將您的 Amazon MSK 叢集和 Kafka 主題新增為 Lambda 函數的觸發器。

將 Amazon MSK 觸發器新增至您的 Lambda 函數 (主控台)
  1. 開啟 Lambda 主控台中的 函數頁面

  2. 選擇 Lambda 函數的名稱。

  3. 函式概觀 下,選擇 新增觸發條件

  4. Trigger configuration (觸發條件) 下,執行下列動作:

    1. 選擇 MSK 觸發程式類型。

    2. 對於MSK叢集,請選取您的叢集。

    3. 對於 批次大小,輸入單一批次中要擷取的訊息數量上限。

    4. 對於 Batch window (批次時段),輸入 Lambda 調用函數之前收集記錄所花費的最長秒數。

    5. 對於 Topic name (主題名稱),輸入 Kafka 主題名稱。

    6. (選用) 對於取用者群組 ID,輸入要加入的 Kafka 取用者群組 ID。

    7. (選用) 對於開始位置,請選擇最新以從最新記錄開始讀取串流、選擇水平修剪以從最早的可用記錄開始,或選擇在時間戳記為以指定開始讀取的時間戳記。

    8. (選擇性) 對於驗,請選擇密碼金鑰以透過MSK叢集中的代理程式進行驗證。

    9. 若要建立處於停用狀態的觸發條件以進行測試 (建議做法),請取消勾選 啟用觸發條件 。或者,若要立即啟用觸發條件,請選取 啟用觸發條件

  5. 若要建立觸發條件,請選擇 新增

添加一個 Amazon MSK 觸發器(AWS CLI)

使用下列範例 AWS CLI 命令建立和檢視 Lambda 函數的 Amazon MSK 觸發器。

使用建立觸發器 AWS CLI

範例 — 為使用IAM驗證的叢集建立事件來源對應

下列範例會使用命create-event-source-mapping AWS CLI 令,將名為的 Lambda 函數對應my-kafka-function至名為的卡夫卡主題。AWSKafkaTopic主題的開始位置設定為 LATEST。當叢集使用以IAM角色為基礎的驗證時,您不需要SourceAccessConfiguration物件。範例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
範例 — 為使用SASL/SCRAM身份驗證的群集創建事件源映射

如果叢集使用 SASL/SCRAM驗證,您必須包含指定的SourceAccessConfiguration物件SASL_SCRAM_512_AUTH和秘 Secrets Manager 碼ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'
範例 — 為使用 m TLS 驗證的叢集建立事件來源對應

如果叢集使用 m TLS 驗證,您必須包含指定的SourceAccessConfiguration物件CLIENT_CERTIFICATE_TLS_AUTH和秘 Secrets Manager 碼ARN。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:cluster/my-cluster/fc2f5bdf-fd1b-45ad-85dd-15b4a5a6247e-2 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:111122223333:secret:my-secret"}]'

如需詳細資訊,請CreateEventSourceMappingAPI參閱參考文件。

使用檢視狀態 AWS CLI

下列範例使用命get-event-source-mapping AWS CLI 令來描述您所建立之事件來源對應的狀態。

aws lambda get-event-source-mapping \ --uuid 6d9bce8e-836b-442c-8070-74e77903c815

Amazon MSK 配置參數

所有 Lambda 事件來源類型都共用相同CreateEventSourceMappingUpdateEventSourceMappingAPI作業。但是,只有一些參數適用於 Amazon MSK。

參數 必要 預設 備註

AmazonManagedKafkaEventSourceConfig

N

包含預設為唯一值的 ConsumerGroupId 欄位。

只能在建立時進行設定

BatchSize

100

上限:10,000

已啟用

N

已啟用

EventSourceArn

Y

N/A

只能在建立時進行設定

FunctionName

Y

N/A

FilterCriteria

N

N/A

控制 Lambda 傳送給函數的事件

MaximumBatchingWindowInSeconds

N

500 毫秒

批次處理行為

SourceAccessConfigurations

N

沒有憑證

SASL您的事件來源的/SCRAM或 CLIENT CERTIFICATE TLS _ _ AUTH (相互TLS) 驗證憑證

StartingPosition

Y

N/A

在 _ TIMESTAMP、TRIM _ HORIZON 或 LATEST

只能在建立時進行設定

StartingPositionTimestamp

N

N/A

如果設定 StartingPosition 為 AT_,則需要 TIMESTAMP

主題

Y

N/A

Kafka 主題名稱

只能在建立時進行設定

建立跨帳戶事件來源映射

您可以使用多VPC私有連線,將 Lambda 函數連接到不同的佈建MSK叢集 AWS 帳戶。多VPC連接用途 AWS PrivateLink,保持 AWS 網絡內的所有流量。

注意

您無法為無伺服器MSK叢集建立跨帳戶事件來源對應。

若要建立跨帳戶事件來源對應,您必須先為MSK叢集設定多重VPC連線。建立事件來源對應時,請使用受管理的VPC連線ARN而非叢集ARN,如下列範例所示。CreateEventSourceMapping作業也會根據MSK叢集使用的驗證類型而有所不同。

範例 — 為使IAM用驗證的叢集建立跨帳戶事件來源對應

當叢集使用以IAM角色為基礎的驗證時,您不需要SourceAccessConfiguration物件。範例:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function
範例 — 為使用SASL/SCRAM身份驗證的群集創建跨帳戶事件源映射

如果叢集使用 SASL/SCRAM驗證,您必須包含指定的SourceAccessConfiguration物件SASL_SCRAM_512_AUTH和秘 Secrets Manager 碼ARN。

透過SASL/SCRAM身份驗證,有兩種方法可以使用跨帳戶 Amazon MSK 事件來源映射:

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "SASL_SCRAM_512_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'
範例 — 為使用 m TLS 驗證的叢集建立跨帳戶事件來源對應

如果叢集使用 m TLS 驗證,您必須包含指定的SourceAccessConfiguration物件CLIENT_CERTIFICATE_TLS_AUTH和秘 Secrets Manager 碼ARN。機密可以儲存在叢集帳戶或 Lambda 函數帳戶中。

aws lambda create-event-source-mapping \ --event-source-arn arn:aws:kafka:us-east-1:111122223333:vpc-connection/444455556666/my-cluster-name/51jn98b4-0a61-46cc-b0a6-61g9a3d797d5-7 \ --topics AWSKafkaTopic \ --starting-position LATEST \ --function-name my-kafka-function \ --source-access-configurations '[{"Type": "CLIENT_CERTIFICATE_TLS_AUTH","URI": "arn:aws:secretsmanager:us-east-1:444455556666:secret:my-secret"}]'

使用 Amazon MSK 叢集做為事件來源

當您將 Apache Kafka 或 Amazon MSK 叢集新增為 Lambda 函數的觸發器時,叢集會用作件來源。

Lambda 會根據您指定的,讀取您在CreateEventSourceMapping請求Topics中指定的 Kafka 主題中的事件StartingPosition資料。處理成功後,您的 Kafka 主題將遞交給 Kafka 叢集。

如果您指定 StartingPosition 作為 LATEST,Lambda 會開始讀取屬於該主題的每個分割區中的最新訊息。由於 Lambda 開始讀取訊息之前,觸發條件組態後可能存在一些延遲,所以 Lambda 不會讀取此時段產生的任何訊息。

Lambda 會依序讀取每個卡夫卡主題分割區的訊息。單一 Lambda 承載可以包含來自多個分割區的訊息。當有更多記錄可用時,Lambda 會繼續根據您在CreateEventSourceMapping請求中指定的BatchSize值分批處理記錄,直到函數趕上主題為止。

Lambda 處理每個批次後,會遞交該批次中訊息的偏移量。如果函數針對批次中的任何訊息傳回錯誤,Lambda 會重試整個批次的訊息,直至處理成功或訊息過期。您可以將所有重試嘗試失敗的記錄傳送至失敗時的目的地,以便稍後處理。

注意

雖然 Lambda 函數的逾時限制通常為 15 分鐘,但適用於 Amazon 的事件來源對應MSK、自我管理的 Apache 卡夫卡、Amazon DocumentDB 以及 ActiveMQ 和 RabbitMQ 的 Amazon MQ 只支援 14 分鐘逾時限制上限的函數。此限制條件可確保事件來源映射能夠正確處理函數錯誤和重試。

輪詢和串流開始位置

請注意,建立和更新事件來源映射期間的串流輪詢最終會一致。

  • 在建立事件來源映射期間,從串流開始輪詢事件可能需要幾分鐘時間。

  • 在更新事件來源映射期間,從串流停止並重新開始輪詢事件可能需要幾分鐘時間。

這種行為表示如果您指定 LATEST 當作串流的開始位置,事件來源映射可能會在建立或更新期間遺漏事件。若要確保沒有遺漏任何事件,請將串流開始位置指定為 TRIM_HORIZONAT_TIMESTAMP

Amazon CloudWatch 指標

當您的函數處理記錄時,Lambda 會發出 OffsetLag 指標。此指標的值是寫入 Kafka 事件來源主題的最後一筆記錄與函數取用者群組處理的最後一筆記錄之間的偏移量的差值。您可以使用 OffsetLag 來預估新增記錄時與取用者群組處理記錄時之間的延遲。

OffsetLag 的增加趨勢可能表示函數取用者群組中的輪詢者問題。如需詳細資訊,請參閱檢視 Lambda 函數的指標

Amazon MSK 事件來源的自動擴展

當您最初建立 Amazon MSK 事件來源時,Lambda 會分配一個取用者來處理 Kafka 主題中的所有分區。每個取用者都有多個並行運行的處理器以處理增加的工作負載。此外,Lambda 會根據工作負載自動增加或減少取用者數量。為了保留每個分割區中的訊息順序,主題中每個分割區的取用者數上限是一個取用者。

每 1 分鐘,Lambda 會評估主題中所有分割區的取用者偏移延遲。如果延遲太高,則表示分割區接收訊息的速度比 Lambda 處理訊息的速度更快。如有必要,Lambda 會新增或移除主題取用者。新增或刪除取用者的擴展過程,將在三分鐘的評估期間內完成。

如果您的目標 Lambda 函數遭限流,Lambda 會減少取用者的數量。此動作可透過減少取用者可擷取和傳送至函數的訊息數量,減少函數的工作負載。

要監控 Kafka 主題的輸送量,當您的函數處理記錄時,檢視 Lambda 發出的偏移延遲指標

若要檢查並行發生的函數調用次數,您也可以監控函數的並行指標