Desenvolver um consumidor da Kinesis Client Library em Node.js - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver um consumidor da Kinesis Client Library em Node.js

Você pode usar a Kinesis Client Library (KCL) para criar aplicativos que processam dados dos streamings de dados do Kinesis. A Kinesis Client Library está disponível em vários idiomas. Este tópico discute Node.js.

O KCL é uma biblioteca Java; o suporte para idiomas diferentes do Java é fornecido usando uma interface multilíngue chamadaMultiLangDaemon do. Este daemon é baseado em Java e é executado em segundo plano quando você estiver usando uma linguagem KCL diferente de Java. Portanto, se você instalar o KCL para Node.js e escrever o aplicativo de consumidor inteiramente em Node.js, ainda precisará do Java instalado no sistema por causa daMultiLangDaemon do. Além disso,MultiLangO Daemon tem algumas configurações padrão que você pode personalizar para o caso de uso. Por exemplo, aAWSRegião à qual ele se conecta. Para obter mais informações sobre oMultiLangDaemon doGitHub, consulte oKCLMultiLangProjeto daemon do.

Para fazer download da KCL do Node.js emGitHub, consulteKinesis Client Library (Node.js).

Downloads de códigos de exemplo

Há dois exemplos de código disponíveis para KCL em Node.js:

  • basic-sample

    Usado nas seções a seguir para ilustrar os conceitos básicos de criação de um aplicativo de consumidor da KCL em Node.js.

  • click-stream-sample

    Levemente mais avançado e usa um cenário real, para depois que você se familiarizar com o código de exemplo básico. Esse exemplo não é discutido aqui, mas há um arquivo README com mais informações.

Você precisa concluir as tarefas a seguir ao implementar um aplicativo de consumidor da KCL em Node.js:

Implementar o processador de registros

O consumidor mais simples possível usando a KCL para Node.js precisa implementar umrecordProcessorfunção, que por sua vez contém as funçõesinitialize,processRecords, eshutdown. O exemplo fornece uma implementação que você pode usar como ponto de partida (consulte sample_kcl_app.js).

function recordProcessor() { // return an object that implements initialize, processRecords and shutdown functions.}
initialize

O KCL chama oinitializeFunção quando o processador de registros é iniciado. Esse processador de registros processa apenas o ID do estilhaço passado como initializeInput.shardId e, normalmente, o inverso também é verdadeiro (esse estilhaço é processado somente por esse processador de registro). No entanto, o consumidor deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso ocorre porque o Kinesis Data Streams tempelo menos uma vezO é semântica, o que significa que cada registro de dados de um estilhaço é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um estilhaço específico pode ser processado por mais de um operador, consulte Reestilhaçamento, escalabilidade e processamento paralelo.

initialize: function(initializeInput, completeCallback)
processRecords

A KCL chama essa função com entrada que contém uma lista de registros de dados do estilhaço especificado para ainitializefunção. O processador de registros que você implementa processa os dados nesses registros de acordo com a semântica do consumidor. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).

processRecords: function(processRecordsInput, completeCallback)

Além dos dados em si, o registro também contém um número de sequência e uma chave de partição, que o operador pode usar ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. O dicionário record expõe os seguintes pares de chave/valor para acessar os dados do registro, o número de sequência e a chave de partição:

record.data record.sequenceNumber record.partitionKey

Observe que os dados são codificados em Base64.

No exemplo básico, a função processRecords tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.

O Kinesis Data Streams requer o processador de registros para rastrear os registros que já foram processados em um estilhaço. A KCL cuida desse rastreamento com umcheckpointerobjeto passado comoprocessRecordsInput.checkpointer. Seu processador de registro chama ocheckpointer.checkpointPara informar à KCL sobre o progresso do processamento dos registros no estilhaço. Caso o operador falhe, a KCL usa essas informações quando você reinicia o processamento do estilhaço para que ele continue a partir do último registro processado conhecido.

Em uma operação de divisão ou mesclagem, a KCL não começa a processar os novos estilhaços até que os processadores dos estilhaços originais tenham chamadocheckpointpara sinalizar que todo o processamento nos fragmentos originais está completo.

Se você não passar o número da sequência para ocheckpointfunção, o KCL assume que a chamada paracheckpointSignifica que todos os registros foram processados, até o último registro que foi passado ao processador de registros. Portanto, o processador de registros deve chamar checkpoint somente após ter processado todos os registros na lista que foi passada para ele. Os processadores de registros não precisam chamar checkpoint em cada chamada para processRecords. Um processador pode, por exemplo, chamar checkpoint a cada terceira chamada, ou algum evento externo para o processador de registros, como um serviço de validação/verificação personalizado que você tiver implementado.

Você pode, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para checkpoint. Nesse caso, a KCL assume que todos os registros foram processados somente até aquele registro.

O aplicativo de exemplo básico mostra a chamada mais simples possível para a função checkpointer.checkpoint. Você pode adicionar outra lógica de verificação que precisar para o consumidor neste ponto da função.

shutdown

O KCL chama oshutdownfunção quando o processamento termina (shutdownInput.reasonéTERMINATE) ou o trabalhador não está mais respondendo (shutdownInput.reasonéZOMBIE).

shutdown: function(shutdownInput, completeCallback)

O processamento termina quando o processador de registros não recebe mais registros do estilhaço porque ele foi dividido ou intercalado, ou o stream foi excluído.

O KCL também passa por umshutdownInput.checkpointerobjeto parashutdown. Se o motivo do desligamento for TERMINATE, você deverá verificar se o processador de registros terminou o processamento de todos os registros de dados e, em seguida, chamar a função checkpoint nessa interface.

Modificar as propriedades de configuração

O exemplo fornece valores padrão para as propriedades de configuração. Você pode substituir qualquer uma dessas propriedades por seus próprios valores (consulte sample.properties no exemplo básico).

Nome do aplicativo

A KCL exige um aplicativo exclusivo entre seus aplicativos e entre suas tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:

  • Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo stream. Esses operadores podem ser distribuídos em várias instâncias. Se você executa uma instância adicional do mesmo código de aplicativo, mas com um nome de aplicativo diferente, a KCL trata a segunda instância como um aplicativo totalmente separado que também opera no mesmo stream.

  • A KCL cria uma tabela do DynamoDB com o nome do aplicativo e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento operador-estilhaço) referentes ao aplicativo. Cada aplicativo tem sua própria tabela do DynamoDB. Para obter mais informações, consulte Usando uma tabela de leasing para rastrear os fragmentos processados pelo aplicativo do consumidor KCL.

Configurar credenciais

Você deve fazer o seuAWSAs credenciais disponíveis para um dos provedores de credenciais na cadeia de provedores de credencial padrão. Você pode usar a propriedade AWSCredentialsProvider para definir um provedor de credenciais. O arquivo sample.properties precisa disponibilizar as credenciais para um dos provedores de credenciais na cadeia de provedores de credenciais padrão. Se você estiver executando o consumidor em uma instância do Amazon EC2, recomendamos que configure a instância com uma função do IAM.AWSAs credenciais da AWS que refletem as permissões associadas à função do IAM são disponibilizadas aos aplicativos na instância por meio de metadados da instância. Essa é a maneira mais segura de gerenciar credenciais para um aplicativo de consumidor em execução em uma instância do EC2.

O exemplo a seguir configura a KCL para processar um stream de dados do Kinesis chamadokclnodejssampleUsando o processador de registros fornecido emsample_kcl_app.js:

# The Node.js executable script executableName = node sample_kcl_app.js # The name of an Amazon Kinesis stream to process streamName = kclnodejssample # Unique KCL application name applicationName = kclnodejssample # Use default AWS credentials provider chain AWSCredentialsProvider = DefaultAWSCredentialsProviderChain # Read from the beginning of the stream initialPositionInStream = TRIM_HORIZON