MSK
The object describing an MSK event source type. For more information, see
Using AWS Lambda with Amazon MSK in the AWS Lambda Developer Guide.
AWS Serverless Application Model (AWS SAM) generates an AWS::Lambda::EventSourceMapping resource when this event type is set.
To use Schema Registry, you need to define specific IAM role permissions for your function. See Complete setup with IAM roles for an example of the required configuration.
Syntax
To declare this entity in your AWS SAM template, use the following syntax.
YAML
BatchSize:IntegerConsumerGroupId:StringDestinationConfig:DestinationConfigEnabled:BooleanFilterCriteria:FilterCriteriaKmsKeyArn:StringMaximumBatchingWindowInSeconds:IntegerProvisionedPollerConfig:ProvisionedPollerConfigSchemaRegistryConfig:SchemaRegistryConfigSourceAccessConfigurations:SourceAccessConfigurationsStartingPosition:StringStartingPositionTimestamp:DoubleStream:StringTopics:List
Properties
-
BatchSize -
The maximum number of records in each batch that Lambda pulls from your stream or queue and sends to your function. Lambda passes all of the records in the batch to the function in a single call, up to the payload limit for synchronous invocation (6 MB).
Default: 100
Valid Range: Minimum value of 1. Maximum value of 10,000.
Type: Integer
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
BatchSizeproperty of anAWS::Lambda::EventSourceMappingresource. -
ConsumerGroupId -
A string that configures how events will be read from Kafka topics.
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
AmazonManagedKafkaConfigurationproperty of anAWS::Lambda::EventSourceMappingresource. -
DestinationConfig -
A configuration object that specifies the destination of an event after Lambda processes it.
Use this property to specify the destination of failed invocations from the Amazon MSK event source.
Type: DestinationConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
DestinationConfigproperty of anAWS::Lambda::EventSourceMappingresource. -
Enabled -
Disables the event source mapping to pause polling and invocation.
Type: Boolean
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
Enabledproperty of anAWS::Lambda::EventSourceMappingresource. -
FilterCriteria -
A object that defines the criteria that determines whether Lambda should process an event. For more information, see AWS Lambda event filtering in the AWS Lambda Developer Guide.
Type: FilterCriteria
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
FilterCriteriaproperty of anAWS::Lambda::EventSourceMappingresource. -
KmsKeyArn -
The Amazon Resource Name (ARN) of the key to encrypt information related to this event.
Type: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
KmsKeyArnproperty of anAWS::Lambda::EventSourceMappingresource. -
MaximumBatchingWindowInSeconds -
The maximum amount of time to gather records before invoking the function, in seconds.
Type: Integer
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
MaximumBatchingWindowInSecondsproperty of anAWS::Lambda::EventSourceMappingresource. -
ProvisionedPollerConfig -
Configuration to increase the amount of pollers used to compute event source mappings. This configuration allows for a minimum of 1 poller and a maximum of 20 pollers. For an example, refer to ProvisionedPollerConfig example.
Type: ProvisionedPollerConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
ProvisionedPollerConfigproperty of anAWS::Lambda::EventSourceMappingresource. SchemaRegistryConfig-
Configuration for using a schema registry with the Kafka event source.
Note
This feature requires
ProvisionedPollerConfigto be configured.Type: SchemaRegistryConfig
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
AmazonManagedKafkaEventSourceConfigproperty of anAWS::Lambda::EventSourceMappingresource. -
SourceAccessConfigurations -
An array of the authentication protocol, VPC components, or virtual host to secure and define your event source.
Valid values:
CLIENT_CERTIFICATE_TLS_AUTHType: List of SourceAccessConfiguration
Required: No
AWS CloudFormation compatibility: This propertyrty is part of the AmazonManagedKafkaEventSourceConfig property of an
AWS::Lambda::EventSourceMappingresource. -
StartingPosition -
The position in a stream from which to start reading.
-
AT_TIMESTAMP– Specify a time from which to start reading records. -
LATEST– Read only new records. -
TRIM_HORIZON– Process all available records.
Valid values:
AT_TIMESTAMP|LATEST|TRIM_HORIZONType: String
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
StartingPositionproperty of anAWS::Lambda::EventSourceMappingresource. -
-
StartingPositionTimestamp -
The time from which to start reading, in Unix time seconds. Define
StartingPositionTimestampwhenStartingPositionis specified asAT_TIMESTAMP.Type: Double
Required: No
AWS CloudFormation compatibility: This property is passed directly to the
StartingPositionTimestampproperty of anAWS::Lambda::EventSourceMappingresource. -
Stream -
The Amazon Resource Name (ARN) of the data stream or a stream consumer.
Type: String
Required: Yes
AWS CloudFormation compatibility: This property is passed directly to the
EventSourceArnproperty of anAWS::Lambda::EventSourceMappingresource. -
Topics -
The name of the Kafka topic.
Type: List
Required: Yes
AWS CloudFormation compatibility: This property is passed directly to the
Topicsproperty of anAWS::Lambda::EventSourceMappingresource.
Examples
Complete setup with IAM roles
The following example shows a complete setup including the required IAM role configuration for using Schema Registry:
Parameters: PreCreatedSubnetOne: Type: String PreCreatedSubnetTwo: Type: String MskClusterName4: Type: String Resources: MyLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17 ' Statement: - Action: [sts:AssumeRole] Effect: Allow Principal: Service: [lambda.amazonaws.com] Policies: - PolicyName: KafkaClusterPermissions PolicyDocument: Statement: - Action: [kafka:DescribeClusterV2, kafka:GetBootstrapBrokers] Effect: Allow Resource: 'arn:aws:kafka:us-east-1:123456789012:cluster/*' - PolicyName: KafkaAuthPolicy PolicyDocument: Statement: - Action: [secretsmanager:GetSecretValue, kms:Decrypt] Effect: "Allow" Resource: ['arn:aws:secretsmanager:us-west-2:123456789012:secret:kafkaSecret-******', 'arn:aws:kms:us-west-2:123456789012:key/keyId'] - PolicyName: ENIPolicy PolicyDocument: Statement: - Action: [ec2:CreateNetworkInterface, ec2:DescribeNetworkInterfaces, ec2:DescribeVpcs, ec2:DeleteNetworkInterface, ec2:DescribeSubnets, ec2:DescribeSecurityGroups] Effect: Allow Resource: '*' - PolicyName: SchemaRegistryPolicy PolicyDocument: Statement: - Action: [glue:GetRegistry] Effect: Allow Resource: 'arn:aws:glue:{region}:{account-id}:registry/{registry-name}' - PolicyName: SchemaVersionsPolicy PolicyDocument: Statement: - Action: [glue:GetSchemaVersions] Effect: Allow Resource: '*' ManagedPolicyArns: - !Sub arn:${AWS::Partition}:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Tags: - {Value: SAM, Key: lambda:createdBy} MyMskCluster: Type: AWS::MSK::Cluster Properties: BrokerNodeGroupInfo: ClientSubnets: - Ref: PreCreatedSubnetOne - Ref: PreCreatedSubnetTwo InstanceType: kafka.t3.small StorageInfo: EBSStorageInfo: VolumeSize: 1 ClusterName: Ref: MskClusterName4 KafkaVersion: 3.8.x NumberOfBrokerNodes: 2 MyMskStreamProcessor: Type: AWS::Serverless::Function Properties: Runtime: nodejs18.x Handler: index.handler CodeUri: ${codeuri} Role: Fn::GetAtt: [MyLambdaExecutionRole, Arn] Events: MyMskEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster SourceAccessConfigurations: - Type: SASL_SCRAM_512_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-path/my-secret-name-1a2b3c SchemaValidationConfigs: - Attribute: KEY EventRecordFormat: JSON SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry
ProvisionedPollerConfig example
ProvisionedPollerConfig: MinimumPollers: 1 MaximumPollers: 20
Amazon MSK Example for Existing Cluster
The following is an example of an MSK event source type for an Amazon MSK
cluster that already exists in an AWS account.
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: arn:aws:kafka:us-east-1:012345678012:cluster/exampleClusterName/abcdefab-1234-abcd-5678-cdef0123ab01-2 Topics: - MyTopic
Amazon MSK Example for Cluster Declared in Same Template
The following is an example of an MSK event source type for an Amazon MSK
cluster that is declared in the same template file.
YAML
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster # This must be the name of an MSK cluster declared in the same template file Topics: - MyTopic
MSK Event Source with Schema Registry
The following is an example of an MSK event source type configured with a schema registry.
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: !Sub arn:${AWS::Partition}:glue:us-west-2:123456789012:registry/myregistry EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE
MSK Event Source with Confluent Schema Registry
The following is an example of an MSK event source type configured with a Confluent Schema Registry.
Events: MSKEvent: Type: MSK Properties: StartingPosition: LATEST Stream: Ref: MyMskCluster Topics: - SchemaRegistryTestTopic ProvisionedPollerConfig: MinimumPollers: 1 SchemaRegistryConfig: SchemaRegistryURI: https://my-schema-registry.confluent.cloud AccessConfigs: - Type: BASIC_AUTH URI: !Sub arn:${AWS::Partition}:secretsmanager:us-west-2:123456789012:secret:my-secret EventRecordFormat: JSON SchemaValidationConfigs: - Attribute: KEY - Attribute: VALUE