シャードの一覧表示 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

シャードの一覧表示

データストリームは 1 つ以上のシャードを持つことができます。データストリームからシャードを一覧表示 (または取得) するには、2 つの方法があります。

ListShards API - 推奨

データストリームからシャードを一覧表示または取得するための推奨方法は、 ListShards API を使用することです。次の例では、データストリーム内のシャードを一覧表示する方法を示します。この例で使用されているメインオペレーションと、オペレーションに設定できるすべてのパラメータの詳細については、「」を参照してくださいListShards

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

前のコード例を実行するには、次のような POM ファイルを使用できます。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

ListShards API では、 ShardFilterパラメータを使用して API のレスポンスを除外できます。一度に 1 つのフィルターしか指定できません。

API を呼び出す ListShardsときに ShardFilterパラメータを使用する場合、 Typeは必須プロパティであり、指定する必要があります。AT_TRIM_HORIZONFROM_TRIM_HORIZON、または AT_LATEST タイプを指定する場合は、ShardId または Timestamp のオプションのプロパティを指定する必要はありません。

AFTER_SHARD_ID タイプを指定する場合は、オプションの ShardId プロパティの値も指定する必要があります。ShardId プロパティは、 ListShards API の ExclusiveStartShardIdパラメータと同じ機能です。ShardId プロパティが指定されている場合、レスポンスには、指定した ShardId の直後に ID が続くシャードで始まるシャードが含まれます。

AT_TIMESTAMP または FROM_TIMESTAMP_ID タイプを指定する場合は、オプションの Timestamp プロパティの値も指定する必要があります。AT_TIMESTAMP タイプを指定する場合は、指定されたタイムスタンプで開いていたすべてのシャードが返されます。FROM_TIMESTAMP タイプを指定する場合は、TIP に指定されたタイムスタンプから始まるすべてのシャードが返されます。

重要

DescribeStreamSummary および ListShard API は、データストリームに関する情報を取得するための、よりスケーラブルな方法を提供します。具体的には、 DescribeStream API のクォータによってスロットリングが発生する可能性があります。詳細については、「クォータと制限」を参照してください。また、DescribeStreamクォータは AWS 、アカウント内のすべてのデータストリームとやり取りするすべてのアプリケーション間で共有されることに注意してください。一方、 ListShards API のクォータは、単一のデータストリームに固有です。そのため、 ListShards API を使用すると TPS が高くなるだけでなく、データストリームをさらに作成するにつれてアクションのスケーリングも向上します。

API を呼び出すすべてのプロデューサーとコンシューマーを移行して、代わりに DescribeStream DescribeStreamSummary と ListShard APIsを呼び出すことをお勧めします。これらのプロデューサーとコンシューマーを特定するには、KPL と KCL のユーザーエージェントが API コールにキャプチャされるため、Athena を使用して CloudTrail ログを解析することをお勧めします。

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

また、DescribeStreamAPI を呼び出す AWS Lambda および Amazon Firehose と Kinesis Data Streams の統合は、統合が代わりに DescribeStreamSummaryおよび を呼び出すように再設定することをお勧めしますListShards。具体的には、 AWS Lambda の場合は、イベントソースマッピングを更新する必要があります。Amazon Firehose については、対応する IAM 許可を更新して、それらに ListShards IAM 許可を含める必要があります。

DescribeStream API - 非推奨

重要

以下の情報は、 DescribeStream API を介してデータストリームからシャードを取得する現在非推奨の方法について説明します。現在、ListShards API を使用してデータストリームを構成するシャードを取得することを強くお勧めしています。

describeStream メソッドによって返された応答オブジェクトを使用すると、ストリームを構成するシャードについて情報を取得できます。シャードを取得するには、このオブジェクトの getShards メソッドを呼び出します。このメソッドは、1 回の呼び出しでストリームからすべてのシャードを返すとは限りません。以下のコードでは、getHasMoreShardsgetStreamDescription メソッドを使用して、返されなかったシャードがあるかどうかを確認しています。ある場合、つまり、このメソッドが true を返した場合は、ループ内で getShards の呼び出しを繰り返して、返されたシャードの新しいバッチをシャードのリストに追加していきます。getHasMoreShardsfalse を返した場合は、ループが終了します。つまり、すべてのシャードが返されたことになります。getShards は 状態のシャードを返さないことに注意してください。EXPIREDシャードの状態 (EXPIRED 状態など) の詳細については、リシャーディング後のデータのルーティング、データの永続化、シャードの状態を参照してください。

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );