AWS SDK for Java와 Amazon Kinesis Data Streams API를 사용하여 생산자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS SDK for Java와 Amazon Kinesis Data Streams API를 사용하여 생산자 개발

AWS SDK for Java와 함께 Amazon Kinesis Data Streams API를 사용하여 생산자를 개발할 수 있습니다. Kinesis Data Streams를 처음 사용하는 경우, 먼저 Amazon Kinesis Data Streams란?Amazon Kinesis Data Streams 시작하기에 있는 개념과 용어를 알아 두세요.

이 예제에서는 Kinesis Data Streams API를 설명하고 AWS SDK for Java를 사용하여 스트림에 데이터를 추가(넣기)합니다. 그러나 대부분의 사용 사례에서는 Kinesis Data Streams KPL 라이브러리를 우선적으로 사용해야 합니다. 자세한 내용은 Amazon Kinesis Producer Library를 사용하여 생산자 개발 섹션을 참조하세요.

이 장의 Java 예제 코드는 기본 Kinesis Data Streams API 작업을 수행하는 방법을 설명하며, 작업 유형에 따라 논리적으로 나뉘어집니다. 이 예제는 가능한 모든 예외를 확인하지 않거나 가능한 모든 보안 및 성능 고려 사항을 감안하지 않는다는 점에서 프로덕션 지원 코드가 아닙니다. 또한 다른 프로그래밍 언어를 사용하여 Kinesis Data Streams API를 직접적으로 호출할 수 있습니다. 사용 가능한 모든 AWS SDK에 대한 자세한 내용은 Amazon Web Services로 개발 시작을 참조하세요.

각 작업에는 사전 요구 사항이 있습니다. 예를 들어, 스트림을 생성하기 전에는 데이터를 스트림에 추가할 수 없으므로 클라이언트를 만들어야 합니다. 자세한 내용은 스트림 생성 및 관리 섹션을 참조하세요.

스트림에 데이터 추가

스트림이 생성되면 레코드의 형태로 데이터를 스트림에 추가할 수 있습니다. 레코드는 데이터 BLOB 형식으로 처리할 데이터가 포함된 데이터 구조입니다. 데이터를 레코드에 저장한 후에는 Kinesis Data Streams가 어떤 식으로도 데이터를 검사하거나 해석하거나 변경하지 않습니다. 또한 레코드마다 연결된 시퀀스 번호와 파티션 키가 있습니다.

Kinesis Data Streams API에는 스트림에 데이터를 추가하는 두 가지 작업, PutRecordsPutRecord가 있습니다. PutRecords 작업은 HTTP 요청마다 스트림에 여러 레코드를 보내고, 단수인 PutRecord 작업은 한 번에 하나씩 스트림에 레코드를 보냅니다(레코드마다 별도의 HTTP 요청 필요). 데이터 생산자마다 더 높은 처리량을 보관하므로 대부분의 애플리케이션에 PutRecords를 우선 사용해야 합니다. 각 작업에 대한 자세한 내용은 아래에 나와 있는 별도의 하위 단원을 참조하십시오.

소스 애플리케이션이 Kinesis Data Streams API를 사용하여 스트림에 데이터를 추가할 때는 스트림의 데이터를 동시에 처리하는 소비자 애플리케이션이 하나 이상일 가능성이 높습니다. Kinesis Data Streams API를 사용하여 소비자가 데이터를 가져오는 방법에 대한 자세한 내용은 스트림에서 데이터 가져오기 섹션을 참조하세요.

PutRecords를 사용하여 여러 레코드 추가

PutRecords 작업은 단일 요청에서 여러 레코드를 Kinesis Data Streams에 보냅니다. 데이터를 Kinesis 데이터 스트림에 보낼 때 PutRecords를 사용하여 생산자가 더 높은 처리량을 보관할 수 있습니다. 각 PutRecords 요청은 최대 500개의 레코드를 지원할 수 있습니다. 요청에 포함되는 각 레코드 크기의 상한은 1MB이며, 파티션 키를 포함해 전체 요청당 최대 5MB로 제한됩니다. 아래에서 설명하는 단일 PutRecord 작업과 마찬가지로 PutRecords도 시퀀스 번호와 파티션 키를 사용합니다. 그러나 PutRecord 파라미터 SequenceNumberForOrderingPutRecords 호출에 포함되지 않습니다. PutRecords 작업은 요청의 일반 순서에 따라 모든 레코드를 처리하려고 합니다.

데이터 레코드마다 고유한 시퀀스 번호가 있습니다. 스트림에 데이터 레코드를 추가하기 위해 client.putRecords를 직접적으로 호출한 후 Kinesis Data Streams에서 시퀀스 번호를 할당합니다. 동일한 파티션 키에 대한 시퀀스 번호는 일반적으로 시간이 지남에 따라 증가합니다. PutRecords 요청 기간이 길어질수록 시퀀스 번호도 커집니다.

참고

같은 스트림에 있는 데이터 세트의 인덱스로 시퀀스 번호를 사용할 수 없습니다. 데이터 세트를 논리적으로 분리하려면 파티션 키를 사용하거나 데이터 세트마다 별도의 스트림을 만드십시오.

PutRecords 요청은 다른 파티션 키를 가진 레코드를 포함할 수 있습니다. 요청의 범위는 스트림이며, 요청은 요청 제한 이내에서 파티션 키와 레코드 조합을 포함할 수 있습니다. 일반적으로 여러 다른 파티션 키로 만들어져 여러 다른 샤드를 가진 스트림으로 전송되는 요청은 소수의 파티션 키를 가지고 있고 소수의 샤드로 전송되는 요청보다 빠릅니다. 파티션 키의 수가 샤드 수보다 훨씬 커야 지연 시간을 줄이고 처리량을 최대화할 수 있습니다.

PutRecords 예제

다음 코드는 파티션 키가 순차적인 데이터 레코드 100개를 만들어 DataStream이라는 스트림에 넣습니다.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

PutRecords 응답에는 응답 Records의 어레이가 포함됩니다. 응답 어레이의 각 레코드는 요청 및 응답의 일반 순서(위에서 아래로)를 이용해 요청 어레이의 레코드와 직접 연관됩니다. 응답 Records 어레이에는 항상 요청 어레이와 같은 수의 레코드가 포함됩니다.

PutRecords 사용 시 실패 처리

기본적으로 요청에 있는 개별 레코드가 실패해도 PutRecords 요청에 있는 후속 레코드의 처리가 중단되지 않습니다. 즉, 응답 Records 어레이에 성공적으로 처리된 레코드와 그렇지 않은 레코드가 모두 포함됩니다. 따라서 성공적으로 처리되지 않은 레코드를 찾아 후속 호출에 포함해야 합니다.

성공한 레코드에는 SequenceNumberShardID 값이 포함되며 성공하지 못한 레코드에는 ErrorCodeErrorMessage 값이 포함됩니다. ErrorCode 파라미터는 오류 유형을 반영하며 ProvisionedThroughputExceededException 값 또는 InternalFailure 값 중 하나 일 수 있습니다. ErrorMessage는 병목 현상이 발생한 레코드의 계정 ID, 스트림 이름 및 샤드 ID를 포함하여 ProvisionedThroughputExceededException 예외에 대한 세부 정보를 제공합니다. 아래 예제에서는 PutRecords 요청에 레코드 3개가 있습니다. 두 번째 레코드가 실패하고 응답에 반영됩니다.

예 PutRecords 요청 구문
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
예 PutRecords 응답 구문
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

성공적으로 처리되지 않은 레코드를 후속 PutRecords 요청에 포함할 수 있습니다. 먼저, putRecordsResult에서 FailedRecordCount 파라미터를 확인해서 요청에 처리에 실패한 레코드가 있는지 확인합니다. 그런 레코드가 있으면 putRecordsEntry이 아닌 ErrorCode가 있는 각 null를 후속 요청에 추가해야 합니다. 이러한 유형의 핸들러 예제는 다음 코드를 참조하세요.

예 PutRecords 실패 핸들러
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

PutRecord를 사용하여 단일 레코드 추가

PutRecord 호출은 레코드 1개에 적용됩니다. 특별히 애플리케이션이 항상 요청당 레코드를 1개만 보내야 하거나 다른 어떤 이유로 PutRecords를 사용할 수 없는 경우가 아니라면 PutRecords를 사용하여 여러 레코드 추가에서 설명하는 PutRecords 작업을 우선 사용하십시오.

데이터 레코드마다 고유한 시퀀스 번호가 있습니다. 스트림에 데이터 레코드를 추가하기 위해 client.putRecord를 직접적으로 호출한 후 Kinesis Data Streams에서 시퀀스 번호를 할당합니다. 동일한 파티션 키에 대한 시퀀스 번호는 일반적으로 시간이 지남에 따라 증가합니다. PutRecord 요청 기간이 길어질수록 시퀀스 번호도 커집니다.

넣기 작업은 본질적으로 Kinesis Data Streams와 동시에 나타나기 때문에 넣기 작업이 연달아 빠르게 발생할 때는 반환된 시퀀스 번호가 증가한다는 보장은 없습니다. 같은 파티션 키의 시퀀스 번호가 증가하도록 확실하게 보장하려면 SequenceNumberForOrdering 코드 샘플에 나온 대로 PutRecord 예제 파라미터를 사용하십시오.

SequenceNumberForOrdering 사용 여부와 관계없이 Kinesis Data Streams가 GetRecords 호출을 통해 검색하는 레코드는 시퀀스 번호대로 엄격하게 지정됩니다.

참고

같은 스트림에 있는 데이터 세트의 인덱스로 시퀀스 번호를 사용할 수 없습니다. 데이터 세트를 논리적으로 분리하려면 파티션 키를 사용하거나 데이터 세트마다 별도의 스트림을 만드십시오.

파티션 키는 스트림에서 데이터를 그룹화하는 데 사용됩니다. 해당 파티션 키에 따라 스트림 내의 샤드에 데이터 레코드가 할당됩니다. 특히 Kinesis Data Streams는 파티션 키 및 연결된 데이터를 특정 샤드에 매핑하는 해시 함수에 대한 입력으로 파티션 키를 사용합니다.

이 해시 메커니즘의 결과로 같은 파티션 키를 가진 모든 데이터 레코드가 스트림에 있는 동일한 샤드에 매핑됩니다. 그러나 파티션 키의 수가 샤드 수를 초과하면 일부 샤드에 서로 다른 파티션 키를 가진 레코드가 반드시 포함됩니다. 설계 면에서 모든 샤드를 잘 활용하려면 setShardCountCreateStreamRequest 메서드로 지정된 샤드 수가 고유한 파티션 수보다 상당히 적고, 단일 파티션 키로 전송되는 데이터의 양이 샤드의 용량보다 상당히 적어야 합니다.

PutRecord 예제

다음 코드는 파티션 키 2개에 배포되는 데이터 레코드 10개를 만들어 myStreamName이라는 스트림에 넣습니다.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

앞에 나온 코드 샘플에서는 setSequenceNumberForOrdering을 사용하여 각 파티션 키 내에서 순서가 증가하도록 확실하게 보장합니다. 이 파라미터를 효과적으로 사용하려면 현재 레코드(레코드 n)의 SequenceNumberForOrdering을 이전 레코드(레코드 n-1)의 시퀀스 번호로 설정합니다. 스트림에 추가된 레코드의 시퀀스 번호를 가져오려면 putRecord의 결과에 getSequenceNumber를 호출하십시오.

SequenceNumberForOrdering 파라미터는 동일한 파티션 키에 대한 시퀀스 번호가 확실하게 증가하도록 보장합니다. SequenceNumberForOrdering은 여러 파티션 키에 대한 레코드의 순서를 지정하지 않습니다.

AWS Glue 스키마 레지스트리를 사용하여 데이터와 상호 작용

Kinesis 데이터 스트림을 AWS Glue 스키마 레지스트리와 통합할 수 있습니다. AWS Glue 스키마 레지스트리를 사용하면 스키마를 중앙에서 검색, 제어 및 발전시키는 동시에 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증되도록 할 수 있습니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. AWS Glue 스키마 레지스트리를 사용하면 스트리밍 애플리케이션 내에서 엔드 투 엔드 데이터 품질 및 데이터 거버넌스를 개선할 수 있습니다. 자세한 내용은 AWS Glue Schema Registry를 참조하세요. 이 통합을 설정하는 방법 중 하나는 AWS Java SDK에서 사용 가능한 PutRecordsPutRecord Kinesis Data Streams API를 사용하는 것입니다.

Kinesis Data Streams API를 사용하여 Kinesis Data Streams와 스키마 레지스트리의 통합을 설정하는 방법에 대한 자세한 지침은 사용 사례: AWS Glue Schema Registry와 Amazon Kinesis Data Streams 통합의 'Kinesis Data Streams API를 사용하여 데이터와 상호 작용'을 참조하세요.