シャードの一覧表示 - Amazon Kinesis Data Streams

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

シャードの一覧表示

データストリームは 1 つ以上のシャードを持つことができます。データストリームからシャードを一覧表示 (または取得) するには、2 つの方法があります。

ListShards API-推奨

データストリームからシャードを一覧表示または取得するには、API を使用することをおすすめしますListShards。次の例では、データストリーム内のシャードを一覧表示する方法を示します。この例で使用されている主な操作と、その操作に設定できるすべてのパラメータの詳細については、を参照してください。ListShards

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

前のコード例を実行するには、次のような POM ファイルを使用できます。

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

ListShardsAPI では、ShardFilterパラメータを使用して API の応答を除外できます。一度に 1 つのフィルターしか指定できません。

ListShardsAPI ShardFilter を呼び出すときにパラメータを使用する場合、Typeは必須プロパティであり、指定する必要があります。AT_TRIM_HORIZONFROM_TRIM_HORIZON、または AT_LATEST タイプを指定する場合は、ShardId または Timestamp のオプションのプロパティを指定する必要はありません。

AFTER_SHARD_ID タイプを指定する場合は、オプションの ShardId プロパティの値も指定する必要があります。ShardIdこのプロパティは ListShards API ExclusiveStartShardId のパラメータと機能的には同じです。ShardId プロパティが指定されている場合、レスポンスには、指定した ShardId の直後に ID が続くシャードで始まるシャードが含まれます。

AT_TIMESTAMP または FROM_TIMESTAMP_ID タイプを指定する場合は、オプションの Timestamp プロパティの値も指定する必要があります。AT_TIMESTAMP タイプを指定する場合は、指定されたタイムスタンプで開いていたすべてのシャードが返されます。FROM_TIMESTAMP タイプを指定する場合は、TIP に指定されたタイムスタンプから始まるすべてのシャードが返されます。

重要

DescribeStreamSummary および ListShard API は、データストリームに関する情報を取得するための、よりスケーラブルな方法を提供します。具体的には、 DescribeStream API のクォータによってスロットリングが発生する可能性があります。詳細については、「クォータと制限」を参照してください。なお、DescribeStream クォータは、AWS アカウント内のすべてのデータストリームとやり取りするすべてのアプリケーションで共有されます。一方、 ListShards API のクォータは 1 つのデータストリームに固有です。そのため、 ListShards API では TPS が高くなるだけでなく、データストリームが増えるにつれてアクションのスケーラビリティも向上します。

API を呼び出すすべてのプロデューサーとコンシューマーを移行して、代わりに DescribeStream API を呼び出すことをお勧めします。 DescribeStreamSummary ListShard これらのプロデューサーとコンシューマーを特定するには、KPL と KCL のユーザーエージェントが API 呼び出しでキャプチャされるため、Athena CloudTrail を使用してログを解析することをお勧めします。

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

また、DescribeStream API を呼び出す Kinesis Data Streams との AWS Lambda と Amazon Firehose の統合を再設定して、統合がこの API の代わりに DescribeStreamSummaryListShards を呼び出すようにすることもお勧めします。具体的には、AWS Lambda では、イベントソースマッピングを更新する必要があります。Amazon Firehose については、対応する IAM 許可を更新して、それらに ListShards IAM 許可を含める必要があります。

DescribeStream API-非推奨

重要

以下の情報は、現在廃止されている API 経由でデータストリームからシャードを取得する方法について説明しています。 DescribeStream 現在、ListShards API を使用してデータストリームを構成するシャードを取得することを強くお勧めしています。

describeStream メソッドによって返された応答オブジェクトを使用すると、ストリームを構成するシャードについて情報を取得できます。シャードを取得するには、このオブジェクトの getShards メソッドを呼び出します。このメソッドは、1 回の呼び出しでストリームからすべてのシャードを返すとは限りません。以下のコードでは、getHasMoreShardsgetStreamDescription メソッドを使用して、返されなかったシャードがあるかどうかを確認しています。ある場合、つまり、このメソッドが true を返した場合は、ループ内で getShards の呼び出しを繰り返して、返されたシャードの新しいバッチをシャードのリストに追加していきます。getHasMoreShardsfalse を返した場合は、ループが終了します。つまり、すべてのシャードが返されたことになります。getShards は 状態のシャードを返さないことに注意してください。EXPIREDシャードの状態 (EXPIRED 状態など) の詳細については、リシャーディング後のデータのルーティング、データの永続化、シャードの状態を参照してください。

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );