Kinesis 클라이언트 라이브러리 사용 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Kinesis 클라이언트 라이브러리 사용

KDS 데이터 스트림에서 데이터를 처리할 수 있는 사용자 지정 소비자 응용 프로그램을 개발하는 방법 중 하나는 Kinesis 클라이언트 라이브러리 (KCL) 를 사용하는 것입니다.

참고

KCL 1.x 및 KCL 2.x의 경우 사용 시나리오에 따라 최신 KCL 1.x 버전 또는 KCL 2.x 버전으로 업그레이드하는 것이 좋습니다. KCL 1.x 및 KCL 2.x는 모두 최신 종속성 및 보안 패치, 버그 수정 및 이전 버전과 호환되는 새로운 기능을 포함하는 최신 릴리스로 정기적으로 업데이트됩니다. 자세한 내용은 단원을 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/릴리스.

Kinesis 클라이언트 라이브러리란 무엇입니까?

KCL은 분산 컴퓨팅과 관련된 많은 복잡한 작업을 처리하여 Kinesis 데이터 스트림에서 데이터를 소비하고 처리할 수 있도록 지원합니다. 여기에는 여러 소비자 애플리케이션 인스턴스의 로드 밸런싱, 소비자 애플리케이션 인스턴스 실패에 대한 응답, 처리된 레코드 검사 및 리샤딩에 대한 반응이 포함됩니다. KCL은 이러한 모든 하위 작업을 처리하므로 사용자 지정 레코드 처리 로직을 작성하는 데 집중할 수 있습니다.

KCL은 에서 사용할 수 있는 Kinesis Data Streams API와 다릅니다.AWSSDK. Kinesis Data Streams API는 스트림 생성, 리샤딩, 레코드 넣기 및 가져오기 등 Kinesis Data Streams 의 여러 가지 측면을 관리하는 데 도움이 됩니다. KCL은 이러한 모든 하위 작업에 대한 추상화 계층을 제공합니다. 특히 소비자 애플리케이션의 사용자 지정 데이터 처리 로직에 집중할 수 있습니다. Kinesis Data Streams API에 대한 자세한 내용은Amazon Kinesis API 참조.

중요

KCL은 자바 라이브러리입니다. Java 이외의 언어에 대한 지원은MultiLang데몬. 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 예를 들어, Python용 KCL을 설치하고 Python으로만 소비자 애플리케이션을 작성한 경우에도 시스템에 Java를 설치해야 합니다.MultiLang데몬. 추가,MultiLangDaemon에는 사용 사례에 맞게 사용자 지정해야 하는 몇 가지 기본 설정이 있습니다 (예:AWS연결되는 지역입니다. 에 대한 자세한 내용은MultiLang데몬GitHub참조할 항목KCLMultiLang데몬 프로젝트.

KCL은 레코드 처리 로직과 Kinesis Data Streams 간에 중간 역할을 합니다. KCL은 다음 작업을 수행합니다.

  • 데이터 스트림에 연결합니다.

  • 데이터 스트림 내 샤드를 열거합니다.

  • 리스를 사용하여 작업자와의 샤드 연결 조정

  • 관리하는 모든 샤드의 레코드 프로세서를 인스턴스화합니다

  • 데이터 스트림에서 데이터 레코드를 가져옵니다.

  • 해당하는 레코드 프로세서로 레코드를 푸시합니다

  • 처리된 레코드에 대해 체크포인트를 수행합니다

  • 작업자 인스턴스 수가 변경되거나 데이터 스트림이 리샤드될 때 (샤드가 분할 또는 병합) 될 때 샤드-작업자 연결 (리스) 의 균형을 조정합니다.

KCL 사용 가능한 버전

현재 지원되는 KCL의 다음 버전 중 하나를 사용하여 사용자 지정 소비자 애플리케이션을 빌드할 수 있습니다.

KCL 1.x 또는 KCL 2.x를 사용하여 공유 처리량을 사용하는 소비자 애플리케이션을 빌드할 수 있습니다. 자세한 정보는 KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발을 참조하십시오.

전용 처리량 (향상된 팬아웃 소비자) 을 사용하는 소비자 애플리케이션을 구축하려면 KCL 2.x만 사용할 수 있습니다. 자세한 정보는 전용 처리량으로 사용자 지정 소비자 개발(향상된 팬아웃)을 참조하십시오.

KCL 1.x와 KCL 2.x의 차이점과 KCL 1.x에서 KCL 2.x로 마이그레이션하는 방법에 대한 자세한 내용은 단원을 참조하십시오.KCL 1.x에서 KCL 2.x로 소비자 마이그레이션.

KCL 개념

  • KCL 소비자 애플리케이션KCL을 사용하여 사용자 지정되어 데이터 스트림의 레코드를 읽고 처리하도록 설계된 애플리케이션입니다.

  • 소비자 애플리케이션 인스턴스- KCL 소비자 애플리케이션은 일반적으로 분산되며, 장애를 조정하고 데이터 레코드 처리를 동적으로 로드 밸런싱하기 위해 하나 이상의 애플리케이션 인스턴스가 동시에 실행됩니다.

  • 작업자— KCL 소비자 애플리케이션 인스턴스가 데이터 처리를 시작하는 데 사용하는 상위 수준 클래스입니다.

    중요

    각 KCL 소비자 애플리케이션 인스턴스에는 하나의 워커가 있습니다.

    작업자는 샤드 및 리스 정보 동기화, 샤드 할당 추적, 샤드의 데이터 처리 등 다양한 작업을 초기화하고 감독합니다. 작업자는 KCL에 소비자 응용 프로그램에 대한 구성 정보를 제공합니다. 예를 들어 이 KCL 소비자 응용 프로그램이 처리할 데이터 레코드의 데이터 스트림의 이름과AWS이 데이터 스트림에 액세스하는 데 필요한 자격 증명입니다. 또한 워커는 특정 KCL 소비자 애플리케이션 인스턴스를 시작하여 데이터 스트림에서 레코드 프로세서로 데이터 레코드를 전달합니다.

    중요

    KCL 1.x에서는이 클래스가 호출됩니다.작업자. 자세한 내용은 (Java KCL 리포지토리) 를 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/BLOB/V1.x/SRC/메인/자바/COM/아마존/서비스/중국어/클라이언트 라이브러리/LIB/노동자/노동자. 자바. KCL 2.x에서는 이 클래스가 호출됩니다.스케줄러. KCL 2.x의 스케줄러의 목적은 KCL 1.x의 작업자 목적과 동일합니다. KCL 2.x의 스케줄러 클래스에 대한 자세한 내용은 단원을 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/blob/마스터/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler.java.

  • 임대— 워커와 샤드 간의 바인딩을 정의하는 데이터입니다. 분산된 KCL 소비자 애플리케이션은 임대를 사용하여 여러 근로자 간에 데이터 레코드 처리를 분할합니다. 언제든지 데이터 레코드의 각 샤드는 다음과 같이 식별된 임대에 의해 특정 작업자에게 바인딩됩니다.leaseKey변수.

    기본적으로 근로자는 하나 이상의 임대를 보유할 수 있습니다 (maxLeasesFor작업자동시에 변수).

    중요

    모든 작업자는 데이터 스트림에서 사용 가능한 모든 샤드에 대해 사용 가능한 모든 임대를 보유하겠다고 주장합니다. 그러나 한 번에 한 명의 근로자만이 각 임대를 성공적으로 보유합니다.

    예를 들어 4개의 샤드가 있는 데이터 스트림을 처리하는 작업자 A가 있는 소비자 애플리케이션 인스턴스 A가 있는 경우 작업자 A는 샤드 1, 2, 3 및 4에 리스를 동시에 보유할 수 있습니다. 하지만 두 개의 소비자 애플리케이션 인스턴스가 있는 경우: A 및 B는 작업자 A와 작업자 B를 사용하는 경우 이러한 인스턴스는 4개의 샤드로 데이터 스트림을 처리하고 있으며 작업자 A와 작업자 B는 동시에 샤드 1에 대한 임대를 보유 할 수 없습니다. 한 작업자는 이 샤드의 데이터 레코드 처리를 중지할 준비가 될 때까지 또는 실패할 때까지 특정 샤드에 대한 임대를 보유합니다. 한 근로자가 임대 보유를 중지하면 다른 근로자가 임대를 차지하고 보유합니다.

    자세한 내용은 (Java KCL 리포지토리) 를 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/BLOB/V1.x/SRC/메인/자바/COM/아마존/서비스/중국어/임대/임대/IMP/임대. 자바KCL 1.x의 경우https://github.com/awslabs/amazon-kinesis-client/blob/마스터/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease.javaKCL 2.x의 경우.

  • 임대 테이블- KCL 소비자 애플리케이션의 워커가 임대 및 처리하는 KDS 데이터 스트림의 샤드를 추적하는 데 사용되는 고유한 Amazon DynamoDB 테이블입니다. 임대 테이블은 KCL 소비자 애플리케이션이 실행되는 동안 데이터 스트림의 최신 샤드 정보와 (작업자 내 및 모든 워커에 걸쳐) 동기화 상태를 유지해야 합니다. 자세한 정보는 임대 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리된 샤드 추적을 참조하십시오.

  • 레코드 프로세서— KCL 소비자 애플리케이션이 데이터 스트림에서 가져온 데이터를 처리하는 방법을 정의하는 로직입니다. 런타임에 KCL 소비자 애플리케이션 인스턴스는 워커를 인스턴스화하고, 이 워커는 임대를 보유하는 모든 샤드에 대해 하나의 레코드 프로세서를 인스턴스화합니다.

임대 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리된 샤드 추적

임대 테이블이란 무엇입니까?

각 Amazon Kinesis Data Streams 애플리케이션에 대해 KCL은 고유한 임대 테이블 (Amazon DynamoDB 테이블에 저장됨) 을 사용하여 KCL 소비자 애플리케이션의 워커가 임대 및 처리하는 KDS 데이터 스트림의 샤드를 추적합니다.

중요

KCL은 소비자 애플리케이션의 이름을 사용하여 이 소비자 애플리케이션이 사용하는 리스 테이블의 이름을 생성하므로 각 소비자 애플리케이션 이름은 고유해야 합니다.

다음을 사용하여 임대 테이블을 볼 수 있습니다.Amazon DynamoDB 콘솔소비자 애플리케이션이 실행되는 동안

애플리케이션이 시작될 때 KCL 소비자 애플리케이션의 리스 테이블이 없으면 작업자 중 하나가 이 애플리케이션에 대한 리스 테이블을 생성합니다.

중요

Kinesis Data Streams 자체와 관련된 비용 외에도 DynamoDB 테이블과 관련된 비용이 계정에 청구됩니다.

lease 테이블의 각 행은 소비자 애플리케이션의 작업자가 처리 중인 샤드를 나타냅니다. KCL 소비자 애플리케이션이 하나의 데이터 스트림만 처리하는 경우leaseKey이 값은 샤드 ID이며, 리스 테이블의 해시 키입니다. 만약Java 소비자 응용 프로그램에 대해 동일한 KCL 2.x를 사용하여 여러 데이터 스트림 처리그러면 LeaseKey의 구조는 다음과 같습니다. account-id:StreamName:streamCreationTimestamp:ShardId. 예,111111111:multiStreamTest-1:12345:shardId-000000000336.

각 행에는 샤드 ID 외에도 다음 데이터가 포함됩니다.

  • 체크포인트: 샤드의 가장 최근 체크포인트 시퀀스 번호입니다. 이 값은 데이터 스트림의 모든 샤드에서 고유합니다.

  • checkpointSubSequence숫자: Kinesis 프로듀서 라이브러리의 집계 기능을 사용할 때 이 기능은 다음과 같은 확장입니다.체크포인트Kinesis 레코드 내의 개별 사용자 레코드를 추적합니다.

  • leaseCounter: 다른 작업자가 리스를 받았다는 것을 작업자가 감지할 수 있도록 리스 버전 관리에 사용됩니다.

  • leaseKey: 임대의 고유 식별자입니다. 각 리스는 데이터 스트림의 샤드에 특정적이며 한 번에 한 작업자가 리스를 보유합니다.

  • leaseOwner: 이 임대를 보유하는 작업자입니다.

  • ownerSwitchesSince체크포인트: 마지막으로 체크포인트가 쓰여진 이후 이 임대가 작업자를 변경한 횟수입니다.

  • parentShardId: 하위 샤드에서 처리를 시작하기 전에 상위 샤드가 완전히 처리되도록 하는 데 사용됩니다. 그러면 스트림에 입력된 순서대로 레코드가 처리됩니다.

  • 해시 범위: 에 의해 사용PeriodicShardSyncManager주기적인 동기화를 실행하여 리스 테이블에서 누락된 샤드를 찾고 필요한 경우 리스를 생성합니다.

    참고

    이 데이터는 KCL 1.14 및 KCL 2.3부터 시작하는 모든 샤드의 임대 테이블에 있습니다. 에 대한 자세한 내용PeriodicShardSyncManager리스와 샤드 간의 정기 동기화는 다음을 참조하십시오.임대 테이블이 KDS 데이터 스트림의 샤드와 동기화되는 방법.

  • 어린이용 샤드: 에 의해 사용LeaseCleanupManager하위 샤드의 처리 상태를 검토하고 임대 테이블에서 상위 샤드를 삭제할 수 있는지 여부를 결정합니다.

    참고

    이 데이터는 KCL 1.14 및 KCL 2.3부터 시작하는 모든 샤드의 임대 테이블에 있습니다.

  • 샤르드: 샤드의 ID입니다.

    참고

    이 데이터는 임대 테이블에만 표시됩니다.Java 소비자 응용 프로그램에 대해 동일한 KCL 2.x를 사용하여 여러 데이터 스트림 처리. 이는 Java용 KCL 2.x에서만 지원됩니다. Java용 KCL 2.3부터 시작하여 Java를 비롯한

  • stream name다음과 같은 형식의 데이터 스트림 식별자입니다.account-id:StreamName:streamCreationTimestamp.

    참고

    이 데이터는 임대 테이블에만 표시됩니다.Java 소비자 응용 프로그램에 대해 동일한 KCL 2.x를 사용하여 여러 데이터 스트림 처리. 이는 Java용 KCL 2.x에서만 지원됩니다. Java용 KCL 2.3부터 시작하여 Java를 비롯한

처리량

Amazon Kinesis Data Streams 애플리케이션에서 프로비저닝된 처리량을 예외를 수신하는 경우 DynamoDB 테이블의 프로비저닝된 처리량을 늘려야 합니다. KCL은 프로비저닝된 처리량이 초당 읽기 10개, 초당 쓰기 10개인 테이블을 생성하지만 애플리케이션에 충분하지 않을 수도 있습니다. 예를 들어, Amazon Kinesis Data Streams 애플리케이션이 체크포인트를 자주 수행하거나 여러 샤드로 구성된 스트림에서 작동하는 경우 처리량이 더 필요할 수 있습니다.

DynamoDB에서 프로비저닝된 처리량에 대한 자세한 내용은 섹션을 참조하십시오읽기/쓰기 용량 모드테이블 및 데이터 작업Amazon DynamoDB Developer Guide.

임대 테이블이 KDS 데이터 스트림의 샤드와 동기화되는 방법

KCL 소비자 애플리케이션의 작업자는 리스를 사용하여 지정된 데이터 스트림의 샤드를 처리합니다. 주어진 시간에 어떤 샤드를 임대하고 있는지에 대한 정보는 임대 테이블에 저장됩니다. 임대 테이블은 KCL 소비자 애플리케이션이 실행되는 동안 데이터 스트림의 최신 샤드 정보와 동기화된 상태를 유지해야 합니다. KCL은 소비자 애플리케이션 부트스트래핑 (소비자 애플리케이션이 초기화되거나 다시 시작될 때) 및 처리 중인 샤드가 종료 (리샤딩) 에 도달할 때마다 Kinesis Data Streams 서비스에서 획득한 샤드 정보와 리스 테이블을 동기화합니다. 즉, 작업자 또는 KCL 소비자 애플리케이션은 초기 소비자 애플리케이션 부트스트랩 동안 그리고 소비자 애플리케이션이 데이터 스트림 reshard 이벤트가 발생할 때마다 처리 중인 데이터 스트림과 동기화됩니다.

KCL 1.0 - 1.13 및 KCL 2.0 - 2.2에서의 동기화

KCL 1.0 - 1.13 및 KCL 2.0 - 2.2에서 소비자 애플리케이션의 부트스트래핑 및 각 데이터 스트림 리샤드 이벤트 중에 KCL은 임대 테이블을 Kinesis Kinesis Data Streams 서비스에서 획득한 샤드 정보와 동기화합니다.ListShards또는DescribeStream검색 API. 위에 나열된 모든 KCL 버전에서 KCL 소비자 애플리케이션의 각 워커는 다음 단계를 완료하여 소비자 애플리케이션의 부트스트래핑과 각 스트림 리샤드 이벤트에서 리스/샤드 동기화 프로세스를 수행합니다.

  • 처리 중인 스트림에서 데이터에 대한 모든 샤드를 가져옵니다.

  • 임대 테이블에서 모든 샤드 리스를 가져옵니다.

  • 임대 테이블에 임대가 없는 열려 있는 각 샤드를 필터링합니다.

  • 발견 된 모든 열린 샤드와 열린 부모가없는 각 열린 샤드에 대해 반복합니다.

    • 상위 경로를 통해 계층 트리를 통과하여 샤드가 하위 항목인지 확인합니다. 조상 샤드가 처리 중이거나 (상위 샤드의 임대 항목이 임대 테이블에 있음) 또는 조상 샤드를 처리해야 하는 경우 (예: 초기 위치가TRIM_HORIZON또는AT_TIMESTAMP)

    • 컨텍스트에서 열린 샤드가 자손인 경우 KCL은 초기 위치를 기반으로 샤드를 체크포인트하고 필요한 경우 상위에 대한 임대를 생성합니다.

KCL 2.3부터 시작하여 KCL 2.x의 동기화

지원되는 최신 버전의 KCL 2.x (KCL 2.3) 부터 라이브러리는 이제 동기화 프로세스에 대한 다음과 같은 변경 사항을 지원합니다. 이러한 리스/샤드 동기화 변경은 KCL 소비자 애플리케이션이 Kinesis Data Streams 서비스로 수행한 API 호출 수를 크게 줄이고 KCL 소비자 애플리케이션에서 임대 관리를 최적화합니다.

  • 응용 프로그램의 부트스트래핑 중에 임대 테이블이 비어 있으면 KCL은ListShardAPI의 필터링 옵션 (ShardFilter선택적 요청 매개 변수) 에서 지정한 시간에 열려 있는 샤드의 스냅샷에 대해서만 리스를 검색하고 생성합니다.ShardFilter파라미터. 이ShardFilter매개 변수를 사용하면 의 응답을 필터링할 수 있습니다.ListShardsAPI. 의 유일한 필수 재산ShardFilterParameterType. KCL은 다음을 사용합니다.Typefilter 속성과 다음과 같은 유효한 값을 사용하여 새 임대가 필요할 수 있는 열린 샤드의 스냅샷을 식별하고 반환합니다.

    • AT_TRIM_HORIZON- 응답에는 열려 있던 모든 샤드가 포함됩니다.TRIM_HORIZON.

    • AT_LATEST- 응답에는 현재 열려 있는 데이터 스트림의 샤드만 포함됩니다.

    • AT_TIMESTAMP- 응답에는 시작 타임스탬프가 지정된 타임스탬프보다 작거나 같고 종료 타임스탬프가 지정된 타임스탬프보다 크거나 같거나 열려 있는 모든 샤드가 포함됩니다.

    ShardFilter는 에 지정된 샤드의 스냅샷에 대한 리스를 초기화하기 위해 빈 임대 테이블에 대한 리스를 생성할 때 사용됩니다.RetrievalConfig#initialPositionInStreamExtended.

    ShardFilter에 대한 자세한 내용은 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html 섹션을 참조하세요.

  • 리스/샤드 동기화를 수행하는 모든 워커가 데이터 스트림의 최신 샤드와 함께 리스 테이블을 최신 상태로 유지하는 대신 선출된 단일 작업자 리더가 리스/샤드 동기화를 수행합니다.

  • KCL 2.3은ChildShards의 반환 매개 변수GetRecordsSubscribeToShard에서 발생하는 리스/샤드 동기화를 수행하는 APISHARD_END닫힌 샤드의 경우, KCL 작업자가 처리를 마친 샤드의 하위 샤드에 대해서만 임대를 생성할 수 있습니다. 소비자 응용 프로그램 전체에서 공유되는 경우 이러한 임대/샤드 동기화를 최적화하면ChildShards의 매개 변수GetRecordsAPI. 전용 처리량 (향상된 팬아웃) 소비자 애플리케이션의 경우 리스/샤드 동기화의 최적화는ChildShards의 매개 변수SubscribeToShardAPI. 자세한 내용은 단원을 참조하십시오.GetRecords,SubscribeTo샤드, 및ChildShard.

  • 위의 변경 사항에 따라 KCL의 동작은 모든 기존 샤드에 대해 학습하는 모든 워커의 모델에서 각 워커가 소유하는 샤드의 하위 샤드에 대해서만 학습하는 작업자 모델로 이동합니다. 따라서 소비자 응용 프로그램 부트스트래핑 및 리샤드 이벤트 중에 발생하는 동기화 외에도 KCL은 임대 테이블의 잠재적 구멍을 식별하기 위해 추가 주기적인 샤드/리스 스캔을 수행하여 (즉, 모든 새로운 샤드에 대해 알아보기 위해)데이터 스트림의 해시 범위가 처리되고 필요한 경우 임대를 생성합니다.PeriodicShardSyncManager는 주기적인 리스/샤드 스캔을 실행하는 역할을 담당하는 구성 요소입니다.

    에 대한 자세한 내용PeriodicShardSyncManagerKCL 2.3에서 다음을 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/blob/마스터/amazon-kinesis-client/src/main/java/소프트웨어/아마존/키네시스/리스/LeaseManagementConfig.java #L201 -L213.

    KCL 2.3에서는 새로운 구성 옵션을 구성할 수 있습니다.PeriodicShardSyncManager에서LeaseManagementConfig:

    이름 기본값 설명
    leasesRecoveryAuditorExecutionFrequency밀리스

    120000 (2분)

    리스 테이블에서 부분 임대를 검색할 감사 작업의 빈도 (밀리초) 입니다. 감사자가 스트림에 대한 임대의 구멍을 감지하면 다음을 기반으로 샤드 동기화가 트리거됩니다.leasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidence임계값

    3

    리스 테이블의 데이터 스트림에 대한 임대가 일치하지 않는지 확인하기 위한 주기적인 감사자 작업에 대한 신뢰 임계값입니다. 감사자가 여러 번 데이터 스트림에 대해 동일한 불일치 집합을 연속적으로 발견하면 샤드 동기화가 트리거됩니다.

    NewCloudWatch이제 메트릭이 방출되어 상태를 모니터링할 수 있습니다.PeriodicShardSyncManager. 자세한 정보는 정기 간행차드싱매니저을 참조하십시오.

  • 다음을 위한 최적화 포함HierarchicalShardSyncer하나의 샤드 레이어에 대해서만 임대를 생성할 수 있습니다.

KCL 1.14 및 그 이후부터 시작하여 KCL 1.x의 동기화

지원되는 최신 버전의 KCL 1.x (KCL 1.14) 부터 시작하여 라이브러리는 이제 동기화 프로세스에 대한 다음과 같은 변경 사항을 지원합니다. 이러한 리스/샤드 동기화 변경은 KCL 소비자 애플리케이션이 Kinesis Data Streams 서비스로 수행한 API 호출 수를 크게 줄이고 KCL 소비자 애플리케이션에서 임대 관리를 최적화합니다.

  • 응용 프로그램의 부트스트래핑 중에 임대 테이블이 비어 있으면 KCL은ListShardAPI의 필터링 옵션 (ShardFilter선택적 요청 매개 변수) 에서 지정한 시간에 열려 있는 샤드의 스냅샷에 대해서만 리스를 검색하고 생성합니다.ShardFilter파라미터. 이ShardFilter매개 변수를 사용하면 의 응답을 필터링할 수 있습니다.ListShardsAPI. 의 유일한 필수 재산ShardFilterParameterType. KCL은 다음을 사용합니다.Typefilter 속성과 다음과 같은 유효한 값을 사용하여 새 임대가 필요할 수 있는 열린 샤드의 스냅샷을 식별하고 반환합니다.

    • AT_TRIM_HORIZON- 응답에는 열려 있던 모든 샤드가 포함됩니다.TRIM_HORIZON.

    • AT_LATEST- 응답에는 현재 열려 있는 데이터 스트림의 샤드만 포함됩니다.

    • AT_TIMESTAMP- 응답에는 시작 타임스탬프가 지정된 타임스탬프보다 작거나 같고 종료 타임스탬프가 지정된 타임스탬프보다 크거나 같거나 열려 있는 모든 샤드가 포함됩니다.

    ShardFilter는 에 지정된 샤드의 스냅샷에 대한 리스를 초기화하기 위해 빈 임대 테이블에 대한 리스를 생성할 때 사용됩니다.KinesisClientLibConfiguration#initialPositionInStreamExtended.

    ShardFilter에 대한 자세한 내용은 https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html 섹션을 참조하세요.

  • 리스/샤드 동기화를 수행하는 모든 워커가 데이터 스트림의 최신 샤드와 함께 리스 테이블을 최신 상태로 유지하는 대신 선출된 단일 작업자 리더가 리스/샤드 동기화를 수행합니다.

  • KCL 1.14는 다음을 사용합니다.ChildShards의 반환 매개 변수GetRecordsSubscribeToShard에서 발생하는 리스/샤드 동기화를 수행하는 APISHARD_END닫힌 샤드의 경우, KCL 작업자가 처리를 마친 샤드의 하위 샤드에 대해서만 임대를 생성할 수 있습니다. 자세한 내용은 단원을 참조하십시오.GetRecordsChildShard.

  • 위의 변경 사항에 따라 KCL의 동작은 모든 기존 샤드에 대해 학습하는 모든 워커의 모델에서 각 워커가 소유하는 샤드의 하위 샤드에 대해서만 학습하는 작업자 모델로 이동합니다. 따라서 소비자 응용 프로그램 부트스트래핑 및 리샤드 이벤트 중에 발생하는 동기화 외에도 KCL은 임대 테이블의 잠재적 구멍을 식별하기 위해 추가 주기적인 샤드/리스 스캔을 수행하여 (즉, 모든 새로운 샤드에 대해 알아보기 위해)데이터 스트림의 해시 범위가 처리되고 필요한 경우 임대를 생성합니다.PeriodicShardSyncManager는 주기적인 리스/샤드 스캔을 실행하는 역할을 담당하는 구성 요소입니다.

    일시KinesisClientLibConfiguration#shardSyncStrategyType다음의 경우 이 로 설정됩니다.ShardSyncStrategyType.SHARD_END,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold는 임대 테이블에 구멍이 포함된 연속 스캔 횟수에 대한 임계값을 확인한 후 샤드 동기화를 적용하는 데 사용됩니다. 일시KinesisClientLibConfiguration#shardSyncStrategyType다음의 경우 이 로 설정됩니다.ShardSyncStrategyType.PERIODIC,leasesRecoveryAuditorInconsistencyConfidenceThreshold은 무시됩니다.

    에 대한 자세한 내용PeriodicShardSyncManagerKCL 1.14에서 다음을 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/메인/자바/COM/아마존/서비스/중국어/클라이언트 라이브러리/lib/노동자/KinesisClientLibConfiguration.자바 #L987 -L999.

    KCL 1.14에서는 새로운 구성 옵션을 구성할 수 있습니다.PeriodicShardSyncManager에서LeaseManagementConfig:

    이름 기본값 설명
    leasesRecoveryAuditorInconsistencyConfidence임계값

    3

    리스 테이블의 데이터 스트림에 대한 임대가 일치하지 않는지 확인하기 위한 주기적인 감사자 작업에 대한 신뢰 임계값입니다. 감사자가 여러 번 데이터 스트림에 대해 동일한 불일치 집합을 연속적으로 발견하면 샤드 동기화가 트리거됩니다.

    NewCloudWatch이제 메트릭이 방출되어 상태를 모니터링할 수 있습니다.PeriodicShardSyncManager. 자세한 정보는 정기 간행차드싱매니저을 참조하십시오.

  • KCL 1.14는 이제 지연된 리스 정리를 지원합니다. 임대는 다음과 같이 비동기적으로 삭제됩니다.LeaseCleanupManager도달 시SHARD_END데이터 스트림의 보존 기간이 지난 후 샤드가 만료되었거나 리샤딩 작업의 결과로 종료된 경우

    구성에 새 구성 옵션을 사용할 수 있습니다.LeaseCleanupManager.

    이름 기본값 설명
    leaseCleanupInterval밀리스

    1분

    임대 정리 스레드를 실행할 간격입니다.

    completedLeaseCleanupIntervalMillis 5분

    임대가 완료되었는지 여부를 확인할 간격입니다.

    garbageLeaseCleanupIntervalMillis 30 분

    임대가 가비지 (즉, 데이터 스트림의 보존 기간을 지나 트리밍) 인지 여부를 확인하는 간격입니다.

  • 다음을 위한 최적화 포함KinesisShardSyncer하나의 샤드 레이어에 대해서만 임대를 생성할 수 있습니다.

Java 소비자 응용 프로그램에 대해 동일한 KCL 2.x를 사용하여 여러 데이터 스트림 처리

이 섹션에서는 두 개 이상의 데이터 스트림을 동시에 처리할 수 있는 KCL 소비자 애플리케이션을 생성할 수 있는 Java용 KCL 2.x의 다음 변경 사항에 대해 설명합니다.

중요

멀티스트림 처리는 Java용 KCL 2.x에서만 지원됩니다. Java용 KCL 2.3부터 시작하여 Java와 그 이후의

KCL 2.x를 구현할 수 있는 다른 언어에서는 멀티스트림 처리가 지원되지 않습니다.

멀티스트림 처리는 KCL 1.x 버전에서 지원되지 않습니다.

  • MultistreamTracker 인터페이스

    여러 스트림을 동시에 처리할 수 있는 소비자 애플리케이션을 빌드하려면 라는 새 인터페이스를 구현해야 합니다.MultistreamTracker. 이 인터페이스에는streamConfigListKCL 소비자 응용 프로그램에서 처리할 데이터 스트림 및 해당 구성 목록을 반환하는 메서드입니다. 처리 중인 데이터 스트림은 소비자 응용 프로그램 런타임 중에 변경할 수 있습니다.streamConfigList처리할 데이터 스트림의 변경 사항을 알아보기 위해 KCL에 의해 주기적으로 호출됩니다.

    streamConfigList메서드가 채워집니다.StreamConfig목록.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    참고:StreamIdentifierInitialPositionInStreamExtended필수 필드입니다.consumerArn는 선택 사항입니다. 입력해야 합니다.consumerArnKCL 2.x를 사용하여 향상된 팬아웃 소비자 애플리케이션을 구현하는 경우에만 사용할 수 있습니다.

    에 대한 자세한 내용StreamIdentifier참조할 항목https://github.com/awslabs/amazon-kinesis-client/블롭/0C5042다드프794FE988438436252A5A8페70B6b0b/amazon-kinesis-client/src/main/java/소프트웨어/아마존/키네시스/공통/StreamIdentifier.자바 #L29. 에 대한 멀티스트림 인스턴스를 생성할 수 있습니다.StreamIdentifier직렬화된 스트림 식별자에서 직렬화된 스트림 식별자는 다음 형식이어야 합니다.account-id:StreamName:streamCreationTimestamp.

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker또한 임대 테이블에서 이전 스트림의 임대를 삭제하는 전략도 포함되어 있습니다 (formerStreamsLeasesDeletionStrategy). 소비자 응용 프로그램 런타임 중에는 전략을 변경할 수 없습니다. 자세한 내용은 단원을 참조하십시오.https://github.com/awslabs/amazon-kinesis-client/블롭/0C5042다드프794FE988438436252A5A8페70B6b0b/amazon-kinesis-client/src/main/java/소프트웨어/아마존/키네시스/프로세서/FormerStreamsLeasesDeletionStrategy.java

  • ConfigsBuilder는 KCL 소비자 응용 프로그램을 빌드할 때 사용할 모든 KCL 2.x 구성 설정을 지정하는 데 사용할 수 있는 응용 프로그램 전체 클래스입니다.ConfigsBuilder이제 클래스에 대한 지원이 있습니다.MultistreamTracker인터페이스를 구현해야 합니다. 초기화할 수 있습니다.ConfigsBuilder다음 위치에서 레코드를 사용할 데이터 스트림의 이름을 사용합니다.

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    또는 초기화할 수 있습니다.ConfigsBuilder와MultiStreamTracker여러 스트림을 동시에 처리하는 KCL 소비자 애플리케이션을 구현하려는 경우

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • KCL 소비자 애플리케이션에 대해 구현된 멀티스트림 지원을 통해 애플리케이션의 리스 테이블의 각 행에는 이제 이 애플리케이션이 처리하는 다중 데이터 스트림의 샤드 ID와 스트림 이름이 포함됩니다.

  • KCL 소비자 애플리케이션에 대한 멀티스트림 지원이 구현되면 LeaseKey는 다음과 같은 구조를 사용합니다. account-id:StreamName:streamCreationTimestamp:ShardId. 예,111111111:multiStreamTest-1:12345:shardId-000000000336.

    중요

    기존 KCL 소비자 애플리케이션이 하나의 데이터 스트림만 처리하도록 구성된 경우 LeaseKey (리스 테이블의 해시 키) 는 샤드 ID입니다. 이 기존 KCL 소비자 응용 프로그램을 여러 데이터 스트림을 처리하도록 재구성하면 임대 테이블이 중단됩니다. 멀티스트림 지원의 경우 LeaseKey 구조는 다음과 같아야 하기 때문입니다.account-id:StreamName:StreamCreationTimestamp:ShardId.

Kinesis 클라이언트 라이브러리 사용AWSGlue 스키마 레지스트리

Kinesis 데이터 스트림을AWSGlue 스키마 레지스트리. 이AWSGlue Schema Registry를 사용하면 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증되는 동시에 스키마를 중앙에서 검색, 제어 및 발전시킬 수 있습니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. 이AWSGlue 스키마 레지스트리를 통해 개선할 수 있습니다.end-to-end스트리밍 애플리케이션 내의 데이터 품질 및 데이터 거버넌스. 자세한 내용은 단원을 참조하십시오.AWSGlue 스키마 레지스트리. 이 통합을 설정하는 방법 중 하나는 Java의 KCL을 사용하는 것입니다.

중요

현재 Kinesis Data StreamsAWSGlue 스키마 레지스트리 통합은 Java에서 구현된 KCL 2.3 소비자를 사용하는 Kinesis 데이터 스트림에 대해서만 지원됩니다. 다국어 지원은 제공되지 않습니다. KCL 1.0 소비자는 지원되지 않습니다. KCL 2.3 이전의 KCL 2.x 소비자는 지원되지 않습니다.

KCL을 사용하여 Kinesis Kinesis Data Streams 스트림과 스키마 레지스트리 통합을 설정하는 방법에 대한 자세한 내용은 의 “KPL/KCL 라이브러리를 사용하여 데이터와 상호 작용” 섹션을 참조하십시오.사용 사례: Amazon Kinesis Data Streams 통합AWSGlue 스키마 레지스트리.