AWS Lambda 関数の呼び出し
Amazon S3 バッチオペレーションを使用すると、Amazon S3 のオブジェクトに対して大規模なバッチオペレーションを実行できます。AWS Lambda の呼び出し関数バッチオペレーションは、AWS Lambda 関数を開始し、マニフェストにリストされているオブジェクトに対してカスタムアクションを実行します。このセクションでは、S3 バッチ操作で使用する Lambda 関数とその関数を呼び出すジョブを作成する方法について説明します。S3 バッチ操作ジョブでは、LambdaInvoke
オペレーションを使用して、マニフェストにリストされているすべてのオブジェクトに対して Lambda 関数を実行します。
Amazon S3 コンソール、AWS Command Line Interface (AWS CLI)、AWS SDK、または Amazon S3 REST API を使用して S3 バッチオペレーションを扱うことができます。Lambda の使用について詳しくは、AWS Lambda 開発者ガイドのAWS Lambda の使用を開始するを参照してください。
以下のセクションでは、S3 バッチ操作で Lambda を使用する方法について説明します。
トピック
バッチオペレーションで Lambda を使用する
S3 バッチ操作で AWS Lambda を使用するときは、S3 バッチ操作で使用するための Lambda 関数を新しく作成する必要があります。既存の Amazon S3 のイベントベースの関数を S3 バッチ操作で再利用することはできません。イベント関数はメッセージの受信のみ可能です。メッセージを返すことはできません。S3 バッチ操作で使用する Lambda 関数では、メッセージを受け取って返す必要があります。Amazon S3 のイベントで Lambda を使用する方法について詳しくは、AWS Lambda 開発者ガイドのAWS Lambda を Amazon S3 に使用するを参照してください。
Lambda 関数を呼び出す S3 バッチ操作のジョブを作成します。このジョブは、マニフェストにリストされているすべてのオブジェクトに対して同じ Lambda 関数を実行します。マニフェストのオブジェクトの処理時に使用する Lambda 関数のバージョンは指定できます。S3 バッチ操作では、非修飾 Amazon リソースネーム (ARN)、エイリアス、特定のバージョンがサポートされています。詳しくは、AWS Lambda 開発者ガイドのAWS Lambda 関数のバージョンを参照してください。
S3 バッチ操作のジョブにエイリアスまたは $LATEST
修飾子を使用する関数の ARN を指定し、それらのいずれかが指すバージョンを更新すると、S3 バッチ操作は新しいバージョンの Lambda 関数を呼び出します。これは、大規模なジョブで機能の一部を更新する場合に役立ちます。S3 バッチオペレーションで使用するバージョンを変更したくない場合は、ジョブの作成時に FunctionARN
パラメータで特定のバージョンを指定します。
ディレクトリバケットで Lambda とバッチオペレーションを使用する
ディレクトリバケットは Amazon S3 バケットタイプの1 つであり、一貫して 1 桁ミリ秒のレイテンシーに維持する必要があるワークロードまたはパフォーマンス重視のアプリケーション向けに設計されています。詳細については、「ディレクトリバケット」を参照してください。
バッチオペレーションを使用してディレクトリバケットで動作する Lambda 関数を呼び出すには、独自の要件があります。例えば、更新された JSON スキーマを使用して Lambda リクエストを構築し、ジョブを作成する際に InvocationSchemaVersion 2.0 (1.0 ではない) を指定する必要があります。この更新されたスキーマでは、UserArguments にオプションのキーバリューペアを指定できます。これを使用して、既存の Lambda 関数の特定のパラメータを変更できます。詳細については、「AWS ストレージブログ」の「Automate object processing in Amazon S3 directory buckets with S3 Batch Operations and AWS Lambda
レスポンスと結果のコード
S3 バッチオペレーションは、それぞれに TaskID
が関連付けられている 1 つ以上のキーを使用して Lambda 関数を呼び出します。S3 バッチオペレーションには、Lambda 関数からのキーごとの結果コードが必要です。キーごとの結果コードで返されないタスク ID は、リクエストコードで返され、treatMissingKeysAs
フィールドの結果コードが付与されます。treatMissingKeysAs
はオプションのリクエストフィールドで、デフォルトは TemporaryFailure
です。次の表に、 treatMissingKeysAs
フィールドの他の有効な結果コードと値を示します。
Response Code (レスポンスコード) | 説明 |
---|---|
Succeeded |
タスクは正常に完了しました。ジョブ完了レポートをリクエストした場合は、タスクの結果の文字列がレポートに含まれます。 |
TemporaryFailure |
タスクは一時的に失敗し、ジョブが完了する前に再始動されます。結果の文字列は無視されます。最終的な再処理である場合は、エラーメッセージが最終レポートに含まれます。 |
PermanentFailure |
タスクに固定障害が発生しました。ジョブ完了レポートをリクエストした場合、タスクは Failed としてマークされ、エラーメッセージの文字列が含まれます。失敗したタスクの結果の文字列は無視されます。 |
S3 バッチ操作で使用する Lambda 関数の作成
このセクションでは、Lambda 関数で使用する必要のある AWS Identity and Access Management (IAM) アクセス許可の例について説明します。また、S3 バッチ操作で使用する Lambda 関数の例も示します。Lambda 関数を作成したことがない場合は、AWS Lambda 開発者ガイドのチュートリアル: Amazon S3 トリガーを使用して AWS Lambda 関数を呼び出すを参照してください。
S3 バッチ操作で使用するための Lambda 関数を作成する必要があります。既存の Amazon S3 イベントベースの Lambda 関数を再利用することはできません。これは、S3 バッチオペレーションで使用する Lambda 関数では、特別なデータフィールドを受け取って返す必要があるためです。
重要
Java で記述された AWS Lambda 関数は、RequestHandlerRequestStreamHandler
インターフェイスを必要とします。このインターフェイスにより、Lambda は Java の handleRequest
メソッドに InputStream と OutputStream ストリームを渡すことができます。
S3 バッチ操作で Lambda 関数を使用する場合は、必ず RequestStreamHandler
インターフェイスを使用してください。RequestHandler
インターフェイスを使用すると、バッチジョブが失敗し、完了レポートに「Invalid JSON returned in Lambda payload」と表示されます。
詳しくは、AWS Lambda ユーザーガイドのハンドラーのインターフェイスを参照してください。
IAM アクセス許可の例
S3 バッチ操作で Lambda 関数を使用するために必要な IAM アクセス許可の例を以下に示します。
例 - S3 バッチ操作の信頼ポリシー
以下は、バッチ操作の IAM ロールに使用できる信頼ポリシーの例です。この IAM ロールは、ジョブを作成して、バッチ操作に IAM ロールを引き受けるアクセス許可を付与する場合に指定します。
{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "batchoperations.s3.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }
例 - Lambda の IAM ポリシー
以下は、S3 バッチ操作に Lambda 関数を呼び出して入力のマニフェストを読み取るアクセス許可を与える IAM ポリシーの例です。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "BatchOperationsLambdaPolicy", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion", "s3:PutObject", "lambda:InvokeFunction" ], "Resource": "*" } ] }
リクエストとレスポンスの例
このセクションでは、Lambda 関数のリクエストとレスポンスの例を示します。
例 リクエスト
以下は、Lambda 関数のリクエストの JSON の例です。
{ "invocationSchemaVersion": "1.0", "invocationId": "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "job": { "id": "f3cc4f60-61f6-4a2b-8a21-d07600c373ce" }, "tasks": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "s3Key": "customerImage1.jpg", "s3VersionId": "1", "s3BucketArn": "arn:aws:s3:us-east-1:0123456788:amzn-s3-demo-bucket1" } ] }
例 レスポンス
以下は、Lambda 関数のレスポンスの JSON の例です。
{ "invocationSchemaVersion": "1.0", "treatMissingKeysAs" : "PermanentFailure", "invocationId" : "YXNkbGZqYWRmaiBhc2RmdW9hZHNmZGpmaGFzbGtkaGZza2RmaAo", "results": [ { "taskId": "dGFza2lkZ29lc2hlcmUK", "resultCode": "Succeeded", "resultString": "[\"Mary Major", \"John Stiles\"]" } ] }
S3 バッチ操作の Lambda 関数の例
次の Python Lambda の例では、バージョン管理されたオブジェクトから削除マーカーを削除します。
例に示すように、S3 バッチ操作のキーは URL エンコードされます。Amazon S3 を他の AWS のサービスとともに使用するには、S3 バッチ操作から渡されたキーを URL デコードすることが重要です。
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.client("s3") def lambda_handler(event, context): """ Removes a delete marker from the specified versioned object. :param event: The S3 batch event that contains the ID of the delete marker to remove. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. When the result code is TemporaryFailure, S3 retries the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] try: obj_key = parse.unquote(task["s3Key"], encoding="utf-8") obj_version_id = task["s3VersionId"] bucket_name = task["s3BucketArn"].split(":")[-1] logger.info( "Got task: remove delete marker %s from object %s.", obj_version_id, obj_key ) try: # If this call does not raise an error, the object version is not a delete # marker and should not be deleted. response = s3.head_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "PermanentFailure" result_string = ( f"Object {obj_key}, ID {obj_version_id} is not " f"a delete marker." ) logger.debug(response) logger.warning(result_string) except ClientError as error: delete_marker = error.response["ResponseMetadata"]["HTTPHeaders"].get( "x-amz-delete-marker", "false" ) if delete_marker == "true": logger.info( "Object %s, version %s is a delete marker.", obj_key, obj_version_id ) try: s3.delete_object( Bucket=bucket_name, Key=obj_key, VersionId=obj_version_id ) result_code = "Succeeded" result_string = ( f"Successfully removed delete marker " f"{obj_version_id} from object {obj_key}." ) logger.info(result_string) except ClientError as error: # Mark request timeout as a temporary failure so it will be retried. if error.response["Error"]["Code"] == "RequestTimeout": result_code = "TemporaryFailure" result_string = ( f"Attempt to remove delete marker from " f"object {obj_key} timed out." ) logger.info(result_string) else: raise else: raise ValueError( f"The x-amz-delete-marker header is either not " f"present or is not 'true'." ) except Exception as error: # Mark all other exceptions as permanent failures. result_code = "PermanentFailure" result_string = str(error) logger.exception(error) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
Lambda 関数を呼び出す S3 バッチ操作のジョブの作成
Lambda 関数を呼び出す S3 バッチ操作のジョブを作成する場合は、以下を指定する必要があります。
-
Lambda 関数の ARN (関数のエイリアスまたは特定のバージョン番号を含む)
-
関数を呼び出すアクセス許可を持つ IAM ロール
-
アクションパラメータ
LambdaInvokeFunction
S3 バッチ操作のジョブの作成の詳細については、「S3 バッチオペレーションジョブの作成」および「S3 バッチ操作でサポートされるオペレーション」を参照してください。
次の例では、AWS CLI を使用して Lambda 関数を呼び出す S3 バッチオペレーションのジョブを作成します。この例を実行するには、
をユーザー自身の情報に置き換えます。user input
placeholders
aws s3control create-job --account-id
account-id
--operation '{"LambdaInvoke": { "FunctionArn": "arn:aws:lambda:region
:account-id
:function:LambdaFunctionName
" } }' --manifest '{"Spec":{"Format":"S3BatchOperations_CSV_20180820","Fields":["Bucket","Key"]},"Location":{"ObjectArn":"arn:aws:s3:::amzn-s3-demo-manifest-bucket
","ETag":"ManifestETag
"}}' --report '{"Bucket":"arn:aws:s3:::amzn-s3-demo-bucket
","Format":"Report_CSV_20180820","Enabled":true,"Prefix":"ReportPrefix
","ReportScope":"AllTasks"}' --priority2
--role-arn arn:aws:iam::account-id
:role/BatchOperationsRole
--regionregion
--description "Lambda Function
"
Lambda のマニフェストでのタスクレベルの情報の指定
S3 バッチオペレーションで AWS Lambda 関数を使用する際、操作対象の各タスク/キーに付随する追加データが必要になる場合があります。例えば、ソースオブジェクトキーと新しいオブジェクトキーの両方を提供したい場合があります。Lambda 関数は、元のキーを新しい名前で新しい S3 バケットにコピーできます。デフォルトでは、バッチオペレーションでは、ジョブへの入力マニフェストで移行先バケットとソースキーのリストのみを指定できます。以下の例では、マニフェストにデータを追加して、より複雑な Lambda 関数を実行する方法について説明します。
S3 バッチ操作のマニフェストでキーごとのパラメータを指定して Lambda 関数のコードで使用するには、以下に示すように URL エンコードされた形式の JSON を使用します。key
フィールドは、Amazon S3 のオブジェクトのキーのように Lambda 関数に渡されます。ただし、以下の例に示すように、Lambda 関数で他の値や複数のキーを含めるように解釈することができます。
注記
マニフェストの key
フィールドの最大文字数は 1,024 文字です。
例 - 「Amazon S3 のキー」を JSON の文字列に置き換える前のマニフェスト
URL エンコードされたバージョンを、S3 バッチ操作に渡す必要があります。
amzn-s3-demo-bucket
,{"origKey": "object1key
", "newKey": "newObject1Key
"}amzn-s3-demo-bucket
,{"origKey": "object2key
", "newKey": "newObject2Key
"}amzn-s3-demo-bucket
,{"origKey": "object3key
", "newKey": "newObject3Key
"}
例 - URL エンコードされたマニフェスト
この URL エンコードされたバージョンを、S3 バッチ操作に渡す必要があります。URL エンコードされていないバージョンは機能しません。
amzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object1key
%22%2C%20%22newKey%22%3A%20%22newObject1Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object2key
%22%2C%20%22newKey%22%3A%20%22newObject2Key
%22%7Damzn-s3-demo-bucket
,%7B%22origKey%22%3A%20%22object3key
%22%2C%20%22newKey%22%3A%20%22newObject3Key
%22%7D
例 - Lambda 関数とジョブのレポートに結果が書き込まれるマニフェストの形式
この URL エンコードされたマニフェストの例には、解析する次の Lambda 関数のパイプ区切りオブジェクトキーが含まれています。
amzn-s3-demo-bucket
,object1key
%7Cloweramzn-s3-demo-bucket
,object2key
%7Cupperamzn-s3-demo-bucket
,object3key
%7Creverseamzn-s3-demo-bucket
,object4key
%7Cdelete
この Lambda 関数は、パイプ区切りのタスクをエンコードされた S3 バッチオペレーションのマニフェストに解析する方法を示しています。タスクは、指定されたオブジェクトに適用されているリビジョンオペレーションを示します。
import logging from urllib import parse import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) logger.setLevel("INFO") s3 = boto3.resource("s3") def lambda_handler(event, context): """ Applies the specified revision to the specified object. :param event: The Amazon S3 batch event that contains the ID of the object to revise and the revision type to apply. :param context: Context about the event. :return: A result structure that Amazon S3 uses to interpret the result of the operation. """ # Parse job parameters from Amazon S3 batch operations invocation_id = event["invocationId"] invocation_schema_version = event["invocationSchemaVersion"] results = [] result_code = None result_string = None task = event["tasks"][0] task_id = task["taskId"] # The revision type is packed with the object key as a pipe-delimited string. obj_key, revision = parse.unquote(task["s3Key"], encoding="utf-8").split("|") bucket_name = task["s3BucketArn"].split(":")[-1] logger.info("Got task: apply revision %s to %s.", revision, obj_key) try: stanza_obj = s3.Bucket(bucket_name).Object(obj_key) stanza = stanza_obj.get()["Body"].read().decode("utf-8") if revision == "lower": stanza = stanza.lower() elif revision == "upper": stanza = stanza.upper() elif revision == "reverse": stanza = stanza[::-1] elif revision == "delete": pass else: raise TypeError(f"Can't handle revision type '{revision}'.") if revision == "delete": stanza_obj.delete() result_string = f"Deleted stanza {stanza_obj.key}." else: stanza_obj.put(Body=bytes(stanza, "utf-8")) result_string = ( f"Applied revision type '{revision}' to " f"stanza {stanza_obj.key}." ) logger.info(result_string) result_code = "Succeeded" except ClientError as error: if error.response["Error"]["Code"] == "NoSuchKey": result_code = "Succeeded" result_string = ( f"Stanza {obj_key} not found, assuming it was deleted " f"in an earlier revision." ) logger.info(result_string) else: result_code = "PermanentFailure" result_string = ( f"Got exception when applying revision type '{revision}' " f"to {obj_key}: {error}." ) logger.exception(result_string) finally: results.append( { "taskId": task_id, "resultCode": result_code, "resultString": result_string, } ) return { "invocationSchemaVersion": invocation_schema_version, "treatMissingKeysAs": "PermanentFailure", "invocationId": invocation_id, "results": results, }
S3 バッチ操作のチュートリアル
次のチュートリアルでは、Lambda を使用したいくつかのバッチ操作タスクにおけるエンドツーエンドの一連の手順について説明します。このチュートリアルでは、S3 ソースバケットに保存された動画のバッチトランスコーディング用に Lambda 関数を呼び出すようにバッチオペレーションを設定する方法を学習します。Lambda 関数は AWS Elemental MediaConvert を呼び出して、動画をトランスコードします。