本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
使用 CLI 创建复制器 AWS
使用create-replicator命令创建 MSK 复制器。在开始之前,请确保已完成创建 MSK 复制器所需的 IAM 权限并已完成准备源集群和目标集群。
注意
以下示例显示了两个 MSK 集群之间的复制。MSK Replicator 还支持在自我管理的 Apache Kafka 集群和使用 Express 代理的 MSK 预配置集群之间进行复制。有关涵盖该场景的 API 示例,请参阅CreateReplicator 自管理 Kafka 集群的 API 示例。
aws kafka create-replicator \ --replicator-name "<replicator-name>" \ --service-execution-role-arn "arn:aws:iam::<account-id>:role/<role-name>" \ --kafka-clusters '[ { "AmazonMskCluster": {"MskClusterArn": "<source-cluster-arn>"}, "VpcConfig": { "SubnetIds": ["<subnet-1>", "<subnet-2>"], "SecurityGroupIds": ["<security-group-id>"] } }, { "AmazonMskCluster": {"MskClusterArn": "<target-cluster-arn>"}, "VpcConfig": { "SubnetIds": ["<subnet-1>", "<subnet-2>"], "SecurityGroupIds": ["<security-group-id>"] } } ]' \ --replication-info-list '[ { "SourceKafkaClusterArn": "<source-cluster-arn>", "TargetKafkaClusterArn": "<target-cluster-arn>", "TopicReplication": { "TopicsToReplicate": [".*"], "CopyTopicConfigurations": true, "CopyAccessControlListsForTopics": true, "DetectAndCopyNewTopics": true, "StartingPosition": {"Type": "LATEST"}, "TopicNameConfiguration": {"Type": "PREFIXED"} }, "ConsumerGroupReplication": { "ConsumerGroupsToReplicate": [".*"], "ConsumerGroupOffsetSyncMode": "LEGACY" }, "TargetCompressionType": "NONE" } ]'
要在创建复制器时启用日志传输,请添加--log-delivery参数。以下示例允许将日志传输到亚马逊 CloudWatch 日志、亚马逊 S3 和亚马逊 Data Firehose。
aws kafka create-replicator \ --replicator-name "<replicator-name>" \ --service-execution-role-arn "arn:aws:iam::<account-id>:role/<role-name>" \ --kafka-clusters '[ { "AmazonMskCluster": {"MskClusterArn": "<source-cluster-arn>"}, "VpcConfig": { "SubnetIds": ["<subnet-1>", "<subnet-2>"], "SecurityGroupIds": ["<security-group-id>"] } }, { "AmazonMskCluster": {"MskClusterArn": "<target-cluster-arn>"}, "VpcConfig": { "SubnetIds": ["<subnet-1>", "<subnet-2>"], "SecurityGroupIds": ["<security-group-id>"] } } ]' \ --replication-info-list '[ { "SourceKafkaClusterArn": "<source-cluster-arn>", "TargetKafkaClusterArn": "<target-cluster-arn>", "TopicReplication": { "TopicsToReplicate": [".*"], "CopyTopicConfigurations": true, "CopyAccessControlListsForTopics": true, "DetectAndCopyNewTopics": true, "StartingPosition": {"Type": "LATEST"}, "TopicNameConfiguration": {"Type": "PREFIXED"} }, "ConsumerGroupReplication": { "ConsumerGroupsToReplicate": [".*"] }, "TargetCompressionType": "NONE" } ]' \ --log-delivery '{ "ReplicatorLogDelivery": { "CloudWatchLogs": { "Enabled": true, "LogGroup": "/mskr/logs/<log-group-name>" }, "S3": { "Enabled": true, "Bucket": "<s3-bucket-name>", "Prefix": "<optional-prefix>" }, "Firehose": { "Enabled": true, "DeliveryStream": "<delivery-stream-name>" } } }'
您可以启用一个或多个日志传送目标。要仅启用 Amazon CloudWatch 日志,请省略S3和Firehose字段,或将其Enabled值设置为。false有关日志传送的更多信息,请参阅MSK 复制器日志。
注意
启用日志传输后,您的 IAM 角色必须具有写入配置的日志目标所需的额外权限。有关所需权限,请参阅启用来自 AWS 服务的日志记录。
有关完整的 API 参考,请参阅CreateReplicator《亚马逊 MSK API 参考》。
选择性主题复制
在topicsToReplicate和中使用正则表达式模式topicsToExclude来控制复制哪些主题。以下示例复制了以开头的主题,prod-并排除以以下开头test-的主题:
"topicReplication": { "topicsToReplicate": ["prod-.*"], "topicsToExclude": ["test-.*"], "detectAndCopyNewTopics": true }
验证复制器状态
创建复制器后,使用describe-replicator以下命令检查其状态:
aws kafka describe-replicator \ --replicator-arn arn:aws:kafka:us-east-1:123456789012:replicator/my-replicator/xxx
复制器通过 CREATING → RUNNING 状态前进。等待大约 30 分钟,让复制器进入RUNNING状态。如果过渡到FAILED,请参阅对 Amazon MSK 复制器进行故障排除。