本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
若要保留失敗的事件來源映射調用記錄,請將目標地新增到函數的事件來源映射中。傳送至該目的地的每筆記錄都是包含失敗調用之中繼資料的 JSON 文件。對於 Amazon S3 目的地,Lambda 還會將整個調用記錄與中繼資料一起傳送。您可以將任何 Amazon SNS 主題、Amazon SQS 佇列或 S3 儲存貯體設定為目的地。
透過 Amazon S3 目的地,您可以使用 Amazon S3 事件通知功能,在物件上傳至您的目的地 S3 儲存貯體時接收通知。您也可以設定 S3 事件通知來調用另一個 Lambda 函數,以對失敗的批次執行自動處理。
執行角色必須具有目的地的許可:
-
對於 SQS 目的地:sqs:SendMessage
-
對於 SNS 目的地:sns:Publish
-
對於 S3 儲存貯體目的地:s3:PutObject 和 s3:ListBucket
您必須在 Apache Kafka 叢集 VPC 內部署失敗時的目的地服務的 VPC 端點。
此外,如果您在目的地上設定 KMS 金鑰,則視目的地類型而定,Lambda 需要下列許可:
-
如果您已針對 S3 目的地使用自己的 KMS 金鑰啟用加密,則需要 kms:GenerateDataKey。如果 KMS 金鑰和 S3 儲存貯體目的地與 Lambda 函數和執行角色位於不同的帳戶中,請將 KMS 金鑰設定為信任執行角色以允許 kms:GenerateDataKey。
-
如果您已針對 SQS 目的地使用自己的 KMS 金鑰啟用加密,則需要 kms:Decrypt 和 kms:GenerateDataKey。如果 KMS 金鑰和 SQS 佇列目的地與 Lambda 函數和執行角色位於不同的帳戶中,請將 KMS 金鑰設定為信任執行角色,以允許 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKey 以及 kms:ReEncrypt。
-
如果您已針對 SNS 目的地使用自己的 KMS 金鑰啟用加密,則需要 kms:Decrypt 和 kms:GenerateDataKey。如果 KMS 金鑰和 SNS 主題目的地與 Lambda 函數和執行角色位於不同的帳戶中,請將 KMS 金鑰設定為信任執行角色,以允許 kms:Decrypt、kms:GenerateDataKey、kms:DescribeKey 以及 kms:ReEncrypt。
為自我管理的 Apache Kafka 事件來源映射設定失敗時的目的地
若要使用主控台設定失敗時的目的地,請依照下列步驟執行:
開啟 Lambda 主控台中的 函數頁面
。 -
選擇一個函數。
-
在函數概觀下,選擇新增目的地。
-
針對來源,請選擇事件來源映射調用。
-
對於事件來源映射,請選擇針對此函數設定的事件來源。
-
對於條件,選取失敗時。對於事件來源映射調用,這是唯一可接受的條件。
-
對於目標類型,請選擇 Lambda 將調用記錄傳送至的目標類型。
-
對於目的地,請選擇一個資源。
-
選擇 Save (儲存)。
您也可以使用 AWS CLI設定失敗時的目的地。例如,下列 create-event-source-mappingMyFunction
:
aws lambda create-event-source-mapping \ --function-name "MyFunction" \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2 \ --destination-config '{"OnFailure": {"Destination": "arn:aws:sqs:us-east-1:123456789012:dest-queue"}}'
下列 update-event-source-mappinguuid
相關聯的事件來源:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": "arn:aws:s3:::dest-bucket"}}'
若要移除目的地,請提供空白字串作為 destination-config
參數的引數:
aws lambda update-event-source-mapping \ --uuid f89f8514-cdd9-4602-9e1f-01a5b77d449b \ --destination-config '{"OnFailure": {"Destination": ""}}'
Amazon S3 目的地的安全最佳實務
刪除已設定為目的地的 S3 儲存貯體而不從函數的組態中移除目的地,可能會產生安全風險。如果其他使用者知道目的地儲存貯體的名稱,他們可以在其 AWS 帳戶中重新建立儲存貯體。失敗調用的記錄會被傳送到其儲存貯體,可能公開來自您函數的資料。
警告
為了確保無法將來自函數的調用記錄傳送到另一個 儲存貯體中的 S3 儲存貯體 AWS 帳戶,請將條件新增至函數的執行角色,以限制您帳戶中儲存貯體的s3:PutObject
許可。
下列範例顯示的 IAM 政策,將函數的 s3:PutObject
許可限制為帳戶中的儲存貯體。此政策也為 Lambda 提供了使用 S3 儲存貯體做為目的地所需的 s3:ListBucket
許可。
{
"Version": "2012-10-17",
"Statement": [
{
"Sid": "S3BucketResourceAccountWrite",
"Effect": "Allow",
"Action": [
"s3:PutObject",
"s3:ListBucket"
],
"Resource": "arn:aws:s3:::*/*",
"Condition": {
"StringEquals": {
"s3:ResourceAccount": "111122223333"
}
}
}
]
}
若要使用 AWS Management Console 或 將許可政策新增至您 funcion 的執行角色 AWS CLI,請參閱下列程序的說明:
若要將許可政策新增至函數的執行角色 (主控台)
開啟 Lambda 主控台中的函數頁面
。 -
選取您要修改其執行角色的 Lambda 函數。
-
在組態索引標籤中,選擇許可。
-
在執行角色索引標籤中,選取函數的角色名稱,以開啟角色的 IAM 主控台頁面。
-
透過下列步驟將許可政策新增至角色:
-
在許可政策窗格中,選擇新增許可 ,然後選取建立內嵌政策。
-
在政策編輯器中,選取 JSON。
-
將您要新增的政策貼入編輯器 (取代現有的 JSON),然後選擇下一步。
-
在政策詳細資訊下,輸入政策名稱。
-
選擇 建立政策。
-
SNS 和 SQS 調用記錄範例
下列範例顯示 Lambda 針對失敗的 Kafka 事件來源調用而傳送至 SNS 主題或 SQS 佇列目的地。recordsInfo
之下的每個索引鍵都包含 Kafka 主題和分割區,以連字號分隔。例如,對於金鑰 "Topic-0"
,Topic
是 Kafka 主題,並且 0
是分區。對於每個主題和分區,您可以使用偏移和時間戳記資料來查找原始調用記錄。
{
"requestContext": {
"requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
"condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
"approximateInvokeCount": 1
},
"responseContext": { // null if record is MaximumPayloadSizeExceeded
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T00:38:06.021Z",
"KafkaBatchInfo": {
"batchSize": 500,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "...",
"payloadSize": 2039086, // In bytes
"recordsInfo": {
"Topic-0": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
},
"Topic-1": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
}
}
}
}
S3 目的地調用記錄範例
對於 S3 的目的地,Lambda 會將整個調用記錄與中繼資料一起傳送至目的地。下列範例顯示 Lambda 針對失敗的 Kafka 事件來源調用而傳送至 S3 儲存貯體目的地。除了上一個 SQS 和 SNS 目的地範例中的所有欄位之外,此 payload
欄位還包含原始調用記錄做為逸出 JSON 字串。
{
"requestContext": {
"requestId": "316aa6d0-8154-xmpl-9af7-85d5f4a6bc81",
"functionArn": "arn:aws:lambda:us-east-1:123456789012:function:myfunction",
"condition": "RetryAttemptsExhausted" | "MaximumPayloadSizeExceeded",
"approximateInvokeCount": 1
},
"responseContext": { // null if record is MaximumPayloadSizeExceeded
"statusCode": 200,
"executedVersion": "$LATEST",
"functionError": "Unhandled"
},
"version": "1.0",
"timestamp": "2019-11-14T00:38:06.021Z",
"KafkaBatchInfo": {
"batchSize": 500,
"eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/vpc-2priv-2pub/751d2973-a626-431c-9d4e-d7975eb44dd7-2",
"bootstrapServers": "...",
"payloadSize": 2039086, // In bytes
"recordsInfo": {
"Topic-0": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
},
"Topic-1": {
"firstRecordOffset": "49601189658422359378836298521827638475320189012309704722",
"lastRecordOffset": "49601189658422359378836298522902373528957594348623495186",
"firstRecordTimestamp": "2019-11-14T00:38:04.835Z",
"lastRecordTimestamp": "2019-11-14T00:38:05.580Z",
}
}
},
"payload": "<Whole Event>" // Only available in S3
}
提示
建議您在目的地儲存貯體上啟用 S3 版本控制。