Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Elenco degli shard
Un flusso di dati può avere una o più partizioni. Esistono due metodi per elencare (o recuperare) le partizioni da un flusso di dati.
ListShards API: consigliata
Il metodo consigliato per elencare o recuperare gli shard da un flusso di dati consiste nell'utilizzare l'API. ListShards L'esempio seguente mostra in che modo è possibile ottenere un elenco di partizioni di un flusso di dati. Per una descrizione completa dell'operazione principale utilizzata in questo esempio e di tutti i parametri che è possibile impostare per l'operazione, vedere. ListShards
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; import software.amazon.awssdk.services.kinesis.model.ListShardsRequest; import software.amazon.awssdk.services.kinesis.model.ListShardsResponse; import java.util.concurrent.TimeUnit; public class ShardSample { public static void main(String[] args) { KinesisAsyncClient client = KinesisAsyncClient.builder().build(); ListShardsRequest request = ListShardsRequest .builder().streamName("myFirstStream") .build(); try { ListShardsResponse response = client.listShards(request).get(5000, TimeUnit.MILLISECONDS); System.out.println(response.toString()); } catch (Exception e) { System.out.println(e.getMessage()); } } }
Per eseguire il codice di esempio precedente è possibile utilizzare un file POM come il seguente.
<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>kinesis.data.streams.samples</groupId> <artifactId>shards</artifactId> <version>1.0-SNAPSHOT</version> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> <dependencies> <dependency> <groupId>software.amazon.awssdk</groupId> <artifactId>kinesis</artifactId> <version>2.0.0</version> </dependency> </dependencies> </project>
Con l'ListShards
API, puoi utilizzare il ShardFilterparametro per filtrare la risposta dell'API. È possibile specificare un solo filtro alla volta.
Se si utilizza il ShardFilter
parametro quando si richiama l' ListShardsAPI, Type
è la proprietà richiesta e deve essere specificata. Se si specificano i tipi AT_TRIM_HORIZON
, FROM_TRIM_HORIZON
o AT_LATEST
, non è necessario specificare né la proprietà ShardId
né la proprietà Timestamp
facoltative.
Se si specifica il tipo AFTER_SHARD_ID
, sarà necessario fornire anche il valore per la proprietà ShardId
facoltativa. La ShardId
proprietà ha funzionalità identiche al ExclusiveStartShardId
parametro dell' ListShards API. Quando viene specificata la proprietà ShardId
, la risposta includerà le partizioni che iniziano con la partizione il cui ID segue immediatamente la ShardId
assegnata.
Se si specifica il tipo AT_TIMESTAMP
o FROM_TIMESTAMP_ID
, sarà necessario fornire anche il valore per la proprietà Timestamp
facoltativa. Se si specifica il tipo AT_TIMESTAMP
, verranno restituite tutte le partizioni aperte nel timestamp fornito. Se si specifica il tipo FROM_TIMESTAMP
, verranno restituite tutte le partizioni dal timestamp fornito al TIP.
Importante
Le API DescribeStreamSummary
e ListShard
forniscono un modo più scalabile per recuperare informazioni sui flussi di dati. Più specificamente, le quote per l' DescribeStream API possono causare limitazioni. Per ulteriori informazioni, consulta Quote e limiti. Tieni inoltre presente che le DescribeStream
quote sono condivise tra tutte le applicazioni che interagiscono con tutti i flussi di dati del tuo account. AWS Le quote per l' ListShards API, invece, sono specifiche per un singolo flusso di dati. Quindi non solo ottieni un TPS più elevato con l' ListShards API, ma l'azione si adatta meglio man mano che crei più flussi di dati.
Ti consigliamo di migrare tutti i produttori e i consumatori che chiamano l' DescribeStream API per richiamare invece le e le API. DescribeStreamSummary ListShard Per identificare questi produttori e consumatori, consigliamo di utilizzare Athena per analizzare i CloudTrail log man mano che gli user agent per KPL e KCL vengono acquisiti nelle chiamate API.
SELECT useridentity.sessioncontext.sessionissuer.username, useridentity.arn,eventname,useragent, count(*) FROM cloudtrail_logs WHERE Eventname IN ('DescribeStream') AND eventtime BETWEEN '' AND '' GROUP BY useridentity.sessioncontext.sessionissuer.username,useridentity.arn,eventname,useragent ORDER BY count(*) DESC LIMIT 100
Consigliamo inoltre di riconfigurare le integrazioni AWS Lambda e Amazon Firehose con Kinesis Data Streams che richiamano l'API in modo che le integrazioni richiamino DescribeStream
invece e. DescribeStreamSummary
ListShards
In particolare, per AWS Lambda, è necessario aggiornare la mappatura delle sorgenti degli eventi. Per Amazon Firehose, le autorizzazioni IAM corrispondenti devono essere aggiornate in modo da includere l'autorizzazione ListShards
IAM.
DescribeStream API: obsoleta
Importante
Le informazioni seguenti descrivono un modo attualmente obsoleto per recuperare gli shard da un flusso di dati tramite l'API. DescribeStream Attualmente si consiglia vivamente di utilizzare l'API ListShards
per recuperare le partizioni che compongono il flusso di dati.
L'oggetto di risposta restituito dal metodo describeStream
consente di recuperare informazioni sugli shard che costituiscono il flusso. Per recuperare gli shard, chiamare il metodo getShards
su questo oggetto. Questo metodo potrebbe non restituire tutte gli shard dal flusso con una una sola chiamata. Nel seguente codice, verifichiamo il metodo getHasMoreShards
su getStreamDescription
per vedere se ci sono shard aggiuntivi che non sono stati restituiti. In tal caso, se il metodo restituisce true
, si continua a chiamare getShards
in un loop, aggiungendo ogni nuovo batch di shard restituiti all'elenco di shard. Il ciclo di loop termina quando getHasMoreShards
restituisce false
: ossia quando tutti gli shard sono stati restituiti. Si noti che getShards
non restituisce shard che si trovano nello stato EXPIRED
. Per ulteriori informazioni sugli stati degli shard, tra cui lo stato EXPIRED
, vedere Routing dei dati, persistenza dei dati e stato dello shard dopo il resharding.
DescribeStreamRequest describeStreamRequest = new DescribeStreamRequest(); describeStreamRequest.setStreamName( myStreamName ); List<Shard> shards = new ArrayList<>(); String exclusiveStartShardId = null; do { describeStreamRequest.setExclusiveStartShardId( exclusiveStartShardId ); DescribeStreamResult describeStreamResult = client.describeStream( describeStreamRequest ); shards.addAll( describeStreamResult.getStreamDescription().getShards() ); if (describeStreamResult.getStreamDescription().getHasMoreShards() && shards.size() > 0) { exclusiveStartShardId = shards.get(shards.size() - 1).getShardId(); } else { exclusiveStartShardId = null; } } while ( exclusiveStartShardId != null );