Apache Kafka를 AWS Database Migration Service 대상으로 사용 - AWS Database Migration Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Kafka를 AWS Database Migration Service 대상으로 사용

AWS DMS를 사용하여 데이터를 Apache Kafka 클러스터로 마이그레이션할 수 있습니다. Apache Kafka는 는 분산 스트리밍 플랫폼입니다. Apache Kafka를 사용하여 스트리밍 데이터를 실시간으로 수집 및 처리할 수 있습니다.

AWSAmazon Managed Streaming for Apache Kafka (Amazon MSK) 도 제공합니다.AWS DMS대상. Amazon MSK는 Apache Kafka 인스턴스의 구현 및 관리를 간소화하는 완전 관리형 Apache Kafka 스트리밍 서비스입니다. 오픈 소스 Apache Kafka 버전에서 작동하며 다음과 같이 Amazon MSK 인스턴스에 액세스할 수 있습니다.AWS DMS타겟은 다른 아파치 카프카 인스턴스와 똑같습니다. 자세한 내용을 알아보려면 다음 섹션을 참조하세요.Amazon MSK는 무엇입니까?에서Amazon Managed Streaming for Apache Kafka 개발자 가이드.

Kafka 클러스터는 파티션으로 구분된 주제라는 범주에 레코드 스트림을 저장합니다. 파티션주제에서 고유하게 식별되는 데이터 레코드 (메시지) 시퀀스입니다. 파티션은 클러스터의 여러 브로커에 분산되어 주제 레코드를 parallel 처리할 수 있습니다. Apache Kafka의 주제 및 파티션과 분산에 대한 자세한 내용은 주제 및 로그분산을 참조하십시오.

AWS Database Migration ServiceJSON을 사용하여 Kafka 주제에 레코드를 게시합니다. 변환 중 AWS DMS는 각 레코드를 원본 데이터베이스로부터 JSON 형식의 속성-값 페어로 직렬화합니다.

지원되는 모든 데이터 소스에서 대상 Kafka 클러스터로 데이터를 마이그레이션하려면 객체 매핑을 사용합니다. 객체 매핑을 사용하여 대상 주제의 데이터 레코드를 구조화하는 방법을 결정합니다. 또한 각 테이블에 대한 파티션 키를 정의합니다. Apache Kafka가 이러한 테이블을 사용해 데이터를 파티션으로 그룹화합니다.

현재는AWS DMS작업당 하나의 주제만 지원합니다. 여러 테이블이 있는 단일 작업의 경우 모든 메시지가 단일 주제로 이동합니다. 각 메시지에는 대상 스키마와 테이블을 식별하는 메타데이터 섹션이 포함되어 있습니다.

Apache Kafka 엔드포인트 설정

의 엔드포인트 설정을 통해 연결 세부 정보를 지정할 수 있습니다.AWS DMS콘솔 또는--kafka-settingsCLI의 옵션입니다. 각 설정의 요구 사항은 다음과 같습니다.

  • Broker— Kafka 클러스터에 하나 이상의 브로커 위치를 쉼표로 구분된 각 브로커 목록 형식으로 지정합니다.broker-hostname:port. 예를 들면, "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"입니다. 이 설정은 클러스터의 일부 또는 모든 브로커의 위치를 지정할 수 있습니다. 모든 클러스터 브로커는 주제로 마이그레이션된 데이터 레코드의 분할을 처리하기 위해 통신합니다.

  • Topic— (선택 사항) 주제 이름을 최대 길이 255자 및 기호로 지정합니다. 마침표(.), 밑줄(_) 및 빼기(-)를 사용할 수 있습니다. 마침표(.) 또는 밑줄(_)이 있는 주제 이름이 내부 데이터 구조에서 충돌할 수 있습니다. 주제 이름에 이러한 기호 중 하나만 사용하고 둘 다 사용하지는 마십시오. 주제 이름을 지정하지 않으면AWS DMS사용"kafka-default-topic"마이그레이션 주제로.

    참고

    AWS DMS가 지정된 마이그레이션 주제 또는 기본 주제를 생성하도록 하려면 Kafka 클러스터 구성 중에 auto.create.topics.enable = true를 설정합니다. 자세한 내용을 알아보려면 다음 섹션을 참조하세요.Apache Kafka를 AWS Database Migration Service용 대상으로 사용할 때 적용되는 제한 사항

  • MessageFormat— 엔드포인트에 생성된 레코드의 출력 형식입니다. 메시지 형식은 JSON(기본값) 또는 JSON_UNFORMATTED(탭이 없는 한 줄)입니다.

  • MessageMaxBytes— 엔드포인트에 생성된 레코드의 최대 크기 (바이트) 입니다. 기본값은 1,000,000입니다.

    참고

    만 사용할 수 있습니다.AWS변경할 CLI/SDKMessageMaxBytes기본이 아닌 값으로 설정합니다. 예를 들어, 기존 Kafka 엔드포인트를 수정하고 변경하려면MessageMaxBytes다음 명령을 사용합니다.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails— 소스 데이터베이스의 자세한 트랜잭션 정보를 제공합니다. 이 정보에는 커밋 타임스탬프, 로그 위치 및 transaction_id, previous_transaction_id, transaction_record_id(트랜잭션 내의 레코드 오프셋)에 대한 값이 포함됩니다. 기본값은 false입니다.

  • IncludePartitionValue— 파티션 유형이 이 아닌 경우 Kafka 메시지 출력에 파티션 값을 표시합니다.schema-table-type. 기본값은 false입니다.

  • PartitionIncludeSchemaTable— 파티션 유형이 인 경우 스키마 및 테이블 이름을 파티션 값에 접두사로 지정합니다.primary-key-type. 이렇게 하면 Kafka 파티션 간의 데이터 분산이 증가합니다. 예를 들어 SysBench 스키마에 수천 개의 테이블이 있고 각 테이블에 기본 키의 범위가 제한되어 있다고 가정하겠습니다. 이 경우 동일한 기본 키가 수천 개의 테이블에서 동일한 파티션으로 전송되어 제한이 발생합니다. 기본값은 false입니다.

  • IncludeTableAlterOperations— 다음과 같이 제어 데이터의 테이블을 변경하는 모든 데이터 정의 언어 (DDL) 작업을 포함합니다.rename-table,drop-table,add-column,drop-column, 및rename-column. 기본값은 false입니다.

  • IncludeControlDetails— Kafka 메시지 출력에서 테이블 정의, 열 정의, 테이블 및 열 변경 사항에 대한 자세한 제어 정보를 표시합니다. 기본값은 false입니다.

  • IncludeNullAndEmpty— 대상에 NULL 및 빈 열을 포함합니다. 기본값은 false입니다.

  • SecurityProtocol— 전송 계층 보안 (TLS) 을 사용하여 Kafka 대상 엔드포인트에 대한 보안 연결을 설정합니다. 옵션에는 ssl-authentication, ssl-encryptionsasl-ssl이 포함됩니다. 사용sasl-ssl가 필요합니다SaslUsernameSaslPassword.

설정을 사용하여 전송 속도를 높일 수 있습니다. 그렇게 하려면 다음을 수행하십시오.AWS DMSApache Kafka 타겟 클러스터에 대한 멀티스레드 전체 로드를 지원합니다.AWS DMS는 다음을 포함하는 작업 설정과 함께 이 멀티스레딩을 지원합니다.

  • MaxFullLoadSubTasks— 이 옵션을 사용하면 병렬로 로드할 최대 소스 테이블 수를 지정할 수 있습니다.AWS DMS전용 하위 작업을 사용하여 각 테이블을 해당 Kafka 대상 테이블에 로드합니다. 기본값은 8이며, 최대값은 49입니다.

  • ParallelLoadThreads— 이 옵션을 사용하여 다음과 같은 스레드 수를 지정할 수 있습니다.AWS DMS각 테이블을 Kafka 대상 테이블에 로드하는 데 사용합니다. Apache Kafka 대상의 최대 값은 32입니다. 이 최대 한도를 증가시키도록 요청할 수 있습니다.

  • ParallelLoadBufferSize— 이 옵션을 사용하여 parallel 로드 스레드가 Kafka 대상에 데이터를 로드하는 데 사용하는 버퍼에 저장할 최대 레코드 수를 지정합니다. 기본값은 50입니다. 최대값은 1,000입니다. 이 설정은 ParallelLoadThreads와 함께 사용하십시오. ParallelLoadBufferSize는 둘 이상의 스레드가 있는 경우에만 유효합니다.

  • ParallelLoadQueuesPerThread— 이 옵션을 사용하여 각 동시 스레드가 대기열에서 데이터 레코드를 제거하고 대상에 대한 배치 로드를 생성하기 위해 액세스하는 대기열 수를 지정합니다. 기본값은 1입니다. 최대값은 512입니다.

parallel 스레드 및 대량 작업에 대한 작업 설정을 조정하여 Kafka 엔드포인트에 대한 변경 데이터 캡처 (CDC) 성능을 개선할 수 있습니다. 이렇게 하려면 ParallelApply* 작업 설정을 사용하여 동시 스레드 수, 스레드당 대기열 및 버퍼에 저장할 레코드 수를 지정해야 합니다. 예를 들어 CDC 로드를 수행하고 128개의 스레드를 병렬로 적용한다고 가정하겠습니다. 또한 버퍼당 50개 레코드가 저장된 스레드당 64개 대기열에 액세스하려고 합니다.

CDC 성능을 승격하기 위해 AWS DMS에서는 다음 작업 설정을 지원합니다.

  • ParallelApplyThreads— 다음 동시 스레드의 수를 지정합니다.AWS DMSCDC 로드 중에 데이터 레코드를 Kafka 대상 엔드포인트로 푸시하는 데 사용합니다. 기본값은 0이고 최대값은 32입니다.

  • ParallelApplyBufferSize— CDC 로드 중에 동시 스레드가 Kafka 대상 엔드포인트로 푸시할 수 있도록 각 버퍼 대기열에 저장할 최대 레코드 수를 지정합니다. 기본값은 100이고 최댓값은 1,000입니다. ParallelApplyThreads가 둘 이상의 스레드를 지정할 때 이 옵션을 사용합니다.

  • ParallelApplyQueuesPerThread— CDC 중에 각 스레드가 대기열에서 데이터 기록을 제거하고 Kafka 엔드포인트에 대한 배치 로드를 생성하기 위해 액세스하는 대기열 수를 지정합니다. 기본값은 1입니다. 최대값은 512입니다.

ParallelApply* 작업 설정을 사용할 때 partition-key-type 기본값은 테이블의 schema-name.table-name가 아니라 primary-key입니다.

전송 계층 보안 (TLS) 을 사용하여 Kafka에 연결

Kafka 클러스터는 전송 계층 보안 (TLS) 을 사용하여 보안 연결을 허용합니다. DMS를 사용하면 다음 세 가지 보안 프로토콜 옵션 중 하나를 사용하여 Kafka 엔드포인트 연결을 보호할 수 있습니다.

SSL 암호화 (SSL)server-encryption)

클라이언트는 서버의 인증서를 통해 서버 ID를 확인합니다. 그런 다음 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

SSL 인증 (SSL 인증mutual-authentication)

서버와 클라이언트는 자체 인증서를 통해 서로 ID를 확인합니다. 그런 다음 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

사슬-SSL ()mutual-authentication)

단순 인증 및 보안 계층 (SASL) 메서드는 클라이언트의 인증서를 사용자 이름 및 암호로 대체하여 클라이언트 ID를 확인합니다. 특히 서버가 클라이언트의 ID를 확인할 수 있도록 서버에서 등록한 사용자 이름과 암호를 입력합니다. 그런 다음 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

중요

아파치 카프카와 아마존 MSK는 확인된 인증서를 수락합니다. 이는 해결해야 할 카프카와 아마존 MSK의 알려진 한계입니다. 자세한 내용을 알아보려면 다음 섹션을 참조하세요.아파치 카프카 이슈, KAFKA-3700.

Amazon MSK를 사용하는 경우 이러한 알려진 제한에 대한 해결책으로 액세스 제어 목록 (ACL) 을 사용하는 것을 고려해 보십시오. ACL에 대한 자세한 내용은 단원을 참조하십시오.Apache Kafka섹션Amazon Managed Streaming for Apache Kafka 개발자 가이드.

자체 관리형 Kafka 클러스터를 사용하는 경우 단원을 참조하십시오.댓글 날짜: 10월 21일/18일클러스터 구성에 대한 자세한 내용을 참조하십시오.

Amazon MSK 또는 자체 관리형 Kafka 클러스터에서 SSL 암호화 사용

SSL 암호화를 사용하여 Amazon MSK 또는 자체 관리형 Kafka 클러스터에 대한 엔드포인트 연결을 보호할 수 있습니다. SSL 암호화 인증 방법을 사용하는 경우 클라이언트는 서버의 인증서를 통해 서버의 ID를 확인합니다. 그런 다음 서버와 클라이언트 간에 암호화된 연결이 이루어집니다.

SSL 암호화를 사용하여 Amazon MSK에 연결하려면

  • 보안 프로토콜 엔드포인트 설정 설정 (SecurityProtocol) 사용ssl-encryption대상 Kafka 엔드포인트를 생성할 때 사용할 수 있는 옵션입니다.

    다음은 보안 프로토콜을 SSL 암호화로 설정하는 JSON 예제 코드입니다.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }

자체 관리형 Kafka 클러스터에 SSL 암호화 사용

  1. 온프레미스 Kafka 클러스터에서 사설 인증 기관 (CA) 을 사용하는 경우 사설 CA 인증서를 업로드하고 Amazon 리소스 이름 (ARN) 을 받으십시오.

  2. 보안 프로토콜 엔드포인트 설정 설정 (SecurityProtocol) 사용ssl-encryption대상 Kafka 엔드포인트를 생성할 때 사용할 수 있는 옵션입니다. 다음 JSON 예제는 보안 프로토콜을 다음과 같이 설정합니다.ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. 사설 CA를 사용하는 경우SslCaCertificateArnARN에서 위의 첫 번째 단계를 완료했습니다.

SSL 인증 사용

SSL 인증을 사용하여 Amazon MSK 또는 자체 관리형 Kafka 클러스터에 대한 엔드포인트 연결을 보호할 수 있습니다.

Amazon MSK에 연결하기 위해 SSL 인증을 사용한 클라이언트 인증 및 암호화를 활성화하려면 다음을 수행하십시오.

  • Kafka의 프라이빗 키와 퍼블릭 인증서를 준비하세요.

  • 인증서를 DMS 인증서 관리자에 업로드합니다.

  • Kafka 엔드포인트 설정에 지정된 해당 인증서 ARN을 사용하여 Kafka 대상 엔드포인트를 생성합니다.

Amazon MSK의 프라이빗 키와 퍼블릭 인증서를 준비하려면 다음을 수행하십시오.

  1. EC2 인스턴스를 만들고 의 1~9단계에 설명된 대로 인증을 사용하도록 클라이언트를 설정합니다.클라이언트 인증섹션Amazon Managed Streaming for Apache Kafka 개발자 가이드.

    이러한 단계를 완료하면 인증서-ARN (ACM에 저장된 공개 인증서 ARN) 과 여기에 포함된 개인 키가 생성됩니다.kafka.client.keystore.jksfile.

  2. 공개 인증서를 가져오고 인증서를signed-certificate-from-acm.pem파일, 다음 명령을 사용합니다.

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    이 명령은 다음 예제와 유사한 정보를 반환합니다.

    {"Certificate": "123", "CertificateChain": "456"}

    그런 다음 이에 해당하는 내용을 복사합니다."123"signed-certificate-from-acm.pemfile.

  3. 을 (를) 가져와서 개인 키를 가져옵니다.msk-rsa키 보낸 사람kafka.client.keystore.jks to keystore.p12다음 예에서와 같습니다.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. 다음 명령을 사용하여 내보냅니다.keystore.p12.pem형식

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    PEM 암호문 입력메시지가 나타나고 인증서를 암호화하는 데 적용되는 키를 식별합니다.

  5. 수하물 특성 및 주요 특성을 에서 제거하십시오..pem파일을 사용하여 첫 번째 줄이 다음 문자열로 시작하는지 확인하십시오.

    ---BEGIN ENCRYPTED PRIVATE KEY---

공개 인증서 및 개인 키를 DMS 인증서 관리자에 업로드하고 Amazon MSK에 대한 연결을 테스트하려면

  1. 다음 명령을 사용하여 DMS 인증서 관리자에 업로드합니다.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Amazon MSK 대상 엔드포인트를 만들고 연결을 테스트하여 TLS 인증이 작동하는지 확인합니다.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
중요

SSL 인증을 사용하여 자체 관리형 Kafka 클러스터에 대한 연결을 보호할 수 있습니다. 경우에 따라 온프레미스 Kafka 클러스터에서 사설 CA (인증 기관) 를 사용할 수 있습니다. 그렇다면 CA 체인, 공개 인증서 및 개인 키를 DMS 인증서 관리자에게 업로드하세요. 그런 다음 온프레미스 Kafka 대상 엔드포인트를 생성할 때 엔드포인트 설정에서 해당 Amazon 리소스 이름 (ARN) 을 사용하십시오.

자체 관리형 Kafka 클러스터에 프라이빗 키 및 서명된 인증서를 준비하려면

  1. 다음 예제와 같이 key pair 생성합니다.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. 인증서 서명 요청 (CSR) 을 생성합니다.

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. 클러스터 신뢰 저장소의 CA를 사용하여 CSR에 서명합니다. CA가 없는 경우 자체 사설 CA를 만들 수 있습니다.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. 임포트ca-cert서버 신뢰 저장소 및 키 저장소에 신뢰 저장소가 없는 경우에는 다음 명령을 사용하여 신뢰 저장소를 생성하고 가져오세요.ca-cert 그것 속으로.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. 인증서에 서명합니다.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. 서명된 인증서를 키 저장소로 가져옵니다.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. 다음 명령을 사용하여 다음을 가져옵니다.on-premise-rsa키 보낸 사람kafka.server.keystore.jkskeystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. 다음 명령을 사용하여 내보냅니다.keystore.p12.pem형식

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. 업로드encrypted-private-server-key.pem,signed-certificate.pem, 및ca-certDMS 인증서 관리자에게

  10. 반환된 ARN을 사용하여 엔드포인트를 생성합니다.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

SASL-SSL 인증을 사용하여 아마존 MSK에 연결

단순 인증 및 보안 계층 (SASL) 방법은 사용자 이름과 암호를 사용하여 클라이언트 ID를 확인하고 서버와 클라이언트 간에 암호화된 연결을 만듭니다.

SASL을 사용하려면 먼저 Amazon MSK 클러스터를 설정할 때 안전한 사용자 이름과 암호를 생성해야 합니다. Amazon MSK 클러스터의 보안 사용자 이름과 암호를 설정하는 방법에 대한 설명은 을 참조하십시오.아마존 MSK 클러스터에 대한 SASL/SCRAM 인증 설정에서Amazon Managed Streaming for Apache Kafka 개발자 가이드.

그런 다음 Kafka 대상 엔드포인트를 생성할 때 보안 프로토콜 엔드포인트 설정을 설정합니다 (SecurityProtocol) 사용sasl-ssl옵션을 생성합니다. 당신도 설정했습니다SaslUsernameSaslPassword옵션을 생성합니다. 다음 JSON 예제에 표시된 것처럼 Amazon MSK 클러스터를 처음 설정할 때 생성한 보안 사용자 이름 및 암호와 일치하는지 확인하십시오.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
참고

현재는AWS DMS공용 CA 지원 SASL/SSL만 지원합니다. DMS는 사설 CA에서 지원하는 자체 관리형 카프카와 함께 사용하기 위한 SASL/SSL을 지원하지 않습니다.

SASL/SSL 인증의 경우AWS DMSSCRAM-SHA-512 메커니즘만 지원합니다.

Apache Kafka 대상의 경우 이전 이미지를 사용하여 CDC 행의 원래 값 보기

Kafka 같은 데이터 스트리밍 대상에 CDC 업데이트를 작성하는 경우 업데이트를 통한 변경 전에 소스 데이터베이스 행의 원래 값을 볼 수 있습니다. 이를 위해 AWS DMS는 소스 데이터베이스 엔진에서 제공되는 데이터를 기반으로 업데이트 이벤트의 이전 이미지를 채웁니다.

소스 데이터베이스 엔진에 따라 이전 이미지에 대해 서로 다른 양의 정보를 제공합니다.

  • Oracle은 열이 변경되는 경우에만 열에 대한 업데이트를 제공합니다.

  • PostgreSQL은 기본 키의 일부인 열에 대한 데이터(변경 여부에 관계 없음)만 제공합니다. 논리적 복제가 사용 중이고 원본 테이블에 REPLICA IDENTITY FULL이 설정되어 있는 경우, WAL에 기록되고 여기에서 제공되는 행에 대한 전체 전후 정보를 얻을 수 있습니다.

  • MySQL은 일반적으로 모든 열에 대한 데이터(변경 여부에 관계 없음)를 제공합니다.

이전 이미징을 활성화하여 소스 데이터베이스의 원래 값을 AWS DMS 출력에 추가하려면 BeforeImageSettings 작업 설정 또는 add-before-image-columns 파라미터를 사용합니다. 이 파라미터는 열 변환 규칙을 적용합니다.

BeforeImageSettings는 다음과 같이 소스 데이터베이스 시스템에서 수집된 값을 사용하여 모든 업데이트 작업에 새 JSON 속성을 추가합니다.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
참고

전체 로드와 CDC 작업(기존 데이터 마이그레이션 및 지속적인 변경 사항 복제) 또는 CDC 전용 작업(데이터 변경 사항만 복제)에 BeforeImageSettings를 적용합니다. 전체 로드 전용 작업에는 BeforeImageSettings를 적용하지 마십시오.

BeforeImageSettings 옵션의 경우 다음이 적용됩니다.

  • 이전 이미징을 활성화하려면 EnableBeforeImage 옵션을 true로 설정합니다. 기본값은 false입니다.

  • FieldName 옵션을 사용하여 새 JSON 속성에 이름을 지정합니다. EnableBeforeImagetrue인 경우 FieldName이 필수이며 비워 둘 수 없습니다.

  • ColumnFilter 옵션은 이전 이미징을 사용하여 추가할 열을 지정합니다. 테이블 기본 키의 일부인 열만 추가하려면 기본값 pk-only를 사용하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 사용하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 사용합니다.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

이전 이미지 변환 규칙 사용

작업 설정 대신 열 변환 규칙을 적용하는 add-before-image-columns 파라미터를 사용할 수 있습니다. 이 파라미터를 사용하면 Kafka 같은 데이터 스트리밍 대상에서 CDC 중에 이전 이미징을 활성화할 수 있습니다.

변환 규칙에 add-before-image-columns를 사용하면 이전 이미지 결과를 보다 세밀하게 제어할 수 있습니다. 변환 규칙을 통해 객체 로케이터를 사용하여 규칙에 대해 선택한 테이블을 제어할 수 있습니다. 또한 변환 규칙을 함께 연결하여 테이블마다 서로 다른 규칙을 적용할 수 있습니다. 그런 다음 다른 규칙을 사용하여 생성된 열을 조작할 수 있습니다.

참고

동일한 작업 내에서 BeforeImageSettings 작업 설정과 함께 add-before-image-columns 파라미터를 사용해서는 안 됩니다. 단일 작업에는 이 파라미터 또는 설정 중 하나만 사용하고 둘 다 사용하지 마십시오.

해당 열에 대해 add-before-image-columns 파라미터가 있는 transformation 규칙 유형이 before-image-def 섹션을 제공해야 합니다. 다음은 그 한 예입니다.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix의 값은 열 이름 앞에 추가되며, column-prefix의 기본값은 BI_입니다. column-suffix의 값은 열 이름 뒤에 추가되며, 기본값은 비어 있습니다. column-prefixcolumn-suffix를 모두 빈 문자열로 설정하지 마십시오.

column-filter에 대해 하나의 값을 선택합니다. 테이블 기본 키의 일부인 열만 추가하려면 pk-only를 선택하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 선택하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 선택합니다.

이전 이미지 변환 규칙의 예

다음 예의 변환 규칙은 대상에서 BI_emp_no라는 새 열을 추가합니다. UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 같은 문은 BI_emp_no 필드를 1로 채웁니다. Amazon S3 타겟에 CDC 업데이트를 작성하면BI_emp_no열을 사용하면 어떤 원래 행이 업데이트되었는지 알 수 있습니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns 규칙 작업 사용에 대한 자세한 내용은 변환 규칙 및 작업 단원을 참조하십시오.

Apache Kafka를 AWS Database Migration Service용 대상으로 사용할 때 적용되는 제한 사항

Apache Kafka를 대상으로 사용할 때 다음 제한 사항이 적용됩니다.

  • AWS DMSKafka 대상 엔드포인트는 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 에 대한 IAM 액세스 제어를 지원하지 않습니다.

  • 두 가지를 모두 구성하십시오.AWS DMSAmazon VPC 기반 VPC 및 동일한 보안 그룹에 있는 복제 인스턴스와 Kafka 클러스터 Kafka 클러스터는 Amazon MSK 인스턴스이거나 Amazon EC2에서 실행되는 자체 Kafka 인스턴스일 수 있습니다. 자세한 내용은 복제 인스턴스용으로 네트워크 설정을 참조하세요.

    참고

    Amazon MSK의 보안 그룹을 지정하려면클러스터 생성페이지, 선택고급 설정를 선택합니다.사용자 지정 설정를 누르고 보안 그룹을 선택하거나 복제 인스턴스와 동일한 경우 기본값을 그대로 사용하십시오.

  • 전체 LOB 모드는 지원되지 않습니다.

  • AWS DMS가 자동으로 새 주제를 만들 수 있는 속성을 사용하여 클러스터에 대한 Kafka 구성 파일을 지정합니다. auto.create.topics.enable = true 설정을 포함합니다. Amazon MSK를 사용하는 경우 Kafka 클러스터를 생성할 때 기본 구성을 지정한 다음auto.create.topics.enable로 설정true. 기본 구성 설정에 대한 자세한 내용은 단원을 참조하십시오.Amazon MSK의 기본 구성에서Amazon Managed Streaming for Apache Kafka 개발자 가이드. Amazon MSK를 사용하여 생성한 기존 Kafka 클러스터를 수정해야 하는 경우AWS CLI명령aws kafka create-configuration다음 예와 같이 Kafka 구성을 업데이트하십시오.

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    여기서, //~/kafka_configuration는 필요한 속성 설정으로 만든 구성 파일입니다.

    Amazon EC2 설치된 자체 Kafka 인스턴스를 사용하는 경우 다음과 같은 유사한 속성 설정으로 Kafka 클러스터 구성을 수정하십시오.auto.create.topics.enable = true, 인스턴스와 함께 제공된 옵션 사용

  • AWS DMS트랜잭션과 관계없이 각 업데이트를 소스 데이터베이스의 단일 레코드에 지정된 Kafka 주제의 하나의 데이터 레코드 (메시지) 로 게시합니다.

  • AWS DMS는 다음과 같이 두 가지 형식의 파티션 키를 지원합니다.

    • SchemaName.TableName: 스키마와 테이블 이름의 조합입니다.

    • ${AttributeName}: JSON 내 한 필드의 값 또는 원본 데이터베이스 내 테이블의 기본 키입니다.

  • BatchApplyKafka 엔드포인트에는 지원되지 않습니다. Batch 적용 사용 (예:BatchApplyEnabledKafka 대상에 대한 대상 메타데이터 (작업 설정) 로 인해 데이터가 손실될 수 있습니다.

  • AWS DMS값을 마이그레이션합니다.BigInt과학 표기법 형식이 16자리 이상인 데이터 유형입니다.

객체 매핑을 사용하여 Kafka 주제로 데이터 이전

AWS DMS테이블 매핑 규칙을 사용하여 원본의 데이터를 대상 Kafka 주제에 매핑합니다. 대상 주제에 데이터를 매핑하려면 객체 매핑이라는 테이블 매핑 규칙 유형을 사용합니다. 객체 매핑을 사용하여 원본의 데이터 레코드를 Kafka 주제에 게시된 데이터 레코드로 매핑하는 방법을 지정합니다.

Kafka 주제에는 파티션 키 외에 별다른 사전 설정 구조가 없습니다.

참고

개체 매핑을 사용할 필요는 없습니다. 다양한 변환에 일반 테이블 매핑을 사용할 수 있습니다. 하지만 파티션 키 유형은 다음과 같은 기본 동작을 따릅니다.

  • 기본 키는 Full Load의 파티션 키로 사용됩니다.

  • 병렬 적용 작업 설정을 사용하지 않는 경우schema.tableCDC의 파티션 키로 사용됩니다.

  • 병행 적용 작업 설정을 사용하는 경우 CDC의 파티션 키로 기본 키가 사용됩니다.

객체 매핑 규칙을 만들려면 rule-typeobject-mapping으로 지정합니다. 이 규칙은 사용할 객체 매핑 유형을 지정합니다.

이 규칙의 구조는 다음과 같습니다.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS는 현재 map-record-to-recordmap-record-to-document만을 rule-action 파라미터의 유효한 값으로 지원합니다. map-record-to-recordmap-record-to-document 값은 exclude-columns 속성 목록의 일부로 제외되지 않은 레코드에 대해 AWS DMS가 기본적으로 수행하는 작업을 지정합니다. 이러한 값은 어떤 식으로든 속성 매핑에 영향을 미치지 않습니다.

관계형 데이터베이스에서 Kafka 주제로 마이그레이션할 때 map-record-to-record를 사용합니다. 이러한 규칙 유형은 Kafka 주제의 파티션 키로 관계형 데이터베이스의 taskResourceId.schemaName.tableName 값을 사용하며 소스 데이터베이스의 각 열마다 속성을 생성합니다. 사용 시map-record-to-record, 에 나열되지 않은 소스 테이블의 모든 열에 대해exclude-columns속성 목록,AWS DMS대상 주제에 해당 속성을 생성합니다. 이러한 해당 속성은 원본 열이 속성 매핑에 사용되는지 여부와 상관없이 생성됩니다.

map-record-to-record를 이해하는 한 가지 방법은 작업 중일 때 관찰하는 것입니다. 이 예에서는 다음 구조와 데이터를 사용하여 관계형 데이터베이스 테이블 행에서 시작한다고 가정합니다.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

1988/02/29

이 정보를 Test라는 스키마에서 Kafka 주제로 마이그레이션하려면 데이터를 대상 주제로 매핑하는 규칙을 생성합니다. 다음 규칙은 그 매핑 과정을 보여 줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Kafka 주제와 파티션 키가 주어지면 (이 경우taskResourceId.schemaName.tableName) 에서, 다음은 Kafka 대상 주제의 샘플 데이터를 사용하여 결과 레코드 형식을 보여줍니다.

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

속성 매핑으로 날짜 재구성

속성 맵을 사용하여 데이터를 Kafka 주제로 마이그레이션하는 동안 데이터를 재구성할 수 있습니다. 예를 들어 원본의 필드 몇 개를 대상의 단일 필드로 묶어야 하는 경우도 있을 것입니다. 다음의 속성 맵은 날짜를 재구성하는 방법을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key에 상수 값을 설정하려면 partition-key 값을 지정합니다. 예를 들어 이 방법을 통해 모든 데이터를 단일 파티션에 저장할 수 있습니다. 다음 매핑은 이러한 접근 방식을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
참고

특정 테이블을 위한 제어 레코드에 대한 partition-key 값은 TaskId.SchemaName.TableName입니다. 특정 작업을 위한 제어 레코드에 대한 partition-key 값은 해당 레코드의 TaskId입니다. partition-key 값을 객체 매핑에 지정해도 제어 레코드에 대한 partition-key에는 영향이 없습니다.

객체 매핑을 사용한 다중 주제 복제

기본적으로AWS DMS태스크는 모든 소스 데이터를 다음 Kafka 주제 중 하나로 마이그레이션합니다.

  • 에 지정된 대로주제필드AWS DMS대상 엔드포인트.

  • 에서 지정한 대로kafka-default-topic만약주제대상 엔드포인트의 필드가 채워지지 않고 Kafka가 입력되었습니다.auto.create.topics.enable설정이 로 설정됩니다.true.

다음으로 바꿉니다.AWS DMS엔진 버전 3.4.6 이상에서는 다음을 사용할 수 있습니다.kafka-target-topic마이그레이션된 각 원본 테이블을 별도의 주제에 매핑하기 위한 속성입니다. 예를 들어, 다음의 객체 매핑 규칙은 원본 테이블을 마이그레이션합니다.CustomerAddress카프카 주제로customer_topicaddress_topic, 각각. 이와 동시에AWS DMS다음을 포함한 다른 모든 소스 테이블을 마이그레이션합니다.Bills에 있는 테이블Test대상 엔드포인트에 지정된 주제에 대한 스키마

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "MapToKafka1", "rule-action": "map-record-to-record", "kafka-target-topic": "customer_topic", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "partition-key": {"value": "ConstantPartitionKey" } }, { "rule-type": "object-mapping", "rule-id": "3", "rule-name": "MapToKafka2", "rule-action": "map-record-to-record", "kafka-target-topic": "address_topic", "object-locator": { "schema-name": "Test", "table-name": "Address" }, "partition-key": {"value": "HomeAddress" } }, { "rule-type": "object-mapping", "rule-id": "4", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Bills" } } ] }

Kafka 다중 주제 복제를 사용하면 단일 복제 작업을 사용하여 소스 테이블을 그룹화하고 별도의 Kafka 주제로 마이그레이션할 수 있습니다.

Apache Kafka 메시지 형식

JSON 출력은 단지 키-값 페어의 목록입니다.

RecordType

레코드 유형은 데이터나 제어일 수 있습니다. 데이터 레코드는 원본 내의 실제 행을 나타냅니다. 제어 레코드는 스트림 내의 중요 이벤트를 나타냅니다(예: 작업의 재시작).

작업

데이터 레코드의 경우, 작업은 load, insert, update 또는 delete일 수 있습니다.

제어 레코드의 경우 작업이 다음과 같을 수 있습니다.create-table,rename-table,drop-table,change-columns,add-column,drop-column,rename-column, 또는column-type-change.

SchemaName

레코드에 대한 원본 스키마입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

TableName

레코드에 대한 원본 테이블입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

타임스탬프

JSON 메시지가 구성된 경우의 타임스탬프입니다. 이 필드는 ISO 8601 형식으로 구성되었습니다.

다음 JSON 메시지 예제는 모든 추가 메타데이터가 포함된 데이터 유형 메시지를 보여줍니다.

{ "data":{ "id":100000161, "fname":"val61s", "lname":"val61s", "REGION":"val61s" }, "metadata":{ "timestamp":"2019-10-31T22:53:59.721201Z", "record-type":"data", "operation":"insert", "partition-key-type":"primary-key", "partition-key-value":"sbtest.sbtest_x.100000161", "schema-name":"sbtest", "table-name":"sbtest_x", "transaction-id":9324410911751, "transaction-record-id":1, "prev-transaction-id":9324410910341, "prev-transaction-record-id":10, "commit-timestamp":"2019-10-31T22:53:55.000000Z", "stream-position":"mysql-bin-changelog.002171:36912271:0:36912333:9324410911751:mysql-bin-changelog.002171:36912209" } }

다음 JSON 메시지 예제는 제어 유형 메시지를 보여줍니다.

{ "control":{ "table-def":{ "columns":{ "id":{ "type":"WSTRING", "length":512, "nullable":false }, "fname":{ "type":"WSTRING", "length":255, "nullable":true }, "lname":{ "type":"WSTRING", "length":255, "nullable":true }, "REGION":{ "type":"WSTRING", "length":1000, "nullable":true } }, "primary-key":[ "id" ], "collation-name":"latin1_swedish_ci" } }, "metadata":{ "timestamp":"2019-11-21T19:14:22.223792Z", "record-type":"control", "operation":"create-table", "partition-key-type":"task-id", "schema-name":"sbtest", "table-name":"sbtest_t1" } }