Kafka 事件源映射日志记录 - AWS Lambda

Kafka 事件源映射日志记录

您可以为 Kafka 事件源映射配置系统级日志记录,以启用和筛选 Lambda 事件轮询器发送至 CloudWatch 的系统日志。

此功能仅适用于使用预置模式的 Kafka 事件源映射。

对于带有日志记录配置的事件源映射,您现在还可以从控制台 Lambda > 其他资源 > 事件源映射页面的监视器选项卡中通过预构建的日志查询查看系统日志。

日志记录的工作原理

当您在事件源映射中使用日志级别设置日志记录配置时,Lambda 事件轮询器会发送相应的日志(事件源映射系统日志)。

事件源映射会对 Lambda 函数重复使用相同的日志目标。请确保 Lambda 函数的执行角色拥有必要的日志记录权限。

事件源映射将有自己的日志流,日志流名称为日期和事件源映射 UUID,例如 2020/01/01/12345678-1234-1234-1234-12345678901

对于事件源映射系统日志,您可以在以下日志级别之间进行选择。

日志级别 用法
DEBUG(最详细) 事件源处理进度详细信息
INFO 关于事件源映射正常运行情况的消息
WARN(最简略) 关于可能会导致意外行为的潜在警告和错误的消息

当您选择日志级别后,Lambda 事件轮询器会发送该级别及更低级别的日志。例如,如果将事件源映射系统日志级别设置为 INFO,则事件轮询器不会发送 DEBUG 级别的日志输出。

配置日志记录

您可以在创建或更新 Kafka 事件源映射时设置日志记录配置。

配置日志记录(控制台)

配置日志记录(控制台)
  1. 打开 Lamba 控制台的函数页面。

  2. 选择您的函数名称。

  3. 请执行以下操作之一:

    • 要添加新的 Kafka 触发器,请在函数概述下选择添加触发器

    • 要修改现有的 Kafka 触发器,请选择该触发器,然后选择编辑

  4. 事件轮询器配置下,对于预置模式启用配置复选框。此时将显示日志级别设置。

  5. 单击日志级别下拉列表,然后选择事件源映射的级别。

  6. 选择底部的添加保存,以创建或更新事件源映射。

配置日志记录(AWS CLI)

创建带有日志记录的事件源映射

下面的示例会创建带有日志记录配置的 Amazon MSK 事件源映射:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --event-source-arn arn:aws:kafka:us-east-1:123456789012:cluster/my-cluster/abc123 \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --logging-config '{"SystemLogLevel":"DEBUG"}'

对于自行管理的 Kafka,请使用相同的语法:

aws lambda create-event-source-mapping \ --function-name my-kafka-function \ --topics AWSKafkaTopic \ --self-managed-event-source '{"Endpoints":{"KAFKA_BOOTSTRAP_SERVERS":["abc.xyz.com:9092"]}}' \ --starting-position LATEST \ --provisioned-poller-config MinimumPollers=1,MaximumPollers=3 \ --logging-config '{"SystemLogLevel":"DEBUG"}'

更新日志记录配置

使用 update-event-source-mapping 命令添加或修改日志记录配置:

aws lambda update-event-source-mapping \ --uuid 12345678-1234-1234-1234-123456789012 \ --logging-config '{"SystemLogLevel":"WARN"}'

Kafka 事件源映射系统日志的记录格式

当 Lambda 事件轮询器发送日志时,每个日志条目都包含常规事件源映射元数据以及事件特定内容。

WARN 日志记录

WARN 记录包含来自事件轮询器的错误或警告,会在事件发生时发出。例如:

{ "eventType": "ESM_PROCESSING_EVENT", "timestamp": 1546347650000, "resourceArn": "arn:aws:lambda:us-east-1:123456789012:event-source-mapping:12345678-1234-1234-1234-123456789012", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/tests-cluster/87654321-4321-4321-4321-876543221-s1", "eventProcessorId": "12345678-1234-1234-1234-123456789012/0", "logLevel": "WARN", "error": { "errorMessage": "Timeout expired while fetching topic metadata", "errorCode": "org.apache.kafka.common.errors.TimeoutException" } }

INFO 日志记录

INFO 记录包含每个事件轮询器中的 Kafka 使用者客户端配置,会在生成或更改使用者时发出。例如:

{ "eventType": "POLLER_STATUS_EVENT", "timestamp": 1546347660000, "resourceArn": "arn:aws:lambda:us-east-1:123456789012:event-source-mapping:12345678-1234-1234-1234-123456789012", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/tests-cluster/87654321-4321-4321-4321-876543221-s1", "eventProcessorId": "12345678-1234-1234-1234-123456789012/0", "logLevel": "INFO", "kafkaEventSourceConnection": { "brokerEndpoints": "boot-abcd1234.c2.kafka-serverless.us-east-1.amazonaws.com:9098", "consumerId": "12345678-1234-1234-1234-123456789012-0", "topics": [ "test" ], "consumerGroupId": "12345678-1234-1234-1234-123456789012", "securityProtocol": "SASL_SSL", "saslMechanism": "AWS_MSK_IAM", "totalPartitionCount": 2, "assignedPartitionCount": 2, "partitionsAssignmentGeneration": 5, "assignedPartitions": [ "test-0", "test-1" ], "networkConfig": { "ipAddresses": [ "10.100.141.1" ], "subnetCidrBlock": "10.100.128.0/20", "securityGroups": [ "sg-abcdefabcdefabcdef" ] } } }

DEBUG 日志记录

DEBUG 日志包含事件源映射处理中与 Kafka 偏移量相关的信息,偏移量信息每分钟发出一次。例如:

{ "eventType": "KAFKA_STATUS_EVENT", "timestamp": 1546347670000, "resourceArn": "arn:aws:lambda:us-east-1:123456789012:event-source-mapping:12345678-1234-1234-1234-123456789012", "eventSourceArn": "arn:aws:kafka:us-east-1:123456789012:cluster/tests-cluster/87654321-4321-4321-4321-876543221-s1", "eventProcessorId": "12345678-1234-1234-1234-123456789012/0", "logLevel": "DEBUG", "kafkaPartitionOffsets": { "partition": "test-1", "endOffset": 5004, "consumedOffset": 5003, "processedOffset": 5003, "committedOffset": 5004 } }