AWS::MSK::Cluster - AWS CloudFormation

AWS::MSK::Cluster

The AWS::MSK::Cluster resource creates an Amazon MSK cluster. For more information, see What Is Amazon MSK? in the Amazon MSK Developer Guide.

Syntax

To declare this entity in your AWS CloudFormation template, use the following syntax:

Properties

BrokerNodeGroupInfo

The setup to be used for brokers in the cluster.

Required: Yes

Type: BrokerNodeGroupInfo

Update requires: Replacement

ClientAuthentication

Includes information related to client authentication.

Required: No

Type: ClientAuthentication

Update requires: Replacement

ClusterName

The name of the cluster.

Required: Yes

Type: String

Update requires: Replacement

ConfigurationInfo

The Amazon MSK configuration to use for the cluster.

Required: No

Type: ConfigurationInfo

Update requires: No interruption

EncryptionInfo

Includes all encryption-related information.

Required: No

Type: EncryptionInfo

Update requires: Replacement

EnhancedMonitoring

Specifies the level of monitoring for the MSK cluster. The possible values are DEFAULT, PER_BROKER, and PER_TOPIC_PER_BROKER.

Required: No

Type: String

Update requires: No interruption

KafkaVersion

The version of Apache Kafka. You can use Amazon MSK to create clusters that use Apache Kafka versions 1.1.1 and 2.2.1.

Required: Yes

Type: String

Update requires: No interruption

LoggingInfo

You can configure your MSK cluster to send broker logs to different destination types. This is a container for the configuration details related to broker logs.

Required: No

Type: LoggingInfo

Update requires: No interruption

NumberOfBrokerNodes

The number of broker nodes you want in the Amazon MSK cluster. You can submit an update to increase the number of broker nodes in a cluster.

Required: Yes

Type: Integer

Update requires: No interruption

OpenMonitoring

The settings for open monitoring.

Required: No

Type: OpenMonitoring

Update requires: No interruption

Tags

A map of key:value pairs to apply to this resource. Both key and value are of type String.

Required: No

Type: Json

Update requires: Replacement

Return Values

Ref

When you pass the logical ID of this resource to the intrinsic Ref function, Ref returns the Amazon MSK cluster ARN. For example:

REF MyTestCluster

For the Amazon MSK cluster MyTestCluster, Ref returns the ARN of the cluster.

For more information about using the Ref function, see Ref.

Examples

In the following examples you can find the YAML for each template, followed by the equivalent JSON. You can use either language.

Create an MSK Cluster Where You Only Specify Values for the Required Properties

YAML

Description: MSK Cluster with required properties. Resources: TestCluster: Type: 'AWS::MSK::Cluster' Properties: ClusterName: ClusterWithRequiredProperties KafkaVersion: 2.2.1 NumberOfBrokerNodes: 3 BrokerNodeGroupInfo: InstanceType: kafka.m5.large ClientSubnets: - ReplaceWithSubnetId1 - ReplaceWithSubnetId2 - ReplaceWithSubnetId3

{ "Description": "MSK Cluster with required properties.", "Resources": { "TestCluster": { "Type": "AWS::MSK::Cluster", "Properties": { "ClusterName": "ClusterWithRequiredProperties", "KafkaVersion": "2.2.1", "NumberOfBrokerNodes": 3, "BrokerNodeGroupInfo": { "InstanceType": "kafka.m5.large", "ClientSubnets": [ "ReplaceWithSubnetId1", "ReplaceWithSubnetId2", "ReplaceWithSubnetId3" ] } } } } }

Create an MSK Cluster Where You Explicitly Set All Properties

YAML

Description: MSK Cluster with all properties Resources: TestCluster: Type: 'AWS::MSK::Cluster' Properties: ClusterName: ClusterWithAllProperties KafkaVersion: 2.2.1 NumberOfBrokerNodes: 3 EnhancedMonitoring: PER_BROKER EncryptionInfo: EncryptionAtRest: DataVolumeKMSKeyId: ReplaceWithKmsKeyArn EncryptionInTransit: ClientBroker: TLS InCluster: true OpenMonitoring: Prometheus: JmxExporter: EnabledInBroker: "true" NodeExporter: EnabledInBroker: "true" ConfigurationInfo: Arn: ReplaceWithConfigurationArn Revision: 1 ClientAuthentication: Tls: CertificateAuthorityArnList: - ReplaceWithCAArn Tags: Environment: Test Owner: QATeam BrokerNodeGroupInfo: BrokerAZDistribution: DEFAULT InstanceType: kafka.m5.large SecurityGroups: - ReplaceWithSecurityGroupId StorageInfo: EBSStorageInfo: VolumeSize: 100 ClientSubnets: - ReplaceWithSubnetId1 - ReplaceWithSubnetId2 - ReplaceWithSubnetId3

{ "Description": "MSK Cluster with all properties", "Resources": { "TestCluster": { "Type": "AWS::MSK::Cluster", "Properties": { "ClusterName": "ClusterWithAllProperties", "KafkaVersion": "2.2.1", "NumberOfBrokerNodes": 3, "EnhancedMonitoring": "PER_BROKER", "EncryptionInfo": { "EncryptionAtRest": { "DataVolumeKMSKeyId": "ReplaceWithKmsKeyArn" }, "EncryptionInTransit": { "ClientBroker": "TLS", "InCluster": true } }, "OpenMonitoring": { "Prometheus": { "JmxExporter": { "EnabledInBroker": "true" } "NodeExporter": { "EnabledInBroker": "true" } } }, "ConfigurationInfo": { "Arn": "ReplaceWithConfigurationArn", "Revision": 1 }, "ClientAuthentication": { "Tls": { "CertificateAuthorityArnList": [ "ReplaceWithCAArn" ] } }, "Tags": { "Environment": "Test", "Owner" : "QATeam" }, "BrokerNodeGroupInfo": { "BrokerAZDistribution": "DEFAULT", "InstanceType": "kafka.m5.large", "SecurityGroups": [ "ReplaceWithSecurityGroupId" ], "StorageInfo": { "EBSStorageInfo": { "VolumeSize": 100 } }, "ClientSubnets": [ "ReplaceWithSubnetId1", "ReplaceWithSubnetId2", "ReplaceWithSubnetId3" ] } } } } }

Get Started With Amazon MSK

This example template creates an MSK cluster in a simple architecture to help you get started.

YAML

AWSTemplateFormatVersion: 2010-09-09 Parameters: KeyName: Description: Name of an existing EC2 KeyPair to enable SSH access to the instance Type: 'AWS::EC2::KeyPair::KeyName' ConstraintDescription: Can contain only ASCII characters. SSHLocation: Description: The IP address range that can be used to SSH to the EC2 instances Type: String MinLength: '9' MaxLength: '18' Default: 0.0.0.0/0 AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})' ConstraintDescription: Must be a valid IP CIDR range of the form x.x.x.x/x Mappings: SubnetConfig: VPC: CIDR: 10.0.0.0/16 PublicOne: CIDR: 10.0.0.0/24 PrivateOne: CIDR: 10.0.1.0/24 PrivateTwo: CIDR: 10.0.2.0/24 PrivateThree: CIDR: 10.0.3.0/24 RegionAMI: us-east-1: HVM64: ami-0c6b1d09930fac512 us-west-2: HVM64: ami-0cb72367e98845d43 Resources: VPC: Type: 'AWS::EC2::VPC' Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !FindInMap - SubnetConfig - VPC - CIDR Tags: - Key: Name Value: MMVPC PublicSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PublicOne - CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: MMPublicSubnet PrivateSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateOne - CIDR Tags: - Key: Name Value: MMPrivateSubnetOne PrivateSubnetTwo: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 1 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateTwo - CIDR Tags: - Key: Name Value: MMPrivateSubnetTwo PrivateSubnetThree: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 2 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateThree - CIDR Tags: - Key: Name Value: MMPrivateSubnetThree InternetGateway: Type: 'AWS::EC2::InternetGateway' GatewayAttachement: Type: 'AWS::EC2::VPCGatewayAttachment' Properties: VpcId: !Ref VPC InternetGatewayId: !Ref InternetGateway PublicRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PublicRoute: Type: 'AWS::EC2::Route' DependsOn: GatewayAttachement Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PrivateRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PrivateSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetOne PrivateSubnetTwoRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetTwo PrivateSubnetThreeRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetThree KafkaClientInstanceSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable SSH access via port 22 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !Ref SSHLocation MSKSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable SSH access via port 22 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 2181 ToPort: 2181 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9094 ToPort: 9094 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9092 ToPort: 9092 SourceSecurityGroupId: !GetAtt - KafkaClientInstanceSecurityGroup - GroupId KafkaClientEC2Instance: Type: 'AWS::EC2::Instance' Properties: InstanceType: m5.large KeyName: !Ref KeyName IamInstanceProfile: !Ref EC2InstanceProfile AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' SubnetId: !Ref PublicSubnetOne SecurityGroupIds: - !GetAtt - KafkaClientInstanceSecurityGroup - GroupId ImageId: !FindInMap - RegionAMI - !Ref 'AWS::Region' - HVM64 Tags: - Key: Name Value: KafkaClientInstance UserData: !Base64 > #!/bin/bash yum update -y yum install python3.7 -y yum install java-1.8.0-openjdk-devel -y yum erase awscli -y cd /home/ec2-user echo "export PATH=.local/bin:$PATH" >> .bash_profile mkdir kafka mkdir mm cd kafka wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz tar -xzf kafka_2.12-2.2.1.tgz cd /home/ec2-user wget https://bootstrap.pypa.io/get-pip.py su -c "python3.7 get-pip.py --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install boto3 --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install awscli --user" -s /bin/sh ec2-user chown -R ec2-user ./kafka chgrp -R ec2-user ./kafka chown -R ec2-user ./mm chgrp -R ec2-user ./mm EC2Role: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonMSKFullAccess' - 'arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess' EC2InstanceProfile: Type: 'AWS::IAM::InstanceProfile' Properties: InstanceProfileName: EC2MSKCFProfile Roles: - !Ref EC2Role MSKCluster: Type: 'AWS::MSK::Cluster' Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo - !Ref PrivateSubnetThree InstanceType: kafka.m5.large SecurityGroups: - !GetAtt - MSKSecurityGroup - GroupId StorageInfo: EBSStorageInfo: VolumeSize: 2000 ClusterName: MSKCluster EncryptionInfo: EncryptionInTransit: ClientBroker: TLS InCluster: true EnhancedMonitoring: PER_TOPIC_PER_BROKER KafkaVersion: 2.2.1 NumberOfBrokerNodes: 3 Outputs: VPCId: Description: The ID of the VPC created Value: !Ref VPC PublicSubnetOne: Description: The name of the public subnet created Value: !Ref PublicSubnetOne PrivateSubnetOne: Description: The ID of private subnet one created Value: !Ref PrivateSubnetOne PrivateSubnetTwo: Description: The ID of private subnet two created Value: !Ref PrivateSubnetTwo PrivateSubnetThree: Description: The ID of private subnet three created Value: !Ref PrivateSubnetThree MSKSecurityGroupID: Description: The ID of the security group created for the MSK clusters Value: !GetAtt - MSKSecurityGroup - GroupId KafkaClientEC2InstancePublicDNS: Description: The Public DNS for the MirrorMaker EC2 instance Value: !GetAtt - KafkaClientEC2Instance - PublicDnsName MSKClusterArn: Description: The Arn for the MSKMMCluster1 MSK cluster Value: !Ref MSKCluster

{ "AWSTemplateFormatVersion": "2010-09-09", "Parameters": { "KeyName": { "Description": "Name of an existing EC2 KeyPair to enable SSH access to the instance", "Type": "AWS::EC2::KeyPair::KeyName", "ConstraintDescription": "Can contain only ASCII characters." }, "SSHLocation": { "Description": "The IP address range that can be used to SSH to the EC2 instances", "Type": "String", "MinLength": "9", "MaxLength": "18", "Default": "0.0.0.0/0", "AllowedPattern": "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})", "ConstraintDescription": "Must be a valid IP CIDR range of the form x.x.x.x/x" } }, "Mappings": { "SubnetConfig": { "VPC": { "CIDR": "10.0.0.0/16" }, "PublicOne": { "CIDR": "10.0.0.0/24" }, "PrivateOne": { "CIDR": "10.0.1.0/24" }, "PrivateTwo": { "CIDR": "10.0.2.0/24" }, "PrivateThree": { "CIDR": "10.0.3.0/24" } }, "RegionAMI": { "us-east-1": { "HVM64": "ami-0c6b1d09930fac512" }, "us-west-2": { "HVM64": "ami-0cb72367e98845d43" } } }, "Resources": { "VPC": { "Type": "AWS::EC2::VPC", "Properties": { "EnableDnsSupport": true, "EnableDnsHostnames": true, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "VPC", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMVPC" } ] } }, "PublicSubnetOne": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 0, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PublicOne", "CIDR" ] }, "MapPublicIpOnLaunch": true, "Tags": [ { "Key": "Name", "Value": "MMPublicSubnet" } ] } }, "PrivateSubnetOne": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 0, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PrivateOne", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMPrivateSubnetOne" } ] } }, "PrivateSubnetTwo": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 1, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PrivateTwo", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMPrivateSubnetTwo" } ] } }, "PrivateSubnetThree": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 2, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PrivateThree", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMPrivateSubnetThree" } ] } }, "InternetGateway": { "Type": "AWS::EC2::InternetGateway" }, "GatewayAttachement": { "Type": "AWS::EC2::VPCGatewayAttachment", "Properties": { "VpcId": { "Ref": "VPC" }, "InternetGatewayId": { "Ref": "InternetGateway" } } }, "PublicRouteTable": { "Type": "AWS::EC2::RouteTable", "Properties": { "VpcId": { "Ref": "VPC" } } }, "PublicRoute": { "Type": "AWS::EC2::Route", "DependsOn": "GatewayAttachement", "Properties": { "RouteTableId": { "Ref": "PublicRouteTable" }, "DestinationCidrBlock": "0.0.0.0/0", "GatewayId": { "Ref": "InternetGateway" } } }, "PublicSubnetOneRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "SubnetId": { "Ref": "PublicSubnetOne" }, "RouteTableId": { "Ref": "PublicRouteTable" } } }, "PrivateRouteTable": { "Type": "AWS::EC2::RouteTable", "Properties": { "VpcId": { "Ref": "VPC" } } }, "PrivateSubnetOneRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "RouteTableId": { "Ref": "PrivateRouteTable" }, "SubnetId": { "Ref": "PrivateSubnetOne" } } }, "PrivateSubnetTwoRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "RouteTableId": { "Ref": "PrivateRouteTable" }, "SubnetId": { "Ref": "PrivateSubnetTwo" } } }, "PrivateSubnetThreeRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "RouteTableId": { "Ref": "PrivateRouteTable" }, "SubnetId": { "Ref": "PrivateSubnetThree" } } }, "KafkaClientInstanceSecurityGroup": { "Type": "AWS::EC2::SecurityGroup", "Properties": { "GroupDescription": "Enable SSH access via port 22", "VpcId": { "Ref": "VPC" }, "SecurityGroupIngress": [ { "IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "CidrIp": { "Ref": "SSHLocation" } } ] } }, "MSKSecurityGroup": { "Type": "AWS::EC2::SecurityGroup", "Properties": { "GroupDescription": "Enable SSH access via port 22", "VpcId": { "Ref": "VPC" }, "SecurityGroupIngress": [ { "IpProtocol": "tcp", "FromPort": 2181, "ToPort": 2181, "SourceSecurityGroupId": { "Fn::GetAtt": [ "KafkaClientInstanceSecurityGroup", "GroupId" ] } }, { "IpProtocol": "tcp", "FromPort": 9094, "ToPort": 9094, "SourceSecurityGroupId": { "Fn::GetAtt": [ "KafkaClientInstanceSecurityGroup", "GroupId" ] } }, { "IpProtocol": "tcp", "FromPort": 9092, "ToPort": 9092, "SourceSecurityGroupId": { "Fn::GetAtt": [ "KafkaClientInstanceSecurityGroup", "GroupId" ] } } ] } }, "KafkaClientEC2Instance": { "Type": "AWS::EC2::Instance", "Properties": { "InstanceType": "m5.large", "KeyName": { "Ref": "KeyName" }, "IamInstanceProfile": { "Ref": "EC2InstanceProfile" }, "AvailabilityZone": { "Fn::Select": [ 0, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "SubnetId": { "Ref": "PublicSubnetOne" }, "SecurityGroupIds": [ { "Fn::GetAtt": [ "KafkaClientInstanceSecurityGroup", "GroupId" ] } ], "ImageId": { "Fn::FindInMap": [ "RegionAMI", { "Ref": "AWS::Region" }, "HVM64" ] }, "Tags": [ { "Key": "Name", "Value": "KafkaClientInstance" } ], "UserData": { "Fn::Base64": "#!/bin/bash\nyum update -y \nyum install python3.7 -y\nyum install java-1.8.0-openjdk-devel -y\nyum erase awscli -y\ncd /home/ec2-user\necho \"export PATH=.local/bin:$PATH\" >> .bash_profile\nmkdir kafka\nmkdir mm\ncd kafka\nwget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz\ntar -xzf kafka_2.12-2.2.1.tgz\ncd /home/ec2-user\nwget https://bootstrap.pypa.io/get-pip.py\nsu -c \"python3.7 get-pip.py --user\" -s /bin/sh ec2-user\nsu -c \"/home/ec2-user/.local/bin/pip3 install boto3 --user\" -s /bin/sh ec2-user\nsu -c \"/home/ec2-user/.local/bin/pip3 install awscli --user\" -s /bin/sh ec2-user\nchown -R ec2-user ./kafka\nchgrp -R ec2-user ./kafka\nchown -R ec2-user ./mm\nchgrp -R ec2-user ./mm\n" } } }, "EC2Role": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "ec2.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "Path": "/", "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/AmazonMSKFullAccess", "arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess" ] } }, "EC2InstanceProfile": { "Type": "AWS::IAM::InstanceProfile", "Properties": { "InstanceProfileName": "EC2MSKCFProfile", "Roles": [ { "Ref": "EC2Role" } ] } }, "MSKCluster": { "Type": "AWS::MSK::Cluster", "Properties": { "BrokerNodeGroupInfo": { "ClientSubnets": [ { "Ref": "PrivateSubnetOne" }, { "Ref": "PrivateSubnetTwo" }, { "Ref": "PrivateSubnetThree" } ], "InstanceType": "kafka.m5.large", "SecurityGroups": [ { "Fn::GetAtt": [ "MSKSecurityGroup", "GroupId" ] } ], "StorageInfo": { "EBSStorageInfo": { "VolumeSize": 2000 } } }, "ClusterName": "MSKCluster", "EncryptionInfo": { "EncryptionInTransit": { "ClientBroker": "TLS", "InCluster": true } }, "EnhancedMonitoring": "PER_TOPIC_PER_BROKER", "KafkaVersion": "2.2.1", "NumberOfBrokerNodes": 3 } } }, "Outputs": { "VPCId": { "Description": "The ID of the VPC created", "Value": { "Ref": "VPC" } }, "PublicSubnetOne": { "Description": "The name of the public subnet created", "Value": { "Ref": "PublicSubnetOne" } }, "PrivateSubnetOne": { "Description": "The ID of private subnet one created", "Value": { "Ref": "PrivateSubnetOne" } }, "PrivateSubnetTwo": { "Description": "The ID of private subnet two created", "Value": { "Ref": "PrivateSubnetTwo" } }, "PrivateSubnetThree": { "Description": "The ID of private subnet three created", "Value": { "Ref": "PrivateSubnetThree" } }, "MSKSecurityGroupID": { "Description": "The ID of the security group created for the MSK clusters", "Value": { "Fn::GetAtt": [ "MSKSecurityGroup", "GroupId" ] } }, "KafkaClientEC2InstancePublicDNS": { "Description": "The Public DNS for the MirrorMaker EC2 instance", "Value": { "Fn::GetAtt": [ "KafkaClientEC2Instance", "PublicDnsName" ] } }, "MSKClusterArn": { "Description": "The Arn for the MSKMMCluster1 MSK cluster", "Value": { "Ref": "MSKCluster" } } } }

Create Two MSK Clusters To Use With Apache MirrorMaker

This YAML shows how to set up two MSK clusters for MirrorMaker. It also sets up the Amazon VPC, subnets, security groups, and IAM roles that are necessary for this example. In addition, it creates an EC2 instance that has Apache Kafka, Java, and the AWS CLI. You can use this EC2 instance to run Apache Kafka tools, including MirrorMaker. You must manually create the MirrorMaker configuration files.

AWSTemplateFormatVersion: 2010-09-09 Parameters: KeyName: Description: The name of an existing EC2 KeyPair to enable SSH access to the instance. Type: 'AWS::EC2::KeyPair::KeyName' ConstraintDescription: Can contain only ASCII characters. SSHLocation: Description: The IP address range that can be used to SSH to the EC2 instances. Type: String MinLength: '9' MaxLength: '18' Default: 0.0.0.0/0 AllowedPattern: '(\d{1,3})\.(\d{1,3})\.(\d{1,3})\.(\d{1,3})/(\d{1,2})' ConstraintDescription: Must be a valid IP CIDR range of the form x.x.x.x/x Mappings: SubnetConfig: VPC: CIDR: 10.0.0.0/16 PublicOne: CIDR: 10.0.0.0/24 PrivateOne: CIDR: 10.0.1.0/24 PrivateTwo: CIDR: 10.0.2.0/24 PrivateThree: CIDR: 10.0.3.0/24 RegionAMI: us-east-1: HVM64: ami-0c6b1d09930fac512 us-west-2: HVM64: ami-0cb72367e98845d43 Resources: VPC: Type: 'AWS::EC2::VPC' Properties: EnableDnsSupport: true EnableDnsHostnames: true CidrBlock: !FindInMap - SubnetConfig - VPC - CIDR Tags: - Key: Name Value: MMVPC PublicSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PublicOne - CIDR MapPublicIpOnLaunch: true Tags: - Key: Name Value: MMPublicSubnet PrivateSubnetOne: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateOne - CIDR Tags: - Key: Name Value: MMPrivateSubnetOne PrivateSubnetTwo: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 1 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateTwo - CIDR Tags: - Key: Name Value: MMPrivateSubnetTwo PrivateSubnetThree: Type: 'AWS::EC2::Subnet' Properties: AvailabilityZone: !Select - 2 - !GetAZs Ref: 'AWS::Region' VpcId: !Ref VPC CidrBlock: !FindInMap - SubnetConfig - PrivateThree - CIDR Tags: - Key: Name Value: MMPrivateSubnetThree InternetGateway: Type: 'AWS::EC2::InternetGateway' GatewayAttachement: Type: 'AWS::EC2::VPCGatewayAttachment' Properties: VpcId: !Ref VPC InternetGatewayId: !Ref InternetGateway PublicRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PublicRoute: Type: 'AWS::EC2::Route' DependsOn: GatewayAttachement Properties: RouteTableId: !Ref PublicRouteTable DestinationCidrBlock: 0.0.0.0/0 GatewayId: !Ref InternetGateway PublicSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: SubnetId: !Ref PublicSubnetOne RouteTableId: !Ref PublicRouteTable PrivateRouteTable: Type: 'AWS::EC2::RouteTable' Properties: VpcId: !Ref VPC PrivateSubnetOneRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetOne PrivateSubnetTwoRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetTwo PrivateSubnetThreeRouteTableAssociation: Type: 'AWS::EC2::SubnetRouteTableAssociation' Properties: RouteTableId: !Ref PrivateRouteTable SubnetId: !Ref PrivateSubnetThree MMInstanceSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable SSH access via port 22 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 22 ToPort: 22 CidrIp: !Ref SSHLocation MSKSecurityGroup: Type: 'AWS::EC2::SecurityGroup' Properties: GroupDescription: Enable SSH access via port 22 VpcId: !Ref VPC SecurityGroupIngress: - IpProtocol: tcp FromPort: 2181 ToPort: 2181 SourceSecurityGroupId: !GetAtt - MMInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9094 ToPort: 9094 SourceSecurityGroupId: !GetAtt - MMInstanceSecurityGroup - GroupId - IpProtocol: tcp FromPort: 9092 ToPort: 9092 SourceSecurityGroupId: !GetAtt - MMInstanceSecurityGroup - GroupId MMEC2Instance: Type: 'AWS::EC2::Instance' Properties: InstanceType: m5.large KeyName: !Ref KeyName IamInstanceProfile: !Ref EC2InstanceProfile AvailabilityZone: !Select - 0 - !GetAZs Ref: 'AWS::Region' SubnetId: !Ref PublicSubnetOne SecurityGroupIds: - !GetAtt - MMInstanceSecurityGroup - GroupId ImageId: !FindInMap - RegionAMI - !Ref 'AWS::Region' - HVM64 Tags: - Key: Name Value: MMInstance UserData: !Base64 > #!/bin/bash yum update -y yum install python3.7 -y yum install java-1.8.0-openjdk-devel -y yum erase awscli -y cd /home/ec2-user echo "export PATH=.local/bin:$PATH" >> .bash_profile mkdir kafka mkdir mm cd kafka wget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz tar -xzf kafka_2.12-2.2.1.tgz cd /home/ec2-user wget https://bootstrap.pypa.io/get-pip.py su -c "python3.7 get-pip.py --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install boto3 --user" -s /bin/sh ec2-user su -c "/home/ec2-user/.local/bin/pip3 install awscli --user" -s /bin/sh ec2-user chown -R ec2-user ./kafka chgrp -R ec2-user ./kafka chown -R ec2-user ./mm chgrp -R ec2-user ./mm EC2Role: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: ec2.amazonaws.com Action: 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonMSKFullAccess' - 'arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess' EC2InstanceProfile: Type: 'AWS::IAM::InstanceProfile' Properties: InstanceProfileName: EC2MSKCFProfile Roles: - !Ref EC2Role MSKMMCluster1: Type: 'AWS::MSK::Cluster' Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo - !Ref PrivateSubnetThree InstanceType: kafka.m5.large SecurityGroups: - !GetAtt - MSKSecurityGroup - GroupId StorageInfo: EBSStorageInfo: VolumeSize: 2000 ClusterName: MSKMMCluster1 EncryptionInfo: EncryptionInTransit: ClientBroker: TLS InCluster: true EnhancedMonitoring: PER_TOPIC_PER_BROKER KafkaVersion: 2.2.1 NumberOfBrokerNodes: 3 MSKMMCluster2: Type: 'AWS::MSK::Cluster' Properties: BrokerNodeGroupInfo: ClientSubnets: - !Ref PrivateSubnetOne - !Ref PrivateSubnetTwo - !Ref PrivateSubnetThree InstanceType: kafka.m5.large SecurityGroups: - !GetAtt - MSKSecurityGroup - GroupId StorageInfo: EBSStorageInfo: VolumeSize: 2000 ClusterName: MSKMMCluster2 EncryptionInfo: EncryptionInTransit: ClientBroker: TLS InCluster: true EnhancedMonitoring: PER_TOPIC_PER_BROKER KafkaVersion: 2.2.1 NumberOfBrokerNodes: 3 Outputs: VPCId: Description: The ID of the VPC created Value: !Ref VPC PublicSubnetOne: Description: The name of the public subnet created Value: !Ref PublicSubnetOne PrivateSubnetOne: Description: The ID of private subnet one created Value: !Ref PrivateSubnetOne PrivateSubnetTwo: Description: The ID of private subnet two created Value: !Ref PrivateSubnetTwo PrivateSubnetThree: Description: The ID of private subnet three created Value: !Ref PrivateSubnetThree MSKSecurityGroupID: Description: The ID of the security group created for the MSK clusters Value: !GetAtt - MSKSecurityGroup - GroupId MMEC2InstancePublicDNS: Description: The Public DNS for the MirrorMaker EC2 instance Value: !GetAtt - MMEC2Instance - PublicDnsName MSKMMCluster1Arn: Description: The Arn for the MSKMMCluster1 MSK cluster Value: !Ref MSKMMCluster1 MSKMMCluster2Arn: Description: The Arn for the MSKMMCluster2 MSK cluster Value: !Ref MSKMMCluster2

{ "AWSTemplateFormatVersion": "2010-09-09", "Parameters": { "KeyName": { "Description": "The name of an existing EC2 KeyPair to enable SSH access to the instance.", "Type": "AWS::EC2::KeyPair::KeyName", "ConstraintDescription": "Can contain only ASCII characters." }, "SSHLocation": { "Description": "The IP address range that can be used to SSH to the EC2 instances.", "Type": "String", "MinLength": "9", "MaxLength": "18", "Default": "0.0.0.0/0", "AllowedPattern": "(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})\\.(\\d{1,3})/(\\d{1,2})", "ConstraintDescription": "Must be a valid IP CIDR range of the form x.x.x.x/x" } }, "Mappings": { "SubnetConfig": { "VPC": { "CIDR": "10.0.0.0/16" }, "PublicOne": { "CIDR": "10.0.0.0/24" }, "PrivateOne": { "CIDR": "10.0.1.0/24" }, "PrivateTwo": { "CIDR": "10.0.2.0/24" }, "PrivateThree": { "CIDR": "10.0.3.0/24" } }, "RegionAMI": { "us-east-1": { "HVM64": "ami-0c6b1d09930fac512" }, "us-west-2": { "HVM64": "ami-0cb72367e98845d43" } } }, "Resources": { "VPC": { "Type": "AWS::EC2::VPC", "Properties": { "EnableDnsSupport": true, "EnableDnsHostnames": true, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "VPC", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMVPC" } ] } }, "PublicSubnetOne": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 0, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PublicOne", "CIDR" ] }, "MapPublicIpOnLaunch": true, "Tags": [ { "Key": "Name", "Value": "MMPublicSubnet" } ] } }, "PrivateSubnetOne": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 0, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PrivateOne", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMPrivateSubnetOne" } ] } }, "PrivateSubnetTwo": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 1, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PrivateTwo", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMPrivateSubnetTwo" } ] } }, "PrivateSubnetThree": { "Type": "AWS::EC2::Subnet", "Properties": { "AvailabilityZone": { "Fn::Select": [ 2, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "VpcId": { "Ref": "VPC" }, "CidrBlock": { "Fn::FindInMap": [ "SubnetConfig", "PrivateThree", "CIDR" ] }, "Tags": [ { "Key": "Name", "Value": "MMPrivateSubnetThree" } ] } }, "InternetGateway": { "Type": "AWS::EC2::InternetGateway" }, "GatewayAttachement": { "Type": "AWS::EC2::VPCGatewayAttachment", "Properties": { "VpcId": { "Ref": "VPC" }, "InternetGatewayId": { "Ref": "InternetGateway" } } }, "PublicRouteTable": { "Type": "AWS::EC2::RouteTable", "Properties": { "VpcId": { "Ref": "VPC" } } }, "PublicRoute": { "Type": "AWS::EC2::Route", "DependsOn": "GatewayAttachement", "Properties": { "RouteTableId": { "Ref": "PublicRouteTable" }, "DestinationCidrBlock": "0.0.0.0/0", "GatewayId": { "Ref": "InternetGateway" } } }, "PublicSubnetOneRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "SubnetId": { "Ref": "PublicSubnetOne" }, "RouteTableId": { "Ref": "PublicRouteTable" } } }, "PrivateRouteTable": { "Type": "AWS::EC2::RouteTable", "Properties": { "VpcId": { "Ref": "VPC" } } }, "PrivateSubnetOneRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "RouteTableId": { "Ref": "PrivateRouteTable" }, "SubnetId": { "Ref": "PrivateSubnetOne" } } }, "PrivateSubnetTwoRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "RouteTableId": { "Ref": "PrivateRouteTable" }, "SubnetId": { "Ref": "PrivateSubnetTwo" } } }, "PrivateSubnetThreeRouteTableAssociation": { "Type": "AWS::EC2::SubnetRouteTableAssociation", "Properties": { "RouteTableId": { "Ref": "PrivateRouteTable" }, "SubnetId": { "Ref": "PrivateSubnetThree" } } }, "MMInstanceSecurityGroup": { "Type": "AWS::EC2::SecurityGroup", "Properties": { "GroupDescription": "Enable SSH access via port 22", "VpcId": { "Ref": "VPC" }, "SecurityGroupIngress": [ { "IpProtocol": "tcp", "FromPort": 22, "ToPort": 22, "CidrIp": { "Ref": "SSHLocation" } } ] } }, "MSKSecurityGroup": { "Type": "AWS::EC2::SecurityGroup", "Properties": { "GroupDescription": "Enable SSH access via port 22", "VpcId": { "Ref": "VPC" }, "SecurityGroupIngress": [ { "IpProtocol": "tcp", "FromPort": 2181, "ToPort": 2181, "SourceSecurityGroupId": { "Fn::GetAtt": [ "MMInstanceSecurityGroup", "GroupId" ] } }, { "IpProtocol": "tcp", "FromPort": 9094, "ToPort": 9094, "SourceSecurityGroupId": { "Fn::GetAtt": [ "MMInstanceSecurityGroup", "GroupId" ] } }, { "IpProtocol": "tcp", "FromPort": 9092, "ToPort": 9092, "SourceSecurityGroupId": { "Fn::GetAtt": [ "MMInstanceSecurityGroup", "GroupId" ] } } ] } }, "MMEC2Instance": { "Type": "AWS::EC2::Instance", "Properties": { "InstanceType": "m5.large", "KeyName": { "Ref": "KeyName" }, "IamInstanceProfile": { "Ref": "EC2InstanceProfile" }, "AvailabilityZone": { "Fn::Select": [ 0, { "Fn::GetAZs": { "Ref": "AWS::Region" } } ] }, "SubnetId": { "Ref": "PublicSubnetOne" }, "SecurityGroupIds": [ { "Fn::GetAtt": [ "MMInstanceSecurityGroup", "GroupId" ] } ], "ImageId": { "Fn::FindInMap": [ "RegionAMI", { "Ref": "AWS::Region" }, "HVM64" ] }, "Tags": [ { "Key": "Name", "Value": "MMInstance" } ], "UserData": { "Fn::Base64": "#!/bin/bash\nyum update -y \nyum install python3.7 -y\nyum install java-1.8.0-openjdk-devel -y\nyum erase awscli -y\ncd /home/ec2-user\necho \"export PATH=.local/bin:$PATH\" >> .bash_profile\nmkdir kafka\nmkdir mm\ncd kafka\nwget https://archive.apache.org/dist/kafka/2.2.1/kafka_2.12-2.2.1.tgz\ntar -xzf kafka_2.12-2.2.1.tgz\ncd /home/ec2-user\nwget https://bootstrap.pypa.io/get-pip.py\nsu -c \"python3.7 get-pip.py --user\" -s /bin/sh ec2-user\nsu -c \"/home/ec2-user/.local/bin/pip3 install boto3 --user\" -s /bin/sh ec2-user\nsu -c \"/home/ec2-user/.local/bin/pip3 install awscli --user\" -s /bin/sh ec2-user\nchown -R ec2-user ./kafka\nchgrp -R ec2-user ./kafka\nchown -R ec2-user ./mm\nchgrp -R ec2-user ./mm\n" } } }, "EC2Role": { "Type": "AWS::IAM::Role", "Properties": { "AssumeRolePolicyDocument": { "Version": "2012-10-17", "Statement": [ { "Sid": "", "Effect": "Allow", "Principal": { "Service": "ec2.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }, "Path": "/", "ManagedPolicyArns": [ "arn:aws:iam::aws:policy/AmazonMSKFullAccess", "arn:aws:iam::aws:policy/AWSCloudFormationReadOnlyAccess" ] } }, "EC2InstanceProfile": { "Type": "AWS::IAM::InstanceProfile", "Properties": { "InstanceProfileName": "EC2MSKCFProfile", "Roles": [ { "Ref": "EC2Role" } ] } }, "MSKMMCluster1": { "Type": "AWS::MSK::Cluster", "Properties": { "BrokerNodeGroupInfo": { "ClientSubnets": [ { "Ref": "PrivateSubnetOne" }, { "Ref": "PrivateSubnetTwo" }, { "Ref": "PrivateSubnetThree" } ], "InstanceType": "kafka.m5.large", "SecurityGroups": [ { "Fn::GetAtt": [ "MSKSecurityGroup", "GroupId" ] } ], "StorageInfo": { "EBSStorageInfo": { "VolumeSize": 2000 } } }, "ClusterName": "MSKMMCluster1", "EncryptionInfo": { "EncryptionInTransit": { "ClientBroker": "TLS", "InCluster": true } }, "EnhancedMonitoring": "PER_TOPIC_PER_BROKER", "KafkaVersion": "2.2.1", "NumberOfBrokerNodes": 3 } }, "MSKMMCluster2": { "Type": "AWS::MSK::Cluster", "Properties": { "BrokerNodeGroupInfo": { "ClientSubnets": [ { "Ref": "PrivateSubnetOne" }, { "Ref": "PrivateSubnetTwo" }, { "Ref": "PrivateSubnetThree" } ], "InstanceType": "kafka.m5.large", "SecurityGroups": [ { "Fn::GetAtt": [ "MSKSecurityGroup", "GroupId" ] } ], "StorageInfo": { "EBSStorageInfo": { "VolumeSize": 2000 } } }, "ClusterName": "MSKMMCluster2", "EncryptionInfo": { "EncryptionInTransit": { "ClientBroker": "TLS", "InCluster": true } }, "EnhancedMonitoring": "PER_TOPIC_PER_BROKER", "KafkaVersion": "2.2.1", "NumberOfBrokerNodes": 3 } } }, "Outputs": { "VPCId": { "Description": "The ID of the VPC created", "Value": { "Ref": "VPC" } }, "PublicSubnetOne": { "Description": "The name of the public subnet created", "Value": { "Ref": "PublicSubnetOne" } }, "PrivateSubnetOne": { "Description": "The ID of private subnet one created", "Value": { "Ref": "PrivateSubnetOne" } }, "PrivateSubnetTwo": { "Description": "The ID of private subnet two created", "Value": { "Ref": "PrivateSubnetTwo" } }, "PrivateSubnetThree": { "Description": "The ID of private subnet three created", "Value": { "Ref": "PrivateSubnetThree" } }, "MSKSecurityGroupID": { "Description": "The ID of the security group created for the MSK clusters", "Value": { "Fn::GetAtt": [ "MSKSecurityGroup", "GroupId" ] } }, "MMEC2InstancePublicDNS": { "Description": "The Public DNS for the MirrorMaker EC2 instance", "Value": { "Fn::GetAtt": [ "MMEC2Instance", "PublicDnsName" ] } }, "MSKMMCluster1Arn": { "Description": "The Arn for the MSKMMCluster1 MSK cluster", "Value": { "Ref": "MSKMMCluster1" } }, "MSKMMCluster2Arn": { "Description": "The Arn for the MSKMMCluster2 MSK cluster", "Value": { "Ref": "MSKMMCluster2" } } } }