Listado de fragmentos - Amazon Kinesis Data Streams

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Listado de fragmentos

Un flujo de datos puede tener una o varias particiones. Hay dos métodos para listar (o recuperar) las particiones de un flujo de datos.

ListShards API: recomendada

El método recomendado para enumerar o recuperar los fragmentos de un flujo de datos es utilizar la ListShardsAPI. El siguiente ejemplo muestra cómo obtener una lista de las particiones de un flujo de datos. Para obtener una descripción completa de la operación principal utilizada en este ejemplo y de todos los parámetros que puede configurar para la operación, consulte. ListShards

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

Para ejecutar el ejemplo de código anterior puede utilizar un archivo POM como el siguiente.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

Con la ListShards API, puede usar el ShardFilterparámetro para filtrar la respuesta de la API. Solo puede especificar un filtro a la vez.

Si utilizas el ShardFilter parámetro al invocar la ListShards API, Type es la propiedad obligatoria y debes especificarla. Si especifica los tipos AT_TRIM_HORIZON, FROM_TRIM_HORIZON o AT_LATEST, no necesita especificar las propiedades opcionales ShardId o Timestamp.

Si especifica el tipo AFTER_SHARD_ID, también deberá proporcionar el valor de la propiedad opcional ShardId. La ShardId propiedad tiene una funcionalidad idéntica a la del ExclusiveStartShardId parámetro de la ListShards API. Cuando se especifica la propiedad ShardId, la respuesta incluye las particiones que comienzan con la partición cuyo ID sigue inmediatamente al ShardId proporcionado.

Si especifica el tipo AT_TIMESTAMP o FROM_TIMESTAMP_ID, también deberá proporcionar el valor de la propiedad opcional Timestamp. Si especifica el tipo AT_TIMESTAMP, se devolverán todas las particiones que estaban abiertas en la marca de tiempo proporcionada. Si se especifica el tipo FROM_TIMESTAMP, se devolverán todas las particiones a partir de la fecha y hora indicadas hasta TIP.

importante

Las API DescribeStreamSummary y ListShard proporcionan una forma más escalable de recuperar información sobre sus flujos de datos. Más específicamente, las cuotas de la DescribeStream API pueden provocar limitaciones. Para obtener más información, consulte Cuotas y límites. Ten en cuenta también que DescribeStream las cuotas se comparten entre todas las aplicaciones que interactúan con todos los flujos de datos de tu AWS cuenta. Las cuotas de la ListShards API, por otro lado, son específicas para un único flujo de datos. Por lo tanto, no solo se obtiene un TPS más alto con la ListShards API, sino que la acción se amplía mejor a medida que se crean más flujos de datos.

Le recomendamos que migre a todos los productores y consumidores que utilizan la DescribeStream API para que, en su lugar, invoquen la API DescribeStreamSummary y las ListShard API. Para identificar a estos productores y consumidores, recomendamos utilizar Athena para analizar los CloudTrail registros, ya que los agentes de usuario de KPL y KCL se capturan en las llamadas a la API.

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

También recomendamos volver a configurar las integraciones de AWS Lambda y Amazon Firehose con Kinesis Data Streams que invocan la API para que, en su lugar, DescribeStream las integraciones invoquen y. DescribeStreamSummary ListShards En concreto, en el caso de AWS Lambda, debe actualizar el mapeo de la fuente de eventos. En el caso de Amazon Firehose, deben actualizarse los permisos de IAM correspondientes para que incluyan el permiso de IAM de ListShards.

DescribeStream API: obsoleta

importante

La siguiente información describe una forma actualmente obsoleta de recuperar fragmentos de un flujo de datos a través de la API. DescribeStream En la actualidad, se recomienda utilizar la API ListShards para recuperar las particiones que componen el flujo de datos.

El objeto de respuesta que devuelve el método describeStream le permite recuperar información sobre los fragmentos que componen la secuencia. Para recuperar los fragmentos, llame al método getShards de este objeto. Este método podría no devolver todos los fragmentos de la secuencia en una única llamada. En el siguiente código, comprobaremos el método getHasMoreShards en getStreamDescription para ver si hay fragmentos adicionales que no se hayan devuelto. Si es así, es decir, si este método devuelve true, seguiremos insistiendo en llamar a getShards en bucle, añadiendo cada nuevo lote de fragmentos devueltos a nuestra lista de fragmentos. El bucle se cierra cuando getHasMoreShards devuelve false; es decir, cuando todos los fragmentos se han devuelto. Tenga en cuenta que getShards no devolverá fragmentos que se encuentren en el estado EXPIRED. Para obtener más información sobre los estados de los fragmentos, incluido el estado EXPIRED, consulte Direccionamiento de datos, persistencia de datos y estado de fragmentos tras los cambios en los fragmentos.

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );