Affichage des partitions - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Affichage des partitions

Un flux de données peut comporter une ou plusieurs partitions. Il existe deux méthodes pour répertorier (ou récupérer) les partitions d'un flux de données.

ListShards API - Recommandé

La méthode recommandée pour répertorier ou récupérer les fragments d'un flux de données consiste à utiliser l'ListShardsAPI. L'exemple suivant montre comment vous pouvez obtenir une liste des partitions d'un flux de données. Pour une description complète de l'opération principale utilisée dans cet exemple et de tous les paramètres que vous pouvez définir pour l'opération, consultez ListShards.

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

Pour exécuter l'exemple de code précédent, vous pouvez utiliser un fichier POM comme celui qui suit.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

Avec l'ListShardsAPI, vous pouvez utiliser le ShardFilterparamètre pour filtrer la réponse de l'API. Vous ne pouvez spécifier qu'un seul filtre à la fois.

Si vous utilisez le ShardFilter paramètre lors de l'appel de l' ListShardsAPI, Type il s'agit de la propriété requise qui doit être spécifiée. Si vous spécifiez les types AT_TRIM_HORIZON, FROM_TRIM_HORIZON ou AT_LATEST, vous n'avez pas besoin d'indiquer le ShardId ou les propriétés optionnelles Timestamp.

Si vous spécifiez le type AFTER_SHARD_ID, vous devez également indiquer la valeur de la propriété optionnelle ShardId. La fonctionnalité de la ShardId propriété est identique à celle du ExclusiveStartShardId paramètre de l' ListShards API. Lorsque la propriété ShardId est indiquée, la réponse inclut les partitions en commençant par la partition dont l'identifiant suit immédiatement le ShardId que vous avez indiqué.

Si vous spécifiez le type AT_TIMESTAMP ou FROM_TIMESTAMP_ID, vous devez également indiquer la valeur de la propriété optionnelle Timestamp. Si vous spécifiez le type AT_TIMESTAMP, toutes les partitions ouvertes à l'horodatage indiqué sont renvoyées. Si vous spécifiez le type FROM_TIMESTAMP, toutes les partitions commençant à partir de l'horodatage indiqué jusqu'à TIP sont renvoyées.

Important

Les API DescribeStreamSummary et ListShard offrent un moyen plus évolutif de récupérer des informations sur vos flux de données. Plus précisément, les quotas de l' DescribeStream API peuvent provoquer un ralentissement. Pour plus d’informations, consultez Quotas et limites. Notez également que les DescribeStream quotas sont partagés entre toutes les applications qui interagissent avec tous les flux de données de votre AWS compte. Les quotas de l' ListShards API, en revanche, sont spécifiques à un seul flux de données. Ainsi, non seulement vous obtenez un TPS plus élevé avec l' ListShards API, mais l'action s'adapte mieux à mesure que vous créez davantage de flux de données.

Nous vous recommandons de migrer tous vos producteurs et consommateurs qui appellent l' DescribeStream API pour qu'ils invoquent plutôt les API DescribeStreamSummary et les ListShard API. Pour identifier ces producteurs et consommateurs, nous recommandons d'utiliser Athena pour analyser les CloudTrail journaux, car les agents utilisateurs de KPL et KCL sont capturés dans les appels d'API.

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

Nous recommandons également de reconfigurer les intégrations AWS Lambda et Amazon Firehose avec Kinesis Data Streams qui invoquent DescribeStream l'API afin que les intégrations invoquent plutôt et. DescribeStreamSummary ListShards Plus précisément, pour AWS Lambda, vous devez mettre à jour le mappage de votre source d'événements. Pour Amazon Firehose, les autorisations IAM correspondantes doivent être mises à jour afin qu'elles incluent l'autorisation IAM ListShards.

DescribeStream API - Obsolète

Important

Les informations ci-dessous décrivent un moyen actuellement obsolète de récupérer des fragments d'un flux de données via l'API. DescribeStream Il est actuellement fortement recommandé d'utiliser l'API ListShards pour récupérer les partitions qui constituent le flux de données.

L'objet de réponse renvoyé par la méthode describeStream vous permet d'extraire des informations sur les partitions que comprend le flux. Pour extraire les partitions, appelez la méthode getShards sur cet objet. Cette méthode peut ne pas renvoyer toutes les partitions du flux en un seul appel. Dans le code suivant, nous vérifions la méthode getHasMoreShards sur getStreamDescription pour voir s'il y a des partitions supplémentaires qui n'ont pas été renvoyées. Si c'est le cas, c'est-à-dire si cette méthode renvoie true, nous continuons d'appeler getShards en boucle, en ajoutant chaque nouveau lot de partitions renvoyées à notre liste de partitions. La boucle s'arrête lorsque getHasMoreShards renvoie false, ce qui signifie que toutes les partitions ont été renvoyées. Notez que getShards ne renvoie pas les partitions qui ont l'état EXPIRED. Pour plus d'informations sur les états des partitions, notamment l'état EXPIRED, consultez Routage des données, persistance des données et état des partitions après un repartitionnement.

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );