用于 SDK Python 的 Lambda 示例 (Boto3)

以下代码示例向您展示了如何使用 with Lambda 来执行操作和实现常见场景。 AWS SDK for Python (Boto3)


场景是向您展示如何通过在一个服务中调用多个函数或与其他 AWS 服务结合来完成特定任务的代码示例。



以下代码示例展示了如何开始使用 Lambda。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

import boto3 def main(): """ List the Lambda functions in your AWS account. """ # Create the Lambda client lambda_client = boto3.client("lambda") # Use the paginator to list the functions paginator = lambda_client.get_paginator("list_functions") response_iterator = paginator.paginate() print("Here are the Lambda functions in your account:") for page in response_iterator: for function in page["Functions"]: print(f" {function['FunctionName']}") if __name__ == "__main__": main()
  • 有关API详细信息,请参阅ListFunctions中的 AWS SDKPython (Boto3) API 参考。



SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def create_function( self, function_name, handler_name, iam_role, deployment_package ): """ Deploys a Lambda function. :param function_name: The name of the Lambda function. :param handler_name: The fully qualified name of the handler function. This must include the file name and the function name. :param iam_role: The IAM role to use for the function. :param deployment_package: The deployment package that contains the function code in .zip format. :return: The Amazon Resource Name (ARN) of the newly created function. """ try: response = self.lambda_client.create_function( FunctionName=function_name, Description="AWS Lambda doc example", Runtime="python3.8", Role=iam_role.arn, Handler=handler_name, Code={"ZipFile": deployment_package}, Publish=True, ) function_arn = response["FunctionArn"] waiter = self.lambda_client.get_waiter("function_active_v2") waiter.wait(FunctionName=function_name) logger.info( "Created function '%s' with ARN: '%s'.", function_name, response["FunctionArn"], ) except ClientError: logger.error("Couldn't create function %s.", function_name) raise else: return function_arn
  • 有关API详细信息,请参阅CreateFunction中的 AWS SDKPython (Boto3) API 参考。


SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def delete_function(self, function_name): """ Deletes a Lambda function. :param function_name: The name of the function to delete. """ try: self.lambda_client.delete_function(FunctionName=function_name) except ClientError: logger.exception("Couldn't delete function %s.", function_name) raise
  • 有关API详细信息,请参阅DeleteFunction中的 AWS SDKPython (Boto3) API 参考。


SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def get_function(self, function_name): """ Gets data about a Lambda function. :param function_name: The name of the function. :return: The function data. """ response = None try: response = self.lambda_client.get_function(FunctionName=function_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Function %s does not exist.", function_name) else: logger.error( "Couldn't get function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return response
  • 有关API详细信息,请参阅GetFunction中的 AWS SDKPython (Boto3) API 参考。


SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def invoke_function(self, function_name, function_params, get_log=False): """ Invokes a Lambda function. :param function_name: The name of the function to invoke. :param function_params: The parameters of the function as a dict. This dict is serialized to JSON before it is sent to Lambda. :param get_log: When true, the last 4 KB of the execution log are included in the response. :return: The response from the function invocation. """ try: response = self.lambda_client.invoke( FunctionName=function_name, Payload=json.dumps(function_params), LogType="Tail" if get_log else "None", ) logger.info("Invoked function %s.", function_name) except ClientError: logger.exception("Couldn't invoke function %s.", function_name) raise return response
  • 有关API详细信息,请参阅调用以获取 Python (Boto3) API 参考。AWS SDK


SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def list_functions(self): """ Lists the Lambda functions for the current account. """ try: func_paginator = self.lambda_client.get_paginator("list_functions") for func_page in func_paginator.paginate(): for func in func_page["Functions"]: print(func["FunctionName"]) desc = func.get("Description") if desc: print(f"\t{desc}") print(f"\t{func['Runtime']}: {func['Handler']}") except ClientError as err: logger.error( "Couldn't list functions. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise
  • 有关API详细信息,请参阅ListFunctions中的 AWS SDKPython (Boto3) API 参考。


SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def update_function_code(self, function_name, deployment_package): """ Updates the code for a Lambda function by submitting a .zip archive that contains the code for the function. :param function_name: The name of the function to update. :param deployment_package: The function code to update, packaged as bytes in .zip format. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_code( FunctionName=function_name, ZipFile=deployment_package ) except ClientError as err: logger.error( "Couldn't update function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response
  • 有关API详细信息,请参阅UpdateFunctionCode中的 AWS SDKPython (Boto3) API 参考。


SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource def update_function_configuration(self, function_name, env_vars): """ Updates the environment variables for a Lambda function. :param function_name: The name of the function to update. :param env_vars: A dict of environment variables to update. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_configuration( FunctionName=function_name, Environment={"Variables": env_vars} ) except ClientError as err: logger.error( "Couldn't update function configuration %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response


以下代码示例演示如何使用虚构数据创建一个模拟系统 RESTAPI,以跟踪美国每天有 COVID 19 例的案例。

SDK适用于 Python (Boto3)

演示如何使用 AWS Chalice 和创建RESTAPI使用 Amazon Gate API way 和 Amazon DynamoDB AWS Lambda的无服务器。 AWS SDK for Python (Boto3) RESTAPI它模拟了一个使用虚构数据追踪美国每天 COVID 19例病例的系统。了解如何:

  • 使用 AWS Chalice 在 Lambda 函数中定义路由,这些函数用于处理通过网关发REST出的请求。API

  • 使用 Lambda 函数在 DynamoDB 表中检索和存储数据以处理请求。REST

  • 在 AWS CloudFormation 模板中定义表结构和安全角色资源。

  • 使用 AWS Chalice and CloudFormation 来打包和部署所有必要的资源。

  • CloudFormation 用于清理所有已创建的资源。


  • API网关

  • AWS CloudFormation

  • DynamoDB

  • Lambda

以下代码示例演示如何创建借阅图书馆,顾客可以使用由 Amazon Aurora REST API 支持的数据库借阅和还书。

SDK适用于 Python (Boto3)

演示如何将 AWS SDK for Python (Boto3) 与亚马逊关系数据库服务(亚马逊RDS)API和 AWS Chalice 一起使用来创建由亚马逊 Aurora REST API 支持的数据库。此 Web 服务是完全无服务器的,代表简单的借阅图书馆,其中顾客可以借阅和归还图书。了解如何:

  • 创建和管理无服务器 Aurora 数据库集群。

  • AWS Secrets Manager 用于管理数据库凭证。

  • 实施一个数据存储层,使用 Amazon RDS 将数据移入和移出数据库。

  • 使用 AWS Chalice 将无服务器部署到RESTAPI亚马逊API网关和. AWS Lambda

  • 使用请求软件包向 Web 服务发送请求。


  • API网关

  • Aurora

  • Lambda

  • Secrets Manager

以下代码示例说明如何创建用于从数据库表中检索消息记录的 AWS Step Functions Messenger 应用程序。

SDK适用于 Python (Boto3)

演示如何使用 with 创建信使应用程序,该 AWS SDK for Python (Boto3) 应用程序从 Amazon DynamoDB 表中检索消息记录并通过亚马逊简单队列服务 (Amazon) 将其发送。 AWS Step Functions SQS状态机集成了扫描数据库中是否有未发送消息的 AWS Lambda 功能。

  • 创建检索并更新 Amazon DynamoDB 表中的消息记录的状态机。

  • 更新状态机定义以同时向亚马逊简单队列服务 (AmazonSQS) 发送消息。

  • 启动和停止状态机运行。

  • 使用服务集成,从状态机连接到 Lambda、DynamoDB 和SQS亚马逊。


  • DynamoDB

  • Lambda

  • Amazon SQS

  • Step Functions

以下代码示例演示如何创建由API基于 Amazon API Gateway 的 websocket 提供的聊天应用程序。

SDK适用于 Python (Boto3)

演示如何使用 with Amazon API Gateway V2 来创建 AWS SDK for Python (Boto3) 与亚马逊 DynamoDB 集成API的 websocket。 AWS Lambda

  • 创建一个由 API Gateway API 提供服务的网络套接字。

  • 定义在 DynamoDB 中存储连接并向其他聊天参与者发布消息的 Lambda 处理程序。

  • 连接到 Websocket 聊天应用程序并使用 WebSocket 软件包发送消息。


  • API网关

  • DynamoDB

  • Lambda


  • 创建IAM角色和 Lambda 函数,然后上传处理程序代码。

  • 使用单个参数来调用函数并获取结果。

  • 更新函数代码并使用环境变量进行配置。

  • 使用新参数来调用函数并获取结果。显示返回的执行日志。

  • 列出账户函数,然后清除函数。

有关更多信息,请参阅使用控制台创建 Lambda 函数

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。查找完整示例,学习如何在 AWS 代码示例存储库中进行设置和运行。

定义一个递增数字的 Lambda 处理程序。

import logging logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): """ Accepts an action and a single number, performs the specified action on the number, and returns the result. The only allowable action is 'increment'. :param event: The event dict that contains the parameters sent when the function is invoked. :param context: The context in which the function is called. :return: The result of the action. """ result = None action = event.get("action") if action == "increment": result = event.get("number", 0) + 1 logger.info("Calculated result of %s", result) else: logger.error("%s is not a valid action.", action) response = {"result": result} return response

定义执行算术运算的第二个 Lambda 处理程序。

import logging import os logger = logging.getLogger() # Define a list of Python lambda functions that are called by this AWS Lambda function. ACTIONS = { "plus": lambda x, y: x + y, "minus": lambda x, y: x - y, "times": lambda x, y: x * y, "divided-by": lambda x, y: x / y, } def lambda_handler(event, context): """ Accepts an action and two numbers, performs the specified action on the numbers, and returns the result. :param event: The event dict that contains the parameters sent when the function is invoked. :param context: The context in which the function is called. :return: The result of the specified action. """ # Set the log level based on a variable configured in the Lambda environment. logger.setLevel(os.environ.get("LOG_LEVEL", logging.INFO)) logger.debug("Event: %s", event) action = event.get("action") func = ACTIONS.get(action) x = event.get("x") y = event.get("y") result = None try: if func is not None and x is not None and y is not None: result = func(x, y) logger.info("%s %s %s is %s", x, action, y, result) else: logger.error("I can't calculate %s %s %s.", x, action, y) except ZeroDivisionError: logger.warning("I can't divide %s by 0!", x) response = {"result": result} return response

创建包装 Lambda 操作的函数。

class LambdaWrapper: def __init__(self, lambda_client, iam_resource): self.lambda_client = lambda_client self.iam_resource = iam_resource @staticmethod def create_deployment_package(source_file, destination_file): """ Creates a Lambda deployment package in .zip format in an in-memory buffer. This buffer can be passed directly to Lambda when creating the function. :param source_file: The name of the file that contains the Lambda handler function. :param destination_file: The name to give the file when it's deployed to Lambda. :return: The deployment package. """ buffer = io.BytesIO() with zipfile.ZipFile(buffer, "w") as zipped: zipped.write(source_file, destination_file) buffer.seek(0) return buffer.read() def get_iam_role(self, iam_role_name): """ Get an AWS Identity and Access Management (IAM) role. :param iam_role_name: The name of the role to retrieve. :return: The IAM role. """ role = None try: temp_role = self.iam_resource.Role(iam_role_name) temp_role.load() role = temp_role logger.info("Got IAM role %s", role.name) except ClientError as err: if err.response["Error"]["Code"] == "NoSuchEntity": logger.info("IAM role %s does not exist.", iam_role_name) else: logger.error( "Couldn't get IAM role %s. Here's why: %s: %s", iam_role_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return role def create_iam_role_for_lambda(self, iam_role_name): """ Creates an IAM role that grants the Lambda function basic permissions. If a role with the specified name already exists, it is used for the demo. :param iam_role_name: The name of the role to create. :return: The role and a value that indicates whether the role is newly created. """ role = self.get_iam_role(iam_role_name) if role is not None: return role, False lambda_assume_role_policy = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": {"Service": "lambda.amazonaws.com"}, "Action": "sts:AssumeRole", } ], } policy_arn = "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" try: role = self.iam_resource.create_role( RoleName=iam_role_name, AssumeRolePolicyDocument=json.dumps(lambda_assume_role_policy), ) logger.info("Created role %s.", role.name) role.attach_policy(PolicyArn=policy_arn) logger.info("Attached basic execution policy to role %s.", role.name) except ClientError as error: if error.response["Error"]["Code"] == "EntityAlreadyExists": role = self.iam_resource.Role(iam_role_name) logger.warning("The role %s already exists. Using it.", iam_role_name) else: logger.exception( "Couldn't create role %s or attach policy %s.", iam_role_name, policy_arn, ) raise return role, True def get_function(self, function_name): """ Gets data about a Lambda function. :param function_name: The name of the function. :return: The function data. """ response = None try: response = self.lambda_client.get_function(FunctionName=function_name) except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": logger.info("Function %s does not exist.", function_name) else: logger.error( "Couldn't get function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise return response def create_function( self, function_name, handler_name, iam_role, deployment_package ): """ Deploys a Lambda function. :param function_name: The name of the Lambda function. :param handler_name: The fully qualified name of the handler function. This must include the file name and the function name. :param iam_role: The IAM role to use for the function. :param deployment_package: The deployment package that contains the function code in .zip format. :return: The Amazon Resource Name (ARN) of the newly created function. """ try: response = self.lambda_client.create_function( FunctionName=function_name, Description="AWS Lambda doc example", Runtime="python3.8", Role=iam_role.arn, Handler=handler_name, Code={"ZipFile": deployment_package}, Publish=True, ) function_arn = response["FunctionArn"] waiter = self.lambda_client.get_waiter("function_active_v2") waiter.wait(FunctionName=function_name) logger.info( "Created function '%s' with ARN: '%s'.", function_name, response["FunctionArn"], ) except ClientError: logger.error("Couldn't create function %s.", function_name) raise else: return function_arn def delete_function(self, function_name): """ Deletes a Lambda function. :param function_name: The name of the function to delete. """ try: self.lambda_client.delete_function(FunctionName=function_name) except ClientError: logger.exception("Couldn't delete function %s.", function_name) raise def invoke_function(self, function_name, function_params, get_log=False): """ Invokes a Lambda function. :param function_name: The name of the function to invoke. :param function_params: The parameters of the function as a dict. This dict is serialized to JSON before it is sent to Lambda. :param get_log: When true, the last 4 KB of the execution log are included in the response. :return: The response from the function invocation. """ try: response = self.lambda_client.invoke( FunctionName=function_name, Payload=json.dumps(function_params), LogType="Tail" if get_log else "None", ) logger.info("Invoked function %s.", function_name) except ClientError: logger.exception("Couldn't invoke function %s.", function_name) raise return response def update_function_code(self, function_name, deployment_package): """ Updates the code for a Lambda function by submitting a .zip archive that contains the code for the function. :param function_name: The name of the function to update. :param deployment_package: The function code to update, packaged as bytes in .zip format. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_code( FunctionName=function_name, ZipFile=deployment_package ) except ClientError as err: logger.error( "Couldn't update function %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def update_function_configuration(self, function_name, env_vars): """ Updates the environment variables for a Lambda function. :param function_name: The name of the function to update. :param env_vars: A dict of environment variables to update. :return: Data about the update, including the status. """ try: response = self.lambda_client.update_function_configuration( FunctionName=function_name, Environment={"Variables": env_vars} ) except ClientError as err: logger.error( "Couldn't update function configuration %s. Here's why: %s: %s", function_name, err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise else: return response def list_functions(self): """ Lists the Lambda functions for the current account. """ try: func_paginator = self.lambda_client.get_paginator("list_functions") for func_page in func_paginator.paginate(): for func in func_page["Functions"]: print(func["FunctionName"]) desc = func.get("Description") if desc: print(f"\t{desc}") print(f"\t{func['Runtime']}: {func['Handler']}") except ClientError as err: logger.error( "Couldn't list functions. Here's why: %s: %s", err.response["Error"]["Code"], err.response["Error"]["Message"], ) raise


class UpdateFunctionWaiter(CustomWaiter): """A custom waiter that waits until a function is successfully updated.""" def __init__(self, client): super().__init__( "UpdateSuccess", "GetFunction", "Configuration.LastUpdateStatus", {"Successful": WaitState.SUCCESS, "Failed": WaitState.FAILURE}, client, ) def wait(self, function_name): self._wait(FunctionName=function_name) def run_scenario(lambda_client, iam_resource, basic_file, calculator_file, lambda_name): """ Runs the scenario. :param lambda_client: A Boto3 Lambda client. :param iam_resource: A Boto3 IAM resource. :param basic_file: The name of the file that contains the basic Lambda handler. :param calculator_file: The name of the file that contains the calculator Lambda handler. :param lambda_name: The name to give resources created for the scenario, such as the IAM role and the Lambda function. """ logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s") print("-" * 88) print("Welcome to the AWS Lambda getting started with functions demo.") print("-" * 88) wrapper = LambdaWrapper(lambda_client, iam_resource) print("Checking for IAM role for Lambda...") iam_role, should_wait = wrapper.create_iam_role_for_lambda(lambda_name) if should_wait: logger.info("Giving AWS time to create resources...") wait(10) print(f"Looking for function {lambda_name}...") function = wrapper.get_function(lambda_name) if function is None: print("Zipping the Python script into a deployment package...") deployment_package = wrapper.create_deployment_package( basic_file, f"{lambda_name}.py" ) print(f"...and creating the {lambda_name} Lambda function.") wrapper.create_function( lambda_name, f"{lambda_name}.lambda_handler", iam_role, deployment_package ) else: print(f"Function {lambda_name} already exists.") print("-" * 88) print(f"Let's invoke {lambda_name}. This function increments a number.") action_params = { "action": "increment", "number": q.ask("Give me a number to increment: ", q.is_int), } print(f"Invoking {lambda_name}...") response = wrapper.invoke_function(lambda_name, action_params) print( f"Incrementing {action_params['number']} resulted in " f"{json.load(response['Payload'])}" ) print("-" * 88) print(f"Let's update the function to an arithmetic calculator.") q.ask("Press Enter when you're ready.") print("Creating a new deployment package...") deployment_package = wrapper.create_deployment_package( calculator_file, f"{lambda_name}.py" ) print(f"...and updating the {lambda_name} Lambda function.") update_waiter = UpdateFunctionWaiter(lambda_client) wrapper.update_function_code(lambda_name, deployment_package) update_waiter.wait(lambda_name) print(f"This function uses an environment variable to control logging level.") print(f"Let's set it to DEBUG to get the most logging.") wrapper.update_function_configuration( lambda_name, {"LOG_LEVEL": logging.getLevelName(logging.DEBUG)} ) actions = ["plus", "minus", "times", "divided-by"] want_invoke = True while want_invoke: print(f"Let's invoke {lambda_name}. You can invoke these actions:") for index, action in enumerate(actions): print(f"{index + 1}: {action}") action_params = {} action_index = q.ask( "Enter the number of the action you want to take: ", q.is_int, q.in_range(1, len(actions)), ) action_params["action"] = actions[action_index - 1] print(f"You've chosen to invoke 'x {action_params['action']} y'.") action_params["x"] = q.ask("Enter a value for x: ", q.is_int) action_params["y"] = q.ask("Enter a value for y: ", q.is_int) print(f"Invoking {lambda_name}...") response = wrapper.invoke_function(lambda_name, action_params, True) print( f"Calculating {action_params['x']} {action_params['action']} {action_params['y']} " f"resulted in {json.load(response['Payload'])}" ) q.ask("Press Enter to see the logs from the call.") print(base64.b64decode(response["LogResult"]).decode()) want_invoke = q.ask("That was fun. Shall we do it again? (y/n) ", q.is_yesno) print("-" * 88) if q.ask( "Do you want to list all of the functions in your account? (y/n) ", q.is_yesno ): wrapper.list_functions() print("-" * 88) if q.ask("Ready to delete the function and role? (y/n) ", q.is_yesno): for policy in iam_role.attached_policies.all(): policy.detach_role(RoleName=iam_role.name) iam_role.delete() print(f"Deleted role {lambda_name}.") wrapper.delete_function(lambda_name) print(f"Deleted function {lambda_name}.") print("\nThanks for watching!") print("-" * 88) if __name__ == "__main__": try: run_scenario( boto3.client("lambda"), boto3.resource("iam"), "lambda_handler_basic.py", "lambda_handler_calculator.py", "doc_example_lambda_calculator", ) except Exception: logging.exception("Something went wrong with the demo!")

以下代码示例展示了如何创建由 Amazon API Gateway 调用的 AWS Lambda 函数。

SDK适用于 Python (Boto3)

此示例说明如何创建和使用以 AWS Lambda 函数为目标的 Amazon API Gatew REST API ay。Lambda 处理程序演示如何基于HTTP方法进行路由;如何从查询字符串、标题和正文中获取数据;以及如何返回响应。JSON

  • 部署 Lambda 函数。


  • 创建以 Lambda 函数为目标的REST资源。

  • 授予允许 API Gateway 调用 Lambda 函数的权限。

  • 使用请求包将请求发送到RESTAPI。

  • 清理演示期间创建的所有资源。

最好在上查看此示例 GitHub。有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

  • API网关

  • Lambda

以下代码示例显示如何创建由 Amazon EventBridge 计划事件调用的 AWS Lambda 函数。

SDK适用于 Python (Boto3)

此示例说明如何将 AWS Lambda 函数注册为计划的 Amazon EventBridge 事件的目标。Lambda 处理程序将友好的消息和完整的事件数据写入 Amazon CloudWatch 日志,以供日后检索。

  • 部署 Lambda 函数。

  • 创建 EventBridge 计划事件并将 Lambda 函数设为目标。

  • 授予允许 EventBridge 调用 Lambda 函数的权限。

  • 打印来自 CloudWatch Logs 的最新数据以显示计划调用的结果。

  • 清理演示期间创建的所有资源。

最好在上查看此示例 GitHub。有关如何设置和运行的完整源代码和说明,请参阅上的完整示例GitHub

  • CloudWatch 日志

  • EventBridge

  • Lambda


以下代码示例说明如何实现连接到数据库的 Lambda 函数。RDS该函数发出一个简单的数据库请求并返回结果。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 在 Lambda 函数中连接到亚马逊RDS数据库。

import json import os import boto3 import pymysql # RDS settings proxy_host_name = os.environ['PROXY_HOST_NAME'] port = int(os.environ['PORT']) db_name = os.environ['DB_NAME'] db_user_name = os.environ['DB_USER_NAME'] aws_region = os.environ['AWS_REGION'] # Fetch RDS Auth Token def get_auth_token(): client = boto3.client('rds') token = client.generate_db_auth_token( DBHostname=proxy_host_name, Port=port DBUsername=db_user_name Region=aws_region ) return token def lambda_handler(event, context): token = get_auth_token() try: connection = pymysql.connect( host=proxy_host_name, user=db_user_name, password=token, db=db_name, port=port, ssl={'ca': 'Amazon RDS'} # Ensure you have the CA bundle for SSL connection ) with connection.cursor() as cursor: cursor.execute('SELECT %s + %s AS sum', (3, 2)) result = cursor.fetchone() return result except Exception as e: return (f"Error: {str(e)}") # Return an error message if an exception occurs

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收因接收来自 Kinesis 流的记录而触发的事件。该函数检索 Kinesis 有效负载,将 Base64 解码,并记录下记录内容。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 将 Kinesis 事件与 Lambda 结合使用。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import base64 def lambda_handler(event, context): for record in event['Records']: try: print(f"Processed Kinesis Event - EventID: {record['eventID']}") record_data = base64.b64decode(record['kinesis']['data']).decode('utf-8') print(f"Record Data: {record_data}") # TODO: Do interesting work based on the new data except Exception as e: print(f"An error occurred {e}") raise e print(f"Successfully processed {len(event['Records'])} records.")

以下代码示例演示如何实现 Lambda 函数,该函数接收通过从 DynamoDB 流接收记录而触发的事件。该函数检索 DynamoDB 有效负载,并记录下记录内容。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

通过 Python 将 DynamoDB 事件与 Lambda 结合使用。

import json def lambda_handler(event, context): print(json.dumps(event, indent=2)) for record in event['Records']: log_dynamodb_record(record) def log_dynamodb_record(record): print(record['eventID']) print(record['eventName']) print(f"DynamoDB Record: {json.dumps(record['dynamodb'])}")

以下代码示例说明如何实现一个 Lambda 函数,该函数接收通过从 DocumentDB 更改流接收记录而触发的事件。该函数检索 DocumentDB 有效负载,并记录下记录内容。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 将 Amazon DocumentDB 事件与 Lambda 结合使用。

import json def lambda_handler(event, context): for record in event.get('events', []): log_document_db_event(record) return 'OK' def log_document_db_event(record): event_data = record.get('event', {}) operation_type = event_data.get('operationType', 'Unknown') db = event_data.get('ns', {}).get('db', 'Unknown') collection = event_data.get('ns', {}).get('coll', 'Unknown') full_document = event_data.get('fullDocument', {}) print(f"Operation type: {operation_type}") print(f"db: {db}") print(f"collection: {collection}") print("Full document:", json.dumps(full_document, indent=2))

以下代码示例说明如何实现 Lambda 函数,该函数接收通过从 Ama MSK zon 集群接收记录而触发的事件。该函数检索MSK有效载荷并记录记录内容。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 在 Lambda 上使用亚马逊MSK事件。

import base64 def lambda_handler(event, context): # Iterate through keys for key in event['records']: print('Key:', key) # Iterate through records for record in event['records'][key]: print('Record:', record) # Decode base64 msg = base64.b64decode(record['value']).decode('utf-8') print('Message:', msg)

以下代码示例展示了如何实现一个 Lambda 函数,该函数接收通过将对象上传到 S3 桶而触发的事件。该函数从事件参数中检索 S3 存储桶名称和对象密钥,并调用 Amazon S3 API 来检索和记录对象的内容类型。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 将 S3 事件与 Lambda 结合使用。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import json import urllib.parse import boto3 print('Loading function') s3 = boto3.client('s3') def lambda_handler(event, context): #print("Received event: " + json.dumps(event, indent=2)) # Get the object from the event and show its content type bucket = event['Records'][0]['s3']['bucket']['name'] key = urllib.parse.unquote_plus(event['Records'][0]['s3']['object']['key'], encoding='utf-8') try: response = s3.get_object(Bucket=bucket, Key=key) print("CONTENT TYPE: " + response['ContentType']) return response['ContentType'] except Exception as e: print(e) print('Error getting object {} from bucket {}. Make sure they exist and your bucket is in the same region as this function.'.format(key, bucket)) raise e

以下代码示例说明如何实现一个 Lambda 函数,该函数接收通过接收来自主题的消息而触发的事件。SNS该函数从事件参数检索消息并记录每条消息的内容。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 在 Lambda 中使用SNS事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for record in event['Records']: process_message(record) print("done") def process_message(record): try: message = record['Sns']['Message'] print(f"Processed message {message}") # TODO; Process your record here except Exception as e: print("An error occurred") raise e

以下代码示例说明如何实现一个 Lambda 函数,该函数接收通过从队列接收消息而触发的事件。SQS该函数从事件参数检索消息并记录每条消息的内容。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 在 Lambda 中使用SQS事件。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): for message in event['Records']: process_message(message) print("done") def process_message(message): try: print(f"Processed message {message['body']}") # TODO: Do interesting work based on the new message except Exception as err: print("An error occurred") raise err

以下代码示例展示了如何为接收来自 Kinesis 流的事件的 Lambda 函数实现部分批处理响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Python 进行 Lambda Kinesis 批处理项目失败。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["kinesis"]["sequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

以下代码示例演示如何为接收来自 DynamoDB 流的事件的 Lambda 函数实现部分批量响应。该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

报告使用 Python 通过 Lambda 进行 DynamoDB 批处理项目失败。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def handler(event, context): records = event.get("Records") curRecordSequenceNumber = "" for record in records: try: # Process your record curRecordSequenceNumber = record["dynamodb"]["SequenceNumber"] except Exception as e: # Return failed record's sequence number return {"batchItemFailures":[{"itemIdentifier": curRecordSequenceNumber}]} return {"batchItemFailures":[]}

以下代码示例说明如何为从队列接收事件的 Lambda 函数实现部分批量响应。SQS该函数在响应中报告批处理项目失败,并指示 Lambda 稍后重试这些消息。

SDK适用于 Python (Boto3)

还有更多相关信息 GitHub。在无服务器示例存储库中查找完整示例,并了解如何进行设置和运行。

使用 Python 使用 Lambda 报告SQS批处理项目失败。

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 def lambda_handler(event, context): if event: batch_item_failures = [] sqs_batch_response = {} for record in event["Records"]: try: # process message except Exception as e: batch_item_failures.append({"itemIdentifier": record['messageId']}) sqs_batch_response["batchItemFailures"] = batch_item_failures return sqs_batch_response