Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.
Informazioni su KCL 1.x e 2.x
Nota
Le versioni 1.x e 2.x di Kinesis Client Library (KCL) sono obsolete. Consigliamo di effettuare la migrazione alla versione 3.x di KCL, che offre prestazioni migliorate e nuove funzionalità. Per la documentazione e la guida alla migrazione più recenti di KCL, consulta. Usa la libreria client Kinesis
Uno dei metodi per sviluppare applicazioni consumer personalizzate in grado di elaborare i dati dai flussi di dati KDS consiste nell'utilizzare la Kinesis Client Library (KCL).
Nota
Sia per KCL 1.x che per KCL 2.x, si consiglia di eseguire l'aggiornamento alla versione più recente di KCL 1.x o KCL 2.x, a seconda dello scenario di utilizzo. Sia KCL 1.x che KCL 2.x vengono regolarmente aggiornate con versioni più recenti che includono le ultime patch di dipendenza e sicurezza, correzioni di bug e nuove funzionalità retrocompatibili. Per ulteriori informazioni, consulta https://github.com/awslabs/amazon-kinesis-client/releases.
Informazioni su KCL (versioni precedenti)
KCL ti aiuta a consumare ed elaborare i dati da un flusso di dati Kinesis occupandoti di molte delle attività complesse associate al calcolo distribuito. Queste includono il bilanciamento del carico su più istanze di applicazioni consumer, la risposta agli errori delle istanze delle applicazioni consumer, il checkpoint dei record elaborati e la reazione al ripartizionamento. La KCL si occupa di tutte queste attività secondarie in modo che tu possa concentrare i tuoi sforzi sulla scrittura della tua logica di elaborazione dei record personalizzata.
Il KCL è diverso dai Kinesis Data APIs Streams disponibili in. AWS SDKs Kinesis APIs Data Streams ti aiuta a gestire molti aspetti di Kinesis Data Streams, tra cui la creazione di stream, il resharding e l'inserimento e l'acquisizione di record. La KCL fornisce un livello di astrazione su tutte queste sottoattività, in particolare per consentirti di concentrarti sulla logica di elaborazione dei dati personalizzata dell'applicazione consumer. Per ulteriori informazioni sulle API del flusso di dati Kinesis, consulta la Documentazione di riferimento delle API di Amazon Kinesis.
Importante
La KCL è una libreria Java. Il supporto per linguaggi diversi da Java viene fornito utilizzando un'interfaccia multilingue chiamata. MultiLangDaemon Questo daemon è basato su Java e viene eseguito in background quando si utilizza un linguaggio KCL diverso da Java. Ad esempio, se installi KCL per Python e scrivi la tua applicazione consumer interamente in Python, avrai comunque bisogno che Java sia installato sul tuo sistema a causa di. MultiLangDaemon Inoltre, MultiLangDaemon ha alcune impostazioni predefinite che potresti dover personalizzare in base al tuo caso d'uso, ad esempio la AWS regione a cui si connette. Per ulteriori informazioni su MultiLangDaemon on GitHub, vedere il MultiLangDaemon progetto KCL
La KCL funge da intermediario tra la tua logica di elaborazione di record e il flusso di dati Kinesis.
Versioni precedenti di KCL
Al momento per creare applicazioni consumer personalizzate puoi utilizzare una delle seguenti versioni supportate di KCL:
-
KCL 1.x
Per ulteriori informazioni, consulta Sviluppa i consumatori di KCL 1.x
-
KCL 2.x
Per ulteriori informazioni, consulta Sviluppa KCL 2.x Consumers
È possibile utilizzare KCL 1.x o KCL 2.x per creare applicazioni consumer che utilizzano una velocità di trasmissione effettiva condivisa. Per ulteriori informazioni, consulta Sviluppa consumatori personalizzati con un throughput condiviso utilizzando KCL.
Per creare applicazioni consumer che utilizzano una velocità di trasmissione effettiva dedicata (utenti fan-out avanzati), è possibile utilizzare solo KCL 2.x. Per ulteriori informazioni, consulta Sviluppa consumatori con fan-out migliorati con un throughput dedicato.
Per informazioni sulle differenze tra KCL 1.x e KCL 2.x e le istruzioni su come passare da KCL 1.x a KCL 2.x, consulta Migra i consumatori da KCL 1.x a KCL 2.x.
Concetti KCL (versioni precedenti)
-
Applicazione consumer KCL: un'applicazione personalizzata che utilizza KCL e progettata per leggere ed elaborare i record dai flussi di dati.
-
Istanza di applicazioni consumer: le applicazioni consumer KCL sono generalmente distribuite, con una o più istanze applicative eseguite contemporaneamente per coordinarsi in caso di guasti e bilanciare dinamicamente l'elaborazione dei record di dati.
-
Worker: una classe di livello superiore utilizzata da un'istanza di applicazione consumer KCL per iniziare l'elaborazione dei dati.
Importante
Ogni istanza dell'applicazione consumer KCL ha un worker.
Il worker inizializza e supervisiona varie attività, tra cui la sincronizzazione delle informazioni sulle partizioni e sui lease, il monitoraggio delle assegnazioni delle partizioni e l'elaborazione dei dati dalle partizioni. Un worker fornisce a KCL le informazioni di configurazione per l'applicazione consumer, ad esempio il nome del flusso di dati i cui record di dati verranno elaborati dall'applicazione consumer KCL e le AWS credenziali necessarie per accedere a questo flusso di dati. Il worker avvia inoltre quella specifica istanza dell'applicazione consumer KCL per fornire i record di dati dal flusso di dati ai processori di record.
Importante
In KCL 1.x questa classe si chiama Worker. Per ulteriori informazioni, (questi sono i repository Java KCL), vedere/.java. https://github.com/awslabs/ amazon-kinesis-client blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/Worker
In KCL 2.x questa classe si chiama Scheduler. Lo scopo di Scheduler in KCL 2.x è identico allo scopo di Worker in KCL 1.x. Per ulteriori informazioni sulla classe Scheduler in KCL 2.x, vedete/.java. https://github.com/awslabs/ amazon-kinesis-client blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/coordinator/Scheduler -
Lease: dati che definiscono l'associazione tra un worker e una partizione. Le applicazioni consumer KCL distribuite utilizzano i lease per suddividere l'elaborazione dei record di dati tra un parco istanze di worker. In qualsiasi momento, ogni partizione di record di dati è legato a un determinato worker tramite un lease identificato dalla variabile leaseKey.
Per impostazione predefinita, un lavoratore può detenere uno o più contratti di locazione (in base al valore della variabile maxLeasesForWorker) contemporaneamente.
Importante
Ogni worker si impegna a detenere tutti i lease disponibili per tutte le partizioni disponibili in un flusso di dati. Ma solo un worker alla volta si aggiudicherà con successo ogni lease.
Ad esempio, se si dispone di un'istanza di applicazione consumer A con worker A che elabora un flusso di dati con 4 partizioni, il worker A può detenere i lease per le partizioni 1, 2, 3 e 4 contemporaneamente. Tuttavia, se si dispone di due istanze di applicazioni consumer, A e B, con worker A e worker B, e queste istanze elaborano un flusso di dati con 4 partizioni, il worker A e il worker B non possono entrambi detenere il lease per la partizione 1 contemporaneamente. Un worker detiene il lease di una particolare partizione finché non è pronto a interrompere l'elaborazione dei record di dati della partizione o fino a quando non si verifica un guasto. Quando un worker smette di detenere il lease, un altro worker lo riprende e lo mantiene.
Per ulteriori informazioni (questi sono i repository Java KCL), vedere https://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/Lease.java per KCL 1.x e https://github.com/awslabs/amazon-kinesis-client/.java
per KCL 2.x. blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/leases/Lease -
Tabella di lease: una tabella univoca di Amazon DynamoDB utilizzata per tenere traccia delle partizioni in un flusso di dati KDS sottoposte a lease ed elaborate dai worker dell'applicazione consumer KCL. La tabella di lease deve rimanere sincronizzata (all'interno di un worker e tra tutti i worker) con le ultime informazioni sulle partizioni provenienti dal flusso di dati mentre l'applicazione consumer KCL è in esecuzione. Per ulteriori informazioni, consulta Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL.
-
Processore di record: la logica che definisce il modo in cui l'applicazione consumer KCL elabora i dati che riceve dai flussi di dati. Durante il runtime, un'istanza dell'applicazione consumer KCL istanzia un worker e questo worker istanzia un processore di record per ogni partizione su cui detiene un lease.
Utilizza una tabella di leasing per tenere traccia degli shard elaborati dall'applicazione consumer KCL
Argomenti
Cos'è una tabella di leasing
Tabella di lease: per ogni applicazione Flusso di dati Amazon Kinesis, la KCL utilizza una tabella di lease univoca (archiviata in una tabella di Amazon DynamoDB) per tenere traccia delle partizioni in un flusso di dati KDS sottoposte a lease ed elaborate dai worker dell'applicazione consumer KCL.
Importante
La KCL utilizza il nome dell'applicazione consumer per creare il nome della tabella di lease utilizzata da questa applicazione consumer, pertanto il nome di ogni applicazione consumer deve essere univoco.
È possibile visualizzare la tabella di lease utilizzando la console Amazon DynamoDB mentre l'applicazione è in esecuzione.
Se la tabella di lease per l'applicazione consumer KCL non esiste all'avvio dell'applicazione, uno dei worker la crea per l'applicazione.
Importante
Il tuo account sarà addebitato per i costi associati alla tabella DynamoDB, oltre ai costi associati al flusso di dati Kinesis stesso.
Ogni riga della tabella di lease rappresenta una partizione che viene elaborata dalla tua applicazione consumer. Quando l'applicazione consumer KCL esistente è configurata per elaborare un solo flusso di dati, allora leaseKey
(che è la chiave hash per la tabella di lease) è l'ID della partizione. Se sei Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java, allora la struttura di leaseKey avrà il seguente aspetto: account-id:StreamName:streamCreationTimestamp:ShardId
. Ad esempio 111111111:multiStreamTest-1:12345:shardId-000000000336
.
Oltre all'ID dello shard, ogni riga include anche i seguenti dati:
-
checkpoint: il numero di sequenza di checkpoint più recente per lo shard. Questo valore è univoco per tutte le partizioni nel flusso di dati.
-
checkpointSubSequenceNumero: quando si utilizza la funzione di aggregazione della Kinesis Producer Library, si tratta di un'estensione del checkpoint che tiene traccia dei record dei singoli utenti all'interno del record Kinesis.
-
leaseCounter: utilizzato per la funzione Versioni multiple del lease, in modo tale da permettere ai lavoratori di rilevare che il loro lease è stato preso da un altro lavoratore.
-
leaseKey: un identificatore univoco per un lease. Ogni lease è specifico di una partizione nel flusso di dati ed è detenuto da un worker alla volta.
-
leaseOwner: il lavoratore che detiene questo lease.
-
ownerSwitchesSinceCheckpoint: quante volte questo contratto di locazione ha cambiato lavoratori dall'ultima volta che è stato scritto un checkpoint.
-
parentShardId: Utilizzato per garantire che lo shard principale sia completamente elaborato prima che inizi l'elaborazione sui frammenti secondari. In questo modo, ci si assicura che i record siano elaborati nello stesso ordine in cui sono stati introdotti nel flusso.
-
hashrange: utilizzato dalla
PeriodicShardSyncManager
per eseguire sincronizzazioni periodiche per trovare le partizioni mancanti nella tabella di lease e creare lease per esse, se necessario.Nota
Questi dati sono presenti nella tabella di lease per ogni partizione a partire da KCL 1.14 e KCL 2.3. Per ulteriori informazioni su
PeriodicShardSyncManager
e sulla sincronizzazione periodica tra lease e partizioni, consulta Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis. -
childshards: utilizzato da
LeaseCleanupManager
per esaminare lo stato di elaborazione della partizione secondaria e decidere se la partizione principale può essere eliminata dalla tabella di lease.Nota
Questi dati sono presenti nella tabella di lease per ogni partizione a partire da KCL 1.14 e KCL 2.3.
-
shardID: l'ID della partizione.
Nota
Questi dati sono presenti nella tabella dei lease solo se sei Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java. È supportato solo in KCL 2.x per Java, a partire da KCL 2.3 per Java e versioni successive.
-
nome del flusso: l'identificatore del flusso di dati nel seguente formato:
account-id:StreamName:streamCreationTimestamp
.Nota
Questi dati sono presenti nella tabella dei lease solo se sei Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java. È supportato solo in KCL 2.x per Java, a partire da KCL 2.3 per Java e versioni successive.
Prestazioni
Se l'applicazione Flusso di dati Amazon Kinesis riceve eccezioni di velocità di trasmissione effettiva assegnata, dovrai aumentare la velocità di trasmissione effettiva assegnata per la tabella DynamoDB. La KCL crea la tabella con una velocità di trasmissione effettiva assegnata di 10 letture al secondo e 10 scritture al secondo, ma questo potrebbe non essere sufficiente per l'applicazione. Ad esempio, se la tua applicazione Flusso di dati Amazon Kinesis crea frequentemente dei checkpoint o opera in un flusso che è composto da molte partizioni, potrebbe essere necessaria una velocità di trasmissione effettiva maggiore.
Per informazioni sulla velocità di trasmissione effettiva assegnata in DynamoDB, consulta Modalità capacità di lettura/scrittura e Utilizzo di tabelle e dati nella Guida per gli sviluppatori di Amazon DynamoDB.
Come viene sincronizzata una tabella di leasing con gli shard in un flusso di dati Kinesis
I worker delle applicazioni consumer KCL utilizzano i lease per elaborare le partizioni di un determinato flusso di dati. Le informazioni su quale worker sta eseguendo il lease di una partizione in un dato momento vengono archiviate in una tabella di lease. La tabella di lease deve rimanere sincronizzata con le ultime informazioni sulle partizioni provenienti dal flusso di dati mentre l'applicazione consumer KCL è in esecuzione. La KCL sincronizza la tabella di lease con le informazioni sulle partizioni acquisite dal servizio del flusso di dati Kinesis durante l'avvio dell'applicazione consumer (quando l'applicazione consumer viene inizializzata o riavviata) e anche ogni volta che una partizione in fase di elaborazione raggiunge il suo termine (ripartizionamento). In altre parole, i worker o un'applicazione consumer KCL vengono sincronizzati con il flusso di dati che stanno elaborando durante l'avvio iniziale dell'applicazione consumer e ogni volta che l'applicazione consumer incontra un evento di ripartizionamento del flusso di dati.
Argomenti
Sincronizzazione in KCL 1.0 - 1.13 e KCL 2.0 - 2.2
In KCL 1.0 - 1.13 e KCL 2.0 - 2.2, durante l'avvio dell'applicazione consumer e anche durante ogni evento di reshard del flusso di dati, KCL sincronizza la tabella di lease con le informazioni sugli shard acquisite dal servizio Kinesis Data Streams richiamando l'or the discovery. ListShards
DescribeStream
APIs In tutte le versioni KCL sopra elencate, ogni worker di un'applicazione consumer KCL completa i seguenti passaggi per eseguire il processo di sincronizzazione lease/partizione durante l'avvio dell'applicazione consumer e in occasione di ogni evento di ripartizionamento del flusso:
-
Recupera tutte le partizioni per i dati elaborati dal flusso
-
Recupera tutte i lease delle partizioni dalla tabella di lease
-
Filtra ogni partizione aperta che non ha un lease nella tabella di lease
-
Itera su tutte le partizioni aperte trovate e per ogni partizione aperta senza un elemento principale aperto:
-
Attraversa l'albero gerarchico lungo il percorso dei suoi antenati per determinare se la partizione è un discendente. Una partizione è considerata un discendente se una partizione antenata è in fase di elaborazione (la voce di lease relativa alla partizione antenata esiste nella tabella di lease) o se è necessario elaborare una partizione antenata (ad esempio, se la posizione iniziale è
TRIM_HORIZON
oAT_TIMESTAMP
) -
Se la partizione aperta nel contesto è un discendente, la KCL controlla la partizione in base alla posizione iniziale e crea dei lease per i suoi elementi principali, se necessario
-
Sincronizzazione in KCL 2.x, a partire da KCL 2.3 e versioni successive
A partire dalle ultime versioni supportate di KCL 2.x (KCL 2.3) e successive, la libreria ora supporta le seguenti modifiche al processo di sincronizzazione. Queste modifiche alla sincronizzazione lease/partizione riducono significativamente il numero di chiamate API effettuate dalle applicazioni consumer KCL al servizio del flusso di dati Kinesis e ottimizzano la gestione dei lease nell'applicazione consumer KCL.
-
Durante l'avvio dell'applicazione, se la tabella di lease è vuota, la KCL utilizza l'opzione di filtro dell'API
ListShard
(il parametro di richiestaShardFilter
facoltativo) per recuperare e creare lease solo per uno snapshot di partizioni aperte nel momento specificato dal parametroShardFilter
. Il parametroShardFilter
consente di filtrare la risposta dell'APIListShards
. L'unica proprietà richiesta del parametroShardFilter
èType
. KCL utilizza la proprietà di filtroType
e i seguenti valori validi per identificare e restituire uno snapshot delle partizioni aperte che potrebbero richiedere nuovi lease:-
AT_TRIM_HORIZON
: la risposta include tutte le partizioni che erano aperte inTRIM_HORIZON
. -
AT_LATEST
: la risposta include solo le partizioni del flusso di dati correntemente aperte. -
AT_TIMESTAMP
: la risposta include tutte le partizioni il cui timestamp di inizio è inferiore o uguale al timestamp specificato e il timestamp di fine è maggiore o uguale al timestamp specificato o sono ancora aperte.
ShardFilter
viene utilizzato durante la creazione di lease per una tabella di lease vuota per inizializzare i lease per uno snapshot delle partizioni specificate inRetrievalConfig#initialPositionInStreamExtended
.Per ulteriori informazioni su
ShardFilter
, consulta https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Invece che tutti i lavoratori eseguano la sincronizzazione. lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard
-
KCL 2.3 utilizza il parametro
ChildShards
return diGetRecords
and theSubscribeToShard
APIs per eseguire la sincronizzazione lease/shard che avviene per gli shard chiusi, consentendo a un worker KCL di creare leasing soloSHARD_END
per i frammenti secondari dello shard che ha terminato l'elaborazione. Per le applicazioni consumer con velocità di trasmissione effettiva condivisa, questa ottimizzazione della sincronizzazione lease/partizione utilizza il parametroChildShards
dell'APIGetRecords
. Per le applicazioni consumer (fan-out migliorato) con velocità di trasmissione effettiva condivisa, questa ottimizzazione della sincronizzazione lease/partizione utilizza il parametroChildShards
dell'APISubscribeToShard
. Per ulteriori informazioni, consulta GetRecords, SubscribeToShards e ChildShard. -
Con le modifiche di cui sopra, il comportamento della KCL sta passando dal modello in cui tutti i worker apprendono tutte le partizioni esistenti a un modello in cui i worker apprendono solo le partizioni secondarie delle partizioni di proprietà di ogni worker. Pertanto, oltre alla sincronizzazione che avviene durante gli eventi di avvio e ripartizionamento delle applicazioni consumer, la KCL ora esegue anche ulteriori scansioni periodiche di partizione/lease per identificare eventuali potenziali buchi nella tabella di lease (in altre parole, per conoscere tutte le nuove partizioni) per garantire l'elaborazione dell'intervallo hash completo del flusso di dati e creare i lease, se necessario.
PeriodicShardSyncManager
è il componente responsabile dell'esecuzione di scansioni periodiche di lease/partizione.In KCL 2.3, sono disponibili nuove opzioni di configurazione per configurare
PeriodicShardSyncManager
inLeaseManagementConfig
:Nome Valore predefinito Descrizione leasesRecoveryAuditorExecutionFrequencyMillis 120.000 (2 minuti)
La frequenza (in millisecondi) del lavoro del revisore per la ricerca di lease parziali nella tabella di lease. Se il revisore rileva un buco nei lease relativi a uno stream, attiva la sincronizzazione delle partizioni in base a
leasesRecoveryAuditorInconsistencyConfidenceThreshold
.leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Soglia di confidenza per il lavoro periodico del revisore volto a determinare se i lease per un flusso di dati nella tabella di lease non sono coerenti. Se il revisore rileva lo stesso insieme di incongruenze consecutivamente per un flusso di dati per questo numero di volte, attiva una sincronizzazione delle partizioni.
Ora vengono inoltre emesse nuove CloudWatch metriche per monitorare lo stato di.
PeriodicShardSyncManager
Per ulteriori informazioni, consulta PeriodicShardSyncManager. -
Inclusa un'ottimizzazione a
HierarchicalShardSyncer
per creare lease solo per un livello di partizioni.
Sincronizzazione in KCL 1.x, a partire da KCL 1.14 e versioni successive
A partire dalle ultime versioni supportate di KCL 1.x (KCL 1.14) e successive, la libreria ora supporta le seguenti modifiche al processo di sincronizzazione. Queste modifiche alla sincronizzazione lease/partizione riducono significativamente il numero di chiamate API effettuate dalle applicazioni consumer KCL al servizio del flusso di dati Kinesis e ottimizzano la gestione dei lease nell'applicazione consumer KCL.
-
Durante l'avvio dell'applicazione, se la tabella di lease è vuota, la KCL utilizza l'opzione di filtro dell'API
ListShard
(il parametro di richiestaShardFilter
facoltativo) per recuperare e creare lease solo per uno snapshot di partizioni aperte nel momento specificato dal parametroShardFilter
. Il parametroShardFilter
consente di filtrare la risposta dell'APIListShards
. L'unica proprietà richiesta del parametroShardFilter
èType
. KCL utilizza la proprietà di filtroType
e i seguenti valori validi per identificare e restituire uno snapshot delle partizioni aperte che potrebbero richiedere nuovi lease:-
AT_TRIM_HORIZON
: la risposta include tutte le partizioni che erano aperte inTRIM_HORIZON
. -
AT_LATEST
: la risposta include solo le partizioni del flusso di dati correntemente aperte. -
AT_TIMESTAMP
: la risposta include tutte le partizioni il cui timestamp di inizio è inferiore o uguale al timestamp specificato e il timestamp di fine è maggiore o uguale al timestamp specificato o sono ancora aperte.
ShardFilter
viene utilizzato durante la creazione di lease per una tabella di lease vuota per inizializzare i lease per uno snapshot delle partizioni specificate inKinesisClientLibConfiguration#initialPositionInStreamExtended
.Per ulteriori informazioni su
ShardFilter
, consulta https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html. -
-
Invece che tutti i lavoratori eseguano la sincronizzazione. lease/shard synchronization to keep the lease table up to date with the latest shards in the data stream, a single elected worker leader performs the lease/shard
-
KCL 1.14 utilizza il parametro
ChildShards
return diGetRecords
and theSubscribeToShard
APIs per eseguire la sincronizzazione lease/shard che avviene per gli shard chiusi, consentendo a un worker KCL di creare leasing soloSHARD_END
per i frammenti secondari dello shard che ha terminato l'elaborazione. Per ulteriori informazioni, consulta GetRecords e ChildShard. -
Con le modifiche di cui sopra, il comportamento della KCL sta passando dal modello in cui tutti i worker apprendono tutte le partizioni esistenti a un modello in cui i worker apprendono solo le partizioni secondarie delle partizioni di proprietà di ogni worker. Pertanto, oltre alla sincronizzazione che avviene durante gli eventi di avvio e ripartizionamento delle applicazioni consumer, la KCL ora esegue anche ulteriori scansioni periodiche di partizione/lease per identificare eventuali potenziali buchi nella tabella di lease (in altre parole, per conoscere tutte le nuove partizioni) per garantire l'elaborazione dell'intervallo hash completo del flusso di dati e creare i lease, se necessario.
PeriodicShardSyncManager
è il componente responsabile dell'esecuzione di scansioni periodiche di lease/partizione.Quando
KinesisClientLibConfiguration#shardSyncStrategyType
è impostato suShardSyncStrategyType.SHARD_END
,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold
viene utilizzato per determinare la soglia per il numero di scansioni consecutive contenenti buchi nella tabella di lease, dopodiché imporre la sincronizzazione delle partizioni. QuandoKinesisClientLibConfiguration#shardSyncStrategyType
è impostato suShardSyncStrategyType.PERIODIC
,leasesRecoveryAuditorInconsistencyConfidenceThreshold
viene ignorato.In KCL 1.14, è disponibile una nuova opzione di configurazione per configurare
PeriodicShardSyncManager
inLeaseManagementConfig
:Nome Valore predefinito Descrizione leasesRecoveryAuditorInconsistencyConfidenceThreshold 3
Soglia di confidenza per il lavoro periodico del revisore volto a determinare se i lease per un flusso di dati nella tabella di lease non sono coerenti. Se il revisore rileva lo stesso insieme di incongruenze consecutivamente per un flusso di dati per questo numero di volte, attiva una sincronizzazione delle partizioni.
Ora vengono inoltre emesse nuove CloudWatch metriche per monitorare lo stato di.
PeriodicShardSyncManager
Per ulteriori informazioni, consulta PeriodicShardSyncManager. -
KCL 1.14 ora supporta anche la pulizia differita dei lease. I lease vengono eliminati in modo asincrono da
LeaseCleanupManager
quando viene raggiuntoSHARD_END
o quando una partizione è scaduta, superando il periodo di conservazione del flusso di dati o è stata chiusa a seguito di un'operazione di ripartizionamento.Sono disponibili nuove opzioni di configurazione per configurare
LeaseCleanupManager
.Nome Valore predefinito Descrizione leaseCleanupIntervalMillis 1 minuto
Intervallo in cui eseguire il thread di pulizia dei lease.
completedLeaseCleanupIntervalMillis 5 minuti Intervallo in cui verificare se un lease è stato completato o meno.
garbageLeaseCleanupIntervalMillis 30 minuti Intervallo durante il quale verificare se un lease è inutile (ossia se è stato interrotto oltre il periodo di conservazione del flusso di dati) o meno.
-
Inclusa un'ottimizzazione a
KinesisShardSyncer
per creare lease solo per un livello di partizioni.
Elabora più flussi di dati con la stessa applicazione consumer KCL 2.x per Java
Questa sezione descrive le seguenti modifiche in KCL 2.x per Java che consentono di creare applicazioni consumer KCL in grado di elaborare più di un flusso di dati contemporaneamente.
Importante
L'elaborazione multistream è supportata solo in KCL 2.x per Java, a partire da KCL 2.3 per Java e versioni successive.
L'elaborazione multistream NON è supportata per altri linguaggi in cui è possibile implementare KCL 2.x.
L'elaborazione multistream NON è supportata in nessuna versione di KCL 1.x.
-
MultistreamTracker interfaccia
Per creare un'applicazione consumer in grado di elaborare più flussi contemporaneamente, è necessario implementare una nuova interfaccia denominata MultistreamTracker
. Questa interfaccia include il metodo streamConfigList
che restituisce l'elenco dei flussi di dati e le relative configurazioni che devono essere elaborati dall'applicazione consumer KCL. Si noti che i flussi di dati in fase di elaborazione possono essere modificati durante il runtime dell'applicazione consumer.streamConfigList
viene chiamato periodicamente dalla KCL per conoscere le modifiche nei flussi di dati da elaborare.Il
streamConfigList
metodo compila l'elenco. StreamConfigpackage software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }
Nota che
StreamIdentifier
eInitialPositionInStreamExtended
sono obbligatori, mentreconsumerArn
è facoltativo. È necessario fornireconsumerArn
solo se si utilizza KCL 2.x per implementare un'applicazione consumer fan-out migliorata.Per ulteriori informazioni su
StreamIdentifier
, vedere https://github.com/awslabs/amazon-kinesis-client/blob/v2.5.8/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L129.Per creare un StreamIdentifier
, ti consigliamo di creare un'istanza multistream dastreamArn
and the disponibile nella versionestreamCreationEpoch
2.5.0 e successive. In KCL v2.3 e v2.4, che non supportanostreamArm
, crea un'istanza multistream utilizzando il formato.account-id:StreamName:streamCreationTimestamp
Questo formato sarà obsoleto e non sarà più supportato a partire dalla prossima versione principale.MultistreamTracker
include anche una strategia per eliminare i lease di vecchi flussi nella tabella dei lease (formerStreamsLeasesDeletionStrategy
). Si noti che la strategia NON PUÒ essere modificata durante il runtime dell'applicazione consumer. Per ulteriori informazioni, consulta https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ .java amazon-kinesis-client src/main/java/software/amazon/kinesis/processor/FormerStreamsLeasesDeletionStrategy -
ConfigsBuilder
è una classe a livello di applicazione che puoi utilizzare per specificare tutte le impostazioni di configurazione di KCL 2.x da utilizzare durante la creazione dell'applicazione consumer KCL. ConfigsBuilder
la classe ora supporta l'interfaccia.MultistreamTracker
Puoi inizializzarli ConfigsBuilder entrambi con il nome dell'unico flusso di dati da cui consumare i record da:/** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
Oppure puoi inizializzare ConfigsBuilder con
MultiStreamTracker
se desideri implementare un'applicazione consumer KCL che elabora più flussi contemporaneamente.* Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
-
Con il supporto multistream implementato per la tua applicazione consumer KCL, ogni riga della tabella di lease dell'applicazione ora contiene l'ID della partizione e il nome del flusso dei molteplici flussi di dati elaborati da questa applicazione.
-
Quando viene implementato il supporto multistream per la tua applicazione consumer KCL, leaseKey assume la seguente struttura:
account-id:StreamName:streamCreationTimestamp:ShardId
. Ad esempio111111111:multiStreamTest-1:12345:shardId-000000000336
.Importante
Quando l'applicazione consumer KCL esistente è configurata per elaborare un solo flusso di dati, leaseKey (che è la chiave hash per la tabella di lease) è l'ID della partizione. Se riconfiguri questa applicazione consumer KCL esistente per elaborare più flussi di dati, la tabella di lease viene interrotta perché con il supporto multistream, la struttura leaseKey deve essere
account-id:StreamName:StreamCreationTimestamp:ShardId
.
Usa KCL con lo Schema Registry AWS Glue
Puoi integrare i tuoi flussi di dati Kinesis con lo Schema Registry. AWS Glue Lo AWS Glue Schema Registry ti consente di scoprire, controllare ed evolvere centralmente gli schemi, garantendo al contempo che i dati prodotti siano convalidati continuamente da uno schema registrato. Uno schema definisce la struttura e il formato di un registro di dati. Uno schema è una specifica con versioni per la pubblicazione, il consumo o l'archiviazione dei dati in modo affidabile. Lo AWS Glue Schema Registry consente di migliorare la qualità e la governance end-to-end dei dati all'interno delle applicazioni di streaming. Per ulteriori informazioni, consulta Registro degli schemi di AWS Glue. Uno dei modi per configurare questa integrazione è tramite la KCL in Java.
Importante
Attualmente, l'integrazione tra Kinesis Data AWS Glue Streams e Schema Registry è supportata solo per i flussi di dati Kinesis che utilizzano i consumatori KCL 2.3 implementati in Java. Il supporto multilingue non viene fornito. I consumer KCL 1.0 non sono supportati. I consumer KCL 2.x precedenti a KCL 2.3 non sono supportati.
Per istruzioni dettagliate su come configurare l'integrazione di Kinesis Data Streams con Schema Registry utilizzando KCL, consulta la sezione «Interazione con i dati utilizzando le librerie KPL/KCL» in Caso d'uso: integrazione di Amazon Kinesis Data Streams con il registro dello schema Glue. AWS