View a markdown version of this page

自我管理 Kafka 叢集的 CreateReplicator API 範例 - Amazon Managed Streaming for Apache Kafka

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

自我管理 Kafka 叢集的 CreateReplicator API 範例

轉送複寫 (自我管理 Kafka 到 MSK Express)

使用下列 AWS CLI 命令來建立複寫器,將資料從自我管理的 Kafka 叢集複寫到 Amazon MSK Express 叢集。

aws kafka create-replicator \ --replicator-name my-selfmanaged-to-msk-replicator \ --description "Replicating from self-managed Kafka to MSK Express" \ --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \ --kafka-clusters '[ { "apacheKafkaCluster": { "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094", "apacheKafkaClusterId": "<self-managed-cluster-id>" }, "clientAuthentication": { "saslScram": { "mechanism": "SHA256", "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" } }, "encryptionInTransit": { "encryptionType": "TLS", "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" }, "vpcConfig": { "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"], "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"] } }, { "amazonMskCluster": { "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx" }, "vpcConfig": { "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"], "securityGroupIds": ["sg-yyyyyyyyy"] } }]' \ --replication-info-list '[{ "sourceKafkaClusterId": "<self-managed-cluster-id>", "targetKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx", "targetCompressionType": "NONE", "topicReplication": { "topicsToReplicate": [".*"], "topicNameConfiguration": {"type": "IDENTICAL"}, "startingPosition": {"type": "EARLIEST"}, "detectAndCopyNewTopics": true, "copyTopicConfigurations": true, "copyAccessControlListsForTopics": true }, "consumerGroupReplication": { "consumerGroupsToReplicate": [".*"], "detectAndCopyNewConsumerGroups": true, "synchroniseConsumerGroupOffsets": true }}]'

雙向複寫範例

若要為轉返功能設定雙向複寫,必須先建立正向和反向複寫器,並將 consumerGroupOffsetSyncMode設定為 ENHANCED。這可確保消費者群組位移的同步方式可支援任一方向的無縫切換。

使用ENHANCED偏移同步模式建立轉送複寫器 (自我管理 Kafka 到 MSK Express):

aws kafka create-replicator \ --replicator-name my-selfmanaged-to-msk-replicator \ --description "Replicating from self-managed Kafka to MSK Express" \ --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \ --kafka-clusters '[ { "apacheKafkaCluster": { "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094", "apacheKafkaClusterId": "<self-managed-cluster-id>" }, "clientAuthentication": { "saslScram": { "mechanism": "SHA256", "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" } }, "encryptionInTransit": { "encryptionType": "TLS", "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-ca-cert" }, "vpcConfig": { "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"], "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"] } }, { "amazonMskCluster": { "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx" }, "vpcConfig": { "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"], "securityGroupIds": ["sg-yyyyyyyyy"] } }]' \ --replication-info-list '[{ "sourceKafkaClusterId": "<self-managed-cluster-id>", "targetKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx", "targetCompressionType": "NONE", "topicReplication": { "topicsToReplicate": [".*"], "topicNameConfiguration": {"type": "IDENTICAL"}, "startingPosition": {"type": "EARLIEST"}, "detectAndCopyNewTopics": true, "copyTopicConfigurations": true, "copyAccessControlListsForTopics": true }, "consumerGroupReplication": { "consumerGroupsToReplicate": [".*"], "detectAndCopyNewConsumerGroups": true, "synchroniseConsumerGroupOffsets": true, "consumerGroupOffsetSyncMode": "ENHANCED" }}]'

然後使用ENHANCED偏移同步模式建立反向複寫器 (MSK Express 到自我管理的 Kafka):

aws kafka create-replicator \ --replicator-name my-msk-to-selfmanaged-replicator \ --description "Reverse replication for rollback" \ --service-execution-role-arn arn:aws:iam::123456789012:role/MSKReplicatorRole \ --kafka-clusters '[ { "amazonMskCluster": { "mskClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx" }, "vpcConfig": { "subnetIds": ["subnet-ddd","subnet-eee","subnet-fff"], "securityGroupIds": ["sg-yyyyyyyyy"] } }, { "apacheKafkaCluster": { "bootstrapBrokerString": "broker1:9094,broker2:9094,broker3:9094", "apacheKafkaClusterId": "<self-managed-cluster-id>" }, "clientAuthentication": { "saslScram": { "mechanism": "SHA256", "secretArn": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-scram-creds" } }, "encryptionInTransit": { "encryptionType": "TLS", "rootCaCertificate": "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka-ca-cert" }, "vpcConfig": { "subnetIds": ["subnet-aaa","subnet-bbb","subnet-ccc"], "securityGroupIds": ["sg-xxxxxxxxxxxxxxxxx"] } }]' \ --replication-info-list '[{ "sourceKafkaClusterArn": "arn:aws:kafka:us-east-1:123456789012:cluster/msk-express/xxx", "targetKafkaClusterId": "<self-managed-cluster-id>", "targetCompressionType": "NONE", "topicReplication": { "topicsToReplicate": [".*"], "topicNameConfiguration": {"type": "IDENTICAL"}, "startingPosition": {"type": "LATEST"}, "detectAndCopyNewTopics": true, "copyTopicConfigurations": true, "copyAccessControlListsForTopics": true }, "consumerGroupReplication": { "consumerGroupsToReplicate": [".*"], "detectAndCopyNewConsumerGroups": true, "synchroniseConsumerGroupOffsets": true, "consumerGroupOffsetSyncMode": "ENHANCED" }}]'

驗證複寫器狀態

使用 CLI describe-replicator 命令檢查複寫器的狀態:

aws kafka describe-replicator \ --replicator-arn arn:aws:kafka:us-east-1:123456789012:replicator/my-replicator/xxx

複寫器將繼續進行 CREATINGRUNNING 狀態。等待約 30 分鐘讓複寫器達到RUNNING狀態。