Desenvolva um consumidor da Kinesis Client Library em Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolva um consumidor da Kinesis Client Library em Java

Você pode usar a Kinesis Client Library (KCL) para criar aplicativos que processam dados dos seus streams de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Java. Para ver a referência de Javadoc, consulte o tópico AWS Javadoc para Classe. AmazonKinesisClient

Para baixar o Java KCL de GitHub, acesse a Kinesis Client Library (Java). Para localizar o Java KCL no Apache Maven, acesse a página de resultados da KCL pesquisa. Para baixar o código de amostra para um aplicativo KCL consumidor Java em GitHub, acesse a página do projeto de amostra KCL para Java em GitHub.

O aplicativo de exemplo usa Apache Commons Logging. Você pode alterar a configuração do registro em log no método configure estático definido no arquivo AmazonKinesisApplicationSample.java. Para obter mais informações sobre como usar o Apache Commons Logging com aplicativos Log4j e AWS Java, consulte Logging with Log4j no Guia do desenvolvedor.AWS SDK for Java

Você deve concluir as seguintes tarefas ao implementar um aplicativo KCL consumidor em Java:

Implemente os IRecordProcessor métodos

KCLAtualmente, o suporta duas versões da IRecordProcessor interface: a interface original está disponível com a primeira versão doKCL, e a versão 2 está disponível a partir da versão 1.5.0. KCL As duas interfaces têm suporte total. A escolha depende dos requisitos de cenário específicos. Consulte os Javadocs criados localmente ou o código-fonte para ver todas as diferenças. As seções a seguir descrevem a implementação mínima para os conceitos básicos.

Interface original (versão 1)

A interface IRecordProcessor original (package com.amazonaws.services.kinesis.clientlibrary.interfaces) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar. O exemplo fornece implementações que você pode usar como ponto de partida (consulte AmazonKinesisApplicationSampleRecordProcessor.java).

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
inicializar

Ele KCL chama o initialize método quando o processador de registro é instanciado, passando um ID de fragmento específico como parâmetro. Esse processador de registros processa apenas esse estilhaço e, normalmente, o inverso também é verdadeiro (esse estilhaço é processado somente por esse processador de registro). No entanto, o consumidor deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. A semântica do Kinesis Data Streams é do tipo pelo menos uma vez, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um estilhaço específico pode ser processado por mais de um operador, consulte Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos.

public void initialize(String shardId)
processRecords

O KCL chama o processRecords método, passando uma lista de registros de dados do fragmento especificado pelo initialize(shardId) método. O processador de registros processa os dados nesses registros de acordo com a semântica do consumidor. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe Record expõe os seguintes métodos que oferecem acesso aos dados do registro, número de sequência e chave de partição.

record.getData() record.getSequenceNumber() record.getPartitionKey()

No exemplo, o método privado processRecordsWithRetries tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. O KCL cuida desse rastreamento para você passando um checkpointer (IRecordProcessorCheckpointer) para. processRecords O processador de registros chama o checkpoint método nessa interface para informar o KCL quanto ele progrediu no processamento dos registros no fragmento. Se o trabalhador falhar, ele KCL usa essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Para uma operação de divisão ou mesclagem, eles KCL não começarão a processar os novos fragmentos até que os processadores dos fragmentos originais liguem checkpoint para sinalizar que todo o processamento nos fragmentos originais foi concluído.

Se você não passar um parâmetro, KCL presume que a chamada para checkpoint significa que todos os registros foram processados, até o último registro que foi passado para o processador de registros. Portanto, o processador de registros deve chamar checkpoint somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar checkpoint em cada chamada para processRecords. Um processador pode, por exemplo, chamar checkpoint a cada terceira chamada para processRecords. Você pode, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para checkpoint. Nesse caso, KCL pressupõe que todos os registros tenham sido processados até esse registro somente.

No exemplo, o método privado checkpoint mostra como chamar IRecordProcessorCheckpointer.checkpoint usando a lógica de novas tentativas e o tratamento de exceções apropriados.

Ele KCL depende de processRecords lidar com quaisquer exceções decorrentes do processamento dos registros de dados. Se uma exceção for lançadaprocessRecords, ela KCL ignorará os registros de dados que foram passados antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros no consumidor.

shutdown

Ele KCL chama o shutdown método quando o processamento termina (o motivo do desligamento éTERMINATE) ou o trabalhador não está mais respondendo (o motivo do desligamento é). ZOMBIE

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

O processamento termina quando o processador de registros não recebe mais registros do estilhaço porque ele foi dividido ou intercalado, ou o stream foi excluído.

O KCL também passa uma IRecordProcessorCheckpointer interface parashutdown. Se o motivo do desligamento é TERMINATE, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método checkpoint nesta interface.

Interface atualizada (versão 2)

A interface IRecordProcessor atualizada (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar:

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

Todos os argumentos da versão original da interface podem ser acessados por meio de métodos get nos objetos de contêiner. Por exemplo, para recuperar a lista de registros em processRecords(), você pode usar processRecordsInput.getRecords().

A partir da versão 2 dessa interface (KCL1.5.0 e posterior), as seguintes novas entradas estão disponíveis, além das entradas fornecidas pela interface original:

número de sequência inicial

No objeto InitializationInput passado para a operação initialize(), o número de sequência inicial a partir do qual os registros seriam fornecidos à instância do processador de registros. Esse é o número de sequência que foi verificado pela última vez pela instância do processador de registros que processou anteriormente o mesmo estilhaço. Isso será fornecido no caso de o aplicativo precisar de informações.

número de sequência do ponto de verificação pendente

No objeto InitializationInput passado para a operação initialize(), o número de sequência de verificação pendente (se houver) que não pôde ser confirmado antes que a instância do processador de registros anterior parasse.

Implemente uma fábrica de classes para a IRecordProcessor interface

Você também precisará implementar uma fábrica para a classe que implementa os métodos do processador de registros. Quando o consumidor instancia o operador, ele passa uma referência a essa fábrica.

O exemplo implementa a classe de fábrica no arquivo AmazonKinesisApplicationSampleRecordProcessorFactory.java usando a interface de processador de registros original. Se você deseja que a fábrica da classe crie a versão 2 dos processadores de registros, use o nome do pacote com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

Crie um trabalhador

Conforme discutido emImplemente os IRecordProcessor métodos, há duas versões da interface do processador de KCL registros para escolher, o que afeta a forma como você criaria um trabalhador. A interface do processador de registros original usa a seguinte estrutura de código para criar um operador:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

Com a versão 2 da interface do processador de registros, você pode usar Worker.Builder para criar um operador sem a necessidade de se preocupar com qual construtor usar e a ordem dos argumentos. A interface do processador de registros atualizada usa a seguinte estrutura de código para criar um operador:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

Modifique as propriedades de configuração

O exemplo fornece valores padrão para propriedades de configuração. Esses dados de configuração para o operador são então consolidados em um objeto KinesisClientLibConfiguration. Esse objeto e uma referência à fábrica de classe para IRecordProcessor são passados na chamada que instancia o operador. Você pode substituir qualquer uma dessas propriedades por seus próprios valores usando um arquivo de propriedades do Java (consulte AmazonKinesisApplicationSample.java).

Nome da aplicação

Isso KCL requer um nome de aplicativo que seja exclusivo em todos os seus aplicativos e nas tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:

  • Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo stream. Esses operadores podem ser distribuídos em várias instâncias. Se você executar uma instância adicional do mesmo código de aplicativo, mas com um nome de aplicativo diferente, KCL tratará a segunda instância como um aplicativo totalmente separado que também está operando no mesmo stream.

  • O KCL cria uma tabela do DynamoDB com o nome do aplicativo e usa a tabela para manter as informações de estado (como pontos de verificação e mapeamento de fragmentos de trabalho) do aplicativo. Cada aplicação tem sua própria tabela do DynamoDB. Para ter mais informações, consulte Use uma tabela de leasing para rastrear os fragmentos processados pelo aplicativo consumidor KCL.

Configurar credenciais

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Por exemplo, se você estiver executando seu consumidor em uma EC2 instância, recomendamos que você execute a instância com uma IAM função. AWS as credenciais que refletem as permissões associadas a essa IAM função são disponibilizadas aos aplicativos na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para um consumidor em execução em uma EC2 instância.

Primeiro, o aplicativo de amostra tenta recuperar as IAM credenciais dos metadados da instância:

credentialsProvider = new InstanceProfileCredentialsProvider();

Se o aplicativo de exemplo não consegue obter credenciais dos metadados da instância, ele tenta recuperar as credenciais de um arquivo de propriedades:

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

Para obter mais informações sobre os metadados da instância, consulte Metadados da instância no Guia EC2do usuário da Amazon.

Use o ID do trabalhador para várias instâncias

O código de inicialização de exemplo cria um ID para o operador, workerId, usando o nome do computador local e anexando um identificador exclusivo globalmente, conforme mostrado no seguinte trecho de código. Essa abordagem é compatível com o cenário de várias instâncias do aplicativo de consumidor em execução em um único computador.

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

Migrar para a versão 2 da interface do processador de registros

Se você quiser migrar o código que usa a interface original, além das etapas descritas anteriormente, as seguintes etapas serão necessárias:

  1. Altere a classe do processador de registros para importar a versão 2 da interface do processador de registros:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. Altere as referências para as entradas para usar métodos get nos objetos de contêiner. Por exemplo, na operação shutdown(), altere "checkpointer" para "shutdownInput.getCheckpointer()".

  3. Altere a classe da fábrica do processador de registros para importar a versão 2 da interface da fábrica do processador de registros:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. Altere a construção do operador para usar Worker.Builder. Por exemplo: .

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();