Apache Kafka를 AWS Database Migration Service 대상으로 사용 - AWSDatabase Migration Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Kafka를 AWS Database Migration Service 대상으로 사용

AWS DMS를 사용하여 데이터를 Apache Kafka 클러스터로 마이그레이션할 수 있습니다. Apache Kafka는 분산 스트리밍 플랫폼입니다. 실시간으로 스트리밍 데이터를 수집하고 처리하기 위해 Apache Kafka를 사용할 수 있습니다.

AWS또한 Amazon Managed Streaming for Apache Kafka (Amazon MSK) 에서AWS DMS대상을 참조하십시오. Amazon MSK는 Apache Kafka 인스턴스의 구현 및 관리를 단순화하는 완전 관리형 Apache Kafka 스트리밍 서비스입니다. 이 서비스는 오픈 소스 Apache Kafka 버전에서 작동하며, 사용자는 Amazon MSK 인스턴스에 액세스할 수 있습니다.AWS DMS는 아파치 카프카 인스턴스와 정확히 같습니다. 자세한 내용은 단원을 참조하십시오.Amazon MSK란 무엇입니까?Amazon Managed Streaming for Apache Kafka 개발자 가이드.

파티션으로 분할되는 주제라는 범주에 레코드의 카프카 클러스터 저장 스트림. 파티션는 주제에서 고유하게 식별되는 데이터 레코드 (메시지) 시퀀스입니다. 파티션은 주제 레코드를 병렬 처리할 수 있도록 한 클러스터의 여러 브로커로 분산될 수 있습니다. Apache Kafka의 주제 및 파티션과 분산에 대한 자세한 내용은 주제 및 로그분산을 참조하십시오.

AWS Database Migration Service는 JSON을 사용하여 Kafka 주제에 레코드를 게시합니다. 변환 중 AWS DMS는 각 레코드를 원본 데이터베이스로부터 JSON 형식의 속성-값 페어로 직렬화합니다.

지원되는 데이터 원본에서 대상 Kafka 클러스터로 데이터를 마이그레이션하려면 객체 매핑을 사용합니다. 객체 매핑을 사용하여 대상 항목에서 데이터 레코드를 구조화하는 방법을 결정합니다. 또한 각 테이블에 대한 파티션 키를 정의합니다. Apache Kafka가 이러한 테이블을 사용해 데이터를 파티션으로 그룹화합니다.

AWS DMS는 Apache Kafka 대상 엔드포인트에서 테이블을 생성하는 경우 원본 데이터베이스 엔드포인트에 있는 수만큼 테이블을 생성합니다. 또한 AWS DMS는 여러 Apache Kafka 파라미터 값을 설정합니다. 테이블 생성 비용은 마이그레이션할 테이블 수와 데이터 양에 따라 달라집니다.

Apache Kafka 엔드포인트 설정

엔드포인트 설정을 통해 연결 세부 정보를 지정할 수 있습니다.AWS DMS콘솔 또는--kafka-settings옵션을 사용할 수 있습니다. 각 설정의 요구 사항은 다음과 같습니다.

  • Broker— Kafka 클러스터에 있는 하나 이상의 브로커의 위치를 쉼표로 구분된 목록 형식으로 지정합니다.broker-hostname:port. 예를 들면, "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"입니다. 이 설정은 클러스터의 모든 브로커에 대한 위치를 지정할 수 있습니다. 모든 클러스터 브로커는 주제로 마이그레이션된 데이터 레코드의 분할을 처리하기 위해 통신합니다.

  • Topic— (선택 사항) 최대 길이 255개 문자 및 기호로 주제 이름을 지정합니다. 마침표(.), 밑줄(_) 및 빼기(-)를 사용할 수 있습니다. 마침표(.) 또는 밑줄(_)이 있는 주제 이름이 내부 데이터 구조에서 충돌할 수 있습니다. 주제 이름에 이러한 기호 중 하나만 사용하고 둘 다 사용하지는 마십시오. 주제 이름을 지정하지 않으면AWS DMS은 다음을 사용합니다."kafka-default-topic"를 마이그레이션 항목으로 입력합니다.

    참고

    AWS DMS가 지정된 마이그레이션 주제 또는 기본 주제를 생성하도록 하려면 Kafka 클러스터 구성 중에 auto.create.topics.enable = true를 설정합니다. 자세한 내용은 단원을 참조하십시오.Apache Kafka를 AWS Database Migration Service용 대상으로 사용할 때 적용되는 제한 사항

  • MessageFormat— 엔드포인트에 생성된 레코드의 출력 형식입니다. 메시지 형식은 JSON(기본값) 또는 JSON_UNFORMATTED(탭이 없는 한 줄)입니다.

  • MessageMaxBytes— 엔드포인트에서 생성된 레코드의 최대 크기 (바이트) 입니다. 기본값은 1,000,000입니다.

    참고

    만 사용할 수 있는AWS변경할 CLI/SDKMessageMaxBytes를 기본값이 아닌 값으로 설정합니다. 예를 들어 기존 카프카 엔드포인트를 수정하고MessageMaxBytes에서 다음 명령을 사용합니다.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails— 소스 데이터베이스의 자세한 트랜잭션 정보를 제공합니다. 이 정보에는 커밋 타임스탬프, 로그 위치 및 transaction_id, previous_transaction_id, transaction_record_id(트랜잭션 내의 레코드 오프셋)에 대한 값이 포함됩니다. 기본값은 false입니다.

  • IncludePartitionValue— 파티션 유형이 이 아닌 경우 Kafka 메시지 출력에 파티션 값을 표시합니다.schema-table-type. 기본값은 false입니다.

  • PartitionIncludeSchemaTable— 파티션 유형이 인 경우 스키마 및 테이블 이름을 파티션 값에 접두사로 지정합니다.primary-key-type. 이렇게 하면 Kafka 파티션 간의 데이터 분산이 증가합니다. 예를 들어 SysBench 스키마에 수천 개의 테이블이 있고 각 테이블에 기본 키의 범위가 제한되어 있다고 가정하겠습니다. 이 경우 동일한 기본 키가 수천 개의 테이블에서 동일한 파티션으로 전송되어 제한이 발생합니다. 기본값은 false입니다.

  • IncludeTableAlterOperations— 제어 데이터의 테이블을 변경하는 데이터 정의 언어 (DDL) 작업 (예:rename-table,drop-table,add-column,drop-column, 및rename-column. 기본값은 false입니다.

  • IncludeControlDetails— Kafka 메시지 출력에서 테이블 정의, 열 정의, 테이블 및 열 변경 사항에 대한 자세한 제어 정보를 표시합니다. 기본값은 false입니다.

  • IncludeNullAndEmpty— 대상에 NULL 및 빈 열을 포함합니다. 기본값은 false입니다.

  • SecurityProtocol— TLS (전송 계층 보안) 를 사용하여 Kafka 대상 엔드포인트에 대한 보안 연결을 설정합니다. 옵션에는 ssl-authentication, ssl-encryptionsasl-ssl이 포함됩니다. 사용sasl-ssl에는 가 필요합니다.SaslUsernameSaslPassword.

설정을 사용하여 전송 속도를 높일 수 있습니다. 그렇게 하려면,AWS DMS는 Apache Kafka 대상 클러스터에 대한 멀티스레드 전체 로드를 지원합니다.AWS DMS는 다음과 같은 작업 설정을 통해 이 멀티태스킹을 지원합니다.

  • MaxFullLoadSubTasks병렬로 로드할 최대 소스 테이블 수를 표시하려면 이 옵션을 사용합니다.AWS DMS는 전용 하위 작업을 사용하여 각 테이블을 해당 Kafka 대상 테이블에 로드합니다. 기본값은 8이며, 최대값은 49입니다.

  • ParallelLoadThreads— 이 옵션을 사용하여AWS DMS에서 각 테이블을 Kafka 대상 테이블에 로드하는 데 사용합니다. Apache Kafka 대상의 최대 값은 32입니다. 이 최대 한도를 증가시키도록 요청할 수 있습니다.

  • ParallelLoadBufferSize— 병렬 로드 스레드에서 데이터를 Kafka 대상에 로드하기 위해 사용하는 버퍼에 저장할 최대 레코드 수를 지정합니다. 기본값은 50입니다. 최대값은 1,000입니다. 이 설정은 ParallelLoadThreads와 함께 사용하십시오. ParallelLoadBufferSize는 둘 이상의 스레드가 있는 경우에만 유효합니다.

  • ParallelLoadQueuesPerThread— 각 동시 스레드가 액세스하는 대기열 수를 지정하여 대기열에서 데이터 레코드를 가져오고 대상에 대한 배치 로드를 생성하려면 이 옵션을 사용합니다. 기본값은 1입니다. 그러나 다양한 페이로드 크기의 Kafka 대상에 대해 유효한 범위는 스레드당 512개 대기열입니다.

Kafka와 같은 실시간 데이터 스트리밍 대상 엔드포인트에 대한 변경 데이터 캡처 (CDC) 의 성능을 향상시킬 수 있습니다.PutRecordsAPI 호출을 사용합니다. 이렇게 하려면 ParallelApply* 작업 설정을 사용하여 동시 스레드 수, 스레드당 대기열 및 버퍼에 저장할 레코드 수를 지정해야 합니다. 예를 들어 CDC 로드를 수행하고 128개의 스레드를 병렬로 적용한다고 가정하겠습니다. 또한 버퍼당 50개 레코드가 저장된 스레드당 64개 대기열에 액세스하려고 합니다.

CDC 성능을 승격하기 위해 AWS DMS에서는 다음 작업 설정을 지원합니다.

  • ParallelApplyThreads— 에서 동시 스레드의 수를 지정합니다.AWS DMS가 데이터 레코드를 Kafka 대상 엔드포인트로 푸시하기 위해 CDC 로드 중에 가 사용합니다. 기본값은 0이고 최대값은 32입니다.

  • ParallelApplyBufferSize— CDC 로드 중에 Kafka 대상 엔드포인트로 푸시할 동시 스레드에 대한 각 버퍼 대기열에 저장할 최대 레코드 수를 지정합니다. 기본값은 100이고 최대값은 1,000입니다. ParallelApplyThreads가 둘 이상의 스레드를 지정할 때 이 옵션을 사용합니다.

  • ParallelApplyQueuesPerThread— 각 스레드가 대기열에서 데이터 레코드를 가져오고 CDC 중에 Kafka 엔드포인트에 대한 배치 로드를 생성하기 위한 대기열 수를 지정합니다.

ParallelApply* 작업 설정을 사용할 때 partition-key-type 기본값은 테이블의 schema-name.table-name가 아니라 primary-key입니다.

TLS (전송 계층 보안)

는 Kafka 클러스터는 TLS (전송 계층 보안) 를 사용한 보안 연결을 허용합니다. DMS를 사용하면 다음 세 가지 보안 프로토콜 옵션 중 하나를 사용하여 Kafka 엔드포인트 연결을 보호할 수 있습니다.

SSL 암호화 (server-encryption)

클라이언트는 서버의 인증서를 통해 서버 ID의 유효성을 검사합니다. 그런 다음 서버와 클라이언트 간에 암호화 된 연결이 이루어집니다.

SSL 인증 (mutual-authentication)

서버와 클라이언트는 자체 인증서를 통해 서로 ID를 확인합니다. 그런 다음 서버와 클라이언트 간에 암호화 된 연결이 이루어집니다.

사슬-SSL (mutual-authentication)

SASL (단순 인증 및 보안 계층) 메서드는 클라이언트 인증서를 사용자 이름 및 암호로 대체하여 클라이언트 ID의 유효성을 검사합니다. 특히 서버가 클라이언트의 ID를 확인할 수 있도록 서버가 등록한 사용자 이름과 암호를 제공합니다. 그런 다음 서버와 클라이언트 간에 암호화 된 연결이 이루어집니다.

중요

아파치 카프카와 아마존 MSK는 확인된 인증서를 수락합니다. 이 해결 될 카프카 및 아마존 MSK의 알려진 제한 사항입니다. 자세한 내용은 단원을 참조하십시오.아파치 카프카 문제, KAFKA-3700.

Amazon MSK를 사용하는 경우 이 알려진 제한 사항의 해결 방법으로 ACL (액세스 제어 목록) 을 사용하는 것이 좋습니다. ACL 사용에 대한 자세한 내용은 단원을 참조하십시오.Apache Kafka ACL섹션 (Amazon Managed Streaming for Apache Kafka 개발자 가이드.

자체 관리형 Kafka 클러스터를 사용 중인 경우댓글 일자 21/10 월/18클러스터 구성에 대한 자세한 내용은 단원을 참조하십시오.

Amazon MSK 또는 자체 관리형 Kafka 클러스터에서 SSL 암호화 사용

SSL 암호화를 사용하여 Amazon MSK 또는 자체 관리형 Kafka 클러스터에 대한 엔드포인트 연결을 보호할 수 있습니다. SSL 암호화 인증 방법을 사용하는 경우 클라이언트는 서버의 인증서를 통해 서버의 ID를 확인합니다. 그런 다음 서버와 클라이언트 간에 암호화 된 연결이 이루어집니다.

SSL 암호화를 사용하여 Amazon MSK에 연결하려면

  • 보안 프로토콜 끝점 설정 (SecurityProtocol) 를 사용하여ssl-encryption옵션을 사용하여 대상 카프카 끝점을 만들 수 있습니다.

    다음 JSON 예제에서는 보안 프로토콜을 SSL 암호화로 설정합니다.

"KafkaSettings": { "SecurityProtocol": "ssl-encryption", }

자체 관리형 Kafka 클러스터에 SSL 암호화를 사용하려면

  1. 온프레미스 카프카 클러스터에서 프라이빗 CA (인증 기관) 를 사용하는 경우 프라이빗 CA 인증서를 업로드하고 Amazon 리소스 이름 (ARN) 을 가져옵니다.

  2. 보안 프로토콜 끝점 설정 (SecurityProtocol) 를 사용하여ssl-encryption옵션을 사용하여 대상 카프카 끝점을 만들 수 있습니다. 다음 JSON 예제에서는 보안 프로토콜을ssl-encryption.

    "KafkaSettings": { "SecurityProtocol": "ssl-encryption", }
  3. 개인 CA를 사용하는 경우SslCaCertificateArnARN 에서 위의 첫 번째 단계에서 얻은 것입니다.

SSL 인증 사용

SSL 인증을 사용하여 Amazon MSK 또는 자체 관리형 Kafka 클러스터에 대한 엔드포인트 연결을 보호할 수 있습니다.

SSL 인증을 사용하여 클라이언트 인증 및 암호화를 활성화하여 Amazon MSK에 연결하려면 다음을 수행합니다.

  • Kafka에 대한 개인 키와 공개 인증서를 준비합니다.

  • 인증서를 DMS 인증서 관리자에 업로드합니다.

  • 카프카 엔드 포인트 설정에 지정된 해당 인증서 ARN으로 카프카 대상 엔드 포인트를 만듭니다.

Amazon MSK용 프라이빗 키와 퍼블릭 인증서를 준비하려면

  1. EC2 인스턴스를 만들고 1단계부터 9단계까지 설명한 대로 인증을 사용하도록 클라이언트를 설정합니다.클라이언트 인증섹션 (Amazon Managed Streaming for Apache Kafka 개발자 가이드.

    이러한 단계를 완료하면 인증서-ARN (ACM에 저장된 공개 인증서 ARN) 과kafka.client.keystore.jks파일을 엽니다.

  2. 공개 인증서를 가져오고 인증서를signed-certificate-from-acm.pem파일에서 다음 명령을 사용합니다.

    aws acm-pca get-certificate --certificate-authority-arn Private_CA_ARN --certificate-arn Certificate_ARN

    이 명령은 다음 예제와 유사한 정보를 반환합니다.

    {"Certificate": "123", "CertificateChain": "456"}

    그런 다음 다음과 같은 내용을 복사합니다."123"signed-certificate-from-acm.pem파일을 엽니다.

  3. 가져 와서 개인 키를 가져 오기msk-rsa에서 키kafka.client.keystore.jks to keystore.p12에서 다음 예제에 표시된 대로 을 추가합니다.

    keytool -importkeystore \ -srckeystore kafka.client.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias msk-rsa-client \ -deststorepass test1234 \ -destkeypass test1234
  4. 다음 명령을 사용하여keystore.p12.pem형식

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-client-key.pem –nocerts

    PEM 암호 구문 입력메시지가 나타나고 인증서를 암호화하는 데 적용되는 키를 식별합니다.

  5. 에서 가방 속성과 주요 속성을 제거.pem파일을 사용하여 첫 번째 줄이 다음 문자열로 시작되는지 확인하십시오.

    ---BEGIN ENCRYPTED PRIVATE KEY---

DMS 인증서 관리자에게 공개 인증서 및 개인 키를 업로드하고 Amazon MSK에 대한 연결을 테스트하려면

  1. 다음 명령을 사용하여 DMS 인증서 관리자에 업로드합니다.

    aws dms import-certificate --certificate-identifier signed-cert --certificate-pem file://path to signed cert aws dms import-certificate --certificate-identifier private-key —certificate-pem file://path to private key
  2. Amazon MSK 대상 엔드포인트를 생성하고 연결을 테스트하여 TLS 인증이 작동하는지 확인합니다.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:0000", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "arn:aws:dms:us-east-1:012346789012:cert:", "SslClientKeyArn": "arn:aws:dms:us-east-1:0123456789012:cert:","SslClientKeyPassword":"test1234"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk
중요

SSL 인증을 사용하여 자체 관리형 Kafka 클러스터에 대한 연결을 보호할 수 있습니다. 경우에 따라 온-프레미스 Kafka 클러스터에서 개인 CA (인증 기관) 를 사용할 수 있습니다. 이 경우 CA 체인, 공개 인증서 및 개인 키를 DMS 인증서 관리자에게 업로드합니다. 그런 다음 온프레미스 카프카 대상 엔드포인트를 생성할 때 엔드포인트 설정에서 해당 Amazon 리소스 이름 (ARN) 을 사용합니다.

자체 관리형 Kafka 클러스터에 대한 개인 키와 서명된 인증서를 준비하려면

  1. 다음 예제와 같이 key pair 생성합니다.

    keytool -genkey -keystore kafka.server.keystore.jks -validity 300 -storepass your-keystore-password -keypass your-key-passphrase -dname "CN=your-cn-name" -alias alias-of-key-pair -storetype pkcs12 -keyalg RSA
  2. 인증서 서명 요청 (CSR) 을 생성합니다.

    keytool -keystore kafka.server.keystore.jks -certreq -file server-cert-sign-request-rsa -alias on-premise-rsa -storepass your-key-store-password -keypass your-key-password
  3. 클러스터 트러스트 스토어의 CA를 사용하여 CSR에 서명합니다. CA가 없는 경우 개인 CA를 만들 수 있습니다.

    openssl req -new -x509 -keyout ca-key -out ca-cert -days validate-days
  4. 가져오기ca-cert를 서버 트러스트 스토어 및 키 저장소에 저장합니다. 신뢰 저장소가 없는 경우에는 다음 명령을 사용하여 신뢰 저장소를 만들고ca-cert 를 그것으로 가져옵니다.

    keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
  5. 인증서에 서명합니다.

    openssl x509 -req -CA ca-cert -CAkey ca-key -in server-cert-sign-request-rsa -out signed-server-certificate.pem -days validate-days -CAcreateserial -passin pass:ca-password
  6. 서명된 인증서를 키 저장소로 가져옵니다.

    keytool -keystore kafka.server.keystore.jks -import -file signed-certificate.pem -alias on-premise-rsa -storepass your-keystore-password -keypass your-key-password
  7. 다음 명령을 사용하여on-premise-rsa에서 키kafka.server.keystore.jkstokeystore.p12.

    keytool -importkeystore \ -srckeystore kafka.server.keystore.jks \ -destkeystore keystore.p12 \ -deststoretype PKCS12 \ -srcalias on-premise-rsa \ -deststorepass your-truststore-password \ -destkeypass your-key-password
  8. 다음 명령을 사용하여keystore.p12.pem형식

    Openssl pkcs12 -in keystore.p12 -out encrypted-private-server-key.pem –nocerts
  9. 업로드encrypted-private-server-key.pem,signed-certificate.pem, 및ca-cert를 DMS 인증서 관리자에 추가합니다.

  10. 반환된 ARN을 사용하여 엔드포인트를 만듭니다.

    aws dms create-endpoint --endpoint-identifier $endpoint-identifier --engine-name kafka --endpoint-type target --kafka-settings '{"Broker": "b-0.kafka260.aaaaa1.a99.kafka.us-east-1.amazonaws.com:9092", "SecurityProtocol":"ssl-authentication", "SslClientCertificateArn": "your-client-cert-arn","SslClientKeyArn": "your-client-key-arn","SslClientKeyPassword":"your-client-key-password", "SslCaCertificateArn": "your-ca-certificate-arn"}' aws dms test-connection -replication-instance-arn=$rep_inst_arn —endpoint-arn=$kafka_tar_arn_msk

SASL-SSL 인증을 사용하여 아마존 MSK에 연결

SASL (단순 인증 및 보안 계층) 메서드는 사용자 이름과 암호를 사용하여 클라이언트 ID의 유효성을 검사하고 서버와 클라이언트 간에 암호화된 연결을 만듭니다.

SASL을 사용하려면 Amazon MSK 클러스터를 설정할 때 먼저 보안 사용자 이름과 암호를 생성해야 합니다. Amazon MSK 클러스터에 대한 보안 사용자 이름 및 암호를 설정하는 방법에 대한 설명은아마존 MSK 클러스터에 대한 SASL/SCRAM 인증 설정Amazon Managed Streaming for Apache Kafka 개발자 가이드.

그런 다음 Kafka 대상 끝점을 만들 때 보안 프로토콜 끝점 설정 (SecurityProtocol) 를 사용하여sasl-ssl옵션을 선택합니다. 또한SaslUsernameSaslPassword옵션을 선택합니다. 다음 JSON 예제와 같이 Amazon MSK 클러스터를 처음 설정할 때 생성한 보안 사용자 이름 및 암호와 일치해야 합니다.

"KafkaSettings": { "SecurityProtocol": "sasl-ssl", "SaslUsername":"Amazon MSK cluster secure user name", "SaslPassword":"Amazon MSK cluster secure password" }
참고

현재,AWS DMS는 공용 CA 지원 SASL/SSL만 지원합니다. DMS는 개인 CA에서 지원하는 자체 관리 카프카와 함께 사용하기 위해 SASL/SSL을 지원하지 않습니다.

Apache Kafka 대상의 경우 이전 이미지를 사용하여 CDC 행의 원래 값 보기

Kafka 같은 데이터 스트리밍 대상에 CDC 업데이트를 작성하는 경우 업데이트를 통한 변경 전에 소스 데이터베이스 행의 원래 값을 볼 수 있습니다. 이를 위해 AWS DMS는 소스 데이터베이스 엔진에서 제공되는 데이터를 기반으로 업데이트 이벤트의 이전 이미지를 채웁니다.

소스 데이터베이스 엔진에 따라 이전 이미지에 대해 서로 다른 양의 정보를 제공합니다.

  • Oracle은 열이 변경되는 경우에만 열에 대한 업데이트를 제공합니다.

  • PostgreSQL은 기본 키의 일부인 열에 대한 데이터(변경 여부에 관계 없음)만 제공합니다.

  • MySQL은 일반적으로 모든 열에 대한 데이터(변경 여부에 관계 없음)를 제공합니다.

이전 이미징을 활성화하여 소스 데이터베이스의 원래 값을 AWS DMS 출력에 추가하려면 BeforeImageSettings 작업 설정 또는 add-before-image-columns 파라미터를 사용합니다. 이 파라미터는 열 변환 규칙을 적용합니다.

BeforeImageSettings는 다음과 같이 소스 데이터베이스 시스템에서 수집된 값을 사용하여 모든 업데이트 작업에 새 JSON 속성을 추가합니다.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
참고

전체 로드와 CDC 작업(기존 데이터 마이그레이션 및 지속적인 변경 사항 복제) 또는 CDC 전용 작업(데이터 변경 사항만 복제)에 BeforeImageSettings를 적용합니다. 전체 로드 전용 작업에는 BeforeImageSettings를 적용하지 마십시오.

BeforeImageSettings 옵션의 경우 다음이 적용됩니다.

  • 이전 이미징을 활성화하려면 EnableBeforeImage 옵션을 true로 설정합니다. 기본값은 false입니다.

  • FieldName 옵션을 사용하여 새 JSON 속성에 이름을 지정합니다. EnableBeforeImagetrue인 경우 FieldName이 필수이며 비워 둘 수 없습니다.

  • ColumnFilter 옵션은 이전 이미징을 사용하여 추가할 열을 지정합니다. 테이블 기본 키의 일부인 열만 추가하려면 기본값 pk-only를 사용하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 사용하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 사용합니다.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

이전 이미지 변환 규칙 사용

작업 설정 대신 열 변환 규칙을 적용하는 add-before-image-columns 파라미터를 사용할 수 있습니다. 이 파라미터를 사용하면 Kafka 같은 데이터 스트리밍 대상에서 CDC 중에 이전 이미징을 활성화할 수 있습니다.

변환 규칙에 add-before-image-columns를 사용하면 이전 이미지 결과를 보다 세밀하게 제어할 수 있습니다. 변환 규칙을 통해 객체 로케이터를 사용하여 규칙에 대해 선택한 테이블을 제어할 수 있습니다. 또한 변환 규칙을 함께 연결하여 테이블마다 서로 다른 규칙을 적용할 수 있습니다. 그런 다음 다른 규칙을 사용하여 생성된 열을 조작할 수 있습니다.

참고

동일한 작업 내에서 BeforeImageSettings 작업 설정과 함께 add-before-image-columns 파라미터를 사용해서는 안 됩니다. 단일 작업에는 이 파라미터 또는 설정 중 하나만 사용하고 둘 다 사용하지 마십시오.

해당 열에 대해 add-before-image-columns 파라미터가 있는 transformation 규칙 유형이 before-image-def 섹션을 제공해야 합니다. 다음은 그 한 예입니다.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix의 값은 열 이름 앞에 추가되며, column-prefix의 기본값은 BI_입니다. column-suffix의 값은 열 이름 뒤에 추가되며, 기본값은 비어 있습니다. column-prefixcolumn-suffix를 모두 빈 문자열로 설정하지 마십시오.

column-filter에 대해 하나의 값을 선택합니다. 테이블 기본 키의 일부인 열만 추가하려면 pk-only를 선택하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 선택하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 선택합니다.

이전 이미지 변환 규칙의 예

다음 예의 변환 규칙은 대상에서 BI_emp_no라는 새 열을 추가합니다. UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 같은 문은 BI_emp_no 필드를 1로 채웁니다. Amazon S3 대상에 CDC 업데이트를 작성하면BI_emp_no열을 사용하면 업데이트된 원래 행을 알 수 있습니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns 규칙 작업 사용에 대한 자세한 내용은 변환 규칙 및 작업 단원을 참조하십시오.

Apache Kafka를 AWS Database Migration Service용 대상으로 사용할 때 적용되는 제한 사항

Apache Kafka를 대상으로 사용할 때 다음 제한 사항이 적용됩니다.

  • AWS DMS는 Kafka 대상에 대해 1MiB의 최대 메시지 크기를 지원합니다.

  • 두 가지를 모두 구성해야 합니다.AWS DMS복제 인스턴스와 Kafka 클러스터를 Amazon VPC와 동일한 보안 그룹에 있는 동일한 가상 프라이빗 클라우드 (VPC) 에 배치합니다. Kafka 클러스터는 Amazon MSK 인스턴스 또는 Amazon EC2 실행되는 자체 Kafka 인스턴스가 될 수 있습니다. 자세한 정보는 복제 인스턴스용으로 네트워크 설정을 참조하십시오.

    참고

    Amazon MSK에 대한 보안 그룹을 지정하려면클러스터 생성페이지에서고급 설정에서설정 사용자 지정를 선택한 다음 보안 그룹을 선택하거나 복제 인스턴스와 동일한 경우 기본값을 그대로 사용합니다.

  • 전체 LOB 모드는 지원되지 않습니다.

  • AWS DMS가 자동으로 새 주제를 만들 수 있는 속성을 사용하여 클러스터에 대한 Kafka 구성 파일을 지정합니다. auto.create.topics.enable = true 설정을 포함합니다. Amazon MSK를 사용하는 경우 Kafka 클러스터를 생성할 때 기본 구성을 지정한 다음auto.create.topics.enable설정을 다음으로 설정합니다.true. 기본 구성 설정에 대한 자세한 내용은 단원을 참조하십시오.Amazon MSK 기본 구성Amazon Managed Streaming for Apache Kafka 개발자 가이드. Amazon MSK를 사용하여 만든 기존 Kafka 클러스터를 수정해야 하는 경우AWS CLI명령aws kafka create-configuration다음 예와 같이 Kafka 구성을 업데이트합니다.

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    여기서, //~/kafka_configuration는 필요한 속성 설정으로 만든 구성 파일입니다.

    Amazon EC2 설치된 자체 Kafka 인스턴스를 사용하는 경우, 다음을 비롯한 유사한 속성 설정을 사용하여 Kafka 클러스터 구성을 수정합니다.auto.create.topics.enable = true, 인스턴스와 함께 제공된 옵션을 사용하여.

  • AWS DMS는 트랜잭션에 관계없이 각 업데이트를 특정 Kafka 주제의 1개 데이터 레코드 (메시지) 로 게시합니다.

  • AWS DMS는 다음과 같이 두 가지 형식의 파티션 키를 지원합니다.

    • SchemaName.TableName: 스키마와 테이블 이름의 조합입니다.

    • ${AttributeName}: JSON 내 한 필드의 값 또는 원본 데이터베이스 내 테이블의 기본 키입니다.

  • BatchApply는 Kafka 엔드포인트에 대해 지원되지 않습니다. Batch 적용 사용 (예:BatchApplyEnabled대상 메타 데이터 작업 설정) Kafka 대상에 대한 데이터 손실이 발생할 수 있습니다.

객체 매핑을 사용하여 데이터를 Kafka 주제로 마이그레이션

AWS DMS는 테이블 매핑 규칙을 사용하여 데이터를 원본에서 대상 Kafka 주제에 매핑합니다. 데이터를 대상 주제에 매핑하려면 객체 매핑이라는 테이블 매핑 규칙 유형을 사용합니다. 객체 매핑을 사용하여 원본의 데이터 레코드를 Kafka 주제에 게시된 데이터 레코드로 매핑하는 방법을 지정합니다.

Kafka 주제에는 파티션 키 외에 별다른 사전 설정 구조가 없습니다.

객체 매핑 규칙을 만들려면 rule-typeobject-mapping으로 지정합니다. 이 규칙은 사용할 객체 매핑 유형을 지정합니다.

이 규칙의 구조는 다음과 같습니다.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS는 현재 map-record-to-recordmap-record-to-document만을 rule-action 파라미터의 유효한 값으로 지원합니다. map-record-to-recordmap-record-to-document 값은 exclude-columns 속성 목록의 일부로 제외되지 않은 레코드에 대해 AWS DMS가 기본적으로 수행하는 작업을 지정합니다. 이러한 값은 어떤 식으로든 속성 매핑에 영향을 미치지 않습니다.

관계형 데이터베이스에서 Kafka 주제로 마이그레이션할 때 map-record-to-record를 사용합니다. 이러한 규칙 유형은 Kafka 주제의 파티션 키로 관계형 데이터베이스의 taskResourceId.schemaName.tableName 값을 사용하며 소스 데이터베이스의 각 열마다 속성을 생성합니다. 사용해야 하는 경우map-record-to-record에서 나열되지 않은 원본 테이블의 열이 나열되지 않은 경우exclude-columns속성 목록,AWS DMS는 대상 주제에 해당 속성을 생성합니다. 이러한 해당 속성은 원본 열이 속성 매핑에 사용되는지 여부와 상관없이 생성됩니다.

map-record-to-record를 이해하는 한 가지 방법은 작업 중일 때 관찰하는 것입니다. 이 예에서는 다음 구조와 데이터를 사용하여 관계형 데이터베이스 테이블 행에서 시작한다고 가정합니다.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

1988/02/29

이 정보를 Test라는 스키마에서 Kafka 주제로 마이그레이션하려면 데이터를 대상 주제로 매핑하는 규칙을 생성합니다. 다음 규칙은 그 매핑 과정을 보여 줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

카프카 주제와 파티션 키가 주어지면 (이 경우taskResourceId.schemaName.tableName) 다음은 Kafka 대상 주제의 샘플 데이터를 사용하여 결과 레코드 형식을 보여 줍니다.

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

속성 매핑으로 날짜 재구성

속성 맵을 사용하여 데이터를 Kafka 주제로 마이그레이션하는 동안 데이터를 재구성할 수 있습니다. 예를 들어 원본의 필드 몇 개를 대상의 단일 필드로 묶어야 하는 경우도 있을 것입니다. 다음의 속성 맵은 날짜를 재구성하는 방법을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key에 상수 값을 설정하려면 partition-key 값을 지정합니다. 예를 들어 이 방법을 통해 모든 데이터를 단일 파티션에 저장할 수 있습니다. 다음 매핑은 이러한 접근 방식을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
참고

특정 테이블을 위한 제어 레코드에 대한 partition-key 값은 TaskId.SchemaName.TableName입니다. 특정 작업을 위한 제어 레코드에 대한 partition-key 값은 해당 레코드의 TaskId입니다. partition-key 값을 객체 매핑에 지정해도 제어 레코드에 대한 partition-key에는 영향이 없습니다.

Apache Kafka 메시지 형식

JSON 출력은 단지 키-값 페어의 목록입니다.

RecordType

레코드 유형은 데이터나 제어일 수 있습니다. 데이터 레코드는 원본 내의 실제 행을 나타냅니다. 제어 레코드는 스트림 내의 중요 이벤트를 나타냅니다(예: 작업의 재시작).

작업

데이터 레코드의 경우, 작업은 create, read, update 또는 delete일 수 있습니다.

데이터 레코드의 경우, 작업은 TruncateTable 또는 DropTable일 수 있습니다.

SchemaName

레코드에 대한 원본 스키마입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

TableName

레코드에 대한 원본 테이블입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

타임스탬프

JSON 메시지가 구성된 경우의 타임스탬프입니다. 이 필드는 ISO 8601 형식으로 구성되었습니다.