Kinesis Data Streams API를 사용하여 향상된 팬아웃 소비자 개발 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Kinesis Data Streams API를 사용하여 향상된 팬아웃 소비자 개발

향상된 팬아웃Amazon Kinesis Data Streams 기능은 소비자가 샤드당 1초에 최대 2MB의 전용 처리량으로 데이터 스트림으로부터 레코드를 수신할 수 있도록 하는 Amazon Kinesis Data Streams 기능입니다. 향상된 팬아웃을 사용하는 소비자는 스트림으로부터 데이터를 수신하는 다른 소비자와 경쟁할 필요가 없습니다. 자세한 정보는 전용 처리량으로 사용자 지정 소비자 개발(향상된 팬아웃)을 참조하십시오.

API 작업을 사용하여 Kinesis Data Streams Streams의 향상된 팬아웃을 사용하는 소비자를 만들 수 있습니다.

Kinesis Data Streams API를 사용하여 향상된 팬아웃을 사용하는 소비자를 등록하려면

  1. 전화RegisterStream소비자을 사용하여 애플리케이션을 향상된 팬아웃을 사용하는 소비자로 애플리케이션을 등록합니다. Kinesis Data Streams 소비자에 대해 Amazon 리소스 이름 (ARN) 을 생성하여 응답으로 반환합니다.

  2. 특정 샤드 청취를 시작하려면 다음 호출에 소비자 ARN을 전달하십시오.SubscribeTo샤드. 그러면 Kinesis Data Streams 유형의 이벤트 형식으로 해당 샤드의 레코드를 사용자에게 푸시하기 시작합니다.SubscribeToShardEventHTTP/2 연결을 통해 이 연결은 최대 5분 동안 활성화됩니다. 전화SubscribeTo샤드다시 한 번 다음 샤드에서 레코드를 계속 받으려면future에 대한 호출로 반환되는 경우SubscribeTo샤드정상적으로 또는 예외적으로 완료됩니다.

    참고

    SubscribeToShardAPI는 또한 현재 샤드의 끝에 도달하면 현재 샤드의 자식 샤드 목록을 반환합니다.

  3. 향상된 팬아웃을 사용하는 소비자의 등록을 해제하려면DeregisterStream소비자.

다음 코드는 소비자를 샤드에 등록하고, 등록을 정기적으로 갱신하고, 이벤트를 처리하는 방법을 보여 주는 예제입니다.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

다음의 경우,event.ContinuationSequenceNumber보고null, 이 샤드와 관련된 샤드 분할 또는 병합이 발생했음을 나타냅니다. 이 샤드는 이제CLOSEDstate이고 이 샤드에서 사용 가능한 모든 데이터 레코드를 읽었습니다. 이 시나리오에서는 위의 예에 따라 다음을 사용할 수 있습니다.event.childShards을 사용하여 분할 또는 병합으로 생성된 샤드의 새 하위 샤드에 대해 알아봅니다. 자세한 내용은 단원을 참조하십시오.ChildShard.