Elenco degli shard - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Elenco degli shard

Un flusso di dati può avere una o più partizioni. Esistono due metodi per elencare (o recuperare) le partizioni da un flusso di dati.

ListShards API: consigliata

Il metodo consigliato per elencare o recuperare gli shard da un flusso di dati consiste nell'utilizzare l'API. ListShards L'esempio seguente mostra in che modo è possibile ottenere un elenco di partizioni di un flusso di dati. Per una descrizione completa dell'operazione principale utilizzata in questo esempio e di tutti i parametri che è possibile impostare per l'operazione, vedere. ListShards

import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }

Per eseguire il codice di esempio precedente è possibile utilizzare un file POM come il seguente.

<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>

Con l'ListShardsAPI, puoi utilizzare il ShardFilterparametro per filtrare la risposta dell'API. È possibile specificare un solo filtro alla volta.

Se si utilizza il ShardFilter parametro quando si richiama l' ListShardsAPI, Type è la proprietà richiesta e deve essere specificata. Se si specificano i tipi AT_TRIM_HORIZON, FROM_TRIM_HORIZON o AT_LATEST, non è necessario specificare né la proprietà ShardId né la proprietà Timestamp facoltative.

Se si specifica il tipo AFTER_SHARD_ID, sarà necessario fornire anche il valore per la proprietà ShardId facoltativa. La ShardId proprietà ha funzionalità identiche al ExclusiveStartShardId parametro dell' ListShards API. Quando viene specificata la proprietà ShardId, la risposta includerà le partizioni che iniziano con la partizione il cui ID segue immediatamente la ShardId assegnata.

Se si specifica il tipo AT_TIMESTAMP o FROM_TIMESTAMP_ID, sarà necessario fornire anche il valore per la proprietà Timestamp facoltativa. Se si specifica il tipo AT_TIMESTAMP, verranno restituite tutte le partizioni aperte nel timestamp fornito. Se si specifica il tipo FROM_TIMESTAMP, verranno restituite tutte le partizioni dal timestamp fornito al TIP.

Importante

Le API DescribeStreamSummary e ListShard forniscono un modo più scalabile per recuperare informazioni sui flussi di dati. Più specificamente, le quote per l' DescribeStream API possono causare limitazioni. Per ulteriori informazioni, consulta Quote e limiti. Tieni inoltre presente che le DescribeStream quote sono condivise tra tutte le applicazioni che interagiscono con tutti i flussi di dati del tuo account. AWS Le quote per l' ListShards API, invece, sono specifiche per un singolo flusso di dati. Quindi non solo ottieni un TPS più elevato con l' ListShards API, ma l'azione si adatta meglio man mano che crei più flussi di dati.

Ti consigliamo di migrare tutti i produttori e i consumatori che chiamano l' DescribeStream API per richiamare invece le e le API. DescribeStreamSummary ListShard Per identificare questi produttori e consumatori, consigliamo di utilizzare Athena per analizzare i CloudTrail log man mano che gli user agent per KPL e KCL vengono acquisiti nelle chiamate API.

SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100

Consigliamo inoltre di riconfigurare le integrazioni AWS Lambda e Amazon Firehose con Kinesis Data Streams che richiamano l'API in modo che le integrazioni richiamino DescribeStream invece e. DescribeStreamSummary ListShards In particolare, per AWS Lambda, è necessario aggiornare la mappatura delle sorgenti degli eventi. Per Amazon Firehose, le autorizzazioni IAM corrispondenti devono essere aggiornate in modo da includere l'autorizzazione ListShards IAM.

DescribeStream API: obsoleta

Importante

Le informazioni seguenti descrivono un modo attualmente obsoleto per recuperare gli shard da un flusso di dati tramite l'API. DescribeStream Attualmente si consiglia vivamente di utilizzare l'API ListShards per recuperare le partizioni che compongono il flusso di dati.

L'oggetto di risposta restituito dal metodo describeStream consente di recuperare informazioni sugli shard che costituiscono il flusso. Per recuperare gli shard, chiamare il metodo getShards su questo oggetto. Questo metodo potrebbe non restituire tutte gli shard dal flusso con una una sola chiamata. Nel seguente codice, verifichiamo il metodo getHasMoreShards su getStreamDescription per vedere se ci sono shard aggiuntivi che non sono stati restituiti. In tal caso, se il metodo restituisce true, si continua a chiamare getShards in un loop, aggiungendo ogni nuovo batch di shard restituiti all'elenco di shard. Il ciclo di loop termina quando getHasMoreShards restituisce false: ossia quando tutti gli shard sono stati restituiti. Si noti che getShards non restituisce shard che si trovano nello stato EXPIRED. Per ulteriori informazioni sugli stati degli shard, tra cui lo stato EXPIRED, vedere Routing dei dati, persistenza dei dati e stato dello shard dopo il resharding.

DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );