AWS SDK for Java를 사용하여 공유 처리량으로 사용자 지정 소비자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS SDK for Java를 사용하여 공유 처리량으로 사용자 지정 소비자 개발

공유된 사용자 지정 Kinesis Data Streams 소비자를 개발하는 방법 중 하나는 Amazon Kinesis Data Streams API를 사용하는 것입니다. 이 섹션에서는 Kinesis Data Streams API를 사용하는 방법을 설명합니다.AWSSDK for Java. 이 섹션의 Java 샘플 코드는 기본 KDS API 작업을 수행하는 방법을 설명하며, 작업 유형에 따라 논리적으로 나뉘어집니다.

이러한 예제는 프로덕션에 사용할 수 있는 코드를 제공하지 않습니다. 이 예제는 가능한 모든 예외를 확인하지 않거나 가능한 모든 보안 및 성능 고려 사항을 감안하지 않습니다.

다른 프로그래밍 언어를 사용하여 Kinesis Data Streams API를 호출할 수 있습니다. 사용 가능한 모든 항목에 대한 자세한 내용AWSSDK, 참조Amazon Web Services Services로 개발 시작.

중요

Kinesis Client Library (KCL) 를 사용하여 사용자 지정 Kinesis Data Streams 소비자를 공유하는 방법을 권장합니다. KCL은 분산 컴퓨팅과 관련된 많은 복잡한 작업을 처리하여 Kinesis 데이터 스트림에서 데이터를 소비하고 처리할 수 있도록 지원합니다. 자세한 내용은 단원을 참조하십시오.KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발.

스트림에서 데이터 가져오기

Kinesis Data Streams API에는getShardIteratorgetRecords데이터 스트림에서 레코드를 검색하기 위해 호출할 수 있는 메서드입니다. 이 풀 모델에서는 코드가 데이터 스트림의 샤드에서 직접 데이터 레코드를 가져옵니다.

중요

KCL에서 제공하는 레코드 프로세서 지원을 사용하여 데이터 스트림에서 레코드를 검색하는 것이 좋습니다. 이 푸시 모델에서는 데이터를 처리하는 코드를 구현합니다. KCL은 데이터 스트림에서 데이터 레코드를 검색하여 애플리케이션 코드로 전달합니다. 또한 KCL은 장애 조치, 복구 및 로드 밸런싱 기능도 제공합니다. 자세한 내용은 단원을 참조하십시오.KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발.

그렇지만 경우에 따라 Kinesis Data Streams API를 사용할 수 있습니다. 예를 들어, 데이터 스트림을 모니터링하거나 디버깅하기 위한 사용자 지정 도구를 구현할 수 있습니다.

중요

Kinesis Data Streams는 데이터 스트림의 데이터 레코드 보존 기간에 대한 변경을 지원합니다. 자세한 정보는 데이터 보존 기간 변경을 참조하십시오.

샤드 반복자 사용

샤드 수를 기준으로 스트림에서 레코드를 검색합니다. 각 샤드와 샤드에서 검색한 레코드의 각 배치에 대한 샤드 반복자를 가져와야 합니다. 레코드를 검색할 샤드를 지정하기 위해 getRecordsRequest 객체에 샤드 반복자가 사용됩니다. 샤드 반복자와 연결된 유형은 샤드에서 레코드를 검색할 지점을 결정합니다(자세한 내용은 이 단원의 뒷부분 참조). 샤드 반복자로 작업하기 전에 DescribeStreamAPI - 사용 중단됨에서 설명하는 대로 샤드를 검색해야 합니다.

getShardIterator 메서드를 사용하여 초기 샤드 반복자를 가져옵니다. getNextShardIterator 메서드에서 반환된 getRecordsResult 객체의 getRecords 메서드를 사용하여 추가 레코드 배치의 샤드 반복자를 가져옵니다. 샤드 반복자는 5분간 유효합니다. 유효한 시간 동안 샤드 반복자를 사용하면 새로운 샤드 반복자를 얻을 수 있습니다. 각 샤드 반복자는 사용 후에도 5분간 유효합니다.

초기 샤드 반복자를 가져오려면 GetShardIteratorRequest를 인스턴스화하고 getShardIterator 메서드에 전달합니다. 요청을 구성하려면 스트림과 샤드 ID를 지정하십시오. 에서 스트림을 가져오는 방법에 대한 자세한 내용AWS계정, 참조스트림 나열. 스트림에서 샤드를 가져오는 방법에 대한 자세한 내용은 DescribeStreamAPI - 사용 중단됨을 참조하십시오.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

이 샘플 코드는 초기 샤드 반복자를 가져올 때 반복자 유형으로 TRIM_HORIZON을 지정합니다. 이 반복자 유형은 가장 최근에 추가한 레코드 (라고도 함) 가 아니라 샤드에 추가된 첫 번째 레코드부터 반환해야 함을 의미합니다.. 다음은 가능한 반복자 유형입니다.

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

자세한 내용은 단원을 참조하십시오.ShardIterator유형.

일부 반복자 유형에는 유형 외에 시퀀스 번호도 지정해야 합니다. 예를 들면 다음과 같습니다.

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

getRecords를 사용하여 레코드를 가져온 후에 레코드의 getSequenceNumber 메서드를 호출하여 레코드의 시퀀스 번호를 가져올 수 있습니다.

record.getSequenceNumber()

또한 데이터 스트림에 레코드를 추가하는 코드는 getSequenceNumber 결과에 putRecord를 호출하여 추가된 레코드의 시퀀스 번호를 가져올 수 있습니다.

lastSequenceNumber = putRecordResult.getSequenceNumber();

시퀀스 번호를 사용하여 레코드 순서가 확실하게 증가하도록 보장할 수 있습니다. 자세한 내용은 PutRecord예의 코드 예제를 참조하십시오.

사용GetRecords

샤드 반복자를 가져온 후 GetRecordsRequest 객체를 인스턴스화하십시오. setShardIterator 메서드를 사용하여 요청에 반복자를 지정하십시오.

선택적으로 setLimit 메서드를 사용하여 검색할 레코드 수를 설정할 수 있습니다. getRecords에 의해 반환된 레코드 수는 항상 이 제한보다 적거나 같습니다. 이 제한을 지정하지 않으면getRecords검색된 레코드 10MB를 반환합니다. 아래 샘플 코드에서는 제한을 레코드 25개로 설정합니다.

반환된 레코드가 없으면 샤드 반복자가 참조하는 시퀀스 번호로 이 샤드에서 현재 사용할 수 있는 데이터 레코드가 없음을 의미합니다. 이 상황에서, 애플리케이션은 스트림의 데이터 원본에 적절한 시간 동안(적어도 1초) 대기해야 합니다. 그런 다음 이전의 getRecords 호출에서 반환된 샤드 반복자를 사용하여 샤드에서 데이터를 다시 가져오십시오. 레코드가 스트림에 추가되는 시간부터 getRecords에서 사용할 수 있는 시간까지 약 3초의 지연 시간이 발생합니다.

getRecordsRequestgetRecords 메서드에 전달하고 반환된 값을 getRecordsResult 객체로 캡처하십시오. 데이터 레코드를 가져오려면 getRecords 객체에서 getRecordsResult 메서드를 호출하십시오.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

getRecords에 대한 또 다른 호출을 준비하려면 getRecordsResult에서 다음 샤드 반복자를 가져오십시오.

shardIterator = getRecordsResult.getNextShardIterator();

최상의 결과를 얻으려면 getRecords 빈도 제한을 초과하지 않도록 getRecords 호출 사이에 적어도 1초(1,000밀리초)의 대기 시간을 유지하십시오.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

일반적으로 테스트 시나리오에서 단일 레코드를 검색하는 경우에도 루프에서 getRecords를 호출해야 합니다. getRecords를 한 번 호출하면 샤드가 이후의 시퀀스 번호에 더 많은 레코드를 포함하더라도 빈 레코드 목록이 반환될 수 있습니다. 이 경우 빈 레코드 목록과 함께 반환된 NextShardIterator가 샤드의 이후 시퀀스 번호를 참조하며 연속적으로 getRecords 호출하면 최종적으로 레코드가 반환됩니다. 다음 샘플은 루프의 사용을 보여줍니다.

예제: getRecords

다음 코드 예제는 루프에서 이루어지는 호출을 포함하여 이 단원에 나온 getRecords 팁을 반영합니다.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Kinesis Client Library 를 사용하는 경우 데이터를 반환하기 전에 여러 번 호출할 수 있습니다. 이 동작은 설계에 따른 것이며 KCL 또는 데이터에 문제가 있는 것은 아닙니다.

리샤딩에 적용

다음의 경우,getRecordsResult.getNextShardIterator보고null, 이 샤드와 관련된 샤드 분할 또는 병합이 발생했음을 나타냅니다. 이 샤드는 이제CLOSEDstate이 샤드에서 사용 가능한 모든 데이터 레코드를 읽었습니다.

이 시나리오에서는 다음을 사용할 수 있습니다.getRecordsResult.childShards를 사용하여 분할이나 병합으로 생성된 샤드의 새 하위 샤드에 대해 알아봅니다. 자세한 내용은 단원을 참조하십시오.ChildShard.

이 분할의 경우 새로운 샤드 2개 모두 이전에 처리한 샤드의 샤드 ID와 동일한 parentShardId가 있습니다. 이 샤드 2개의 adjacentParentShardId 값은 null입니다.

병합으로 생성된 새 단일 샤드에는 상위 샤드 중 하나의 샤드 ID 하나와 동일한 parentShardId 및 다른 상위 샤드의 샤드 ID와 동일한 adjacentParentShardId가 있습니다. 애플리케이션은 이러한 샤드 중 하나에서 모든 데이터를 이미 읽었습니다. 이것은 getRecordsResult.getNextShardIteratornull을 반환한 샤드입니다. 애플리케이션에서 데이터의 순서가 중요한 경우 병합으로 생성된 하위 샤드에서 새로운 데이터를 읽기 전에 다른 상위 샤드의 모든 데이터를 읽으십시오.

여러 프로세서를 사용하여 스트림에서 데이터를 검색하는 경우 만약 샤드당 프로세서가 1개이고 샤드 분할 또는 병합이 발생하면 샤드 수 변경에 따라 프로세서 수를 늘리거나 줄여 조정하십시오.

샤드 상태에 대한 설명을 포함하여 리샤딩에 대한 자세한 내용 (예:CLOSED—참조스트림 리샤딩.

를 사용하여 데이터와 상호 작용AWSGlue 스키마 레지스트리

Kinesis 데이터 스트림을AWSGlue 스키마 레지스트리. 이AWSSchema Registry는 스키마를 중앙에서 검색, 제어 및 발전시킬 수 있는 동시에 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증되도록 합니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. 이AWSGlue 스키마 레지스트리를 통해 개선할 수 있습니다.end-to-end스트리밍 애플리케이션 내의 데이터 품질 및 데이터 거버넌스 자세한 내용은 단원을 참조하십시오.AWSGlue 스키마 레지스트리. 이 통합을 설정하는 방법 중 하나는GetRecords에서 제공되는 Kinesis Data Streams APIAWSJava SDK.

를 사용하여 Kinesis Data Streams Streams의 Schema Registry와 통합을 설정하는 방법에 대한 자세한 내용은GetRecordsKinesis Data Streams API, 의 “Kinesis Data Streams API를 사용하여 데이터와 상호 작용” 섹션을 참조하십시오.사용 사례: Amazon Kinesis Data StreamsAWSGlue 스키마 레지스트리.