教學課程:使用 Amazon MSK 事件來源映射來叫用 Lambda 函數 - AWS Lambda

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

教學課程:使用 Amazon MSK 事件來源映射來叫用 Lambda 函數

在本教學課程中,您將執行下列操作:

  • 在與現有 Amazon MSK 叢集相同的 AWS 帳戶中建立 Lambda 函數。

  • 設定 Lambda 與 Amazon 通訊的聯網和身份驗證MSK。

  • 設定 Lambda Amazon MSK 事件來源映射,以便在主題中顯示事件時執行 Lambda 函數。

完成這些步驟後,當事件傳送至 Amazon 時MSK,您將能夠設定 Lambda 函數,使用您自己的自訂 Lambda 程式碼自動處理這些事件。

您可以使用此功能做什麼?

範例解決方案:使用MSK事件來源對應來傳遞即時分數給您的客戶。

請考慮下列案例:您的公司主控一個 Web 應用程式,您的客戶可以在其中檢視即時賽事的相關資訊,例如體育比賽。遊戲中的資訊更新會透過 Amazon 上的 Kafka 主題提供給您的團隊。MSK您想要設計一個解決方案,該解決方案會使用MSK主題的更新,以便為您開發的應用程式內的客戶提供即時事件的更新檢視。您已決定採用下列設計方法:您的用戶端應用程式將與託管於 AWS. 用戶端將使用 Amazon API 閘道 WebSocket API透過 Websocket 工作階段進行連線。

在此解決方案中,您需要一個可讀取MSK事件、執行某些自訂邏輯以準備應用程式層的事件的元件,然後將該資訊轉送至API閘道API。您可以透過在 Lambda 函數中提供自訂邏輯,然後使用 AWS Lambda Amazon MSK 事件來源對應呼叫該元件來實作此元件。 AWS Lambda

如需使用 Amazon API 閘道實作解決方案的詳細資訊 WebSocket API,請參閱閘API道文件中的WebSocket API教學課程

必要條件

具有下列預先設定資源的 AWS 帳號:

了滿足這些先決條件,我們建議您遵循 Amazon MSK 文件MSK中的「開始使用 Amazon」。

  • Amazon MSK 集群。請參閱開始使用 Amazon 網站中的建立 Amazon MSK 叢集MSK。

  • 以下配置:

    • 確定叢集安全性設定中已啟用IAM角色為基礎的驗證。這樣可以將 Lambda 函數限制為僅存取所需的 Amazon MSK 資源,藉此改善您的安全性。預設情況下,新的 Amazon MSK 叢集會啟用此功能。

    • 確定叢集網路設定中的公用存取已關閉。限制 Amazon MSK 叢集對網際網路的存取權限,可限制處理資料的中介機構數量,從而提高安全性。預設情況下,新的 Amazon MSK 叢集會啟用此功能。

  • Amazon MSK 群集中用於此解決方案的卡夫卡主題。請參閱開始使用 Amazon 中的建立主題MSK。

  • Kafka 管理主機設定用於從您的 Kafka 叢集擷取資訊,並將卡夫卡事件傳送到您的主題進行測試,例如安裝了 Kafka 管理員和 Amazon 程式庫的 Amazon EC2 執行個體。CLI MSK IAM請參閱開始使用 Amazon 中的建立用戶端電腦MSK。

設定完這些資源後,請從您的 AWS 帳戶收集下列資訊,以確認您已準備好繼續。

  • 您的 Amazon MSK 群集的名稱。您可以在 Amazon MSK 控制台中找到此信息。

  • 集群UUID,您可以在 ARN Amazon MSK 控制台中找到該MSK集群的一部分。請遵循 Amazon MSK 文件中列出叢集中的程序以尋找此資訊。

  • 與您的 Amazon MSK 叢集相關聯的安全群組。您可以在 Amazon MSK 控制台中找到此信息。在以下步驟中,請將這些步驟稱為您的 clusterSecurityGroups.

  • VPC包含您的 Amazon MSK 群集的 Amazon 的 ID。您可以在 Amazon 主控台中識別與 Amazon MSK 叢集關聯的子網路,然後在 Amazon MSK 主控台中識別與子網路VPC相關聯的 Amazon,以找到此資訊。VPC

  • 解決方案中使用的 Kafka 主題名稱。您可以topicsCLI從 Kafka 管理主機使用 Kafka 呼叫您的 Amazon MSK 叢集,以找到此資訊。如需有關主題的詳細資訊CLI,請參閱 Kafka 文件中的新增和移除主題

  • 您卡夫卡主題的用戶群組名稱,適合由 Lambda 函數使用。此群組可由 Lambda 自動建立,因此您不需要使用卡夫卡CLI建立群組。如果您確實需要管理用戶群組,若要深入了解消費者群組CLI,請參閱 Kafka 文件中的管理用戶群組

您 AWS 帳戶中的下列權限:

  • 建立和管理 Lambda 函數的權限。

  • 建立IAM政策並將其與 Lambda 函數產生關聯的權限。

  • 允許在VPC託管您的 Amazon MSK 叢集的 Amazon 中建立 Amazon VPC 端點和變更聯網組態。

設定 Lambda 與 Amazon 通訊的網路連線 MSK

用 AWS PrivateLink 於連接 Lambda 和 Amazon MSK。您可以通過在 Amazon VPC 控制台中創建界面 Amazon VPC 端點來做到這一點。如需網路組態的詳細資訊,請參閱網路組態

當 Amazon MSK 事件來源對應代表 Lambda 函數執行時,它會擔任 Lambda 函數的執行角色。此IAM角色授權對應存取受保護的資源IAM,例如 Amazon MSK 叢集。雖然這些元件共用執行角色,但 Amazon MSK 對應和 Lambda 函數對其各自的任務有不同的連線需求,如下圖所示。

Lambda 函數會輪詢叢集,並使用. AWS STS

您的事件來源對應屬於您的 Amazon MSK 叢集安全群組。在此聯網步驟中,從 Amazon MSK 叢集建立 Amazon VPC 端點,VPC以將事件來源對應連接至 Lambda 和STS服務。保護這些端點以接受來自 Amazon MSK 叢集安全群組的流量。然後,調整 Amazon MSK 叢集安全群組,以允許事件來源映射與 Amazon MSK 叢集通訊。

您可以使用配置下列步驟 AWS Management Console。

設定介面 Amazon VPC 端點以連接 Lambda 和 Amazon MSK
  1. 為您的界面 Amazon VPC 端點創建一個安全組,endpointSecurityGroup,允許來自 443 的入站TCP流量 clusterSecurityGroups。 按照 Amazon EC2 文件中建立安全群組中的程序來建立安全群組。然後,按照 Amazon EC2 文件中將規則新增至安全群組中的程序來新增適當的規則。

    使用下列資訊建立安全性群組:

    新增輸入規則時,請為中的每個安全群組建立規則 clusterSecurityGroups。 對於每個規則:

    • 選取做為「類型HTTPS

    • 對於來源,選取其中一個 clusterSecurityGroups.

  2. 建立一個端點,將 Lambda 服務連接到VPC包含您的 Amazon MSK 叢集的 Amazon。依照建立介面端點中的程序執行。

    使用下列資訊建立介面端點:

    • 對於服務名稱,選取com.amazonaws.regionName.lambda,其中 regionName 託管您的 Lambda 函數。

    • 對於 VPC,選取VPC包含您的 Amazon MSK 叢集的 Amazon。

    • 對於安全性群組,選取 endpointSecurityGroup,您之前創建的。

    • 對於子網路,請選取託管 Amazon MSK 叢集的子網路。

    • 針對原則,請提供下列原則文件,以保護端點,以供 Lambda 服務主體使用lambda:InvokeFunction動作。

      { "Statement": [ { "Action": "lambda:InvokeFunction", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用DNS名稱保持設定狀態。

  3. 建立一個端點,將 AWS STS 服務連接到VPC包含您的 Amazon MSK 叢集的 Amazon。依照建立介面端點中的程序執行。

    使用下列資訊建立介面端點:

    • 對於服務名稱,選取 AWS STS。

    • 對於 VPC,選取VPC包含您的 Amazon MSK 叢集的 Amazon。

    • 對於安全性群組,選取 endpointSecurityGroup.

    • 對於子網路,請選取託管 Amazon MSK 叢集的子網路。

    • 針對原則,請提供下列原則文件,以保護端點,以供 Lambda 服務主體使用sts:AssumeRole動作。

      { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": [ "lambda.amazonaws.com" ] }, "Resource": "*" } ] }
    • 確保啟用DNS名稱保持設定狀態。

  4. 針對與 Amazon MSK 叢集相關聯的每個安全群組,也就是 clusterSecurityGroups,允許下列項目:

    • 允許 9098 上的所有入站和出站TCP流量傳輸至所有流量 clusterSecurityGroups, 包括在本身.

    • 允許 443 上的所有輸出TCP流量。

    預設安全性群組規則允許其中一些流量,因此,如果您的叢集附加至單一安全性群組,而該群組具有預設規則,則不需要額外的規則。若要調整安全群組規則,請遵循 Amazon EC2 文件中將規則新增至安全群組中的程序。

    使用下列資訊將規則新增至您的安全群組:

    • 針對連接埠 9098 的每個輸入規則或輸出規則,請提供

      • 選取「自訂」做為「類型」TCP。

      • 對於連接埠範圍,請提供 9098。

      • 針對來源,提供下列其中一項 clusterSecurityGroups.

    • 對於連接埠 443 的每個輸入規則,對於類型,選取HTTPS

建立 Lambda 從您的 Amazon MSK 主題中讀取的IAM角色

確定 Lambda 要從您的 Amazon MSK 主題讀取的身份驗證要求,然後在政策中定義它們。創建一個角色,lambdaAuthRole,即授權 Lambda 使用這些權限。使用動作授權 Amazon MSK 叢集上的kafka-clusterIAM動作。然後,授權 Lambda 執行探索MSKkafka並連接到 Amazon MSK 叢集所需的 Amazon 和 Amazon EC2 動作,以及執行 CloudWatch 動作,以便 Lambda 可以記錄它所做的事情。

描述 Lambda 從 Amazon 讀取的身份驗證要求 MSK
  1. 撰寫IAM政策文件(一JSON份文件),clusterAuthPolicy,這允許 Lambda 使用您的卡夫卡消費者組從 Amazon MSK 集群中的卡夫卡主題中讀取。Lambda 需要在讀取時設定卡夫卡消費群組。

    變更下列範本以符合您的先決條件:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:DescribeGroup", "kafka-cluster:AlterGroup", "kafka-cluster:DescribeTopic", "kafka-cluster:ReadData", "kafka-cluster:DescribeClusterDynamicConfiguration" ], "Resource": [ "arn:aws:kafka:region:account-id:cluster/mskClusterName/cluster-uuid", "arn:aws:kafka:region:account-id:topic/mskClusterName/cluster-uuid/mskTopicName", "arn:aws:kafka:region:account-id:group/mskClusterName/cluster-uuid/mskGroupName" ] } ] }

    欲了解更多信息,請諮詢IAM 角色型身分驗證。撰寫您的政策時:

    • 用於 region 以及 account-id,提供那些託管您的 Amazon MSK 集群。

    • 用於 mskClusterName,請提供您的 Amazon MSK 群集的名稱。

    • 用於 cluster-uuid,ARN請為您的 Amazon MSK 叢集提供UUID中的。

    • 用於 mskTopicName,提供您的卡夫卡主題的名稱。

    • 用於 mskGroupName,提供您的卡夫卡消費群組的名稱。

  2. 識別 Lambda 探索EC2和連接 Amazon MSK 叢集所需的 Amazon、Amazon 和 CloudWatch 許可,並記錄這些事件。MSK

    AWSLambdaMSKExecutionRole管理的原則會寬鬆定義必要的權限。請按照以下步驟使用它。

    在生產環境中,請根據最低權限原則評估AWSLambdaMSKExecutionRole以限制執行角色原則,然後為您的角色撰寫取代此受管理原則的原則。

如需有關IAM原則語言的詳細資訊,請參閱IAM文件

現在您已撰寫政策文件,請建立IAM原則,以便將其附加至您的角色。您可以按照以下步驟使用控制台執行此操作。

從您的IAM保單文件建立保單
  1. 登入 AWS Management Console 並開啟IAM主控台,位於https://console.aws.amazon.com/iam/

  2. 在左側的導覽窗格中,選擇 Policies (政策)

  3. 選擇 Create policy (建立政策)。

  4. 在 [原則編輯器] 區段中,選擇JSON選項。

  5. Paste (貼上) clusterAuthPolicy.

  6. 將許可新增至政策後,請選擇下一步

  7. 檢視與建立頁面上,為您正在建立的政策輸入政策名稱描述 (選用)。檢視此政策中定義的許可,來查看您的政策所授予的許可。

  8. 選擇 Create policy (建立政策) 儲存您的新政策。

如需詳細資訊,請參閱IAM說明文件中的建立IAM原則

現在您有適當的IAM策略,請創建一個角色並將其附加到它。您可以按照以下步驟使用控制台執行此操作。

在IAM主控台中建立執行角色
  1. 在主控台中開啟 [角色] 頁IAM面

  2. 選擇建立角色

  3. 受信任的實體類型下,選擇 AWS  服務

  4. 使用案例 下,選擇 Lambda

  5. 選擇 Next (下一步)

  6. 選取以下政策:

    • clusterAuthPolicy

    • AWSLambdaMSKExecutionRole

  7. 選擇 Next (下一步)

  8. 對於角色名稱,輸入 lambdaAuthRole ,然後選擇 [建立角色]。

如需詳細資訊,請參閱使用執行角色定義 Lambda 函數許可

創建一個 Lambda 函數以從您的 Amazon MSK 主題中讀取

建立設定為使用您的IAM角色的 Lambda 函數。您可以使用主控台建立 Lambda 函數。

若要使用驗證組態建立 Lambda 函數
  1. 開啟 Lambda 主控台,然後從標頭中選取建立函數

  2. 選取從頭開始撰寫

  3. 對於函數名稱,請提供您選擇的適當名稱。

  4. 對於 Runtime,請選擇的最新支援版本,Node.js以使用本教學課程中提供的程式碼。

  5. 選擇 [變更預設執行角色]。

  6. 選取 [使用現有角色]。

  7. 對於現有角色,選取 lambdaAuthRole.

在生產環境中,您通常需要為 Lambda 函數的執行角色新增其他政策,以便有意義地處理 Amazon MSK 事件。如需有關將原則新增至角色的詳細資訊,請參閱IAM文件中的新增或移除身分識別權限

建立對應至 Lambda 函數的事件來源

您的 Amazon MSK 事件來源對應提供 Lambda 服務所需的資訊,以便在發生適當的 Amazon MSK 事件時叫用 Lambda。您可以使用控制台創建一個 Amazon MSK 映射。建立 Lambda 觸發器,然後自動設定事件來源對應。

若要建立 Lambda 觸發器 (以及事件來源對應)
  1. 瀏覽至 Lambda 函數的概觀頁面。

  2. 在「功能概覽」區段中,選擇左下角的「新增觸發器」。

  3. 選擇一個源下拉列表中,選擇 Amazon MSK

  4. 不要設置身份驗證

  5. 對於MSK叢集,請選取叢集的名稱。

  6. 對於「Batch 大小」,輸入 1。此步驟使得此功能更容易測試,並且在生產中不是理想的價值。

  7. 對於「主題名稱」,請提供您的 Kafka 主題名稱。

  8. 對於消費者群組 ID,請提供您的卡夫卡用戶群組的 ID。

更新您的 Lambda 函數以讀取串流資料

Lambda 通過事件方法參數提供有關卡夫卡事件的信息。如需 Amazon MSK 事件的範例結構,請參閱 範例事件。瞭解如何解譯 Lambda 轉寄的 Amazon MSK 事件之後,您可以變更 Lambda 函數程式碼以使用其提供的資訊。

將下列程式碼提供給您的 Lambda 函數,以記錄 Lambda Amazon MSK 事件的內容,以供測試之用:

Java
SDK對於爪哇 2.x
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

使用 Java 使用 Lambda 消費 Amazon MSK 事件。

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestHandler; import com.amazonaws.services.lambda.runtime.events.KafkaEvent; import com.amazonaws.services.lambda.runtime.events.KafkaEvent.KafkaEventRecord; import java.util.Base64; import java.util.Map; public class Example implements RequestHandler<KafkaEvent, Void> { @Override public Void handleRequest(KafkaEvent event, Context context) { for (Map.Entry<String, java.util.List<KafkaEventRecord>> entry : event.getRecords().entrySet()) { String key = entry.getKey(); System.out.println("Key: " + key); for (KafkaEventRecord record : entry.getValue()) { System.out.println("Record: " + record); byte[] value = Base64.getDecoder().decode(record.getValue()); String message = new String(value); System.out.println("Message: " + message); } } return null; } }
JavaScript
SDK對於 JavaScript (3)
注意

還有更多關於 GitHub。尋找完整範例,並了解如何在無伺服器範例儲存庫中設定和執行。

消費 Amazon MSK 事件 Lambda 使用 JavaScript.

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. // SPDX-License-Identifier: Apache-2.0 exports.handler = async (event) => { // Iterate through keys for (let key in event.records) { console.log('Key: ', key) // Iterate through records event.records[key].map((record) => { console.log('Record: ', record) // Decode base64 const msg = Buffer.from(record.value, 'base64').toString() console.log('Message:', msg) }) } }

您可以使用主控台將函數程式碼提供給 Lambda。

若要更新 Lambda 函數程式碼
  1. 瀏覽至 Lambda 函數的概觀頁面。

  2. 選擇 程式碼 標籤。

  3. 在代碼源中輸入提供的代碼IDE。

  4. 在 [程式碼來源] 導覽列中,選擇 [部署]。

測試您的 Lambda 函數以確認它已連接到您的 Amazon MSK 主題

您現在可以透過檢查 CloudWatch 事件記錄來確認事件來源是否叫用 Lambda。

驗證是否正在叫用 Lambda 函數
  1. 使用您的 Kafka 管理主機來產生使用的卡夫卡事件。kafka-console-producer CLI如需詳細資訊,請參閱 Kafka 文件中的主題撰寫一些事件。針對先前步驟中定義的事件來源對應,傳送足夠的事件以填滿批次大小定義的批次,否則 Lambda 將等待更多資訊呼叫。

  2. 如果您的函數執行,Lambda 會寫入發生的事情 CloudWatch。在主控台中,導覽至 Lambda 函數的詳細資料頁面。

  3. 選取 Configuration (組態) 索引標籤。

  4. 在側邊欄中,選取監控和作業工具

  5. 識別記CloudWatch 錄組態下的記錄群組。記錄群組的開頭應為/aws/lambda。選擇日誌群組的連結。

  6. 在 CloudWatch 主控台中,檢查 Lambda 傳送至記錄串流的記錄事件記錄事件。識別是否存在包含來自 Kafka 事件之訊息的記錄事件,如下圖所示。如果有的話,您已經MSK使用 Lambda 事件來源對應成功將 Lambda 函數連接到 Amazon。

    CloudWatch 顯示由提供的代碼提取的事件信息的日誌事件。