Kinesis Client Library 사용 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Kinesis Client Library 사용

KDS 데이터 스트림의 데이터를 처리할 수 있는 사용자 지정 소비자 애플리케이션을 개발하는 방법 중 하나는 Kinesis Client Library(KCL)를 사용하는 것입니다.

참고

KCL 1.x와 KCL 2.x 모두 사용 시나리오에 따라 최신 KCL 1.x 버전 또는 KCL 2.x 버전으로 업그레이드하는 것이 좋습니다. KCL 1.x와 KCL 2.x 모두 최신 종속성 및 보안 패치, 버그 수정, 이전 버전과 호환되는 새 기능이 포함된 최신 릴리스로 정기적으로 업데이트됩니다. 자세한 내용은 https://github.com/awslabs/ amazon-kinesis-client /releases를 참조하십시오.

Kinesis Client Library란?

KCL을 사용하면 분산 컴퓨팅과 관련된 많은 복잡한 작업을 처리하여 Kinesis 데이터 스트림의 데이터를 사용하고 처리할 수 있습니다. 여기에는 여러 소비자 애플리케이션 인스턴스 간의 로드 밸런싱, 소비자 애플리케이션 인스턴스 장애 대응, 처리된 레코드 체크포인트 및 리샤딩 대응이 포함됩니다. 사용자가 사용자 지정 레코드 처리 로직을 작성하는 데 집중할 수 있도록 KCL이 이러한 하위 작업을 모두 처리합니다.

KCL은 AWS SDK에서 사용할 수 있는 Kinesis Data Streams API와 다릅니다. Kinesis Data Streams API는 스트림 생성, 리샤딩, 레코드 넣기 및 가져오기를 비롯한 Kinesis Data Streams의 여러 측면을 관리하는 데 도움이 됩니다. 특히 사용자가 소비자 애플리케이션의 사용자 지정 데이터 처리 로직에 집중할 수 있도록 KCL은 이러한 모든 하위 작업에 대한 추상화 계층을 제공합니다. Kinesis Data Streams API에 대한 자세한 내용은 Amazon Kinesis API 참조를 확인하세요.

중요

KCL은 Java 라이브러리입니다. Java 이외의 언어에 대한 지원은 라는 다국어 인터페이스를 사용하여 제공됩니다. MultiLangDaemon 이 데몬은 Java 기반이며, Java 이외의 KCL 언어를 사용하는 경우 배경에서 실행됩니다. 예를 들어, KCL for Python을 설치하고 소비자 응용 프로그램 전체를 Python으로 작성하는 경우 다음과 같은 이유로 시스템에 Java를 설치해야 합니다. MultiLangDaemon 또한 MultiLangDaemon 에는 사용 사례에 맞게 사용자 지정해야 할 몇 가지 기본 설정 (예: 연결 AWS 지역) 이 있습니다. MultiLangDaemon GitHubon에 대한 자세한 내용은 KCL MultiLangDaemon 프로젝트를 참조하십시오.

KCL은 레코드 처리 로직과 Kinesis Data Streams 간에 중간 역할을 합니다. KCL은 다음과 같은 작업을 수행합니다.

  • 데이터 스트림에 연결합니다

  • 데이터 스트림 내 샤드를 열거합니다.

  • 리스를 사용하여 워커와의 샤드 연결을 조정합니다.

  • 관리하는 모든 샤드의 레코드 프로세서를 인스턴스화합니다

  • 데이터 스트림에서 데이터 레코드를 가져옵니다.

  • 해당하는 레코드 프로세서로 레코드를 푸시합니다

  • 처리된 레코드에 대해 체크포인트를 수행합니다

  • 워커 인스턴스 수가 변경되거나 데이터 스트림이 리샤딩(샤드 분할 또는 병합)될 때 샤드-워커 연결(리스)의 균형을 유지합니다.

KCL 사용 가능 버전

현재 지원되는 다음 KCL 버전 중 하나를 사용하여 사용자 지정 소비자 애플리케이션을 구축할 수 있습니다.

KCL 1.x 또는 KCL 2.x를 사용하여 공유 처리량을 사용하는 소비자 애플리케이션을 구축할 수 있습니다. 자세한 설명은 KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발 섹션을 참조하세요.

전용 처리량을 사용하는 소비자 애플리케이션(향상된 팬아웃 소비자)을 구축하려면 KCL 2.x만 사용할 수 있습니다. 자세한 설명은 전용 처리량으로 사용자 지정 소비자 개발(향상된 팬아웃) 섹션을 참조하세요.

KCL 1.x와 KCL 2.x의 차이점에 대한 자세한 내용과 KCL 1.x에서 KCL 2.x로 마이그레이션하는 방법에 대한 지침은 KCL 1.x에서 KCL 2.x로 소비자 마이그레이션 섹션을 참조하세요.

KCL 개념

  • KCL 소비자 애플리케이션 - KCL을 사용하여 맞춤 구축되고 데이터 스트림에서 레코드를 읽고 처리하도록 설계된 애플리케이션입니다.

  • 소비자 애플리케이션 인스턴스 - KCL 소비자 애플리케이션은 일반적으로 장애를 조정하고 데이터 레코드 처리를 동적으로 로드 밸런싱하기 위해 하나 이상의 애플리케이션 인스턴스가 동시에 실행되는 분산형입니다.

  • 워커 - KCL 소비자 애플리케이션 인스턴스가 데이터 처리를 시작하는 데 사용하는 상위 수준 클래스입니다.

    중요

    KCL 소비자 애플리케이션 인스턴스마다 워커가 하나씩 있습니다.

    워커는 샤드 및 리스 정보 동기화, 샤드 할당 추적, 샤드의 데이터 처리 등 다양한 작업을 초기화하고 감독합니다. 워커는 이 KCL 소비자 애플리케이션이 처리할 데이터 레코드가 있는 데이터 스트림의 이름, 이 데이터 스트림에 액세스하는 데 필요한 AWS 보안 인증 등 소비자 애플리케이션에 대한 구성 정보를 KCL에 제공합니다. 또한 워커는 데이터 스트림에서 레코드 프로세서로 데이터 레코드를 전송하기 위해 특정 KCL 소비자 애플리케이션 인스턴스를 시작합니다.

    중요

    KCL 1.x에서는 이 클래스를 워커라고 합니다. 자세한 내용 (Java KCL 리포지토리) 은 https://github.com/awslabs/ /blob/v1.x/src/main/java/com/AmazonAWS/Services/Kinesis/ClientLibrary/lib/Worker.java를 참조하십시오. amazon-kinesis-client KCL 2.x에서는 이 클래스를 스케줄러라고 합니다. KCL 2.x에서 스케줄러의 용도는 KCL 1.x에서 워커의 용도와 동일합니다. KCL 2.x의 스케줄러 클래스에 대한 자세한 내용은 amazon-kinesis-clienthttps://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java 를 amazon-kinesis-client 참조하십시오.

  • 리스 - 워커와 샤드 간의 바인딩을 정의하는 데이터입니다. 분산된 KCL 소비자 애플리케이션은 리스를 사용하여 워커 플릿의 데이터 레코드 처리를 분할합니다. 언제든지 각 데이터 레코드 샤드는 leaseKey 변수로 식별되는 리스를 통해 특정 워커에게 바인딩됩니다.

    기본적으로 작업자는 하나 이상의 임대 계약 (작업자 변수 값에 따름) 을 동시에 보유할 수 있습니다. maxLeasesFor

    중요

    모든 워커는 데이터 스트림에서 사용 가능한 모든 샤드에 대해 사용 가능한 모든 리스를 보유하고자 경쟁합니다. 하지만 한 번에 하나의 워커만 각 리스를 성공적으로 보유할 수 있습니다.

    예를 들어, 소비자 애플리케이션 인스턴스 A에 워커 A가 있고 이 인스턴스가 4개의 샤드로 구성된 데이터 스트림을 처리하는 경우 워커 A는 샤드 1, 2, 3, 4에 대한 리스를 동시에 보유할 수 있습니다. 그러나 A와 B라는 2개의 소비자 애플리케이션 인스턴스에 각각 워커 A와 워커 B가 있고 이들 인스턴스가 4개의 샤드로 구성된 데이터 스트림을 처리하는 경우 워커 A와 워커 B는 샤드 1에 대한 리스를 동시에 보유할 수 없습니다. 한 워커가 특정 샤드의 데이터 레코드 처리를 중지할 준비가 되거나 실패할 때까지 이 샤드에 대한 리스를 보유합니다. 한 워커가 리스 보유를 중지하면 다른 워커가 리스를 인수하여 보유합니다.

    자세한 내용 (Java KCL 리포지토리) 은 https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/SRC/main/java/com/Amazonaws/Services/Kinesis/Leases/impl/Lease.KCL 1.x의 경우 https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java 를 참조하십시오. amazon-kinesis-client amazon-kinesis-client

  • 리스 테이블 - KCL 소비자 애플리케이션의 워커가 리스하고 처리하고 있는 KDS 데이터 스트림의 샤드를 추적하는 데 사용되는 고유한 Amazon DynamoDB 테이블입니다. KCL 소비자 애플리케이션이 실행되는 동안 리스 테이블은 데이터 스트림의 최신 샤드 정보와 동기화된 상태를 유지해야 합니다(워커 내 및 모든 워커 간). 자세한 설명은 리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적 섹션을 참조하세요.

  • 레코드 프로세서 - KCL 소비자 애플리케이션이 데이터 스트림에서 가져온 데이터를 처리하는 방법을 정의하는 로직입니다. 런타임 시 KCL 소비자 애플리케이션 인스턴스는 워커를 인스턴스화하고 이 워커는 리스를 보유한 모든 샤드에 대해 하나의 레코드 프로세서를 인스턴스화합니다.

리스 테이블을 사용하여 KCL 소비자 애플리케이션에서 처리한 샤드 추적

리스 테이블이란?

각 Amazon Kinesis Data Streams 애플리케이션에 대해 KCL은 Amazon DynamoDB 테이블에 저장되는 고유한 리스 테이블을 사용하여 KCL 소비자 애플리케이션의 워커가 리스하고 처리하고 있는 KDS 데이터 스트림의 샤드를 추적합니다.

중요

KCL은 소비자 애플리케이션의 이름을 사용하여 이 소비자 애플리케이션이 사용하는 리스 테이블의 이름을 생성하므로 각 소비자 애플리케이션 이름은 고유해야 합니다.

소비자 애플리케이션이 실행되는 동안 Amazon DynamoDB 콘솔을 사용하여 리스 테이블을 볼 수 있습니다.

애플리케이션이 시작될 때 KCL 소비자 애플리케이션에 대한 리스 테이블이 없는 경우 워커 중 하나가 이 애플리케이션에 대한 리스 테이블을 생성합니다.

중요

Kinesis Data Streams 자체와 관련된 비용 외에도 DynamoDB 테이블 관련 비용이 계정에 청구됩니다.

리스 테이블의 각 행은 소비자 애플리케이션의 워커가 처리 중인 샤드를 나타냅니다. KCL 소비자 애플리케이션이 하나의 데이터 스트림만 처리하는 경우 리스 테이블의 해시 키인 leaseKey가 샤드 ID입니다. 동일한 'KCL 2.x for Java' 소비자 애플리케이션으로 여러 데이터 스트림 처리를 수행하는 경우 leaseKey의 구조는 account-id:StreamName:streamCreationTimestamp:ShardId와 같습니다. 예를 들어 111111111:multiStreamTest-1:12345:shardId-000000000336입니다.

각 행에는 샤드 ID 외에도 다음 데이터가 포함됩니다.

  • checkpoint: 샤드의 가장 최근 체크포인트 시퀀스 번호입니다. 이 값은 데이터 스트림의 모든 샤드에서 고유합니다.

  • checkpointSubSequence번호: Kinesis 프로듀서 라이브러리의 집계 기능을 사용할 때 이는 Kinesis 레코드 내의 개별 사용자 레코드를 추적하는 체크포인트의 확장입니다.

  • leaseCounter: 다른 작업자가 리스를 받았다는 것을 작업자가 감지할 수 있도록 리스 버전 관리에 사용됩니다.

  • leaseKey: 임대의 고유 식별자입니다. 각 리스는 데이터 스트림의 샤드에 특정적이며 한 번에 한 워커가 리스를 보유합니다.

  • leaseOwner: 이 임대를 보유하는 작업자입니다.

  • ownerSwitchesSince체크포인트: 마지막으로 체크포인트를 작성한 이후 이번 임대로 인해 근로자가 몇 번이나 변경되었습니까?

  • parentShardId: 하위 샤드에서 처리가 시작되기 전에 상위 샤드가 완전히 처리되었는지 확인하는 데 사용됩니다. 그러면 스트림에 입력된 순서대로 레코드가 처리됩니다.

  • hashrange: PeriodicShardSyncManager에서 주기적 동기화를 실행하여 리스 테이블에서 누락된 샤드를 찾고 필요한 경우 리스를 생성하는 데 사용됩니다.

    참고

    이 데이터는 KCL 1.14 및 KCL 2.3부터 시작하는 모든 샤드의 리스 테이블에 있습니다. PeriodicShardSyncManager 및 리스와 샤드 간의 주기적 동기화에 대한 자세한 내용은 리스 테이블이 KDS 데이터 스트림의 샤드와 동기화되는 방법 섹션을 참조하세요.

  • childshards: LeaseCleanupManager에서 하위 샤드의 처리 상태를 검토하고 리스 테이블에서 상위 샤드를 삭제할 수 있는지 여부를 결정하는 데 사용됩니다.

    참고

    이 데이터는 KCL 1.14 및 KCL 2.3부터 시작하는 모든 샤드의 리스 테이블에 있습니다.

  • shardID: 샤드의 ID입니다.

    참고

    이 데이터는 사용자가 동일한 'KCL 2.x for Java' 소비자 애플리케이션으로 여러 데이터 스트림 처리를 수행하는 경우에만 리스 테이블에 있습니다. 이는 KCL 2.3 for Java 이상부터 시작하여 KCL 2.x for Java에서만 지원됩니다.

  • stream name account-id:StreamName:streamCreationTimestamp 형식의 데이터 스트림 식별자입니다.

    참고

    이 데이터는 사용자가 동일한 'KCL 2.x for Java' 소비자 애플리케이션으로 여러 데이터 스트림 처리를 수행하는 경우에만 리스 테이블에 있습니다. 이는 KCL 2.3 for Java 이상부터 시작하여 KCL 2.x for Java에서만 지원됩니다.

처리량

Amazon Kinesis Data Streams 애플리케이션이 프로비저닝된 처리량을 예외를 수신하는 경우 DynamoDB 테이블의 프로비저닝된 처리량을 늘려야 합니다. KCL은 프로비저닝된 처리량이 초당 읽기 10개, 초당 쓰기 10개인 테이블을 생성하지만 애플리케이션에 충분하지 않을 수도 있습니다. 예를 들어, Amazon Kinesis Data Streams 애플리케이션이 체크포인트를 자주 수행하거나 여러 샤드로 구성된 스트림에서 작동하는 경우 처리량이 더 필요할 수 있습니다.

DynamoDB의 프로비저닝된 처리량에 대한 자세한 내용은 Amazon DynamoDB 개발자 안내서의 읽기/쓰기 용량 모드DynamoDB의 테이블 및 데이터 작업을 참조하세요.

리스 테이블이 KDS 데이터 스트림의 샤드와 동기화되는 방법

KCL 소비자 애플리케이션의 워커는 리스를 사용하여 지정된 데이터 스트림의 샤드를 처리합니다. 특정 시간에 어떤 워커가 어떤 샤드를 리스하는지에 대한 정보는 리스 테이블에 저장됩니다. KCL 소비자 애플리케이션이 실행되는 동안 리스 테이블은 데이터 스트림의 최신 샤드 정보와 동기화된 상태를 유지해야 합니다. KCL은 소비자 애플리케이션 부트스트래핑(소비자 애플리케이션 초기화 또는 재시작 시) 동안 그리고 처리 중인 샤드가 종료(리샤딩)에 도달할 때마다 Kinesis Data Streams 서비스에서 획득한 샤드 정보와 리스 테이블을 동기화합니다. 즉, 워커 또는 KCL 소비자 애플리케이션은 초기 소비자 애플리케이션 부트스트랩 동안 그리고 소비자 애플리케이션에서 데이터 스트림 리샤딩 이벤트가 발생할 때마다 처리 중인 데이터 스트림과 동기화됩니다.

KCL 1.0~1.13 및 KCL 2.0~2.2에서의 동기화

KCL 1.0~1.13 및 KCL 2.0~2.2에서 KCL은 소비자 애플리케이션의 부트스트래핑 및 각 데이터 스트림 리샤딩 이벤트 중에 ListShards 또는 DescribeStream 검색 API를 간접적으로 호출하여 Kinesis Data Streams 서비스에서 획득한 샤드 정보와 리스 테이블을 동기화합니다. 위에 나열된 모든 KCL 버전에서 KCL 소비자 애플리케이션의 각 워커는 소비자 애플리케이션의 부트스트래핑 중 및 각 스트림 리샤드 이벤트에서 다음 단계를 완료하여 리스/샤드 동기화 프로세스를 수행합니다.

  • 처리 중인 데이터 스트림에 대한 모든 샤드를 가져옵니다.

  • 리스 테이블에서 모든 샤드 리스를 가져옵니다.

  • 리스 테이블에 리스가 없는 각 열린 샤드를 필터링합니다.

  • 발견된 모든 열린 샤드와 열린 상위 항목이 없는 각 열린 샤드를 반복합니다.

    • 상위 항목 경로를 통해 계층 트리를 탐색하여 샤드가 하위 항목인지 확인합니다. 상위 샤드가 처리 중이거나(상위 샤드에 대한 리스 항목이 리스 테이블에 있음) 상위 샤드를 처리해야 하는 경우(예: 초기 위치가 TRIM_HORIZON 또는 AT_TIMESTAMP인 경우) 샤드는 하위 항목으로 간주됩니다.

    • 컨텍스트의 열린 샤드가 하위 항목인 경우 KCL은 초기 위치를 기준으로 샤드 체크포인트를 수행하고 필요한 경우 상위 항목에 대한 리스를 생성합니다.

KCL 2.x에서의 동기화(KCL 2.3 이상부터)

지원되는 최신 버전의 KCL 2.x(KCL 2.3) 이상부터 라이브러리는 이제 다음과 같은 동기화 프로세스 변경 사항을 지원합니다. 이러한 리스/샤드 동기화 변경은 KCL 소비자 애플리케이션에서 Kinesis Data Streams 서비스에 대한 API 호출 수를 크게 줄이고 KCL 소비자 애플리케이션의 리스 관리를 최적화합니다.

  • 애플리케이션 부트스트래핑 중 리스 테이블이 비어 있으면 KCL은 ListShard API의 필터링 옵션(ShardFilter 선택적 요청 파라미터)을 활용하여 ShardFilter 파라미터로 지정된 시간에 열려 있는 샤드의 스냅샷에 대해서만 리스를 검색하고 생성합니다. ShardFilter 파라미터를 사용하면 ListShards API의 응답을 필터링할 수 있습니다. ShardFilter 파라미터의 유일한 필수 속성은 Type입니다. KCL은 Type 필터 속성과 다음과 같은 유효한 값을 사용하여 새 리스가 필요할 수 있는 열린 샤드의 스냅샷을 식별하고 반환합니다.

    • AT_TRIM_HORIZON - TRIM_HORIZON에서 열려 있던 모든 샤드가 응답에 포함됩니다.

    • AT_LATEST - 데이터 스트림의 현재 열려 있는 샤드만 응답에 포함됩니다.

    • AT_TIMESTAMP - 시작 타임스탬프가 지정된 타임스탬프보다 작거나 같고 종료 타임스탬프가 지정된 타임스탬프보다 크거나 같거나 여전히 열려 있는 모든 샤드가 응답에 포함됩니다.

    ShardFilterRetrievalConfig#initialPositionInStreamExtended에 지정된 샤드의 스냅샷에 대한 리스를 초기화하기 위해 빈 리스 테이블에 대한 리스를 생성할 때 사용됩니다.

    ShardFilter에 대한 자세한 정보는을 잠조하세요https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • 모든 워커가 리스/하드 동기화를 수행하여 데이터 스트림의 최신 샤드로 리스 테이블을 최신 상태로 유지하는 대신 선출된 단일 워커 리더가 리스/샤드 동기화를 수행합니다.

  • KCL 2.3은 GetRecordsSubscribeToShard API의 ChildShards 반환 파라미터를 사용하여 닫힌 샤드에 대해 SHARD_END에서 발생하는 리스/샤드 동기화를 수행하므로 KCL 워커는 처리가 완료된 샤드의 하위 샤드에 대해서만 리스를 생성할 수 있습니다. 공유 처리량 소비자 애플리케이션의 경우 리스/하드 동기화의 이 최적화는 GetRecords API의 ChildShards 파라미터를 사용합니다. 전용 처리량(향상된 팬아웃) 소비자 애플리케이션의 경우 리스/하드 동기화의 이 최적화는 SubscribeToShard API의 ChildShards 파라미터를 사용합니다. 자세한 내용은, 및 을 참조하십시오 GetRecords. SubscribeToShardsChildShard

  • 위의 변경으로 인해 KCL의 동작은 모든 기존 샤드에 대해 학습하는 모든 워커 모델에서 각 워커가 소유한 샤드의 하위 샤드에 대해서만 학습하는 워커 모델로 바뀌고 있습니다. 따라서 소비자 애플리케이션 부트스트래핑 및 리샤드 이벤트 중 발생하는 동기화 외에도 KCL은 이제 리스 테이블의 잠재적 구멍을 식별하기 위해 즉, 모든 새로운 샤드에 대해 알아보기 위해 추가로 주기적 샤드/리스 스캔도 수행하여 데이터 스트림의 전체 해시 범위가 처리되고 있는지 확인하고 필요한 경우 리스를 생성합니다. PeriodicShardSyncManager는 주기적 리스/샤드 스캔 실행을 담당하는 구성 요소입니다.

    KCL 2.3에 대한 PeriodicShardSyncManager 자세한 내용은 https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/leases/.java #L201 amazon-kinesis-client -L213을 참조하십시오. LeaseManagementConfig

    KCL 2.3에서는 새로운 구성 옵션을 사용하여 LeaseManagementConfig에서 PeriodicShardSyncManager를 구성할 수 있습니다.

    명칭 기본값 설명
    leasesRecoveryAuditorExecutionFrequencyMillis

    12만(2분)

    리스 테이블에서 부분 리스를 검색하는 감사자 작업의 빈도(밀리초)입니다. 감사자가 스트림의 리스에서 구멍을 발견하면 leasesRecoveryAuditorInconsistencyConfidenceThreshold를 기반으로 샤드 동기화를 트리거합니다.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    리스 테이블의 데이터 스트림에 대한 리스가 불일치하는지 확인하기 위한 정기 감사 작업의 신뢰도 임곗값입니다. 감사자가 데이터 스트림에 대해 동일한 불일치 세트를 여러 번 연속해서 발견하면 샤드 동기화가 트리거됩니다.

    또한 이제 의 상태를 모니터링하기 위한 CloudWatch 새 메트릭이 생성되었습니다. PeriodicShardSyncManager 자세한 설명은 PeriodicShardSyncManager 섹션을 참조하세요.

  • 하나의 샤드 계층에 대해서만 리스를 생성하도록 HierarchicalShardSyncer에 대한 최적화를 포함합니다.

KCL 1.x에서의 동기화(KCL 1.14 이상부터)

지원되는 최신 버전의 KCL 1.x(KCL 1.14) 이상부터 라이브러리는 이제 다음과 같은 동기화 프로세스 변경 사항을 지원합니다. 이러한 리스/샤드 동기화 변경은 KCL 소비자 애플리케이션에서 Kinesis Data Streams 서비스에 대한 API 호출 수를 크게 줄이고 KCL 소비자 애플리케이션의 리스 관리를 최적화합니다.

  • 애플리케이션 부트스트래핑 중 리스 테이블이 비어 있으면 KCL은 ListShard API의 필터링 옵션(ShardFilter 선택적 요청 파라미터)을 활용하여 ShardFilter 파라미터로 지정된 시간에 열려 있는 샤드의 스냅샷에 대해서만 리스를 검색하고 생성합니다. ShardFilter 파라미터를 사용하면 ListShards API의 응답을 필터링할 수 있습니다. ShardFilter 파라미터의 유일한 필수 속성은 Type입니다. KCL은 Type 필터 속성과 다음과 같은 유효한 값을 사용하여 새 리스가 필요할 수 있는 열린 샤드의 스냅샷을 식별하고 반환합니다.

    • AT_TRIM_HORIZON - TRIM_HORIZON에서 열려 있던 모든 샤드가 응답에 포함됩니다.

    • AT_LATEST - 데이터 스트림의 현재 열려 있는 샤드만 응답에 포함됩니다.

    • AT_TIMESTAMP - 시작 타임스탬프가 지정된 타임스탬프보다 작거나 같고 종료 타임스탬프가 지정된 타임스탬프보다 크거나 같거나 여전히 열려 있는 모든 샤드가 응답에 포함됩니다.

    ShardFilterKinesisClientLibConfiguration#initialPositionInStreamExtended에 지정된 샤드의 스냅샷에 대한 리스를 초기화하기 위해 빈 리스 테이블에 대한 리스를 생성할 때 사용됩니다.

    ShardFilter에 대한 자세한 정보는을 잠조하세요https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • 모든 워커가 리스/하드 동기화를 수행하여 데이터 스트림의 최신 샤드로 리스 테이블을 최신 상태로 유지하는 대신 선출된 단일 워커 리더가 리스/샤드 동기화를 수행합니다.

  • KCL 1.14는 GetRecordsSubscribeToShard API의 ChildShards 반환 파라미터를 사용하여 닫힌 샤드에 대해 SHARD_END에서 발생하는 리스/샤드 동기화를 수행하므로 KCL 워커는 처리가 완료된 샤드의 하위 샤드에 대해서만 리스를 생성할 수 있습니다. 자세한 내용은 GetRecordsChildShard 섹션을 참조하세요.

  • 위의 변경으로 인해 KCL의 동작은 모든 기존 샤드에 대해 학습하는 모든 워커 모델에서 각 워커가 소유한 샤드의 하위 샤드에 대해서만 학습하는 워커 모델로 바뀌고 있습니다. 따라서 소비자 애플리케이션 부트스트래핑 및 리샤드 이벤트 중 발생하는 동기화 외에도 KCL은 이제 리스 테이블의 잠재적 구멍을 식별하기 위해 즉, 모든 새로운 샤드에 대해 알아보기 위해 추가로 주기적 샤드/리스 스캔도 수행하여 데이터 스트림의 전체 해시 범위가 처리되고 있는지 확인하고 필요한 경우 리스를 생성합니다. PeriodicShardSyncManager는 주기적 리스/샤드 스캔 실행을 담당하는 구성 요소입니다.

    KinesisClientLibConfiguration#shardSyncStrategyTypeShardSyncStrategyType.SHARD_END로 설정된 경우 PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold는 샤드 동기화 시행을 위한 리스 테이블에 구멍이 포함된 연속 스캔 수에 대한 임곗값을 결정하는 데 사용됩니다. KinesisClientLibConfiguration#shardSyncStrategyTypeShardSyncStrategyType.PERIODIC으로 설정된 경우 leasesRecoveryAuditorInconsistencyConfidenceThreshold은 무시됩니다.

    KCL 1.14에 대한 자세한 내용은 https://github.com/awslabs/ PeriodicShardSyncManager amazon-kinesis-client KinesisClientLibConfiguration /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ .java #L987 -L999를 참조하십시오.

    KCL 1.14에서는 새로운 구성 옵션을 사용하여 LeaseManagementConfig에서 PeriodicShardSyncManager를 구성할 수 있습니다.

    명칭 기본값 설명
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    리스 테이블의 데이터 스트림에 대한 리스가 불일치하는지 확인하기 위한 정기 감사 작업의 신뢰도 임곗값입니다. 감사자가 데이터 스트림에 대해 동일한 불일치 세트를 여러 번 연속해서 발견하면 샤드 동기화가 트리거됩니다.

    또한 이제 의 상태를 모니터링하기 위한 CloudWatch 새로운 지표가 생성되었습니다. PeriodicShardSyncManager 자세한 설명은 PeriodicShardSyncManager 섹션을 참조하세요.

  • KCL 1.14는 이제 지연된 리스 정리도 지원합니다. 샤드가 데이터 스트림의 보존 기간을 지나 만료되었거나 리샤딩 작업의 결과로 닫힌 경우 SHARD_END에 도달하면 LeaseCleanupManager가 리스를 비동기식으로 삭제합니다.

    새로운 구성 옵션을 사용하여 LeaseCleanupManager를 구성할 수 있습니다.

    명칭 기본값 설명
    leaseCleanupInterval밀리스

    1분

    리스 정리 스레드를 실행하는 간격입니다.

    completedLeaseCleanupIntervalMillis 5분

    리스가 완료되었는지 여부를 확인하는 간격입니다.

    garbageLeaseCleanupIntervalMillis 30 분

    리스가 가비지(즉, 데이터 스트림의 보존 기간을 지나 트리밍됨)인지 여부를 확인하는 간격입니다.

  • 하나의 샤드 계층에 대해서만 리스를 생성하도록 KinesisShardSyncer에 대한 최적화를 포함합니다.

동일한 'KCL 2.x for Java' 소비자 애플리케이션으로 여러 데이터 스트림 처리

이 섹션에서는 두 개 이상의 데이터 스트림을 동시에 처리할 수 있는 KCL 소비자 응용 프로그램을 만들 수 있도록 하는 Java용 KCL 2.x의 다음 변경 사항에 대해 설명합니다.

중요

멀티스트림 처리는 KCL 2.3 for Java 이상부터 시작하여 KCL 2.x for Java에서만 지원됩니다.

KCL 2.x를 구현할 수 있는 다른 언어에 대해 멀티스트림 처리가 지원되지 않습니다.

KCL 1.x의 모든 버전에서 멀티스트림 처리가 지원되지 않습니다.

  • MultistreamTracker 인터페이스

    여러 스트림을 동시에 처리할 수 있는 소비자 애플리케이션을 구축하려면 라는 새 인터페이스를 구현해야 합니다 MultistreamTracker. 이 인터페이스에는 KCL 소비자 애플리케이션에서 처리할 데이터 스트림 및 해당 구성 목록을 반환하는 streamConfigList 메서드가 포함되어 있습니다. 처리 중인 데이터 스트림은 소비자 애플리케이션 런타임 중에 변경될 수 있습니다. streamConfigList는 처리할 데이터 스트림의 변경 사항을 알아보기 위해 KCL에서 주기적으로 직접적으로 호출됩니다.

    streamConfigList메서드가 StreamConfig목록을 채웁니다.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    StreamIdentifierInitialPositionInStreamExtended는 필수 필드이고 consumerArn은 선택 사항입니다. KCL 2.x를 사용하여 향상된 팬아웃 소비자 애플리케이션을 구현하는 경우에만 consumerArn을 제공해야 합니다.

    에 대한 자세한 내용은 https://github.com/awslabs/ StreamIdentifier amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /src/main/java/software/amazon/kinesis/common/ .java #L29 를 amazon-kinesis-client 참조하십시오. StreamIdentifier 직렬화된 스트림 식별자에서 StreamIdentifier에 대한 멀티스트림 인스턴스를 생성할 수 있습니다. 직렬화된 스트림 식별자는 account-id:StreamName:streamCreationTimestamp 형식이어야 합니다.

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker에는 리스 테이블(formerStreamsLeasesDeletionStrategy)에서 오래된 스트림의 리스를 삭제하기 위한 전략도 포함되어 있습니다. 소비자 애플리케이션 런타임 중에는 전략을 변경할 수 없다는 점에 유의하세요. 자세한 내용은 https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ amazon-kinesis-client /src/main/java/software/amazon/kinesis/processor/.java를 참조하십시오. FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderKCL 소비자 애플리케이션을 구축할 때 사용할 모든 KCL 2.x 구성 설정을 지정하는 데 사용할 수 있는 애플리케이션 전반의 클래스입니다. ConfigsBuilder클래스는 이제 인터페이스를 지원합니다. MultistreamTracker ConfigsBuilder둘 중 하나를 데이터 스트림의 이름으로 초기화하여 다음 데이터 스트림의 레코드를 소비할 수 있습니다.

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    또는 여러 스트림을 동시에 처리하는 KCL 소비자 애플리케이션을 구현하려는 MultiStreamTracker 경우 를 사용하여 초기화할 ConfigsBuilder 수 있습니다.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • KCL 소비자 애플리케이션에 대해 멀티스트림 지원이 구현됨에 따라 이제 애플리케이션 리스 테이블의 각 행에는 이 애플리케이션이 처리하는 여러 데이터 스트림의 샤드 ID와 스트림 이름이 포함됩니다.

  • KCL 소비자 애플리케이션에 대한 멀티스트림 지원이 구현되면 leaseKey는 account-id:StreamName:streamCreationTimestamp:ShardId 구조를 취합니다. 예를 들어 111111111:multiStreamTest-1:12345:shardId-000000000336입니다.

    중요

    기존 KCL 소비자 애플리케이션이 하나의 데이터 스트림만 처리하도록 구성된 경우 leaseKey(리스 테이블의 해시 키)는 샤드 ID입니다. 여러 데이터 스트림을 처리하도록 이 기존 KCL 소비자 애플리케이션을 재구성하면 리스 테이블이 손상됩니다. 멀티스트림 지원을 사용할 경우 leaseKey 구조는 account-id:StreamName:StreamCreationTimestamp:ShardId와 같아야 하기 때문입니다.

AWS Glue 스키마 레지스트리와 함께 Kinesis 클라이언트 라이브러리 사용

Kinesis 데이터 스트림을 AWS Glue 스키마 레지스트리와 통합할 수 있습니다. AWS Glue 스키마 레지스트리를 사용하면 스키마를 중앙에서 검색, 제어 및 발전시키는 동시에 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증되도록 할 수 있습니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. AWSGlue Schema Registry를 사용하면 스트리밍 애플리케이션 내에서 end-to-end 데이터 품질 및 데이터 거버넌스를 개선할 수 있습니다. 자세한 내용은 AWS Glue Schema Registry를 참조하세요. 이 통합을 설정하는 방법 중 하나는 Java에서 KCL을 사용하는 것입니다.

중요

현재 Kinesis Data Streams와 AWS Glue 스키마 레지스트리 통합은 Java로 구현된 KCL 2.3 소비자를 사용하는 Kinesis 데이터 스트림에 대해서만 지원됩니다. 다국어 지원은 제공되지 않습니다. KCL 1.0 소비자는 지원되지 않습니다. KCL 2.3 이전의 KCL 2.x 소비자는 지원되지 않습니다.

KCL을 사용하여 Kinesis Data Streams와 스키마 레지스트리의 통합을 설정하는 방법에 대한 자세한 지침은 사용 사례: AWS Glue Schema Registry와 Amazon Kinesis Data Streams 통합의 'KPL/KCL 라이브러리를 사용하여 데이터와 상호 작용'을 참조하세요.