調用 AWS Lambda 函數 - Amazon Simple Storage Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

調用 AWS Lambda 函數

In voke AWS Lambda 函數會啟動 AWS Lambda 函數,以對資訊清單中列出的物件執行自訂動作。本節說明如何建立 Lambda 函數以搭配 S3 批次作業使用,以及如何建立任務來叫用函數。S3 批次操作任務會使用 LambdaInvoke 操作,對資訊清單中列出的每個物件執行 Lambda 函數。

您可以使用 AWS Management Console、 AWS Command Line Interface (AWS CLI)、 AWS 開發套件或 REST API,針對 Lambda 使用 S3 Batch 操作。如需有關使用 Lambda 的詳細資訊,請參閱《AWS Lambda 開發人員指南》中的 AWS Lambda入門

下列各節說明如何開始搭配 Lambda 使用 S3 批次作業。

搭配 Amazon S3 批次作業使用 Lambda

搭配使用 S3 Batch 操作時 AWS Lambda,您必須建立專門用於 S3 Batch 操作的新 Lambda 函數。您無法搭配 S3 批次作業重複使用現有基於 Amazon S3 事件的函數。事件函數只能接收訊息;不能傳回訊息。搭配 S3 批次作業使用的 Lambda 函數必須接受並傳回訊息。如需將 Lambda 與 Amazon S3 事件搭配使用的詳細資訊,請參閱AWS Lambda 開發人員指南中的AWS Lambda 與 Amazon S3 搭配使用。

您建立 S3 批次作業任務來叫用 Lambda 函數。此任務對資訊清單中列出的所有物件執行相同的 Lambda 函數。您可以控制在處理清單中的物件時,使用 Lambda 函數的哪些版本。S3 批次作業支援不合格的 Amazon Resource Name (ARN)、別名和特定版本。如需詳細資訊,請參閱《AWS Lambda 開發人員指南》中的介紹 AWS Lambda 版本控制

如果您為 S3 批次作業任務提供的函數 ARN 使用別名或 $LATEST 限定詞,而且您更新其中任一項所指向的版本,則 S3 批次作業會開始呼叫 Lambda 函數的新版本。當您想要隨著大型工作更新部分功能,此功能會很有用。如果您不希望 S3 批次作業變更所使用的版本,請在建立任務時在 FunctionARN 參數中提供特定版本。

使用 Lambda 和 Amazon S3 Batch Operations 搭配目錄儲存貯體

目錄儲存貯體是一種 Amazon S3 儲存貯體,這是專為需要一致的個位數毫秒延遲的工作負載或效能關鍵應用程式所設計的類型。如需詳細資訊,請參閱目錄儲存貯體

使用 Amazon S3 Batch Operations 調用對目錄儲存貯體執行動作的 Lambda 函數時,須遵循特殊需求。例如,您必須使用更新的 JSON 結構描述來建構 Lambda 請求,並在建立任務時指定 InvocationSchemaVersion 2.0。此更新的結構描述可讓您為 UserArguments 指定選用的索引鍵值配對,您可以用它來修改現有 Lambda 函數的特定參數。如需詳細資訊,請參閱使用 S3 Batch 操作和AWS 儲存部落格中的自動化 Amazon S3 目錄儲存貯體 AWS Lambda中的物件處理。

回應代碼和結果代碼

S3 Batch 操作會使用一或多個索引鍵叫用 Lambda 函數,每個索引鍵都有TaskID關聯。S3 Batch 操作需要 Lambda 函數提供每個索引鍵的結果程式碼。在要求中傳送的任何工作 ID,但未隨每個索引鍵結果碼傳回,都會從treatMissingKeysAs欄位中提供結果代碼。 treatMissingKeysAs是選擇性的要求欄位,且預設為TemporaryFailure。下表包含欄位的其他可能結果代碼和treatMissingKeysAs值。

回應代碼 描述
Succeeded 任務正常完成。如果您請求工作完成報告,即會在報告中包含任務的結果字串。
TemporaryFailure 任務遇到暫時性的失敗,並且將在工作完成之前重新推動。會忽略結果字串。如果這是最後一次的重新推動,即會在最終報告中包含錯誤訊息。
PermanentFailure 任務遇到永久的失敗。如果您請求工作完成報告,即會將任務標示為 Failed,並包含錯誤訊息字串。會忽略來自失敗任務的結果字串。

建立 Lambda 函數以搭配 S3 批次作業使用

本節提供您必須搭配 Lambda 函數使用的範例 AWS Identity and Access Management (IAM) 許可。還包含一個搭配 S3 批次作業使用的 Lambda 函數範例。如果您之前從未建立過 Lambda 函數,請參閱AWS Lambda 開發人員指南中的教學課程: AWS Lambda 搭配 Amazon S3 使用

您必須建立專門搭配 S3 批次作業使用的 Lambda 函數。您無法重複使用現有基於 Amazon S3 事件的 Lambda 函數。這是因為用於 S3 批次作業的 Lambda 函數必須接受並傳回特殊的資料欄位。

重要

AWS Lambda 用 Java 編寫的函數接受RequestHandlerRequestStreamHandler處理程序接口。但是,若要支援 S3 Batch 操作請求和回應格式, AWS Lambda 需要用於自訂序列化和請求和回應還原序列化的RequestStreamHandler介面。該接口允許 Lambda 將 InputStream 和 OutputStream 傳遞給 Java handleRequest 方法。

搭配 S3 批次作業使用 Lambda 函數時,請務必使用 RequestStreamHandler 界面。如果您使用 RequestHandler 界面,批次任務會失敗,完成報告中出現「Lambda 承載中傳回無效的 JSON」。

如需詳細資訊,請參閱《AWS Lambda 使用者指南》中的處理常式界面

IAM 權限範例

以下是搭配 S3 批次作業使用 Lambda 函數所需的 IAM 權限範例。

範例 — S3 批次作業信任政策

以下是可用於批次作業 IAM 角色的信任政策範例。您在建立任務時指定此 IAM 角色,以准許批次作業擔任 IAM 角色。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
範例 — Lambda IAM 政策

以下 IAM 政策範例准許 S3 批次作業叫用 Lambda 函數和讀取輸入資訊清單。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

範例請求和回應

本節提供 Lambda 函數的請求和回應範例。

範例 請求

以下是 Lambda 函數請求的 JSON 範例。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }
範例 回應

以下是 Lambda 函數回應的 JSON 範例。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

S3 批次作業的 Lambda 函數範例

以下範例 Python Lambda 從版本控制的物件中移除了刪除標記。

如範例所示,來自 S3 批次作業的金鑰以 URL 編碼。若要將 Amazon S3 與其他 AWS 服務搭配使用,請務必將從 S3 Batch 操作傳遞的金鑰進行 URL 解碼。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }

建立 S3 批次作業任務以叫用 Lambda 函數

建立 S3 批次作業任務來叫用 Lambda 函數時,您必須提供下列項目:

  • Lambda 函數的 ARN (可能包含函數別名或特定版本號碼)

  • 獲准叫用函數的 IAM 角色

  • 動作參數 LambdaInvokeFunction

如需有關建立 S3 批次作業任務的詳細資訊,請參閱建立 S3 批次操作任務S3 批次操作支援的操作

下列範例建立 S3 批次操作任務來使用 AWS CLI叫用 Lambda 函數。

aws s3control create-job --account-id <AccountID> --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::awsexamplebucket1","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole --region Region --description “Lambda Function"

在 Lambda 資訊清單中提供工作層級的資訊

當您將 AWS Lambda 函數與 S3 Batch 操作搭配使用時,您可能需要額外的資料隨附於所操作的每個工作/金鑰。例如,您可能希望同時提供來源物件金鑰與全新的物件金鑰。這樣 Lambda 函數就能以新名稱,將來源金鑰複製到新的 S3 儲存貯體。依預設,在任務的輸入資訊清單中,Amazon S3 批次作業只允許您指定目的地儲存貯體和來源金鑰清單。下列說明如何在資訊清單中包含其他資料,以便執行更複雜的 Lambda 函數。

若要在 S3 批次作業資訊清單中指定每個金鑰的參數,以用於 Lambda 函數程式碼中,請使用以下 URL 編碼的 JSON 格式。key 欄位視同 Amazon S3 物件金鑰傳遞給 Lambda 函數。但 Lambda 函數可以轉譯此欄位,以包含其他值或多個金鑰,如下所示。

注意

資訊清單中 key 欄位的字元數上限是 1,024。

範例 — 以 JSON 字串取代「Amazon S3 金鑰」的資訊清單

必須提供以 URL 編碼的版本給 S3 批次作業。

my-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} my-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} my-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}
範例 — 以 URL 編碼的資訊清單

必須提供這個以 URL 編碼的版本給 S3 批次作業。非 URL 編碼的版本不會運作。

my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D
範例 — 以資訊清單格式將結果寫入任務報告的 Lambda 函數

這個 URL 編碼的資訊清單範例包含以管道分隔的物件索引鍵,供下列 Lambda 函數進行剖析。

my-bucket,object1key%7Clower my-bucket,object2key%7Cupper my-bucket,object3key%7Creverse my-bucket,object4key%7Cdelete

此 Lambda 函數示範如何剖析已編碼成 S3 批次作業資訊清單的管道分隔任務。任務會指出要套用至指定物件的修訂版作業。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }

從 S3 批次操作教學課程學習

下列教學課 end-to-end 程介紹使用 Lambda 的某些 Batch 作業工作的完整程序。