Kinesis Data Streams를 사용하여 향상된 팬아웃 소비자 개발 API - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Kinesis Data Streams를 사용하여 향상된 팬아웃 소비자 개발 API

향상된 팬아웃은 소비자가 샤드당 1초에 최대 2MB의 전용 처리량으로 데이터 스트림으로부터 레코드를 수신할 수 있도록 하는 Amazon Kinesis Data Streams 기능입니다. 향상된 팬아웃을 사용하는 소비자는 스트림으로부터 데이터를 수신하는 다른 소비자와 경쟁할 필요가 없습니다. 자세한 내용은 전용 처리량 (향상된 팬아웃) 으로 맞춤형 소비자 개발 단원을 참조하십시오.

API작업을 사용하여 Kinesis Data Streams에서 향상된 팬아웃을 사용하는 소비자를 구축할 수 있습니다.

Kinesis Data Streams를 사용하여 향상된 팬아웃으로 소비자를 등록하려면 API
  1. 애플리케이션을 향상된 팬아웃을 사용하는 소비자로 RegisterStreamConsumer등록하려면 호출하십시오. Kinesis Data Streams는 소비자에 대한 Amazon 리소스 이름 ARN () 을 생성하여 응답에 반환합니다.

  2. 특정 샤드의 청취를 시작하려면 소비자에게 ARN 호출을 전달하십시오. SubscribeToShard 그러면 Kinesis Data Streams는 해당 샤드의 레코드를 /2 연결을 HTTP 통해 SubscribeToShardEvent유형의 이벤트 형태로 사용자에게 푸시하기 시작합니다. 이 연결은 최대 5분 동안 활성화됩니다. 호출에서 정상적으로 또는 예외적으로 완료하라는 메시지가 반환된 후에도 샤드로부터 레코드를 계속 수신하려면 SubscribeToShardSubscribeToShard다시 호출하십시오. future

    참고

    SubscribeToShardAPI또한 현재 샤드의 끝에 도달하면 현재 샤드의 하위 샤드 목록을 반환합니다.

  3. 향상된 팬아웃을 사용하는 소비자의 등록을 취소하려면 전화하십시오. DeregisterStreamConsumer

다음 코드는 소비자를 샤드에 등록하고, 등록을 정기적으로 갱신하고, 이벤트를 처리하는 방법을 보여 주는 예제입니다.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ShardIteratorType; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; import java.util.concurrent.CompletableFuture; /** * See https://github.com/awsdocs/aws-doc-sdk-examples/blob/master/javav2/example_code/kinesis/src/main/java/com/example/kinesis/KinesisStreamEx.java * for complete code and more examples. */ public class SubscribeToShardSimpleImpl { private static final String CONSUMER_ARN = "arn:aws:kinesis:us-east-1:123456789123:stream/foobar/consumer/test-consumer:1525898737"; private static final String SHARD_ID = "shardId-000000000000"; public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.create(); SubscribeToShardRequest request = SubscribeToShardRequest.builder() .consumerARN(CONSUMER_ARN) .shardId(SHARD_ID) .startingPosition(s -> s.type(ShardIteratorType.LATEST)).build(); // Call SubscribeToShard iteratively to renew the subscription periodically. while(true) { // Wait for the CompletableFuture to complete normally or exceptionally. callSubscribeToShardWithVisitor(client, request).join(); } // Close the connection before exiting. // client.close(); } /** * Subscribes to the stream of events by implementing the SubscribeToShardResponseHandler.Visitor interface. */ private static CompletableFuture<Void> callSubscribeToShardWithVisitor(KinesisAsyncClient client, SubscribeToShardRequest request) { SubscribeToShardResponseHandler.Visitor visitor = new SubscribeToShardResponseHandler.Visitor() { @Override public void visit(SubscribeToShardEvent event) { System.out.println("Received subscribe to shard event " + event); } }; SubscribeToShardResponseHandler responseHandler = SubscribeToShardResponseHandler .builder() .onError(t -> System.err.println("Error during stream - " + t.getMessage())) .subscriber(visitor) .build(); return client.subscribeToShard(request, responseHandler); } }

event.ContinuationSequenceNumbernull을 반환하는 경우 이는 이 샤드와 관련된 샤드 분할 또는 병합이 발생했음을 나타냅니다. 이 샤드는 현재 CLOSED 상태이며 이 샤드에서 사용 가능한 모든 데이터 레코드를 읽었습니다. 이 시나리오에서는 위의 예에 따라 event.childShards를 사용하여 분할 또는 병합으로 생성된 처리 중인 샤드의 새 하위 샤드에 대해 알아볼 수 있습니다. 자세한 내용은 ChildShard를 참조하세요.