샤드 나열 - Amazon Kinesis Data Streams

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

샤드 나열

데이터 스트림에는 하나 이상의 샤드가 있을 수 있습니다. 데이터 스트림에서 샤드를 나열 (또는 검색) 하는 방법에는 두 가지가 있습니다.

ListShardsAPI - 권장 사항

데이터 스트림에서 샤드를 나열하거나 검색하는 데 권장되는 방법은ListShardsAPI. 다음 예제에서는 데이터 스트림에서 샤드 목록을 가져오는 방법을 보여줍니다. 이 예제에 사용된 기본 작업과 이 작업에 설정할 수 있는 모든 파라미터에 대한 전체 설명은 을 참조하십시오.ListShards.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

이전 코드 예제를 실행하려면 다음과 같은 POM 파일을 사용할 수 있습니다.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

사용ListShardsAPI로 를 사용할 수 있습니다.ShardFilterAPI의 응답을 필터링하는 매개 변수입니다. 한 번에 하나의 필터만 지정할 수 있습니다.

ShardFilter호출 시 매개 변수ListShardsAPI,Type은 필수 속성이며 를 지정해야 합니다. 를 지정하는 경우AT_TRIM_HORIZON,FROM_TRIM_HORIZON또는AT_LATEST유형, 둘 중 하나를 지정할 필요가 없습니다.ShardId또는Timestamp선택적 속성입니다.

를 지정하는 경우AFTER_SHARD_ID를 입력합니다. 선택 사항에 대한 값도 제공해야 합니다.ShardId속성입니다. 이ShardId속성이 다음과 같은 기능면에서 동일합니다.ExclusiveStartShardId매개 변수ListShardsAPI. 일시ShardId속성이 지정되면 응답에는 ID가 바로 뒤에 오는 샤드로 시작하는 샤드가 포함됩니다.ShardId귀하가 제공한 것입니다.

를 지정하는 경우AT_TIMESTAMP또는FROM_TIMESTAMP_ID를 입력합니다. 선택 사항에 대한 값도 제공해야 합니다.Timestamp속성입니다. 를 지정하는 경우AT_TIMESTAMPtype 을 입력하면 제공된 타임스탬프에서 열려 있던 모든 샤드가 반환됩니다. 를 지정하는 경우FROM_TIMESTAMP를 입력하면 제공된 타임스탬프에서 TIP까지 시작하는 모든 샤드가 반환됩니다.

중요

DescribeStreamSummaryListShardAPI는 데이터 스트림에 대한 정보를 검색할 수 있는 보다 확장 가능한 방법을 제공합니다. 보다 구체적으로, 에 대한 할당량DescribeStreamAPI는 스로틀을 일으킬 수 있습니다. 자세한 정보는 할당량과 제한을 참조하십시오. 참고 사항도DescribeStream의 모든 데이터 스트림과 상호 작용하는 모든 애플리케이션에서 할당량을 공유합니다.AWS계정. 에 대한 할당량ListShards반면 API는 단일 데이터 스트림에만 적용됩니다. 그래서 당신은 더 높은 TPS를 얻을 수있을뿐만 아니라ListShardsAPI이지만 더 많은 데이터 스트림을 만들면 동작이 더 잘 확장됩니다.

를 호출하는 모든 생산자와 소비자를 마이그레이션하는 것이 좋습니다.DescribeStream대신 호출하는 APIDescribeStream요약 및ListShardAPI. 이러한 생산자와 소비자를 식별하려면 Athena를 사용하여 구문 분석하는 것이 좋습니다.CloudTrailKPL 및 KCL에 대한 사용자 에이전트로 로그가 API 호출에서 캡처됩니다.

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

또한 를 사용하는 것이 좋습니다.AWSLambda 및 Kinesis Data Firehose 파이어호스와 Kinesis Data Streams 파이어호스 통합DescribeStreamAPI가 재구성되어 통합이 대신 호출됩니다.DescribeStreamSummaryListShards. 특히 를 위해AWSLambda, 이벤트 소스 매핑을 업데이트해야 합니다. Kinesis Data Firehose 경우 해당 IAM 권한을 업데이트해야 합니다.ListShardsIAM 권한.

DescribeStreamAPI - 사용 중단됨

중요

아래 정보는 데이터 스트림에서 샤드를 검색하는 현재 사용되지 않는 방법에 대해 설명합니다.DescribeStreamAPI. 현재 를 사용하는 것이 좋습니다.ListShards데이터 스트림을 구성하는 샤드를 검색하는 API입니다.

describeStream 메서드에 의해 반환되는 응답 객체를 통해 스트림을 구성하는 샤드에 대한 정보를 검색할 수 있습니다. 샤드를 검색하려면 이 객체에 대해 getShards 메서드를 호출합니다. 이 메서드는 단일 호출로 스트림에서 모든 샤드를 반환하지 않을 수 있습니다. 다음 코드에서 getHasMoreShards에 대해 getStreamDescription 메서드를 확인하여 반환되지 않은 추가 샤드가 있는지 확인합니다. 있는 경우, 즉 이 메서드가 true를 반환하면 계속해서 getShards를 반복적으로 호출하여 반환된 샤드의 새로운 배치를 각각 샤드 목록에 추가합니다. getHasMoreShardsfalse를 반환하면, 즉 모든 샤드가 반환되면 반복이 종료됩니다. getShardsEXPIRED 상태인 샤드를 반환하지 않습니다. EXPIRED 상태를 비롯한 샤드 상태에 대한 자세한 내용은 리샤딩 후 데이터 라우팅, 데이터 지속성 및 샤드 상태 단원을 참조하십시오.

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );