AWS SDK for Java를 사용하여 공유 처리량으로 사용자 지정 소비자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS SDK for Java를 사용하여 공유 처리량으로 사용자 지정 소비자 개발

공유 처리량으로 사용자 지정 Kinesis Data Streams 소비자를 개발하는 방법 중 하나는 Amazon Kinesis Data Streams API를 사용하는 것입니다. 이 섹션에서는 AWS SDK for Java와 함께 Kinesis Data Streams API를 사용하는 방법을 설명합니다. 이 섹션의 Java 샘플 코드는 기본 KDS API 작업을 수행하는 방법을 설명하며, 작업 유형에 따라 논리적으로 나뉘어집니다.

이러한 예제는 프로덕션에 사용할 수 있는 코드를 제공하지 않습니다. 이 예제는 가능한 모든 예외를 확인하지 않거나 가능한 모든 보안 및 성능 고려 사항을 감안하지 않습니다.

다른 프로그래밍 언어를 사용하여 Kinesis Data Streams API를 직접적으로 호출할 수 있습니다. 사용 가능한 모든 AWS SDK에 대한 자세한 내용은 Amazon Web Services로 개발 시작을 참조하세요.

중요

공유 처리량으로 사용자 지정 Kinesis Data Streams 소비자를 개발하는 데 권장되는 방법은 Kinesis Client Library(KCL)를 사용하는 것입니다. KCL을 사용하면 분산 컴퓨팅과 관련된 많은 복잡한 작업을 처리하여 Kinesis 데이터 스트림의 데이터를 사용하고 처리할 수 있습니다. 자세한 내용은 KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발을 참조하세요.

스트림에서 데이터 가져오기

Kinesis Data Streams API에는 데이터 스트림에서 레코드를 검색하기 위해 간접적으로 호출할 수 있는 getShardIteratorgetRecords 메서드가 포함되어 있습니다. 이 풀 모델에서는 코드가 데이터 스트림의 샤드에서 직접 데이터 레코드를 가져옵니다.

중요

KCL에서 제공하는 레코드 프로세서 지원을 사용하여 데이터 스트림에서 레코드를 검색하는 것이 좋습니다. 이 푸시 모델에서는 데이터를 처리하는 코드를 구현합니다. KCL은 데이터 스트림에서 데이터 레코드를 검색하고 애플리케이션 코드로 전송합니다. KCL에서는 장애 조치, 복구 및 로드 밸런싱 기능도 제공합니다. 자세한 내용은 KCL을 사용하여 공유 처리량으로 사용자 지정 소비자 개발을 참조하세요.

그러나 Kinesis Data Streams API 사용을 선호하는 경우도 있을 수 있습니다. 예를 들어, 데이터 스트림을 모니터링하거나 디버깅하기 위해 사용자 지정 도구를 구현할 수 있습니다.

중요

Kinesis Data Streams는 데이터 스트림의 데이터 레코드 보존 기간에 대한 변경을 지원합니다. 자세한 내용은 데이터 보존 기간 변경 섹션을 참조하세요.

샤드 반복자 사용

샤드 수를 기준으로 스트림에서 레코드를 검색합니다. 각 샤드와 샤드에서 검색한 레코드의 각 배치에 대한 샤드 반복자를 가져와야 합니다. 레코드를 검색할 샤드를 지정하기 위해 getRecordsRequest 객체에 샤드 반복자가 사용됩니다. 샤드 반복자와 연결된 유형은 샤드에서 레코드를 검색할 지점을 결정합니다(자세한 내용은 이 단원의 뒷부분 참조). 샤드 반복자로 작업하기 전에 DescribeStream API - 더 이상 사용되지 않음에서 설명하는 대로 샤드를 검색해야 합니다.

getShardIterator 메서드를 사용하여 초기 샤드 반복자를 가져옵니다. getNextShardIterator 메서드에서 반환된 getRecordsResult 객체의 getRecords 메서드를 사용하여 추가 레코드 배치의 샤드 반복자를 가져옵니다. 샤드 반복자는 5분간 유효합니다. 유효한 시간 동안 샤드 반복자를 사용하면 새로운 샤드 반복자를 얻을 수 있습니다. 각 샤드 반복자는 사용 후에도 5분간 유효합니다.

초기 샤드 반복자를 가져오려면 GetShardIteratorRequest를 인스턴스화하고 getShardIterator 메서드에 전달합니다. 요청을 구성하려면 스트림과 샤드 ID를 지정하십시오. AWS 계정에서 스트림을 가져오는 방법에 대한 자세한 내용은 스트림 나열 섹션을 참조하세요. 스트림에서 샤드를 가져오는 방법에 대한 자세한 내용은 DescribeStream API - 더 이상 사용되지 않음을 참조하십시오.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

이 샘플 코드는 초기 샤드 반복자를 가져올 때 반복자 유형으로 TRIM_HORIZON을 지정합니다. 이 반복자 유형은 가장 최근에 추가한 레코드(팁이라고도 함)가 아니라 샤드에 추가된 첫 번째 레코드부터 반환해야 함을 의미합니다. 다음은 가능한 반복자 유형입니다.

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

자세한 내용은 ShardIteratorType을 참조하십시오.

일부 반복자 유형에는 유형 외에 시퀀스 번호도 지정해야 합니다. 예를 들면 다음과 같습니다.

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

getRecords를 사용하여 레코드를 가져온 후에 레코드의 getSequenceNumber 메서드를 호출하여 레코드의 시퀀스 번호를 가져올 수 있습니다.

record.getSequenceNumber()

또한 데이터 스트림에 레코드를 추가하는 코드는 getSequenceNumber 결과에 putRecord를 호출하여 추가된 레코드의 시퀀스 번호를 가져올 수 있습니다.

lastSequenceNumber = putRecordResult.getSequenceNumber();

시퀀스 번호를 사용하여 레코드 순서가 확실하게 증가하도록 보장할 수 있습니다. 자세한 내용은 PutRecord 예제의 코드 예제를 참조하십시오.

GetRecords 사용

샤드 반복자를 가져온 후 GetRecordsRequest 객체를 인스턴스화하십시오. setShardIterator 메서드를 사용하여 요청에 반복자를 지정하십시오.

선택적으로 setLimit 메서드를 사용하여 검색할 레코드 수를 설정할 수 있습니다. getRecords에 의해 반환된 레코드 수는 항상 이 제한보다 적거나 같습니다. 이 제한을 지정하지 않으면 getRecords가 검색된 레코드의 10MB를 반환합니다. 아래 샘플 코드에서는 제한을 레코드 25개로 설정합니다.

반환된 레코드가 없으면 샤드 반복자가 참조하는 시퀀스 번호로 이 샤드에서 현재 사용할 수 있는 데이터 레코드가 없음을 의미합니다. 이 상황에서, 애플리케이션은 스트림의 데이터 소스에 적절한 시간 동안 대기해야 합니다. 그런 다음 이전의 getRecords 호출에서 반환된 샤드 반복자를 사용하여 샤드에서 데이터를 다시 가져오십시오.

getRecordsRequestgetRecords 메서드에 전달하고 반환된 값을 getRecordsResult 객체로 캡처하십시오. 데이터 레코드를 가져오려면 getRecords 객체에서 getRecordsResult 메서드를 호출하십시오.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

getRecords에 대한 또 다른 호출을 준비하려면 getRecordsResult에서 다음 샤드 반복자를 가져오십시오.

shardIterator = getRecordsResult.getNextShardIterator();

최상의 결과를 얻으려면 getRecords 빈도 제한을 초과하지 않도록 getRecords 호출 사이에 적어도 1초(1,000밀리초)의 대기 시간을 유지하십시오.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

일반적으로 테스트 시나리오에서 단일 레코드를 검색하는 경우에도 루프에서 getRecords를 호출해야 합니다. getRecords를 한 번 호출하면 샤드가 이후의 시퀀스 번호에 더 많은 레코드를 포함하더라도 빈 레코드 목록이 반환될 수 있습니다. 이 경우 빈 레코드 목록과 함께 반환된 NextShardIterator가 샤드의 이후 시퀀스 번호를 참조하며 연속적으로 getRecords 호출하면 최종적으로 레코드가 반환됩니다. 다음 샘플은 루프의 사용을 보여줍니다.

예제: getRecords

다음 코드 예제는 루프에서 이루어지는 호출을 포함하여 이 단원에 나온 getRecords 팁을 반영합니다.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Kinesis Client Library를 사용하는 경우 데이터를 반환하기 전에 여러 번 직접적으로 호출할 수 있습니다. 이 동작은 설계에 따른 것이며 KCL이나 데이터에 문제가 있는 것은 아닙니다.

리샤딩에 적용

getRecordsResult.getNextShardIteratornull을 반환하는 경우 이는 이 샤드와 관련된 샤드 분할 또는 병합이 발생했음을 나타냅니다. 이 샤드는 현재 CLOSED 상태이며 이 샤드에서 사용 가능한 모든 데이터 레코드를 읽었습니다.

이 시나리오에서는 getRecordsResult.childShards를 사용하여 분할 또는 병합으로 생성된 처리 중인 샤드의 새 하위 샤드에 대해 알아볼 수 있습니다. 자세한 내용은 ChildShard를 참조하세요.

이 분할의 경우 새로운 샤드 2개 모두 이전에 처리한 샤드의 샤드 ID와 동일한 parentShardId가 있습니다. 이 샤드 2개의 adjacentParentShardId 값은 null입니다.

병합으로 생성된 새 단일 샤드에는 상위 샤드 중 하나의 샤드 ID 하나와 동일한 parentShardId 및 다른 상위 샤드의 샤드 ID와 동일한 adjacentParentShardId가 있습니다. 애플리케이션은 이러한 샤드 중 하나에서 모든 데이터를 이미 읽었습니다. 이것은 getRecordsResult.getNextShardIteratornull을 반환한 샤드입니다. 애플리케이션에서 데이터의 순서가 중요한 경우 병합으로 생성된 하위 샤드에서 새로운 데이터를 읽기 전에 다른 상위 샤드의 모든 데이터를 읽으십시오.

여러 프로세서를 사용하여 스트림에서 데이터를 검색하는 경우 만약 샤드당 프로세서가 1개이고 샤드 분할 또는 병합이 발생하면 샤드 수 변경에 따라 프로세서 수를 늘리거나 줄여 조정하십시오.

CLOSED와 같은 샤드 상태 설명을 포함하여 리샤딩에 대한 자세한 내용은 스트림 리샤딩 섹션을 참조하세요.

AWS Glue 스키마 레지스트리를 사용하여 데이터와 상호 작용

Kinesis 데이터 스트림을 AWS Glue 스키마 레지스트리와 통합할 수 있습니다. AWS Glue 스키마 레지스트리를 사용하면 스키마를 중앙에서 검색, 제어 및 발전시키는 동시에 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증되도록 할 수 있습니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. AWS Glue 스키마 레지스트리를 사용하면 스트리밍 애플리케이션 내에서 엔드 투 엔드 데이터 품질 및 데이터 거버넌스를 개선할 수 있습니다. 자세한 내용은 AWS Glue Schema Registry를 참조하세요. 이 통합을 설정하는 방법 중 하나는 AWS Java SDK에서 사용 가능한 GetRecords Kinesis Data Streams API를 사용하는 것입니다.

GetRecords Kinesis Data Streams API를 사용하여 Kinesis Data Streams와 스키마 레지스트리의 통합을 설정하는 방법에 대한 자세한 지침은 사용 사례: AWS Glue Schema Registry와 Amazon Kinesis Data Streams 통합의 'Kinesis Data Streams API를 사용하여 데이터와 상호 작용'을 참조하세요.