Apache Kafka를 AWS Database Migration Service 대상으로 사용 - AWS Database Migration Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Apache Kafka를 AWS Database Migration Service 대상으로 사용

AWS DMS를 사용하여 데이터를 Apache Kafka 클러스터로 마이그레이션할 수 있습니다. Apache Kafka는 분산 스트리밍 플랫폼입니다. Apache Kafka를 사용하여 스트리밍 데이터를 실시간으로 수집하고 처리할 수 있습니다.

또한 AWS는 Amazon Managed Streaming for Apache Kafka 대상으로 사용하도록 Amazon MSK(AWS DMS)도 제공합니다. Amazon MSK는 Apache Kafka 인스턴스의 구현 및 관리를 단순화하는 완전 관리형 Apache Kafka 스트리밍 서비스입니다. 이 서비스는 오픈 소스 Apache Kafka 버전에서 작동하며, 사용자는 Apache Kafka 인스턴스와 마찬가지로 Amazon MSK 대상으로 AWS DMS 인스턴스에 액세스합니다. 자세한 내용은 의 Amazon MSK란 무엇입니까?를 참조하십시오Amazon Managed Streaming for Apache Kafka 개발자 안내서.

Kafka 클러스터는 파티션으로 분할된 주제라는 카테고리에 레코드 스트림을 저장합니다. 파티션은 주제에서 고유하게 식별되는 데이터 레코드(메시지) 시퀀스입니다. 주제의 레코드를 병렬 처리할 수 있도록 클러스터의 여러 브로커에 파티션을 분산할 수 있습니다. Apache Kafka의 주제 및 파티션과 분산에 대한 자세한 내용은 주제 및 로그분산을 참조하십시오.

AWS Database Migration Service 는 JSON을 사용하여 Kafka 주제에 레코드를 게시합니다. 변환 중 AWS DMS는 각 레코드를 원본 데이터베이스로부터 JSON 형식의 속성-값 페어로 직렬화합니다.

지원되는 데이터 원본에서 대상 Kafka 클러스터로 데이터를 마이그레이션하려면 객체 매핑을 사용합니다. 객체 매핑을 사용하여 대상 주제에서 데이터 레코드를 구성하는 방법을 결정합니다. 또한 각 테이블에 대한 파티션 키를 정의합니다. Apache Kafka가 이러한 테이블을 사용해 데이터를 파티션으로 그룹화합니다.

AWS DMS는 Apache Kafka 대상 엔드포인트에서 테이블을 생성하는 경우 원본 데이터베이스 엔드포인트에 있는 수만큼 테이블을 생성합니다. 또한 AWS DMS는 여러 Apache Kafka 파라미터 값을 설정합니다. 테이블 생성 비용은 마이그레이션할 테이블 수와 데이터 양에 따라 달라집니다.

Apache Kafka 엔드포인트 설정

AWS DMS 콘솔의 엔드포인트 설정 또는 CLI의 --kafka-settings 옵션을 통해 연결 세부 정보를 지정할 수 있습니다. 각 설정의 요구 사항은 다음과 같습니다.

  • Broker – Kafka 클러스터에 있는 하나 이상의 브로커 위치를 각 의 쉼표로 구분된 목록 형식으로 지정합니다broker-hostname:port. 예를 들면, "ec2-12-345-678-901.compute-1.amazonaws.com:2345,ec2-10-987-654-321.compute-1.amazonaws.com:9876"입니다. 이 설정은 클러스터에서 임의의 또는 모든 브로커의 위치를 지정할 수 있습니다. 모든 클러스터 브로커는 주제로 마이그레이션된 데이터 레코드의 분할을 처리하기 위해 통신합니다.

  • Topic – (선택 사항) 최대 길이 255개 문자 및 기호로 주제 이름을 지정합니다. 마침표(.), 밑줄(_) 및 빼기(-)를 사용할 수 있습니다. 마침표(.) 또는 밑줄(_)이 있는 주제 이름이 내부 데이터 구조에서 충돌할 수 있습니다. 주제 이름에 이러한 기호 중 하나만 사용하고 둘 다 사용하지는 마십시오. 주제 이름을 지정하지 않으면 는 를 마이그레이션 주제AWS DMS로 "kafka-default-topic" 사용합니다.

    참고

    AWS DMS가 지정된 마이그레이션 주제 또는 기본 주제를 생성하도록 하려면 Kafka 클러스터 구성 중에 auto.create.topics.enable = true를 설정합니다. 자세한 내용은 단원을 참조하십시오. Apache Kafka를 AWS Database Migration Service용 대상으로 사용할 때 적용되는 제한 사항

  • MessageFormat – 엔드포인트에 생성된 레코드의 출력 형식입니다. 메시지 형식은 JSON(기본값) 또는 JSON_UNFORMATTED(탭이 없는 한 줄)입니다.

  • MessageMaxBytes – 엔드포인트에서 생성된 레코드의 최대 크기(바이트)입니다. 기본값은 1,000,000입니다.

    참고

    기본값이 MessageMaxBytes 아닌 값으로 변경하려면 AWS CLI/SDK만 사용할 수 있습니다. 예를 들어 기존 Kafka 엔드포인트를 수정하고 를 변경하려면 다음 명령을 MessageMaxBytes사용합니다.

    aws dms modify-endpoint --endpoint-arn your-endpoint --kafka-settings Broker="broker1-server:broker1-port,broker2-server:broker2-port,...", Topic=topic-name,MessageMaxBytes=integer-of-max-message-size-in-bytes
  • IncludeTransactionDetails – 원본 데이터베이스의 자세한 트랜잭션 정보를 제공합니다. 이 정보에는 커밋 타임스탬프, 로그 위치 및 transaction_id, previous_transaction_id, transaction_record_id(트랜잭션 내의 레코드 오프셋)에 대한 값이 포함됩니다. 기본값은 입니다false.

  • IncludePartitionValue – 파티션 유형이 schema-table-type이 아닌 경우 Kafka 메시지 출력에 파티션 값을 표시합니다. 기본값은 입니다false.

  • PartitionIncludeSchemaTable – 파티션 유형이 primary-key-type인 경우 스키마 및 테이블 이름을 파티션 값에 접두사로 지정합니다. 이렇게 하면 Kafka 파티션 간의 데이터 분산이 증가합니다. 예를 들어 SysBench 스키마에 수천 개의 테이블이 있고 각 테이블에 기본 키의 범위가 제한되어 있다고 가정하겠습니다. 이 경우 동일한 기본 키가 수천 개의 테이블에서 동일한 파티션으로 전송되어 제한이 발생합니다. 기본값은 입니다false.

  • IncludeTableAlterOperations – 제어 데이터의 테이블을 변경하는 데이터 정의 언어(DDL) 작업(예: rename-table, drop-table, add-column, drop-columnrename-column)을 포함합니다. 기본값은 입니다false.

  • IncludeControlDetails – Kafka 메시지 출력에서 테이블 정의, 열 정의, 테이블 및 열 변경 사항에 대한 자세한 제어 정보를 표시합니다. 기본값은 입니다false.

  • IncludeNullAndEmpty – 대상에 NULL 및 빈 열을 포함합니다. 기본값은 입니다false.

멀티스레드 전체 로드 작업 설정

설정을 사용하여 전송 속도를 높일 수 있습니다. 이를 위해 는 Apache Kafka 대상 클러스터에 대한 멀티스레드 전체 로드를 AWS DMS 지원합니다. 는 다음을 포함하는 작업 설정으로 이 멀티스레딩을 AWS DMS 지원합니다.

  • MaxFullLoadSubTasks – 병렬로 로드할 최대 소스 테이블 수를 표시하려면 이 옵션을 사용합니다. AWS DMS는 전용 하위 작업을 사용하여 해당 Kafka 대상 테이블에 각 테이블을 로드합니다. 기본값은 8이며, 최대값은 49입니다.

  • ParallelLoadThreads – AWS DMS가 각 테이블을 Kafka 대상 테이블에 로드하는 데 사용하는 스레드 수를 지정하려면 이 옵션을 사용합니다. Apache Kafka 대상의 최대 값은 32입니다. 이 최대 한도를 증가시키도록 요청할 수 있습니다.

  • ParallelLoadBufferSize – 병렬 로드 스레드에서 데이터를 Kafka 대상에 로드하기 위해 사용하는 버퍼에 저장할 최대 레코드 수를 지정합니다. 기본값은 50입니다. 최대값은 1,000입니다. 이 설정은 와 함께 사용합니다ParallelLoadThreads. ParallelLoadBufferSize 는 둘 이상의 스레드가 있는 경우에만 유효합니다.

  • ParallelLoadQueuesPerThread – 각 동시 스레드가 액세스하는 대기열 수를 지정하여 대기열에서 데이터 레코드를 가져오고 대상에 대한 배치 로드를 생성하려면 이 옵션을 사용합니다. 기본값은 1입니다. 그러나 다양한 페이로드 크기의 Kafka 대상에 대해 유효한 범위는 스레드당 5–512개 대기열입니다.

멀티스레드 CDC 로드 작업 설정

PutRecords API 호출의 동작을 수정하는 작업 설정을 사용하여 Kafka와 같은 실시간 데이터 스트리밍 대상 엔드포인트에 대한 변경 데이터 캡처(CDC)의 성능을 향상시킬 수 있습니다. 이렇게 하려면 ParallelApply* 작업 설정을 사용하여 동시 스레드 수, 스레드당 대기열 및 버퍼에 저장할 레코드 수를 지정해야 합니다. 예를 들어 CDC 로드를 수행하고 128개의 스레드를 병렬로 적용한다고 가정하겠습니다. 또한 버퍼당 50개 레코드가 저장된 스레드당 64개 대기열에 액세스하려고 합니다.

CDC 성능을 승격하기 위해 AWS DMS에서는 다음 작업 설정을 지원합니다.

  • ParallelApplyThreads – 데이터 레코드를 Kafka 대상 엔드포인트로 푸시하기 위해 CDC 로드 중에 AWS DMS가 사용하는 동시 스레드 수를 지정합니다. 기본값은 0이고 최대값은 32입니다.

  • ParallelApplyBufferSize – CDC 로드 중에 Kafka 대상 엔드포인트로 푸시할 동시 스레드에 대한 각 버퍼 대기열에 저장할 최대 레코드 수를 지정합니다. 기본값은 50이고 최대값은 1,000입니다. ParallelApplyThreads가 둘 이상의 스레드를 지정할 때 이 옵션을 사용합니다.

  • ParallelApplyQueuesPerThread – 각 스레드가 대기열에서 데이터 레코드를 가져오고 CDC 중에 Kafka 엔드포인트에 대한 배치 로드를 생성하기 위한 대기열 수를 지정합니다.

ParallelApply* 작업 설정을 사용할 때 partition-key-type 기본값은 가 아닌 테이블primary-key의 입니다schema-name.table-name.

Apache Kafka 대상의 경우 이전 이미지를 사용하여 CDC 행의 원래 값 보기

Kafka 같은 데이터 스트리밍 대상에 CDC 업데이트를 작성하는 경우 업데이트를 통한 변경 전에 소스 데이터베이스 행의 원래 값을 볼 수 있습니다. 이를 위해 AWS DMS는 소스 데이터베이스 엔진에서 제공되는 데이터를 기반으로 업데이트 이벤트의 이전 이미지를 채웁니다.

소스 데이터베이스 엔진에 따라 이전 이미지에 대해 서로 다른 양의 정보를 제공합니다.

  • Oracle은 열이 변경되는 경우에만 열에 대한 업데이트를 제공합니다.

  • PostgreSQL은 기본 키의 일부인 열에 대한 데이터(변경 여부에 관계 없음)만 제공합니다.

  • MySQL은 일반적으로 모든 열에 대한 데이터(변경 여부에 관계 없음)를 제공합니다.

이전 이미징을 활성화하여 소스 데이터베이스의 원래 값을 AWS DMS 출력에 추가하려면 BeforeImageSettings 작업 설정 또는 add-before-image-columns 파라미터를 사용합니다. 이 파라미터는 열 변환 규칙을 적용합니다.

BeforeImageSettings는 다음과 같이 소스 데이터베이스 시스템에서 수집된 값을 사용하여 모든 업데이트 작업에 새 JSON 속성을 추가합니다.

"BeforeImageSettings": { "EnableBeforeImage": boolean, "FieldName": string, "ColumnFilter": pk-only (default) / non-lob / all (but only one) }
참고

전체 로드와 CDC 작업(기존 데이터 마이그레이션 및 지속적인 변경 사항 복제) 또는 CDC 전용 작업(데이터 변경 사항만 복제)에 BeforeImageSettings를 적용합니다. 전체 로드 전용 작업에는 BeforeImageSettings를 적용하지 마십시오.

BeforeImageSettings 옵션의 경우 다음이 적용됩니다.

  • 이전 이미징을 활성화하려면 EnableBeforeImage 옵션을 true로 설정합니다. 기본값은 입니다false.

  • FieldName 옵션을 사용하여 새 JSON 속성에 이름을 지정합니다. EnableBeforeImagetrue인 경우 FieldName이 필수이며 비워 둘 수 없습니다.

  • ColumnFilter 옵션은 이전 이미징을 사용하여 추가할 열을 지정합니다. 테이블 기본 키의 일부인 열만 추가하려면 기본값 pk-only를 사용하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 사용하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 사용합니다.

    "BeforeImageSettings": { "EnableBeforeImage": true, "FieldName": "before-image", "ColumnFilter": "pk-only" }

이전 이미지 변환 규칙 사용

작업 설정 대신 열 변환 규칙을 적용하는 add-before-image-columns 파라미터를 사용할 수 있습니다. 이 파라미터를 사용하면 Kafka 같은 데이터 스트리밍 대상에서 CDC 중에 이전 이미징을 활성화할 수 있습니다.

변환 규칙에 add-before-image-columns를 사용하면 이전 이미지 결과를 보다 세밀하게 제어할 수 있습니다. 변환 규칙을 통해 객체 로케이터를 사용하여 규칙에 대해 선택한 테이블을 제어할 수 있습니다. 또한 변환 규칙을 함께 연결하여 테이블마다 서로 다른 규칙을 적용할 수 있습니다. 그런 다음 다른 규칙을 사용하여 생성된 열을 조작할 수 있습니다.

참고

동일한 작업 내에서 add-before-image-columns 작업 설정과 함께 BeforeImageSettings 파라미터를 사용해서는 안 됩니다. 단일 작업에는 이 파라미터 또는 설정 중 하나만 사용하고 둘 다 사용하지 마십시오.

해당 열에 대해 transformation 파라미터가 있는 add-before-image-columns 규칙 유형이 before-image-def 섹션을 제공해야 합니다. 다음은 그 한 예입니다.

{ "rule-type": "transformation", … "rule-target": "column", "rule-action": "add-before-image-columns", "before-image-def":{ "column-filter": one-of (pk-only / non-lob / all), "column-prefix": string, "column-suffix": string, } }

column-prefix의 값은 열 이름 앞에 추가되며, column-prefix의 기본값은 BI_입니다. column-suffix의 값은 열 이름 뒤에 추가되며, 기본값은 비어 있습니다. column-prefixcolumn-suffix를 모두 빈 문자열로 설정하지 마십시오.

column-filter에 대해 하나의 값을 선택합니다. 테이블 기본 키의 일부인 열만 추가하려면 pk-only를 선택하고, LOB 유형이 아닌 열만 추가하려면 non-lob를 선택하고, 이전 이미지 값이 있는 모든 열을 추가하려면 all을 선택합니다.

이전 이미지 변환 규칙의 예

다음 예의 변환 규칙은 대상에서 BI_emp_no라는 새 열을 추가합니다. UPDATE employees SET emp_no = 3 WHERE emp_no = 1; 같은 문은 BI_emp_no 필드를 1로 채웁니다. Amazon S3 대상에 CDC 업데이트를 작성할 때 BI_emp_no 열을 통해 업데이트된 원래 행을 알 수 있습니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "%", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "transformation", "rule-id": "2", "rule-name": "2", "rule-target": "column", "object-locator": { "schema-name": "%", "table-name": "employees" }, "rule-action": "add-before-image-columns", "before-image-def": { "column-prefix": "BI_", "column-suffix": "", "column-filter": "pk-only" } } ] }

add-before-image-columns 규칙 작업 사용에 대한 자세한 내용은 변환 규칙 및 작업 단원을 참조하십시오.

Apache Kafka를 AWS Database Migration Service용 대상으로 사용할 때 적용되는 제한 사항

Apache Kafka를 대상으로 사용할 때 다음 제한 사항이 적용됩니다.

  • AWS DMS는 Kafka 대상에 대해 1MiB의 최대 메시지 크기를 지원합니다.

  • 동일한 보안 그룹의 및 를 기반으로 동일한 Virtual Private Cloud(VPC)에서 AWS DMS 복제 인스턴스Amazon VPC와 Kafka 클러스터를 모두 구성합니다. Kafka 클러스터는 Amazon MSK 인스턴스 또는 Amazon EC2에서 실행되는 자체 Kafka 인스턴스가 될 수 있습니다. 자세한 내용은 단원을 참조하세요복제 인스턴스용으로 네트워크 설정

    참고

    에 대한 보안 그룹을 지정하려면 Create cluster(클러스터 Amazon MSK 생성) 페이지에서 Advanced settings(고급 설정)를 선택하고 Customize settings(사용자 지정 설정)를 선택한 다음 보안 그룹을 선택하거나 복제 인스턴스와 동일한 경우 기본값을 그대로 사용합니다.

  • AWS DMS가 자동으로 새 주제를 만들 수 있는 속성을 사용하여 클러스터에 대한 Kafka 구성 파일을 지정합니다. auto.create.topics.enable = true 설정을 포함합니다. Amazon MSK를 사용하는 경우 Kafka 클러스터를 만들 때 기본 구성을 지정한 다음 auto.create.topics.enable 설정을 true로 변경할 수 있습니다. 기본 구성 설정에 대한 자세한 내용은 https://docs.aws.amazon.com/msk/latest/developerguide/msk-default-configuration.html 의 Amazon Managed Streaming for Apache Kafka 개발자 안내서기본 Amazon MSK 구성을 참조하십시오. Amazon MSK를 사용하여 만든 기존 Kafka 클러스터를 수정해야 하는 경우 다음 예제와 같이 AWS CLI 명령 aws kafka create-configuration을 실행하여 Kafka 구성을 업데이트합니다.

    14:38:41 $ aws kafka create-configuration --name "kafka-configuration" --kafka-versions "2.2.1" --server-properties file://~/kafka_configuration { "LatestRevision": { "Revision": 1, "CreationTime": "2019-09-06T14:39:37.708Z" }, "CreationTime": "2019-09-06T14:39:37.708Z", "Name": "kafka-configuration", "Arn": "arn:aws:kafka:us-east-1:111122223333:configuration/kafka-configuration/7e008070-6a08-445f-9fe5-36ccf630ecfd-3" }

    여기서, //~/kafka_configuration는 필요한 속성 설정으로 만든 구성 파일입니다.

    Amazon EC2에 설치된 자체 Kafka 인스턴스를 사용하는 경우, 인스턴스에 제공된 옵션을 사용하여 auto.create.topics.enable = true 등 유사한 속성 설정으로 Kafka 클러스터 구성을 수정합니다.

  • AWS DMS 는 트랜잭션에 관계없이 각 업데이트를 특정 Kafka 주제의 단일 데이터 레코드(메시지)로서 원본 데이터베이스의 단일 레코드에 게시합니다.

  • AWS DMS는 다음과 같이 두 가지 형식의 파티션 키를 지원합니다.

    • SchemaName.TableName: 스키마와 테이블 이름의 조합입니다.

    • ${AttributeName}: JSON 내 한 필드의 값 또는 원본 데이터베이스 내 테이블의 기본 키입니다.

객체 매핑을 사용하여 데이터를 Kafka 주제로 마이그레이션

AWS DMS 는 테이블 매핑 규칙을 사용하여 데이터를 원본에서 대상 Kafka 주제로 매핑합니다. 데이터를 대상 주제에 매핑하려면 객체 매핑이라는 테이블 매핑 규칙 유형을 사용합니다. 객체 매핑을 사용하여 원본의 데이터 레코드를 Kafka 주제에 게시된 데이터 레코드로 매핑하는 방법을 지정합니다.

Kafka 주제에는 파티션 키 외에 별다른 사전 설정 구조가 없습니다.

객체 매핑 규칙을 만들려면 rule-typeobject-mapping으로 지정합니다. 이 규칙은 사용할 객체 매핑 유형을 지정합니다.

이 규칙의 구조는 다음과 같습니다.

{ "rules": [ { "rule-type": "object-mapping", "rule-id": "id", "rule-name": "name", "rule-action": "valid object-mapping rule action", "object-locator": { "schema-name": "case-sensitive schema name", "table-name": "" } } ] }

AWS DMS는 현재 map-record-to-recordmap-record-to-document만을 rule-action 파라미터의 유효한 값으로 지원합니다. map-record-to-recordmap-record-to-document 값은 AWS DMS 속성 목록의 일부로 제외되지 않은 레코드에 대해 exclude-columns가 기본적으로 수행하는 작업을 지정합니다. 이러한 값은 어떤 식으로든 속성 매핑에 영향을 미치지 않습니다.

관계형 데이터베이스에서 Kafka 주제로 마이그레이션할 때 map-record-to-record를 사용합니다. 이러한 규칙 유형은 Kafka 주제의 파티션 키로 관계형 데이터베이스의 taskResourceId.schemaName.tableName 값을 사용하며 소스 데이터베이스의 각 열마다 속성을 생성합니다. 를 사용하는 경우 map-record-to-record 속성 목록에 나열되지 않은 원본 테이블의 열에 exclude-columns대해 는 대상 주제에 해당 속성을 AWS DMS 생성합니다. 이러한 해당 속성은 원본 열이 속성 매핑에 사용되는지 여부와 상관없이 생성됩니다.

map-record-to-record를 이해하는 한 가지 방법은 작업 중일 때 관찰하는 것입니다. 이 예에서는 다음 구조와 데이터를 사용하여 관계형 데이터베이스 테이블 행에서 시작한다고 가정합니다.

FirstName LastName StoreId HomeAddress HomePhone WorkAddress WorkPhone DateofBirth

Randy

Marsh

5

221B Baker Street

1234567890

31 Spooner Street, Quahog

9876543210

1988/02/29

이 정보를 Test라는 스키마에서 Kafka 주제로 마이그레이션하려면 데이터를 대상 주제로 매핑하는 규칙을 생성합니다. 다음 규칙은 그 매핑 과정을 보여 줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "DefaultMapToKafka", "rule-action": "map-record-to-record", "object-locator": { "schema-name": "Test", "table-name": "Customers" } } ] }

Kafka 주제와 파티션 키(이 경우 taskResourceId.schemaName.tableName)가 주어진 경우, 다음은 Kafka 대상 주제의 샘플 데이터를 사용하여 결과 레코드 형식을 보여줍니다.

{ "FirstName": "Randy", "LastName": "Marsh", "StoreId": "5", "HomeAddress": "221B Baker Street", "HomePhone": "1234567890", "WorkAddress": "31 Spooner Street, Quahog", "WorkPhone": "9876543210", "DateOfBirth": "02/29/1988" }

속성 매핑으로 날짜 재구성

속성 맵을 사용하여 데이터를 Kafka 주제로 마이그레이션하는 동안 데이터를 재구성할 수 있습니다. 예를 들어 원본의 필드 몇 개를 대상의 단일 필드로 묶어야 하는 경우도 있을 것입니다. 다음의 속성 맵은 날짜를 재구성하는 방법을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "rule-action": "include", "object-locator": { "schema-name": "Test", "table-name": "%" } }, { "rule-type": "object-mapping", "rule-id": "2", "rule-name": "TransformToKafka", "rule-action": "map-record-to-record", "target-table-name": "CustomerData", "object-locator": { "schema-name": "Test", "table-name": "Customers" }, "mapping-parameters": { "partition-key-type": "attribute-name", "partition-key-name": "CustomerName", "exclude-columns": [ "firstname", "lastname", "homeaddress", "homephone", "workaddress", "workphone" ], "attribute-mappings": [ { "target-attribute-name": "CustomerName", "attribute-type": "scalar", "attribute-sub-type": "string", "value": "${lastname}, ${firstname}" }, { "target-attribute-name": "ContactDetails", "attribute-type": "document", "attribute-sub-type": "json", "value": { "Home": { "Address": "${homeaddress}", "Phone": "${homephone}" }, "Work": { "Address": "${workaddress}", "Phone": "${workphone}" } } } ] } } ] }

partition-key에 상수 값을 설정하려면 partition-key 값을 지정합니다. 예를 들어 이 방법을 통해 모든 데이터를 단일 파티션에 저장할 수 있습니다. 다음 매핑은 이러한 접근 방식을 보여줍니다.

{ "rules": [ { "rule-type": "selection", "rule-id": "1", "rule-name": "1", "object-locator": { "schema-name": "Test", "table-name": "%" }, "rule-action": "include" }, { "rule-type": "object-mapping", "rule-id": "1", "rule-name": "TransformToKafka", "rule-action": "map-record-to-document", "object-locator": { "schema-name": "Test", "table-name": "Customer" }, "mapping-parameters": { "partition-key": { "value": "ConstantPartitionKey" }, "exclude-columns": [ "FirstName", "LastName", "HomeAddress", "HomePhone", "WorkAddress", "WorkPhone" ], "attribute-mappings": [ { "attribute-name": "CustomerName", "value": "${FirstName},${LastName}" }, { "attribute-name": "ContactDetails", "value": { "Home": { "Address": "${HomeAddress}", "Phone": "${HomePhone}" }, "Work": { "Address": "${WorkAddress}", "Phone": "${WorkPhone}" } } }, { "attribute-name": "DateOfBirth", "value": "${DateOfBirth}" } ] } } ] }
참고

특정 테이블을 위한 제어 레코드에 대한 partition-key 값은 TaskId.SchemaName.TableName입니다. 특정 작업을 위한 제어 레코드에 대한 partition-key 값은 해당 레코드의 TaskId입니다. partition-key 값을 객체 매핑에 지정해도 제어 레코드에 대한 partition-key에는 영향이 없습니다.

Apache Kafka 메시지 형식

JSON 출력은 단지 키-값 페어의 목록입니다.

RecordType

레코드 유형은 데이터나 제어일 수 있습니다. 데이터 레코드는 원본 내의 실제 행을 나타냅니다. 제어 레코드는 스트림 내의 중요 이벤트를 나타냅니다(예: 작업의 재시작).

작업

데이터 레코드의 경우 작업은 , create read, update또는 일 수 delete있습니다.

데이터 레코드의 경우, 작업은 TruncateTable 또는 DropTable일 수 있습니다.

SchemaName

레코드에 대한 원본 스키마입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

TableName

레코드에 대한 원본 테이블입니다. 제어 레코드의 경우, 이 필드는 비워둘 수 있습니다.

타임스탬프

JSON 메시지가 구성된 경우의 타임스탬프입니다. 이 필드는 ISO 8601 형식으로 구성되었습니다.