Sviluppo di app Consumer Kinesis Client Library in Java - Flusso di dati Amazon Kinesis

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Sviluppo di app Consumer Kinesis Client Library in Java

È possibile utilizzare la Kinesis Client Library (KCL) per creare applicazioni che elaborano dati dai tuoi flussi di dati Kinesis. La Kinesis Client Library è disponibile in più linguaggi. In questo argomento viene discusso Java. Per visualizzare il riferimento a Javadoc, consultate l'argomento AWS Javadoc per Class. AmazonKinesisClient

Per scaricare Java KCL da GitHub, vai a Kinesis Client Library (Java). Per individuare la KCL Java su Apache Maven, vai alla pagina Risultati di ricerca di KCL. Per scaricare il codice di esempio per un'applicazione consumer Java KCL da GitHub, vai alla pagina del progetto di esempio KCL for Java su. GitHub

L'applicazione di esempio utilizza Apache Commons Logging. È possibile modificare la configurazione di registro nel metodo statico configure definito nel file AmazonKinesisApplicationSample.java. Per ulteriori informazioni su come utilizzare Apache Commons Logging con Log4j e le applicazioni AWS Java, consulta Logging with Log4j nella Developer Guide.AWS SDK for Java

È necessario completare le seguenti attività durante l'implementazione di un'applicazione consumer KCL in Java:

RecordProcessor Implementa i metodi I

La KCL supporta attualmente due versioni dell'interfaccia IRecordProcessor: l'interfaccia originale è disponibile con la prima versione della KCL e la versione 2 è disponibile a partire da KCL versione 1.5.0. Entrambe le interfacce sono completamente supportate. La scelta dipende dai tuoi requisiti specifici di scenario. Fai riferimento ai tuoi Javadocs locali o al codice sorgente per visualizzare tutte le differenze. Le seguenti sezioni delineano l'implementazione minima per iniziare.

Interfaccia originale (Versione 1)

L'interfaccia IRecordProcessor originale (package com.amazonaws.services.kinesis.clientlibrary.interfaces) espone i seguenti metodi di processore del record che il tuo consumer deve implementare. L'esempio fornisce implementazioni che è possibile utilizzare come punto di partenza (consulta AmazonKinesisApplicationSampleRecordProcessor.java).

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
initialize

La KCL chiama il metodo initialize quando viene creata un'istanza del processore di record, passando un ID della partizione specifico come parametro. Questo processore di record elabora esclusivamente questo shard e, in genere, è vero anche il contrario (questo shard è elaborato solo da questo processore di record). Tuttavia, il tuo consumer deve tenere conto della possibilità che un record di dati possa essere elaborato più di una volta. Il flusso di dati Kinesis ha una semantica almeno una volta, il che significa che ogni record di dati da una partizione viene elaborato almeno una volta da un worker nel tuo consumer. Per ulteriori informazioni sui casi in cui un determinato shard può essere elaborato da più di un lavoratore, consulta Resharding, dimensionamento ed elaborazione parallela.

public void initialize(String shardId)
processRecords

La KCL chiama il metodo processRecords e passa un elenco di record di dati dalla partizione specificata dal metodo initialize(shardId). Il processore di record elabora i dati in questi record in base alla semantica del consumer. Ad esempio, il worker potrebbe eseguire una trasformazione dei dati e, successivamente, archiviare il risultato in un bucket Amazon Simple Storage Service (Amazon S3).

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

Oltre ai dati stessi, il record contiene anche un numero di sequenza e una chiave di partizione. Il lavoratore può utilizzare questi valori quando elabora i dati. Ad esempio, il lavoratore può scegliere il bucket S3 in cui archiviare i dati in base al valore della chiave di partizione. La classe Record espone i seguenti metodi che forniscono l'accesso ai dati del record, al numero di sequenza e alla chiave di partizione.

record.getData() record.getSequenceNumber() record.getPartitionKey()

Nell'esempio, il metodo privato processRecordsWithRetries ha un codice che mostra in che modo un lavoratore può accedere ai dati del record, al numero di sequenza e alla chiave di partizione.

Il flusso di dati Kinesis richiede che il processore di record tenga traccia dei record che sono già stati elaborati in una partizione. La KCL si occupa di questo monitoraggio per te, passando un checkpointer (IRecordProcessorCheckpointer) a processRecords. Il processore di record chiama il metodo checkpoint in questa interfaccia per comunicare alla KCL quanto si è progredito nell'elaborazione dei record nella partizione. In caso di errore del worker, la KCL utilizza queste informazioni per riavviare l'elaborazione della partizione nell'ultimo record elaborato conosciuto.

Per le operazioni di divisione o unione, la KCL non avvierà l'elaborazione delle nuove partizioni fino a quando i processori delle partizioni originali non avranno chiamato checkpoint per segnalare che l'intera elaborazione delle partizioni originali è completa.

Se non viene passato un parametro, la KCL suppone che la chiamata a checkpoint significa che tutti i record sono stati elaborati, fino all'ultimo record passato al processore di record. Pertanto, il processore di record deve chiamare checkpoint solo dopo aver elaborato tutti i record nell'elenco passato al processore. I processori di record non devono chiamare checkpoint in ciascuna chiamata a processRecords. Un processore potrebbe, per esempio, chiamare checkpoint in ogni terza chiamata a processRecords. Puoi specificare, in modo facoltativo, il numero di sequenza esatto di un record come parametro per checkpoint. In questo caso, la KCL presuppone che tutti i record siano stati elaborati esclusivamente fino a tale record.

Nell'esempio, il metodo privato checkpoint mostra come effettuare la chiamata a IRecordProcessorCheckpointer.checkpoint utilizzando la gestione delle eccezioni e la logica dei nuovi tentativi appropriate.

La KCL si basa su processRecords per gestire eventuali eccezioni generate dall'elaborazione dei record di dati. Se viene generata un'eccezione da processRecords, la KCL omette i record di dati passati prima dell'eccezione. Ciò significa che questi record non sono inviati nuovamente al processore di record che ha generato l'eccezione o a qualsiasi altro processore di record nel consumer.

shutdown

La KCL chiama il metodo shutdown sia al termine dell'elaborazione (il motivo dell'arresto è TERMINATE) che quando il worker non risponde più (il motivo dell'arresto è ZOMBIE).

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

L'elaborazione termina quando il processore di record non riceve ulteriori record dallo shard, perché lo shard è stato frazionato o fuso o perché il flusso è stato eliminato.

La KCL trasferisce inoltre un'interfaccia IRecordProcessorCheckpointer a shutdown. Se il motivo dell'arresto è TERMINATE, il processore di record deve terminare l'elaborazione di qualsiasi record di dati e, di seguito, chiamare il metodo checkpoint in questa interfaccia.

Interfaccia aggiornata (versione 2)

L'interfaccia IRecordProcessor aggiornata (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) espone i seguenti metodi di processore del record che il tuo consumer deve implementare:

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

Tutti gli argomenti dalla versione originale dell'interfaccia sono accessibili tramite metodi get negli oggetti del container. Ad esempio, per recuperare l'elenco dei record in processRecords(), è possibile utilizzare processRecordsInput.getRecords().

A partire dalla versione 2 di questa interfaccia (KCL 1.5.0 e versioni successive), i seguenti nuovi input sono disponibili in aggiunta agli input forniti dall'interfaccia originale:

Numero di sequenza di partenza

Nell'oggetto InitializationInput passato all'operazione initialize(), il numero di sequenza iniziale a partire da cui i record verrebbero forniti all'istanza del processore di record. Questo è l'ultimo numero di sequenza in cui è stato eseguito il checkpoint dall'istanza del processore di record che aveva precedentemente elaborato lo stesso shard. Questi dati sono forniti nel caso in cui la tua applicazione necessiti di queste informazioni.

Numero di sequenza di checkpoint in sospeso

Nell'oggetto InitializationInput passato all'operazione initialize(), il numero di sequenza di checkpoint in sospeso (se del caso) che non è stato possibile confermare prima dell'arresto dell'istanza precedente del processore di record.

Implementa una Class Factory per l'RecordProcessor interfaccia I

È inoltre necessario implementare un generatore per la classe che implementa i metodi del processore di record. Quando il tuo consumer avvia un'istanza del lavoratore, passa un riferimento a questo generatore.

Il campione implementa il generatore di classe nel file AmazonKinesisApplicationSampleRecordProcessorFactory.java utilizzando l'interfaccia del processore di record originale. Se si desidera che il generatore di classe crei processori di record della versione 2, utilizzi il nome del pacchetto com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

Creazione di un lavoratore

Come discusso nella RecordProcessor Implementa i metodi I, ci sono due versioni dell'interfaccia del processore di record KCL da cui scegliere; ciò influenza il modo in cui è possibile creare un worker. L'interfaccia del processore di record originale utilizza la seguente struttura di codice per creare un lavoratore:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

Con la versione 2 dell'interfaccia del processore di record, è possibile utilizzare Worker.Builder per creare un lavoratore senza dover preoccuparsi di quale costruttore utilizzare e dell'ordine degli argomenti. L'interfaccia del processore di record aggiornata utilizza la seguente struttura di codice per creare un lavoratore:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

Modifica delle proprietà di configurazione

L'esempio fornisce valori di default per le proprietà di configurazione. Questi dati di configurazione per il lavoratore sono poi consolidati in un oggetto KinesisClientLibConfiguration. Questo oggetto e un riferimento al generatore di classe per IRecordProcessor sono passati nella chiamata che avvia un'istanza del lavoratore. È possibile sostituire una qualsiasi di queste proprietà con i tuoi valori utilizzando un file di proprietà Java (consulta AmazonKinesisApplicationSample.java).

Nome applicazione

La KCL richiede un nome dell'applicazione univoco per tutte le applicazioni e per tutte le tabelle Amazon DynamoDB nella stessa Regione. La biblioteca utilizza il valore di configurazione del nome dell'applicazione nei seguenti modi:

  • Si suppone che tutti i lavoratori associati con questo nome dell'applicazione stiano lavorando insieme nello stesso flusso. Questi lavoratori potrebbero essere distribuiti su più istanze. Se si esegue un'istanza aggiuntiva dello stesso codice dell'applicazione, ma con un nome dell'applicazione diverso, la KCL tratta la seconda istanza come un'applicazione completamente separata che opera anch'essa nello stesso flusso.

  • La KCL crea una tabella DynamoDB con il nome dell'applicazione e la utilizza per mantenere le informazioni sullo stato (ad esempio, checkpoint e mappatura worker-partizione) per l'applicazione. Ogni applicazione ha la propria tabella DynamoDB. Per ulteriori informazioni, consulta Utilizzo di una tabella di lease per tenere traccia delle partizioni elaborate dall'applicazione consumer della KCL.

Configurazione delle credenziali

È necessario rendere disponibili le AWS credenziali a uno dei provider di credenziali nella catena di provider di credenziali predefinita. Ad esempio, se l'applicazione consumer è in esecuzione su un'istanza Amazon EC2, consigliamo di avviare l'istanza con un ruolo IAM. Le credenziali AWS che riflettono le autorizzazioni associate a questo ruolo IAM vengono rese disponibili alle applicazioni sull'istanza tramite i relativi metadati dell'istanza. Questo è il modo più sicuro per gestire le credenziali per un consumer in esecuzione in un'istanza EC2.

L'applicazione di esempio prova prima a recuperare le credenziali IAM dai metadati dell'istanza:

credentialsProvider = new InstanceProfileCredentialsProvider();

Se l'applicazione di esempio non è in grado di ottenere le credenziali dai metadati dell'istanza, tenta di recuperare le credenziali da un file proprietà:

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

Per ulteriori informazioni sui metadati delle istanze, consulta Instance Metadata nella Amazon EC2 User Guide.

Utilizzo di un ID del lavoratore per più istanze

L'esempio di codice di inizializzazione crea un ID per il lavoratore, workerIdutilizzando il nome del computer locale e aggiungendo un identificatore univoco globale come illustrato nel seguente frammento di codice. Questo approccio supporta lo scenario di più istanze dell'applicazione di consumo in esecuzione in un singolo computer.

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

Migrazione alla versione 2 dell'interfaccia del processore di record

Se si desidera migrare il codice che utilizza l'interfaccia originale, in aggiunta ai passaggi descritti in precedenza, sono necessari i seguenti passaggi:

  1. Cambia la classe del tuo processore di record per importare la versione 2 dell'interfaccia del processore di record:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. Cambia i riferimenti per gli input per utilizzare i metodi get negli oggetti del container. Ad esempio, nell'operazione shutdown(), cambia "checkpointer" con "shutdownInput.getCheckpointer()".

  3. Cambia la classe del generatore del processore di record per importare la versione 2 dell'interfaccia del generatore del processore di record:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. Cambia la costruzione del lavoratore per utilizzare Worker.Builder. Per esempio:

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();