Desenvolva um consumidor da Kinesis Client Library em Node.js - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolva um consumidor da Kinesis Client Library em Node.js

Você pode usar a Kinesis Client Library (KCL) para criar aplicativos que processam dados dos seus streams de dados do Kinesis. A Kinesis Client Library está disponível em várias linguagens. Este tópico discute Node.js.

KCLÉ uma biblioteca Java; o suporte para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de MultiLangDaemon. Esse daemon é baseado em Java e é executado em segundo plano quando você está usando uma KCL linguagem diferente de Java. Portanto, se você instalar o KCL for Node.js e escrever seu aplicativo de consumidor inteiramente em Node.js, ainda precisará do Java instalado em seu sistema por causa do MultiLangDaemon. Além disso, MultiLangDaemon tem algumas configurações padrão que você pode precisar personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, acesse a página do KCL MultiLangDaemon projeto.

Para baixar o Node.js KCL de GitHub, acesse a Biblioteca de Cliente Kinesis (Node.js).

Downloads de códigos de exemplo

Há dois exemplos de código disponíveis KCL em Node.js:

  • basic-sample

    Usado nas seções a seguir para ilustrar os fundamentos da criação de um aplicativo de KCL consumidor no Node.js.

  • click-stream-sample

    Levemente mais avançado e usa um cenário real, para depois que você se familiarizar com o código de exemplo básico. Esse exemplo não é discutido aqui, mas tem um README arquivo com mais informações.

Você deve concluir as seguintes tarefas ao implementar um aplicativo KCL consumidor no Node.js:

Implemente o processador de registros

O consumidor mais simples possível que usa o KCL for Node.js deve implementar uma recordProcessor função que, por sua vezinitialize, contém as funçõesprocessRecords, shutdown e. O exemplo fornece uma implementação que você pode usar como ponto de partida (consulte sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
inicializar

O KCL chama a initialize função quando o processador de gravação é iniciado. Esse processador de registros processa apenas o ID do estilhaço passado como initializeInput.shardId e, normalmente, o inverso também é verdadeiro (esse estilhaço é processado somente por esse processador de registro). No entanto, o consumidor deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso acontece porque a semântica do Kinesis Data Streams é do tipo pelo menos uma vez, o que significa que cada registro de dados de um fragmento é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um estilhaço específico pode ser processado por mais de um operador, consulte Use refragmentação, escalonamento e processamento paralelo para alterar o número de fragmentos.

initialize: function(initializeInput, completeCallback)
processRecords

O KCL chama essa função com uma entrada que contém uma lista de registros de dados do fragmento especificado para a initialize função. O processador de registros que você implementa processa os dados nesses registros de acordo com a semântica do consumidor. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

processRecords: function(processRecordsInput, completeCallback)

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição, que o operador pode usar ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário record expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

record.data record.sequenceNumber record.partitionKey

Observe que os dados são codificados em Base64.

No exemplo básico, a função processRecords tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer que o processador de registros rastreie os registros que já foram processados em um fragmento. O KCL cuida desse rastreamento com um checkpointer objeto passado comoprocessRecordsInput.checkpointer. Seu processador de registros chama a checkpointer.checkpoint função para informar o KCL quanto ela progrediu no processamento dos registros no fragmento. Caso o trabalhador falhe, ele KCL usa essas informações quando você reinicia o processamento do fragmento para que ele continue a partir do último registro processado conhecido.

Para uma operação de divisão ou mesclagem, o processamento dos novos fragmentos KCL não é iniciado até que os processadores dos fragmentos originais sejam chamados checkpoint para sinalizar que todo o processamento nos fragmentos originais foi concluído.

Se você não passar o número de sequência para a checkpoint função, KCL presume que a chamada para checkpoint significa que todos os registros foram processados, até o último registro passado para o processador de registros. Portanto, o processador de registros deve chamar checkpoint somente após ter processado todos os registros na lista que foi passada para ele. Os processadores de registros não precisam chamar checkpoint em cada chamada para processRecords. Um processador pode, por exemplo, chamar checkpoint a cada terceira chamada, ou algum evento externo para o processador de registros, como um serviço de validação/verificação personalizado que você tiver implementado.

Você pode, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para checkpoint. Nesse caso, KCL pressupõe que todos os registros tenham sido processados até esse registro somente.

O aplicativo de exemplo básico mostra a chamada mais simples possível para a função checkpointer.checkpoint. Você pode adicionar outra lógica de verificação que precisar para o consumidor neste ponto da função.

shutdown

Ele KCL chama a shutdown função quando o processamento termina (shutdownInput.reasonéTERMINATE) ou o trabalhador não está mais respondendo (shutdownInput.reasonestáZOMBIE).

shutdown: function(shutdownInput, completeCallback)

O processamento termina quando o processador de registros não recebe mais registros do estilhaço porque ele foi dividido ou intercalado, ou o stream foi excluído.

O KCL também passa um shutdownInput.checkpointer objeto parashutdown. Se o motivo do desligamento for TERMINATE, você deverá verificar se o processador de registros terminou o processamento de todos os registros de dados e, em seguida, chamar a função checkpoint nessa interface.

Modifique as propriedades de configuração

O exemplo fornece valores padrão para as propriedades de configuração. Você pode substituir qualquer uma dessas propriedades por seus próprios valores (consulte sample.properties no exemplo básico).

Nome da aplicação

Isso KCL exige que um aplicativo seja exclusivo entre seus aplicativos e entre as tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:

  • Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo stream. Esses operadores podem ser distribuídos em várias instâncias. Se você executar uma instância adicional do mesmo código de aplicativo, mas com um nome de aplicativo diferente, KCL tratará a segunda instância como um aplicativo totalmente separado que também está operando no mesmo stream.

  • O KCL cria uma tabela do DynamoDB com o nome do aplicativo e usa a tabela para manter as informações de estado (como pontos de verificação e mapeamento de fragmentos de trabalho) do aplicativo. Cada aplicação tem sua própria tabela do DynamoDB. Para ter mais informações, consulte Use uma tabela de leasing para rastrear os fragmentos processados pelo aplicativo consumidor KCL.

Configurar credenciais

Você deve disponibilizar suas AWS credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Você pode usar a propriedade AWSCredentialsProvider para definir um provedor de credenciais. O arquivo sample.properties precisa disponibilizar as credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Se você estiver executando seu consumidor em uma EC2 instância da Amazon, recomendamos que você configure a instância com uma IAM função. AWS as credenciais que refletem as permissões associadas a essa IAM função são disponibilizadas aos aplicativos na instância por meio dos metadados da instância. Essa é a maneira mais segura de gerenciar as credenciais de um aplicativo consumidor em execução em uma EC2 instância.

O exemplo a seguir é configurado KCL para processar um stream de dados do Kinesis kclnodejssample chamado usando o processador de registros fornecido em: sample_kcl_app.js

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON