AWS Lambda 関数の呼び出し - Amazon Simple Storage Service

AWS Lambda 関数の呼び出し

AWS Lambda の呼び出し関数は、AWS Lambda 関数を開始し、マニフェストにリストされているオブジェクトに対してカスタムアクションを実行します。このセクションでは、S3 バッチ操作で使用する Lambda 関数とその関数を呼び出すジョブを作成する方法について説明します。S3 バッチ操作ジョブでは、LambdaInvoke オペレーションを使用して、マニフェストにリストされているすべてのオブジェクトに対して Lambda 関数を実行します。

Lambda 用の S3 バッチ操作は、AWS Management Console、AWS Command Line Interface (AWS CLI)、AWS SDK、または REST API で操作できます。Lambda の使用について詳しくは、AWS Lambda 開発者ガイドAWS Lambda の使用を開始するを参照してください。

以下のセクションでは、S3 バッチ操作で Lambda を使用する方法について説明します。

Amazon S3 バッチ操作での Lambda の使用

S3 バッチ操作で AWS Lambda を使用するときは、S3 バッチ操作で使用するための Lambda 関数を新しく作成する必要があります。既存の Amazon S3 のイベントベースの関数を S3 バッチ操作で再利用することはできません。イベント関数はメッセージの受信のみ可能です。メッセージを返すことはできません。S3 バッチ操作で使用する Lambda 関数では、メッセージを受け取って返す必要があります。Amazon S3 のイベントで Lambda を使用する方法について詳しくは、AWS Lambda 開発者ガイドAWS Lambda を Amazon S3 に使用するを参照してください。

Lambda 関数を呼び出す S3 バッチ操作のジョブを作成します。このジョブは、マニフェストにリストされているすべてのオブジェクトに対して同じ Lambda 関数を実行します。マニフェストのオブジェクトの処理時に使用する Lambda 関数のバージョンは指定できます。S3 バッチ操作では、非修飾 Amazon リソースネーム (ARN)、エイリアス、特定のバージョンがサポートされています。詳しくは、AWS Lambda 開発者ガイドAWS Lambda 関数のバージョンを参照してください。

S3 バッチ操作のジョブにエイリアスまたは $LATEST 修飾子を使用する関数の ARN を指定し、それらのいずれかが指すバージョンを更新すると、S3 バッチ操作は新しいバージョンの Lambda 関数を呼び出します。これは、大規模なジョブで機能の一部を更新する場合に役立ちます。S3 バッチ操作で使用するバージョンを変更したくない場合は、ジョブの作成時に FunctionARN パラメータで特定のバージョンを指定します。

レスポンスと結果のコード

S3 バッチ操作は、Lambda 関数から 2 つのレベルのコードを受け取ります。1 つ目はリクエスト全体のレスポンスコード、2 つ目はタスクごとの結果コードです。以下のテーブルには、レスポンスコードを含みます。

Response Code (レスポンスコード) 説明
Succeeded タスクは正常に完了しました。ジョブ完了レポートをリクエストした場合は、タスクの結果の文字列がレポートに含まれます。
TemporaryFailure タスクは一時的に失敗し、ジョブが完了する前に再始動されます。結果の文字列は無視されます。最終的な再処理である場合は、エラーメッセージが最終レポートに含まれます。
PermanentFailure タスクに固定障害が発生しました。ジョブ完了レポートをリクエストした場合、タスクは Failed としてマークされ、エラーメッセージの文字列が含まれます。失敗したタスクの結果の文字列は無視されます。

S3 バッチ操作で使用する Lambda 関数の作成

このセクションでは、Lambda 関数で使用する必要のある AWS Identity and Access Management (IAM) アクセス許可の例について説明します。また、S3 バッチ操作で使用する Lambda 関数の例も示します。Lambda 関数を作成したことがない場合は、AWS Lambda 開発者ガイドチュートリアル: Amazon S3 トリガーを使用して AWS Lambda 関数を呼び出すを参照してください。

S3 バッチ操作で使用するための Lambda 関数を作成する必要があります。既存の Amazon S3 のイベントベースの関数を再利用することはできません。これは、S3 バッチ操作で使用する Lambda 関数では、特別なデータフィールドを受け取って返す必要があるためです。

重要

Java で作成した AWS Lambda 関数は、ハンドラーインターフェイスとして RequestHandler または RequestStreamHandler のいずれかを受け付けます。ただし、S3 バッチ操作のリクエストとレスポンスの形式をサポートするため、リクエストとレスポンスのカスタムのシリアル化と逆シリアル化のために AWS Lambda は RequestStreamHandler インターフェイスを必要とします。このインターフェイスにより、Lambda は Java の handleRequest メソッドに InputStream と OutputStream ストリームを渡すことができます。

S3 バッチ操作で Lambda 関数を使用する場合は、必ず RequestStreamHandler インターフェイスを使用してください。RequestHandler インターフェイスを使用すると、バッチジョブが失敗し、完了レポートに「Invalid JSON returned in Lambda payload」と表示されます。

詳しくは、AWS Lambda ユーザーガイドハンドラーのインターフェイスを参照してください。

IAM アクセス許可の例

S3 バッチ操作で Lambda 関数を使用するために必要な IAM アクセス許可の例を以下に示します。

例 - S3 バッチ操作の信頼ポリシー

以下は、バッチ操作の IAM ロールに使用できる信頼ポリシーの例です。この IAM ロールは、ジョブを作成して、バッチ操作に IAM ロールを引き受けるアクセス許可を付与する場合に指定します。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

例 - Lambda の IAM ポリシー

以下は、S3 バッチ操作に Lambda 関数を呼び出して入力のマニフェストを読み取るアクセス許可を与える IAM ポリシーの例です。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }

リクエストとレスポンスの例

このセクションでは、Lambda 関数のリクエストとレスポンスの例を示します。

例 Request

以下は、Lambda 関数のリクエストの JSON の例です。

{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:awsexamplebucket1" } ] }

例 Response

以下は、Lambda 関数のレスポンスの JSON の例です。

{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }

S3 バッチ操作の Lambda 関数の例

次の Python Lambda の例では、バージョン管理されたオブジェクトから削除マーカーを削除します。

例に示すように、S3 バッチ操作のキーは URL エンコードされます。Amazon S3 を他の AWS のサービスとともに使用するには、S3 バッチ操作から渡されたキーを URL デコードすることが重要です。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel('INFO') s3 = boto3.client('s3') def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event['invocationId'] invocation_schema_version = event['invocationSchemaVersion'] results = [] result_code = None result_string = None task = event['tasks'][0] task_id = task['taskId'] try: obj_key = parse.unquote(task['s3Key'], encoding='utf-8') obj_version_id = task['s3VersionId'] bucket_name = task['s3BucketArn'].split(':')[-1] logger.info("Got task: remove delete marker %s from object %s.", obj_version_id, obj_key) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id) result_code = 'PermanentFailure' result_string = f"Object {obj_key}, ID {obj_version_id} is not " \ f"a delete marker." logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response['ResponseMetadata']['HTTPHeaders'] \ .get('x-amz-delete-marker', 'false') if delete_marker == 'true': logger.info("Object %s, version %s is a delete marker.", obj_key, obj_version_id) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id) result_code = 'Succeeded' result_string = f"Successfully removed delete marker " \ f"{obj_version_id} from object {obj_key}." logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response['Error']['Code'] == 'RequestTimeout': result_code = 'TemporaryFailure' result_string = f"Attempt to remove delete marker from " \ f"object {obj_key} timed out." logger.info(result_string) else: raise else: raise ValueError(f"The x-amz-delete-marker header is either not " f"present or is not 'true'.") except Exception as error: # Mark all other exceptions as permanent failures. result_code = 'PermanentFailure' result_string = str(error) logger.exception(error) finally: results.append({ 'taskId': task_id, 'resultCode': result_code, 'resultString': result_string }) return { 'invocationSchemaVersion': invocation_schema_version, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocation_id, 'results': results }

Lambda 関数を呼び出す S3 バッチ操作のジョブの作成

Lambda 関数を呼び出す S3 バッチ操作のジョブを作成する場合は、以下を指定する必要があります。

  • Lambda 関数の ARN (関数のエイリアスまたは特定のバージョン番号を含む)

  • 関数を呼び出すアクセス許可を持つ IAM ロール

  • アクションパラメータ LambdaInvokeFunction

S3 バッチ操作のジョブの作成の詳細については、「S3 バッチ操作ジョブの作成」および「S3 バッチ操作でサポートされるオペレーション」を参照してください。

次の例では、AWS CLI を使用して Lambda 関数を呼び出す S3 バッチ操作のジョブを作成します。

aws s3control create-job --account-id <AccountID> --operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:Region:AccountID:function:LambdaFunctionName" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::ManifestLocation","ETag":"ManifestETag"}}' --report '{"Bucket":"arn:aws:s3:::awsexamplebucket1","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix","ReportScope":"AllTasks"}' --priority 2 --role-arn arn:aws:iam::AccountID:role/BatchOperationsRole --region Region --description “Lambda Function"

Lambda のマニフェストでのタスクレベルの情報の指定

S3 バッチ操作で AWS Lambda 関数を使用するときに、操作対象の各タスク/キーに付随する追加データが必要になる場合があります。たとえば、ソースオブジェクトキーと新しいオブジェクトキーの両方を提供したい場合があります。Lambda 関数は、元のキーを新しい名前で新しい S3 バケットにコピーできます。デフォルトでは、Amazon S3 バッチ操作では、ジョブの入力のマニフェストでコピー先のバケットと元のキーのリストのみを指定できます。以下では、マニフェストにデータを追加して、より複雑な Lambda 関数を実行する方法について説明します。

S3 バッチ操作のマニフェストでキーごとのパラメータを指定して Lambda 関数のコードで使用するには、以下に示すように URL エンコードされた形式の JSON を使用します。key フィールドは、Amazon S3 のオブジェクトのキーのように Lambda 関数に渡されます。ただし、以下に示すように、Lambda 関数で他の値や複数のキーを含めるように解釈することができます。

注記

マニフェストの key フィールドの最大文字数は 1,024 文字です。

例 - 「Amazon S3 のキー」を JSON の文字列に置き換える前のマニフェスト

URL エンコードされたバージョンを、S3 バッチ操作に渡す必要があります。

my-bucket,{"origKey": "object1key", "newKey": "newObject1Key"} my-bucket,{"origKey": "object2key", "newKey": "newObject2Key"} my-bucket,{"origKey": "object3key", "newKey": "newObject3Key"}

例 - URL エンコードされたマニフェスト

この URL エンコードされたバージョンを、S3 バッチ操作に渡す必要があります。URL エンコードされていないバージョンは機能しません。

my-bucket,%7B%22origKey%22%3A%20%22object1key%22%2C%20%22newKey%22%3A%20%22newObject1Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object2key%22%2C%20%22newKey%22%3A%20%22newObject2Key%22%7D my-bucket,%7B%22origKey%22%3A%20%22object3key%22%2C%20%22newKey%22%3A%20%22newObject3Key%22%7D

例 - Lambda 関数とジョブのレポートに結果が書き込まれるマニフェストの形式

この Lambda 関数は、パイプ区切りのタスクをエンコードされた Amazon S3 バッチ操作のマニフェストに解析する方法を示しています。タスクは、指定されたオブジェクトに適用されているリビジョンオペレーションを示します。

import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel('INFO') s3 = boto3.resource('s3') def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event['invocationId'] invocation_schema_version = event['invocationSchemaVersion'] results = [] result_code = None result_string = None task = event['tasks'][0] task_id = task['taskId'] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = \ parse.unquote(task['s3Key'], encoding='utf-8').split('|') bucket_name = task['s3BucketArn'].split(':')[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()['Body'].read().decode('utf-8') if revision == 'lower': stanza = stanza.lower() elif revision == 'upper': stanza = stanza.upper() elif revision == 'reverse': stanza = stanza[::-1] elif revision == 'delete': pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == 'delete': stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, 'utf-8')) result_string = f"Applied revision type '{revision}' to " \ f"stanza {stanza_obj.key}." logger.info(result_string) result_code = 'Succeeded' except ClientError as error: if error.response['Error']['Code'] == 'NoSuchKey': result_code = 'Succeeded' result_string = f"Stanza {obj_key} not found, assuming it was deleted " \ f"in an earlier revision." logger.info(result_string) else: result_code = 'PermanentFailure' result_string = f"Got exception when applying revision type '{revision}' " \ f"to {obj_key}: {error}." logger.exception(result_string) finally: results.append({ 'taskId': task_id, 'resultCode': result_code, 'resultString': result_string }) return { 'invocationSchemaVersion': invocation_schema_version, 'treatMissingKeysAs': 'PermanentFailure', 'invocationId': invocation_id, 'results': results }