As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Desenvolver um consumidor da Kinesis Client Library em .NET
Você pode usar a Kinesis Client Library (KCL) para criar aplicativos que processam dados dos streamings de dados do Kinesis. A Kinesis Client Library está disponível em vários idiomas. Este tópico discute .NET.
O KCL é uma biblioteca Java; o suporte para idiomas diferentes do Java é fornecido usando uma interface multilíngue chamadaMultiLangDaemon do. Este daemon é baseado em Java e é executado em segundo plano quando você estiver usando uma linguagem KCL diferente de Java. Portanto, se você instalar o KCL para .NET e escrever o aplicativo de consumidor inteiramente em .NET, ainda precisará do Java instalado no sistema por causa daMultiLangDaemon do. Além disso,MultiLangO Daemon tem algumas configurações padrão que você pode personalizar para o caso de uso. Por exemplo, aAWSRegião à qual ele se conecta. Para obter mais informações sobre oMultiLangDaemon doGitHub, consulte oKCLMultiLangProjeto daemon do
Para baixar o .NET KCL doGitHub, consulteKinesis Client Library (.NET)
Você precisa concluir as tarefas a seguir ao implementar um aplicativo de consumidor da KCL em .NET:
Tarefas
Implemente o IRecordProcessorClasse Methods (Métodos)
O consumidor precisa implementar os seguintes métodos para IRecordProcessor
. O consumidor de exemplo fornece implementações que você pode usar como ponto de partida (consulte a classe SampleRecordProcessor
em SampleConsumer/AmazonKinesisSampleConsumer.cs
).
public void Initialize(InitializationInput input)
public void ProcessRecords(ProcessRecordsInput input)
public void Shutdown(ShutdownInput input)
Inicializar
A KCL chama esse método quando o processador de registros é instanciado, passando um ID de estilhaço específico nainput
parâmetro (input.ShardId
). Esse processador de registros processa apenas esse estilhaço e, normalmente, o inverso também é verdadeiro (esse estilhaço é processado somente por esse processador de registro). No entanto, o consumidor deve considerar a possibilidade de que um registro de dados pode ser processado mais de uma vez. Isso ocorre porque o Kinesis Data Streams tempelo menos uma vezO é semântica, o que significa que cada registro de dados de um estilhaço é processado pelo menos uma vez por um operador no consumidor. Para obter mais informações sobre casos em que um estilhaço específico pode ser processado por mais de um operador, consulte Reestilhaçamento, escalabilidade e processamento paralelo.
public void Initialize(InitializationInput input)
ProcessRecords
A KCL chama esse método passando uma lista de registros de dados nainput
parâmetro (input.Records
) do fragmento especificado peloInitialize
. O processador de registros que você implementa processa os dados nesses registros de acordo com a semântica do consumidor. Por exemplo, o operador pode executar uma transformação nos dados e, em seguida, armazenar o resultado em um bucket do Amazon Simple Storage Service (Amazon S3).
public void ProcessRecords(ProcessRecordsInput input)
Além dos dados em si, o registro também contém um número de sequência e uma chave de partição. O operador pode usar esses valores ao processar os dados. Por exemplo, o operador pode escolher o bucket do S3 no qual armazenar os dados com base no valor da chave de partição. A classe Record
expõe os seguintes itens para acessar os dados do registro, o número de sequência e a chave de partição:
byte[] Record.Data
string Record.SequenceNumber
string Record.PartitionKey
No exemplo, o método ProcessRecordsWithRetries
tem código que mostra como um operador pode acessar os dados do registro, o número de sequência e a chave de partição.
O Kinesis Data Streams requer o processador de registros para rastrear os registros que já foram processados em um estilhaço. A KCL cuida desse rastreamento para você passando umCheckpointer
objeto paraProcessRecords
(input.Checkpointer
). O processador de registro chama oCheckpointer.Checkpoint
Para informar à KCL sobre o progresso do processamento dos registros no estilhaço. Caso o operador falhe, a KCL usa essas informações para reiniciar o processamento do estilhaço no último registro processado conhecido.
Em uma operação de divisão ou mesclagem, a KCL não começa a processar os novos estilhaços até que os processadores dos estilhaços originais tenham chamadoCheckpointer.Checkpoint
para sinalizar que todo o processamento nos fragmentos originais está completo.
Se você não passar um parâmetro, o KCL pressupõe que a chamada paraCheckpointer.Checkpoint
Significa que todos os registros foram processados, até o último registro que foi passado ao processador de registros. Portanto, o processador de registros deve chamar Checkpointer.Checkpoint
somente após ter processado todos os registros na lista que foi passada a ele. Os processadores de registros não precisam chamar Checkpointer.Checkpoint
em cada chamada para ProcessRecords
. Um processador pode, por exemplo, chamar Checkpointer.Checkpoint
a cada terceira ou quarta chamada. Você pode, opcionalmente, especificar o número de sequência exato de um registro como um parâmetro para Checkpointer.Checkpoint
. Nesse caso, a KCL assume que os registros foram processados somente até aquele registro.
No exemplo, o método privado Checkpoint(Checkpointer checkpointer)
mostra como chamar o método Checkpointer.Checkpoint
usando a lógica de novas tentativas e o tratamento de exceções apropriados.
A KCL para .NET lida com exceções de maneira diferente das outras bibliotecas de idioma da KCL, pois não lida com nenhuma exceção que surge do processamento de registros de dados. As exceções não detectadas do código do usuário causam uma falha no programa.
Desligamento
O KCL chama oShutdown
método quando o processamento termina (o motivo do desligamento éTERMINATE
) ou quando o operador não está mais respondendo (o desligamentoinput.Reason
O valor éZOMBIE
).
public void Shutdown(ShutdownInput input)
O processamento termina quando o processador de registros não recebe mais registros do estilhaço porque ele foi dividido ou intercalado, ou o stream foi excluído.
O KCL também passa por umCheckpointer
objeto parashutdown
. Se o motivo do desligamento é TERMINATE
, o processador de registros deve terminar o processamento de todos os registros de dados e, em seguida, chamar o método checkpoint
nesta interface.
Modificar as propriedades de configuração
O consumidor de exemplo fornece valores padrão para as propriedades de configuração. Você pode substituir qualquer uma dessas propriedades por seus próprios valores (consulte SampleConsumer/kcl.properties
).
Nome do aplicativo
A KCL exige um aplicativo exclusivo entre seus aplicativos e entre suas tabelas do Amazon DynamoDB na mesma região. Ela usa o valor de configuração de nome de aplicativo das seguintes formas:
-
Presume-se que todos os operadores associados com esse nome de aplicativo estejam trabalhando juntos no mesmo stream. Esses operadores podem ser distribuídos em várias instâncias. Se você executa uma instância adicional do mesmo código de aplicativo, mas com um nome de aplicativo diferente, a KCL trata a segunda instância como um aplicativo totalmente separado que também opera no mesmo stream.
-
A KCL cria uma tabela do DynamoDB com o nome do aplicativo e usa essa tabela para manter informações de estado (como pontos de verificação e mapeamento operador-estilhaço) referentes ao aplicativo. Cada aplicativo tem sua própria tabela do DynamoDB. Para obter mais informações, consulte Usando uma tabela de leasing para rastrear os fragmentos processados pelo aplicativo do consumidor KCL.
Configurar credenciais
Você deve fazer o seuAWSAs credenciais disponíveis para um dos provedores de credenciais na cadeia de provedores de credencial padrão. Você pode usar a propriedade AWSCredentialsProvider
para definir um provedor de credenciais. As sample.properties
O arquivo de propriedades do exemplo configura a KCL para processar um stream de dados do Kinesis chamado “words” usando o processador de registros fornecido emAmazonKinesisSampleConsumer.cs
.