쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

Debezium 소스 커넥터 생성

포커스 모드
Debezium 소스 커넥터 생성 - Amazon Managed Streaming for Apache Kafka

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

이 절차에서는 Debezium 소스 커넥터를 생성하는 방법을 설명합니다.

  1. 사용자 지정 플러그인 생성
    1. Debezium 사이트에서 안정적인 최신 릴리스를 위한 MySQL 커넥터 플러그인을 다운로드합니다. 다운로드한 Debezium 릴리스 버전(버전 2.x 또는 이전 시리즈 1.x)을 기록해 둡니다. 이 절차의 뒷부분에서 사용 중인 Debezium 버전에 따라 커넥터를 생성합니다.

    2. AWS Secrets Manager 구성 공급자를 다운로드하여 압축을 풉니다.

    3. 다음 아카이브를 같은 디렉터리에 배치합니다.

      • debezium-connector-mysql 폴더

      • jcusten-border-kafka-config-provider-aws-0.1.1 폴더

    4. 이전 단계에서 생성한 디렉터리를 ZIP 파일로 압축한 다음 해당 ZIP 파일을 S3 버킷에 업로드합니다. 지침은 Amazon S3 사용 설명서에서 객체 업로드를 참조하세요.

    5. 다음 JSON을 복사하여 파일에 붙여넣습니다. 예: debezium-source-custom-plugin.json. <example-custom-plugin-name>을 플러그인에 사용할 이름으로 바꾸고, <amzn-s3-demo-bucket-arn>을 ZIP 파일을 업로드한 Amazon S3 버킷의 ARN으로 바꾸고,를 S3에 업로드한 ZIP 객체의 파일 키<file-key-of-ZIP-object>로 바꿉니다.

      { "name": "<example-custom-plugin-name>", "contentType": "ZIP", "location": { "s3Location": { "bucketArn": "<amzn-s3-demo-bucket-arn>", "fileKey": "<file-key-of-ZIP-object>" } } }
    6. JSON 파일을 저장한 폴더에서 다음 AWS CLI 명령을 실행하여 플러그인을 생성합니다.

      aws kafkaconnect create-custom-plugin --cli-input-json file://<debezium-source-custom-plugin.json>

      다음 예제와 유사한 출력이 표시됩니다.

      { "CustomPluginArn": "arn:aws:kafkaconnect:us-east-1:012345678901:custom-plugin/example-custom-plugin-name/abcd1234-a0b0-1234-c1-12345678abcd-1", "CustomPluginState": "CREATING", "Name": "example-custom-plugin-name", "Revision": 1 }
    7. 다음 명령을 실행하여 플러그인 상태를 확인합니다. 상태가 CREATING에서 ACTIVE로 변경됩니다. ARN 자리 표시자를 이전 명령의 출력에서 가져온 ARN으로 변경합니다.

      aws kafkaconnect describe-custom-plugin --custom-plugin-arn "<arn-of-your-custom-plugin>"
  2. 데이터베이스 자격 증명의 보안 암호 구성 AWS Secrets Manager 및 생성
    1. https://console.aws.amazon.com/secretsmanager/에서 Secrets Manager 콘솔을 엽니다.

    2. 데이터베이스 로그인 보안 인증 정보를 저장할 새로운 비밀번호를 생성합니다. 지침은 AWS Secrets Manager사용 설명서에서 보안 암호 생성을 참조하세요.

    3. 보안 암호의 ARN을 복사합니다.

    4. 다음 예제 정책의 Secrets Manager 권한을 서비스 실행 역할 이해에 추가합니다. <arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>를 보안 암호의 ARN으로 변경합니다.

      { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetResourcePolicy", "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret", "secretsmanager:ListSecretVersionIds" ], "Resource": [ "<arn:aws:secretsmanager:us-east-1:123456789000:secret:MySecret-1234>" ] } ] }

      IAM 권한을 추가하는 방법에 대한 지침은 IAM 사용 설명서에서 IAM ID 권한 추가 및 제거를 참조하세요.

  3. 구성 제공자에 대한 정보를 사용하여 사용자 지정 작업자 구성 생성
    1. 다음 작업자 구성 속성을 파일에 복사하여 자리 표시자 문자열을 시나리오에 해당하는 값으로 변경합니다. AWS Secrets Manager 구성 공급자의 구성 속성에 대한 자세한 내용은 플러그인 설명서에서 SecretsManagerConfigProvider를 참조하세요.

      key.converter=<org.apache.kafka.connect.storage.StringConverter> value.converter=<org.apache.kafka.connect.storage.StringConverter> config.providers.secretManager.class=com.github.jcustenborder.kafka.config.aws.SecretsManagerConfigProvider config.providers=secretManager config.providers.secretManager.param.aws.region=<us-east-1>
    2. 다음 AWS CLI 명령을 실행하여 사용자 지정 작업자 구성을 생성합니다.

      다음 값을 교체합니다.

      • <my-worker-config-name> - 사용자 지정 작업자 구성을 설명하는 이름

      • <encoded-properties-file-content-string> - 이전 단계에서 복사한 일반 텍스트 속성의 base64 인코딩 버전

      aws kafkaconnect create-worker-configuration --name <my-worker-config-name> --properties-file-content <encoded-properties-file-content-string>
  4. 커넥터 생성
    1. 사용 중인 Debezium 버전(2.x 또는 1.x)에 해당하는 다음 JSON을 복사하여 새 파일에 붙여넣습니다. <placeholder> 문자열을 시나리오에 해당하는 값으로 변경합니다. 서비스 실행 역할을 설정하는 방법에 대한 자세한 내용은 MSK Connect의 IAM 역할 및 정책 섹션을 참조하세요.

      이 구성에서는 데이터베이스 보안 인증 정보를 지정할 때 일반 텍스트 대신 ${secretManager:MySecret-1234:dbusername}과 같은 변수를 사용한다는 점에 유의하세요. MySecret-1234를 보안 암호의 이름으로 변경하고 검색하려는 키의 이름을 입력합니다. 또한 <arn-of-config-provider-worker-configuration>을 사용자 지정 작업자 구성의 ARN으로 변경해야 합니다.

      Debezium 2.x

      Debezium 2.x 버전의 경우 다음 JSON을 복사하여 새로운 파일에 붙여넣습니다. <placeholder> 문자열을 시나리오에 해당하는 값으로 변경합니다.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
      Debezium 1.x

      Debezium 1.x 버전의 경우 다음 JSON을 복사하여 새로운 파일에 붙여넣습니다. <placeholder> 문자열을 시나리오에 해당하는 값으로 변경합니다.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.server.name": "<logical-name-of-database-server>", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "database.history.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "database.history.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "database.history.consumer.security.protocol": "SASL_SSL", "database.history.consumer.sasl.mechanism": "AWS_MSK_IAM", "database.history.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "database.history.producer.security.protocol": "SASL_SSL", "database.history.producer.sasl.mechanism": "AWS_MSK_IAM", "database.history.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "database.history.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }

      Debezium 2.x 버전의 경우 다음 JSON을 복사하여 새로운 파일에 붙여넣습니다. <placeholder> 문자열을 시나리오에 해당하는 값으로 변경합니다.

      { "connectorConfiguration": { "connector.class": "io.debezium.connector.mysql.MySqlConnector", "tasks.max": "1", "database.hostname": "<aurora-database-writer-instance-endpoint>", "database.port": "3306", "database.user": "<${secretManager:MySecret-1234:dbusername}>", "database.password": "<${secretManager:MySecret-1234:dbpassword}>", "database.server.id": "123456", "database.include.list": "<list-of-databases-hosted-by-specified-server>", "topic.prefix": "<logical-name-of-database-server>", "schema.history.internal.kafka.topic": "<kafka-topic-used-by-debezium-to-track-schema-changes>", "schema.history.internal.kafka.bootstrap.servers": "<cluster-bootstrap-servers-string>", "schema.history.internal.consumer.security.protocol": "SASL_SSL", "schema.history.internal.consumer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.consumer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.consumer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "schema.history.internal.producer.security.protocol": "SASL_SSL", "schema.history.internal.producer.sasl.mechanism": "AWS_MSK_IAM", "schema.history.internal.producer.sasl.jaas.config": "software.amazon.msk.auth.iam.IAMLoginModule required;", "schema.history.internal.producer.sasl.client.callback.handler.class": "software.amazon.msk.auth.iam.IAMClientCallbackHandler", "include.schema.changes": "true" }, "connectorName": "example-Debezium-source-connector", "kafkaCluster": { "apacheKafkaCluster": { "bootstrapServers": "<cluster-bootstrap-servers-string>", "vpc": { "subnets": [ "<cluster-subnet-1>", "<cluster-subnet-2>", "<cluster-subnet-3>" ], "securityGroups": ["<id-of-cluster-security-group>"] } } }, "capacity": { "provisionedCapacity": { "mcuCount": 2, "workerCount": 1 } }, "kafkaConnectVersion": "2.7.1", "serviceExecutionRoleArn": "<arn-of-service-execution-role-that-msk-connect-can-assume>", "plugins": [{ "customPlugin": { "customPluginArn": "<arn-of-msk-connect-plugin-that-contains-connector-code>", "revision": 1 } }], "kafkaClusterEncryptionInTransit": { "encryptionType": "TLS" }, "kafkaClusterClientAuthentication": { "authenticationType": "IAM" }, "workerConfiguration": { "workerConfigurationArn": "<arn-of-config-provider-worker-configuration>", "revision": 1 } }
    2. 이전 단계에서 JSON 파일을 저장한 폴더에서 다음 AWS CLI 명령을 실행합니다.

      aws kafkaconnect create-connector --cli-input-json file://connector-info.json

      다음은 명령을 성공적으로 실행했을 때 표시되는 출력의 예제입니다.

      { "ConnectorArn": "arn:aws:kafkaconnect:us-east-1:123450006789:connector/example-Debezium-source-connector/abc12345-abcd-4444-a8b9-123456f513ed-2", "ConnectorState": "CREATING", "ConnectorName": "example-Debezium-source-connector" }
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.