기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
이 절차에서는 Debezium 소스 커넥터를 생성하는 방법을 설명합니다.
사용자 지정 플러그인 생성
Debezium
사이트에서 안정적인 최신 릴리스를 위한 MySQL 커넥터 플러그인을 다운로드합니다. 다운로드한 Debezium 릴리스 버전(버전 2.x 또는 이전 시리즈 1.x)을 기록해 둡니다. 이 절차의 뒷부분에서 사용 중인 Debezium 버전에 따라 커넥터를 생성합니다. -
AWS Secrets Manager 구성 공급자
를 다운로드하여 압축을 풉니다. -
다음 아카이브를 같은 디렉터리에 배치합니다.
-
debezium-connector-mysql
폴더 -
jcusten-border-kafka-config-provider-aws-0.1.1
폴더
-
-
이전 단계에서 생성한 디렉터리를 ZIP 파일로 압축한 다음 해당 ZIP 파일을 S3 버킷에 업로드합니다. 지침은 Amazon S3 사용 설명서에서 객체 업로드를 참조하세요.
-
다음 JSON을 복사하여 파일에 붙여넣습니다. 예:
debezium-source-custom-plugin.json
.<example-custom-plugin-name>
을 플러그인에 사용할 이름으로 바꾸고,<amzn-s3-demo-bucket-arn>
을 ZIP 파일을 업로드한 Amazon S3 버킷의 ARN으로 바꾸고,를 S3에 업로드한 ZIP 객체의 파일 키
로 바꿉니다.<file-key-of-ZIP-object>
{ "name": "
<example-custom-plugin-name>
", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<amzn-s3-demo-bucket-arn>
", "fileKey": "<file-key-of-ZIP-object>
" } } } -
JSON 파일을 저장한 폴더에서 다음 AWS CLI 명령을 실행하여 플러그인을 생성합니다.
aws kafkaconnect create-custom-plugin --cli-input-json file://
<debezium-source-custom-plugin.json>
다음 예제와 유사한 출력이 표시됩니다.
{ "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
-
다음 명령을 실행하여 플러그인 상태를 확인합니다. 상태가
CREATING
에서ACTIVE
로 변경됩니다. ARN 자리 표시자를 이전 명령의 출력에서 가져온 ARN으로 변경합니다.aws kafkaconnect describe-custom-plugin --custom-plugin-arn "
<arn-of-your-custom-plugin>
"
데이터베이스 자격 증명의 보안 암호 구성 AWS Secrets Manager 및 생성
-
https://console.aws.amazon.com/secretsmanager/
에서 Secrets Manager 콘솔을 엽니다. -
데이터베이스 로그인 보안 인증 정보를 저장할 새로운 비밀번호를 생성합니다. 지침은 AWS Secrets Manager사용 설명서에서 보안 암호 생성을 참조하세요.
-
보안 암호의 ARN을 복사합니다.
-
다음 예제 정책의 Secrets Manager 권한을 서비스 실행 역할 이해에 추가합니다.
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
를 보안 암호의 ARN으로 변경합니다.{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "
<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>
" ] } ] }IAM 권한을 추가하는 방법에 대한 지침은 IAM 사용 설명서에서 IAM ID 권한 추가 및 제거를 참조하세요.
-
구성 제공자에 대한 정보를 사용하여 사용자 지정 작업자 구성 생성
-
다음 작업자 구성 속성을 파일에 복사하여 자리 표시자 문자열을 시나리오에 해당하는 값으로 변경합니다. AWS Secrets Manager 구성 공급자의 구성 속성에 대한 자세한 내용은 플러그인 설명서에서 SecretsManagerConfigProvider
를 참조하세요. key.converter=
<org.apache.kafka.connect.storage.StringConverter>
value.converter=<org.apache.kafka.connect.storage.StringConverter>
config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
-
다음 AWS CLI 명령을 실행하여 사용자 지정 작업자 구성을 생성합니다.
다음 값을 교체합니다.
-
<my-worker-config-name>
- 사용자 지정 작업자 구성을 설명하는 이름 -
<encoded-properties-file-content-string>
- 이전 단계에서 복사한 일반 텍스트 속성의 base64 인코딩 버전
aws kafkaconnect create-worker-configuration --name
<my-worker-config-name>
--properties-file-content<encoded-properties-file-content-string>
-
-
커넥터 생성
-
사용 중인 Debezium 버전(2.x 또는 1.x)에 해당하는 다음 JSON을 복사하여 새 파일에 붙여넣습니다.
문자열을 시나리오에 해당하는 값으로 변경합니다. 서비스 실행 역할을 설정하는 방법에 대한 자세한 내용은 MSK Connect의 IAM 역할 및 정책 섹션을 참조하세요.<placeholder>
이 구성에서는 데이터베이스 보안 인증 정보를 지정할 때 일반 텍스트 대신
${secretManager:MySecret-1234:dbusername}
과 같은 변수를 사용한다는 점에 유의하세요.
를 보안 암호의 이름으로 변경하고 검색하려는 키의 이름을 입력합니다. 또한MySecret-1234
을 사용자 지정 작업자 구성의 ARN으로 변경해야 합니다.<arn-of-config-provider-worker-configuration>
Debezium 2.x 버전의 경우 다음 JSON을 복사하여 새로운 파일에 붙여넣습니다.
<placeholder>
문자열을 시나리오에 해당하는 값으로 변경합니다.{ "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "
<aurora-database-writer-instance-endpoint>
", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>
", "database.password": "<${secretManager:MySecret-1234:dbpassword}>
", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>
", "topic.prefix": "<logical-name-of-database-server>
", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>
", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>
", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>
", "vpc": { "subnets": [ "<cluster-subnet-1>
", "<cluster-subnet-2>
", "<cluster-subnet-3>
" ], "securityGroups": ["<id-of-cluster-security-group>
"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>
", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>
", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>
", "revision": 1 } } -
이전 단계에서 JSON 파일을 저장한 폴더에서 다음 AWS CLI 명령을 실행합니다.
aws kafkaconnect create-connector --cli-input-json file://connector-info.json
다음은 명령을 성공적으로 실행했을 때 표시되는 출력의 예제입니다.
{ "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }
-