Amazon Kinesis Data Streams API를 사용하여 생산자 개발AWS SDK for Java - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon Kinesis Data Streams API를 사용하여 생산자 개발AWS SDK for Java

Amazon Kinesis Data Streams API를 사용하여 생산자를 개발할 수 있습니다.AWSSDK for Java. Kinesis Data Streams 처음 사용하는 경우 먼저 에 있는 개념과 용어를 알아 두십시오.Amazon Kinesis Data StreamsAmazon Kinesis Data Streams 시작하기.

이 예제에서는Kinesis Data Streams API다음을 사용하십시오.AWSSDK for Java스트림에 데이터를 추가 (넣기) 합니다. 그러나 대부분의 사용 사례에서는 Kinesis Data Streams KPL 라이브러리를 우선적으로 사용해야 합니다. 자세한 정보는 Amazon Kinesis Producer 라이브러리를 사용하여 생산자 개발을 참조하십시오.

이 장의 Java 예제 코드는 기본 Kinesis Data Streams API 작업을 수행하는 방법을 설명하며, 작업 유형에 따라 논리적으로 나뉘어집니다. 이 예제는 가능한 모든 예외를 확인하지 않거나 가능한 모든 보안 및 성능 고려 사항을 감안하지 않는다는 점에서 프로덕션 지원 코드가 아닙니다. 또한 를 호출할 수 있습니다.Kinesis Data Streams API다른 프로그래밍 언어 사용. 사용 가능한 모든 항목에 대한 자세한 내용AWSSDK, 참조Amazon Web Services Services로 개발 시작.

각 작업에는 사전 요구 사항이 있습니다. 예를 들어, 스트림을 생성하기 전에는 데이터를 스트림에 추가할 수 없으므로 클라이언트를 만들어야 합니다. 자세한 정보는 스트림 생성 및 관리을 참조하십시오.

스트림에 데이터 추가

스트림이 생성되면 레코드의 형태로 데이터를 스트림에 추가할 수 있습니다. 레코드는 데이터 BLOB 형식으로 처리할 데이터가 포함된 데이터 구조입니다. 데이터를 레코드에 저장한 후에는 Kinesis Data Streams 어떤 식으로도 데이터를 검사, 해석 또는 변경하지 않습니다. 또한 레코드마다 연결된 시퀀스 번호와 파티션 키가 있습니다.

스트림에 데이터를 추가하는 Kinesis Data Streams API에는 두 가지 작업이 있습니다.PutRecordsPutRecord. PutRecords 작업은 HTTP 요청마다 스트림에 여러 레코드를 보내고, 단수인 PutRecord 작업은 한 번에 하나씩 스트림에 레코드를 보냅니다(레코드마다 별도의 HTTP 요청 필요). 데이터 생산자마다 더 높은 처리량을 보관하므로 대부분의 애플리케이션에 PutRecords를 우선 사용해야 합니다. 각 작업에 대한 자세한 내용은 아래에 나와 있는 별도의 하위 단원을 참조하십시오.

소스 애플리케이션이 Kinesis Data Streams API를 사용하여 스트림에 데이터를 추가할 때는 스트림의 데이터를 동시에 처리하는 소비자 애플리케이션이 하나 이상일 가능성이 높습니다. Kinesis Data Streams API를 사용하여 소비자가 데이터를 가져오는 방법에 대한 자세한 내용은 를 참조하십시오.스트림에서 데이터 가져오기.

를 사용하여 여러 레코드 추가PutRecords

PutRecords작업은 단일 요청에서 여러 레코드를 Kinesis Data Streams Streams에 보냅니다. 사용 사용PutRecords를 사용하여 생산자는 Kinesis Data Streams에 데이터를 전송할 때 더 높은 처리량을 얻을 수 있습니다 각PutRecords요청은 최대 500개의 레코드를 지원할 수 있습니다. 요청에 포함되는 각 레코드 크기의 상한은 1MB이며, 파티션 키를 포함해 전체 요청당 최대 5MB로 제한됩니다. 싱글과 마찬가지로PutRecord아래에 설명된 작업PutRecords에서는 시퀀스 번호와 파티션 키를 사용합니다. 그러나 PutRecord 파라미터 SequenceNumberForOrderingPutRecords 호출에 포함되지 않습니다. PutRecords 작업은 요청의 일반 순서에 따라 모든 레코드를 처리하려고 합니다.

데이터 레코드마다 고유한 시퀀스 번호가 있습니다. 를 호출한 후 Kinesis Data Streams 에서 시퀀스 번호를 할당합니다.client.putRecords스트림에 데이터 레코드를 추가합니다. 동일한 파티션 키에 대한 시퀀스 번호는 일반적으로 시간이 지남에 따라 증가합니다. PutRecords 요청 기간이 길어질수록 시퀀스 번호도 커집니다.

참고

같은 스트림에 있는 데이터 세트의 인덱스로 시퀀스 번호를 사용할 수 없습니다. 데이터 세트를 논리적으로 분리하려면 파티션 키를 사용하거나 데이터 세트마다 별도의 스트림을 만드십시오.

PutRecords 요청은 다른 파티션 키를 가진 레코드를 포함할 수 있습니다. 요청의 범위는 스트림이며, 요청은 요청 제한 이내에서 파티션 키와 레코드 조합을 포함할 수 있습니다. 일반적으로 여러 다른 파티션 키로 만들어져 여러 다른 샤드를 가진 스트림으로 전송되는 요청은 소수의 파티션 키를 가지고 있고 소수의 샤드로 전송되는 요청보다 빠릅니다. 파티션 키의 수가 샤드 수보다 훨씬 커야 지연 시간을 줄이고 처리량을 최대화할 수 있습니다.

PutRecords예

다음 코드는 파티션 키가 순차적인 데이터 레코드 100개를 만들어 DataStream이라는 스트림에 넣습니다.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

PutRecords 응답에는 응답 Records의 어레이가 포함됩니다. 응답 어레이의 각 레코드는 요청 및 응답의 일반 순서(위에서 아래로)를 이용해 요청 어레이의 레코드와 직접 연관됩니다. 응답 Records 어레이에는 항상 요청 어레이와 같은 수의 레코드가 포함됩니다.

사용 시 실패 처리PutRecords

기본적으로 요청에 있는 개별 레코드가 실패해도 PutRecords 요청에 있는 후속 레코드의 처리가 중단되지 않습니다. 즉, 응답 Records 어레이에 성공적으로 처리된 레코드와 그렇지 않은 레코드가 모두 포함됩니다. 따라서 성공적으로 처리되지 않은 레코드를 찾아 후속 호출에 포함해야 합니다.

성공한 레코드에는 SequenceNumberShardID 값이 포함되며 성공하지 못한 레코드에는 ErrorCodeErrorMessage 값이 포함됩니다. ErrorCode 파라미터는 오류 유형을 반영하며 ProvisionedThroughputExceededException 값 또는 InternalFailure 값 중 하나 일 수 있습니다. ErrorMessage는 병목 현상이 발생한 레코드의 계정 ID, 스트림 이름 및 샤드 ID를 포함하여 ProvisionedThroughputExceededException 예외에 대한 세부 정보를 제공합니다. 아래 예제에서는 PutRecords 요청에 레코드 3개가 있습니다. 두 번째 레코드가 실패하고 응답에 반영됩니다.

예 PutRecords요청 구문

{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }

예 PutRecords응답 구문

{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

성공적으로 처리되지 않은 레코드를 후속 PutRecords 요청에 포함할 수 있습니다. 먼저, putRecordsResult에서 FailedRecordCount 파라미터를 확인해서 요청에 처리에 실패한 레코드가 있는지 확인합니다. 그런 레코드가 있으면 putRecordsEntry이 아닌 ErrorCode가 있는 각 null를 후속 요청에 추가해야 합니다. 이러한 유형의 핸들러 예제는 다음 코드를 참조하세요.

예 PutRecords실패 핸들러

PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

를 사용하여 단일 레코드 추가PutRecord

PutRecord 호출은 레코드 1개에 적용됩니다. 특별히 애플리케이션이 항상 요청당 레코드를 1개만 보내야 하거나 다른 어떤 이유로 PutRecords를 사용할 수 없는 경우가 아니라면 를 사용하여 여러 레코드 추가PutRecords에서 설명하는 PutRecords 작업을 우선 사용하십시오.

데이터 레코드마다 고유한 시퀀스 번호가 있습니다. 를 호출한 후 Kinesis Data Streams 에서 시퀀스 번호를 할당합니다.client.putRecord스트림에 데이터 레코드를 추가합니다. 동일한 파티션 키에 대한 시퀀스 번호는 일반적으로 시간이 지남에 따라 증가합니다. PutRecord 요청 기간이 길어질수록 시퀀스 번호도 커집니다.

넣기 작업이 Kinesis Data Streams 와 동시에 나타나기 때문에 넣기 작업이 연달아 빠르게 발생할 때는 반환된 시퀀스 번호가 증가한다는 보장은 없습니다. 같은 파티션 키의 시퀀스 번호가 증가하도록 확실하게 보장하려면 SequenceNumberForOrdering 코드 샘플에 나온 대로 PutRecord예 파라미터를 사용하십시오.

사용 여부SequenceNumberForOrdering, Kinesis Data Streams 스트림이GetRecords호출은 시퀀스 번호별로 엄격하게 정렬됩니다.

참고

같은 스트림에 있는 데이터 세트의 인덱스로 시퀀스 번호를 사용할 수 없습니다. 데이터 세트를 논리적으로 분리하려면 파티션 키를 사용하거나 데이터 세트마다 별도의 스트림을 만드십시오.

파티션 키는 스트림에서 데이터를 그룹화하는 데 사용됩니다. 해당 파티션 키에 따라 스트림 내의 샤드에 데이터 레코드가 할당됩니다. 특히 Kinesis Data Streams 파티션 키 및 연결된 데이터를 특정 샤드에 매핑하는 해시 함수에 대한 입력으로 파티션 키를 사용합니다.

이 해시 메커니즘의 결과로 같은 파티션 키를 가진 모든 데이터 레코드가 스트림에 있는 동일한 샤드에 매핑됩니다. 그러나 파티션 키의 수가 샤드 수를 초과하면 일부 샤드에 서로 다른 파티션 키를 가진 레코드가 반드시 포함됩니다. 설계 면에서 모든 샤드를 잘 활용하려면 setShardCountCreateStreamRequest 메서드로 지정된 샤드 수가 고유한 파티션 수보다 상당히 적고, 단일 파티션 키로 전송되는 데이터의 양이 샤드의 용량보다 상당히 적어야 합니다.

PutRecord예

다음 코드는 파티션 키 2개에 배포되는 데이터 레코드 10개를 만들어 myStreamName이라는 스트림에 넣습니다.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

앞에 나온 코드 샘플에서는 setSequenceNumberForOrdering을 사용하여 각 파티션 키 내에서 순서가 증가하도록 확실하게 보장합니다. 이 매개 변수를 효과적으로 사용하려면SequenceNumberForOrdering현재 레코드 (레코드)n) 이전 레코드의 시퀀스 번호 (레코드)n-1). 스트림에 추가된 레코드의 시퀀스 번호를 가져오려면 putRecord의 결과에 getSequenceNumber를 호출하십시오.

SequenceNumberForOrdering 파라미터는 동일한 파티션 키에 대한 시퀀스 번호가 확실하게 증가하도록 보장합니다. SequenceNumberForOrdering은 여러 파티션 키에 대한 레코드의 순서를 지정하지 않습니다.

를 사용하여 데이터와 상호 작용AWSGlue 스키마 레지스트리

Kinesis 데이터 스트림을AWSGlue 스키마 레지스트리. 이AWSSchema Registry를 사용하면 스키마를 중앙에서 검색, 제어 및 발전시킬 수 있으며, 생성된 데이터가 등록된 스키마에 의해 지속적으로 검증될 수 있습니다. 스키마는 데이터 레코드의 구조와 포맷을 정의합니다. 스키마는 신뢰할 수 있는 데이터 게시, 소비 또는 저장을 위한 버전 지정 사양입니다. 이AWSGlue 스키마 레지스트리를 통해 개선할 수 있습니다.end-to-end스트리밍 애플리케이션 내의 데이터 품질 및 데이터 거버넌스 자세한 내용은 단원을 참조하십시오.AWSGlue 스키마 레지스트리. 이 통합을 설정하는 방법 중 하나는PutRecordsPutRecordKinesis Data Streams APIs는AWSJava SDK.

를 사용하여 Kinesis Data Streams를 Schema Registry와 통합하는 방법을 자세히 알아보려면PutRecords과PutRecordKinesis Data Streams API, 의 “Kinesis Data Streams API를 사용하여 데이터와 상호 작용” 섹션을 참조하십시오.사용 사례: Amazon Kinesis Data Streams 통합AWSGlue 스키마 레지스트리.