Desenvolver um consumidor da Kinesis Client Library em Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver um consumidor da Kinesis Client Library em Java

Você pode usar a Kinesis Client Library (KCL) para criar aplicações que processam dados dos fluxos de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Java. Para ver a referência de Javadoc, consulte o tópico AWSJavadoc para Classe. AmazonKinesisClient

Para baixar o Java KCL do GitHub, acesse a Kinesis Client Library (Java). Para localizar a KCL Java no Apache Maven, acesse a página de resultados da pesquisa de KCL. Para baixar o código de amostra para um aplicativo consumidor Java KCL em GitHub, acesse a página do projeto de amostra KCL for Java em. GitHub

O aplicativo de exemplo usa Apache Commons Logging. Você pode alterar a configuração do registro em log no método configure estático definido no arquivo AmazonKinesisApplicationSample.java. Para obter mais informações sobre como usar o Apache Commons Logging com Log4j e aplicações Java da AWS, consulte Logging with Log4j no Guia do desenvolvedor do AWS SDK for Java.

Você precisa concluir as seguintes tarefas ao implementar uma aplicação de consumo da KCL em Java:

Implemente os RecordProcessor métodos I

Atualmente, a KCL oferece suporte a duas versões da interface do IRecordProcessor: a interface original está disponível com a primeira versão da KCL e a versão 2 está disponível desde a versão 1.5.0. As duas interfaces têm suporte total. A escolha depende dos requisitos de cenário específicos. Consulte os Javadocs criados localmente ou o código-fonte para ver todas as diferenças. As seções a seguir descrevem a implementação mínima para os conceitos básicos.

Interface original (versão 1)

A interface IRecordProcessor original (package com.amazonaws.services.kinesis.clientlibrary.interfaces) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar. O exemplo fornece implementações que você pode usar como ponto de partida (consulte AmazonKinesisApplicationSampleRecordProcessor.java).

public void initialize(String shardId) public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer) public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)
inicializar

A KCL chama o método initialize quando o processador de registros é instanciado, passando um ID de fragmento específico como um parâmetro. Esse processador de registros processa apenas esse estilhaço e, normalmente, o inverso também é verdadeiro (esse estilhaço é processado somente por esse processador de registro). No entanto, o consumidor deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. A semântica do Kinesis Data Streams é do tipo pelo menos uma vez, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um estilhaço específico pode ser processado por mais de um operador, consulte Reestilhaçamento, escalabilidade e processamento paralelo.

public void initialize(String shardId)
processRecords

A KCL chama o método processRecords passando uma lista de registros de dados do fragmento especificado pelo método initialize(shardId). O processador de registros processa os dados nesses registros de acordo com a semântica do consumidor. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

public void processRecords(List<Record> records, IRecordProcessorCheckpointer checkpointer)

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe Record expõe os seguintes métodos que oferecem acesso aos dados do registro, número de sequência e chave de partição.

record.getData() record.getSequenceNumber() record.getPartitionKey()

No exemplo, o método privado processRecordsWithRetries tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. A KCL faz esse rastreamento para você passando um checkpointer (IRecordProcessorCheckpointer) para o processRecords. O processador de registros chama o método checkpoint nesta interface para informar a KCL sobre o progresso do processamento dos registros no fragmento. Se o operador falhar, a KCL usará essas informações para reiniciar o processamento do fragmento no último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL só começará a processar os novos fragmentos quando os processadores dos fragmentos originais chamarem checkpoint para indicar que o processamento dos fragmentos originais foi concluído.

Se você não passar um parâmetro, a KCL presumirá que a chamada para checkpoint significa que todos os registros foram processados até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar checkpoint somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar checkpoint em cada chamada para processRecords. Um processador pode, por exemplo, chamar checkpoint a cada terceira chamada para processRecords. Você pode, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para checkpoint. Nesse caso, a KCL presume que todos os registros foram processados somente até o registro especificado.

No exemplo, o método privado checkpoint mostra como chamar IRecordProcessorCheckpointer.checkpoint usando a lógica de novas tentativas e o tratamento de exceções apropriados.

A KCL depende do processRecords para lidar com qualquer exceção ocorrida no processamento dos registros de dados. Se ocorrer uma exceção em processRecords, a KCL ignorará os registros de dados passados antes da exceção. Ou seja, esses registros não serão reenviados para o processador de registros que lançou a exceção ou para qualquer outro processador de registros no consumidor.

shutdown

A KCL chama o método shutdown quando o processamento termina (o motivo do desligamento é TERMINATE) ou quando o operador não está mais respondendo (o motivo do desligamento é ZOMBIE).

public void shutdown(IRecordProcessorCheckpointer checkpointer, ShutdownReason reason)

O processamento termina quando o processador de registros não recebe mais registros do estilhaço porque ele foi dividido ou intercalado, ou o stream foi excluído.

A KCL também passa uma interface do IRecordProcessorCheckpointer para shutdown. Se o motivo do desligamento é TERMINATE, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método checkpoint nesta interface.

Interface atualizada (versão 2)

A interface IRecordProcessor atualizada (package com.amazonaws.services.kinesis.clientlibrary.interfaces.v2) expõe os seguintes métodos de processador de registros que o consumidor precisa implementar:

void initialize(InitializationInput initializationInput) void processRecords(ProcessRecordsInput processRecordsInput) void shutdown(ShutdownInput shutdownInput)

Todos os argumentos da versão original da interface podem ser acessados por meio de métodos get nos objetos de contêiner. Por exemplo, para recuperar a lista de registros em processRecords(), você pode usar processRecordsInput.getRecords().

Além das entradas fornecidas pela interface original, estas novas entradas estão disponíveis a partir da versão 2 da interface (KCL 1.5.0 e posterior):

número de sequência inicial

No objeto InitializationInput passado para a operação initialize(), o número de sequência inicial a partir do qual os registros seriam fornecidos à instância do processador de registros. Esse é o número de sequência que foi verificado pela última vez pela instância do processador de registros que processou anteriormente o mesmo estilhaço. Isso será fornecido no caso de o aplicativo precisar de informações.

número de sequência do ponto de verificação pendente

No objeto InitializationInput passado para a operação initialize(), o número de sequência de verificação pendente (se houver) que não pôde ser confirmado antes que a instância do processador de registros anterior parasse.

Implemente uma fábrica de classes para a RecordProcessor interface I

Você também precisará implementar uma fábrica para a classe que implementa os métodos do processador de registros. Quando o consumidor instancia o operador, ele passa uma referência a essa fábrica.

O exemplo implementa a classe de fábrica no arquivo AmazonKinesisApplicationSampleRecordProcessorFactory.java usando a interface de processador de registros original. Se você deseja que a fábrica da classe crie a versão 2 dos processadores de registros, use o nome do pacote com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.

public class SampleRecordProcessorFactory implements IRecordProcessorFactory { /** * Constructor. */ public SampleRecordProcessorFactory() { super(); } /** * {@inheritDoc} */ @Override public IRecordProcessor createProcessor() { return new SampleRecordProcessor(); } }

Criar um operador

Como discutido em Implemente os RecordProcessor métodos I, há duas versões da interface do processador de registros da KCL para escolha, o que afeta como você cria um operador. A interface do processador de registros original usa a seguinte estrutura de código para criar um operador:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker(recordProcessorFactory, config);

Com a versão 2 da interface do processador de registros, você pode usar Worker.Builder para criar um operador sem a necessidade de se preocupar com qual construtor usar e a ordem dos argumentos. A interface do processador de registros atualizada usa a seguinte estrutura de código para criar um operador:

final KinesisClientLibConfiguration config = new KinesisClientLibConfiguration(...) final IRecordProcessorFactory recordProcessorFactory = new RecordProcessorFactory(); final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();

Modificar as propriedades de configuração

O exemplo fornece valores padrão para propriedades de configuração. Esses dados de configuração para o operador são então consolidados em um objeto KinesisClientLibConfiguration. Esse objeto e uma referência à fábrica de classe para IRecordProcessor são passados na chamada que instancia o operador. Você pode substituir qualquer uma dessas propriedades por seus próprios valores usando um arquivo de propriedades do Java (consulte AmazonKinesisApplicationSample.java).

Nome da aplicação

A KCL exige um nome de aplicação exclusivo entre as aplicações e as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:

  • Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo stream. Esses operadores podem ser distribuídos em várias instâncias. Se você executar uma instância adicional do mesmo código da aplicação, mas com um nome diferente, a KCL tratará a segunda instância como uma aplicação totalmente independente operando no mesmo fluxo.

  • A KCL cria uma tabela do DynamoDB com o nome da aplicação e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento de operador-fragmento) da aplicação. Cada aplicação tem sua própria tabela do DynamoDB. Para ter mais informações, consulte Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL.

Configurar credenciais

Você precisa disponibilizar as credenciais da AWS para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Por exemplo, se você estiver executando o consumidor em uma instância do EC2, recomendamos que execute a instância com um perfil do IAM. As credenciais da AWS que refletem as permissões associadas a esse perfil do IAM são disponibilizadas às aplicações na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para um consumidor em execução em uma instância do EC2.

A aplicação de exemplo primeiro tenta recuperar as credenciais do IAM nos metadados da instância:

credentialsProvider = new InstanceProfileCredentialsProvider();

Se o aplicativo de exemplo não consegue obter credenciais dos metadados da instância, ele tenta recuperar as credenciais de um arquivo de propriedades:

credentialsProvider = new ClasspathPropertiesFileCredentialsProvider();

Para obter mais informações sobre os metadados da instância, consulte Metadados da instância no Guia do usuário do Amazon EC2 para instâncias do Linux.

Usar o ID do operador para várias Instâncias

O código de inicialização de exemplo cria um ID para o operador, workerId, usando o nome do computador local e anexando um identificador exclusivo globalmente, conforme mostrado no seguinte trecho de código. Essa abordagem é compatível com o cenário de várias instâncias do aplicativo de consumidor em execução em um único computador.

String workerId = InetAddress.getLocalHost().getCanonicalHostName() + ":" + UUID.randomUUID();

Migrar para a versão 2 da interface do processador de registros

Se você quiser migrar o código que usa a interface original, além das etapas descritas anteriormente, as seguintes etapas serão necessárias:

  1. Altere a classe do processador de registros para importar a versão 2 da interface do processador de registros:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessor;
  2. Altere as referências para as entradas para usar métodos get nos objetos de contêiner. Por exemplo, na operação shutdown(), altere "checkpointer" para "shutdownInput.getCheckpointer()".

  3. Altere a classe da fábrica do processador de registros para importar a versão 2 da interface da fábrica do processador de registros:

    import com.amazonaws.services.kinesis.clientlibrary.interfaces.v2.IRecordProcessorFactory;
  4. Altere a construção do operador para usar Worker.Builder. Por exemplo: .

    final Worker worker = new Worker.Builder() .recordProcessorFactory(recordProcessorFactory) .config(config) .build();