Usar a biblioteca de cliente Kinesis - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usar a biblioteca de cliente Kinesis

Um dos métodos de desenvolvimento de aplicativos de consumidor personalizados que podem processar dados de fluxos de dados do KDS é usar a Biblioteca de Cliente Kinesis (KCL).

nota

Tanto para o KCL 1.x quanto para o KCL 2.x, é recomendável que você atualize para a versão mais recente do KCL 1.x ou KCL 2.x, dependendo do seu cenário de uso. Tanto o KCL 1.x quanto o KCL 2.x são atualizados regularmente com versões mais recentes que incluem os patches de dependência e segurança mais recentes, correções de bugs e novos recursos compatíveis com versões anteriores. Para obter mais informações, consultehttps://github.com/awslabs/amazon-kinesis-client/releases.

O que é a biblioteca de cliente Kinesis?

A KCL ajuda você a consumir e processar dados de um stream de dados do Kinesis cuidando de muitas das tarefas complexas associadas à computação distribuída. Isso inclui balanceamento de carga entre várias instâncias de aplicativos de consumidor, resposta a falhas de instância de aplicativo de consumidor, ponto de verificação de registros processados e reação ao reestilhaçamento. A KCL cuida de todas essas subtarefas para que você possa concentrar seus esforços em escrever sua lógica personalizada de processamento de registros.

O KCL é diferente das APIs do Kinesis Data Streams que estão disponíveis naAWSSDKs. As APIs do Kinesis Data Streams ajudam você a gerenciar muitos aspectos do Kinesis Data Streams, inclusive a criação de streams, reestilhaçamento e colocação e obtenção de registros. O KCL fornece uma camada de abstração em torno de todas essas subtarefas, especificamente para que você possa se concentrar na lógica de processamento de dados personalizada do aplicativo do consumidor. Para obter informações sobre a API do Kinesis Data Streams, consulte aReferência da API do Amazon Kinesis.

Importante

O KCL é uma biblioteca Java. O Support para idiomas diferentes do Java é fornecido usando uma interface multilíngue chamadaMultiLangDaemon. Este daemon é baseado em Java e é executado em segundo plano quando você estiver usando uma linguagem KCL diferente de Java. Por exemplo, se você instalar o KCL para Python e escrever o aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado no sistema por causa doMultiLangDaemon. Além disso,MultiLangDaemon tem algumas configurações padrão que você pode personalizar para o caso de uso. Por exemplo, oAWSregião à qual ele se conecta. Para obter mais informações sobre oMultiLangDaemon doGitHub, consulteKCLMultiLangProjeto daemon.

A KCL atua como um intermediário entre a lógica de processamento de registro e o Kinesis Data Streams. A KCL executa as tarefas a seguir:

  • Conecta-se ao stream de dados

  • Enumera os estilhaços no stream de dados

  • Usa arrendamentos para coordenar associações de fragmentos com seus trabalhadores

  • Cria uma instância de um processador de registro para cada estilhaço que gerencia

  • Extrai registros de dados do stream de dados

  • Envia os registros ao processador de registros correspondente

  • Registros processados pelos pontos de verificação

  • Equilibra associações de shard-worker (locações) quando a contagem de instâncias do trabalhador muda ou quando o fluxo de dados é reconfigurado (os fragmentos são divididos ou mesclados)

Versões disponíveis do KCL

Atualmente, você pode usar uma das versões com suporte a seguir do KCL para criar seus aplicativos de consumidor personalizados:

Você pode usar o KCL 1.x ou o KCL 2.x para criar aplicativos de consumo que usam taxa de transferência compartilhada. Para obter mais informações, consulte Desenvolver consumidores personalizados com taxa de transferência compartilhada usando a KCL.

Para criar aplicativos de consumo que usam taxa de transferência dedicada (consumidores de fan-out aprimorados), você só pode usar o KCL 2.x. Para obter mais informações, consulte Desenvolver consumidores personalizados com taxa de transferência dedicada (distribuição avançada).

Para obter informações sobre as diferenças entre KCL 1.x e KCL 2.x e instruções sobre como migrar do KCL 1.x para o KCL 2.x, consulteMigrar consumidores do KCL 1.x para o KCL 2.x.

Conceitos da KCL

  • Aplicativo para consumidores da KCL— um aplicativo criado sob medida usando o KCL e projetado para ler e processar registros de streams de dados.

  • Instância de aplicativo do consumidor- Os aplicativos do consumidor KCL geralmente são distribuídos, com uma ou mais instâncias de aplicativos em execução simultaneamente para coordenar falhas e balancear dinamicamente o processamento de registros de dados de dados.

  • Operador— uma classe de alto nível que uma instância de aplicativo consumidor KCL usa para iniciar o processamento de dados.

    Importante

    Cada instância de aplicativo do consumidor KCL tem um trabalhador.

    O trabalhador inicializa e supervisiona várias tarefas, incluindo sincronização de informações de fragmentos e leasing, rastreamento de atribuições de fragmentos e processamento de dados dos fragmentos. Um trabalhador fornece à KCL as informações de configuração para o aplicativo consumidor, como o nome do fluxo de dados cujos dados registram esse aplicativo consumidor da KCL processará e oAWScredenciais necessárias para acessar esse fluxo de dados. O trabalhador também inicia essa instância específica do aplicativo do consumidor KCL para entregar registros de dados do fluxo de dados para os processadores de registro.

    Importante

    No KCL 1.x, essa classe é chamadaOperador. Para obter mais informações, (estes são os repositórios Java KCL), consultehttps://github.com/awslabs/amazon-kinesis-client/Blob/v1.x/SRC/Main/Java/com/Amazonaws/Services/Kinesis/ClientLibrary/Lib/Worker/Worker.java. No KCL 2.x, essa classe é chamadaAgendadora. O propósito do agendador no KCL 2.x é idêntico ao propósito do trabalhador no KCL 1.x. Para obter mais informações sobre a classe do Scheduler no KCL 2.x, consultehttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-clientArquivo /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java.

  • Arrendamento— dados que definem a vinculação entre um trabalhador e um fragmento. Aplicativos de consumo KCL distribuídos usam locações para particionar o processamento de registros de dados em uma frota de trabalhadores. A qualquer momento, cada fragmento de registros de dados é vinculado a um determinado trabalhador por uma locação identificada peloleaseKeyVariável.

    Por padrão, um trabalhador pode manter um ou mais arrendamentos (sujeito ao valor domaxLeasesForOperadorVariável) ao mesmo tempo.

    Importante

    Todo trabalhador lutará por manter todos os arrendamentos disponíveis para todos os fragmentos disponíveis em um fluxo de dados. Mas apenas um trabalhador manterá com êxito cada locação a qualquer momento.

    Por exemplo, se você tiver uma instância de aplicativo do consumidor A com o trabalhador A que esteja processando um fluxo de dados com 4 fragmentos, o trabalhador A poderá reter locações para fragmentos 1, 2, 3 e 4 ao mesmo tempo. Mas se você tiver duas instâncias de aplicativos do consumidor: A e B com o trabalhador A e o trabalhador B, e essas instâncias estão processando um fluxo de dados com 4 fragmentos, o trabalhador A e o trabalhador B não podem manter o leasing para o fragmento 1 ao mesmo tempo. Um trabalhador mantém a locação em um fragmento específico até que ele esteja pronto para interromper o processamento dos registros de dados desse fragmento ou até que ele falhe. Quando um trabalhador deixa de manter a locação, outro trabalhador ocupa e mantém a locação.

    Para obter mais informações, (estes são os repositórios Java KCL), consultehttps://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/Amazonaws/serviços/kinesis/locações/impl/lease.javapara KCL 1.x ehttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-clientArquivo /src/main/java/software/amazon/kinesis/leases/Lease.javapara KCL 2.x.

  • Tabela de arrendamento- uma tabela exclusiva do Amazon DynamoDB que é usada para acompanhar os fragmentos em um fluxo de dados do KDS que está sendo alugado e processado pelos trabalhadores do aplicativo consumidor KCL. A tabela de leasing deve permanecer sincronizada (dentro de um trabalhador e em todos os trabalhadores) com as informações mais recentes do fragmento do fluxo de dados enquanto o aplicativo consumidor KCL está em execução. Para obter mais informações, consulte Usando uma tabela de leasing para rastrear os fragmentos processados pelo aplicativo do consumidor KCL.

  • Processador de registros— a lógica que define como seu aplicativo consumidor KCL processa os dados que ele obtém dos fluxos de dados. Em tempo de execução, uma instância de aplicativo do consumidor KCL instancia um trabalhador e esse trabalhador instancia um processador de registro para cada fragmento para o qual ele detém uma locação.

Usando uma tabela de leasing para rastrear os fragmentos processados pelo aplicativo do consumidor KCL

O que é uma tabela de locação

Para cada aplicativo do Amazon Kinesis Data Streams, a KCL usa uma tabela de leasing exclusiva (armazenada em uma tabela do Amazon DynamoDB) para acompanhar os fragmentos em um fluxo de dados do KDS que estão sendo alugados e processados pelos trabalhadores do aplicativo consumidor KCL.

Importante

A KCL usa o nome do aplicativo de consumidor para criar o nome da tabela de arrendamento que esse aplicativo de consumidor usa, portanto, cada nome de aplicativo de consumidor precisa ser exclusivo.

Você pode visualizar a tabela de leasing usando oConsole do Amazon DynamoDBenquanto o aplicativo consumidor estiver em execução.

Se a tabela de arrendamento do seu aplicativo de consumidor do KCL não existir quando o aplicativo for iniciado, um dos operadores criará a tabela de arrendamento para esse aplicativo.

Importante

Sua conta é cobrada pelos custos associados à tabela do DynamoDB, além dos custos associados ao próprio Kinesis Data Streams.

Cada linha na tabela de arrendamento representa um estilhaço que está sendo processado pelos operadores do seu aplicativo de consumidor. Se seu aplicativo consumidor KCL processar apenas um fluxo de dados, entãoleaseKeyque é a chave de hash da tabela de arrendamento é o ID do estilhaço. Se você forProcessando vários fluxos de dados com o mesmo aplicativo KCL 2.x para consumidor Java, então a estrutura da leaseKey se parece com isso: account-id:StreamName:streamCreationTimestamp:ShardId. Por exemplo,111111111:multiStreamTest-1:12345:shardId-000000000336.

Além do ID do estilhaço, cada linha também inclui os seguintes dados:

  • Ponto de verificação: Número de sequência do ponto de verificação mais recente do estilhaço. Esse valor é exclusivo entre todos os estilhaços do stream de dados.

  • checkpointSubSequence: Ao usar o recurso de agregação da Kinesis Producer Library, esta é uma extensão paraponto de verificaçãoque rastreia registros de usuários individuais dentro do registro Kinesis.

  • leaseCounter: Usado para versionamento de arrendamento para que os operadores possam detectar que o arrendamento foi feito por outro operador.

  • leaseKey: Um identificador exclusivo para uma locação. Cada arrendamento é específico a um estilhaço no stream de dados e é mantido por um operador por vez.

  • leaseOwner: O operador que está retendo esse arrendamento.

  • ownerSwitchesSincePonto de verificação: Quantas vezes esse arrendamento trocou de operador desde a última vez em que um ponto de verificação foi gravado.

  • parentShardId: Usado para garantir que o estilhaço pai seja totalmente processado antes do início do processamento nos estilhaços filhos. Isso garante que os registros sejam processados na mesma ordem em que foram colocados no stream.

  • hashrange: Usado peloPeriodicShardSyncManagerpara executar sincronizações periódicas para encontrar fragmentos ausentes na tabela de leasing e criar arrendamentos para eles, se necessário.

    nota

    Esses dados estão presentes na tabela de locação para cada fragmento começando com KCL 1.14 e KCL 2.3. Para obter mais informações sobrePeriodicShardSyncManagere sincronização periódica entre locações e fragmentos, consulteComo uma tabela de leasing é sincronizada com os fragmentos em um fluxo de dados do KDS.

  • estilhaços infantis: Usado peloLeaseCleanupManagerpara revisar o status de processamento do fragmento filho e decidir se o fragmento pai pode ser excluído da tabela de leasing.

    nota

    Esses dados estão presentes na tabela de locação para cada fragmento começando com KCL 1.14 e KCL 2.3.

  • ShardId: O ID do fragmento.

    nota

    Esses dados só estarão presentes na tabela de leasing se você estiverProcessando vários fluxos de dados com o mesmo aplicativo KCL 2.x para consumidor Java. Isso só é suportado no KCL 2.x para Java, começando com o KCL 2.3 para Java e além.

  • nome do streamO identificador do fluxo de dados no seguinte formato:account-id:StreamName:streamCreationTimestamp.

    nota

    Esses dados só estarão presentes na tabela de leasing se você estiverProcessando vários fluxos de dados com o mesmo aplicativo KCL 2.x para consumidor Java. Isso só é suportado no KCL 2.x para Java, começando com o KCL 2.3 para Java e além.

Taxa de transferência

Se o aplicativo do Amazon Kinesis Data Streams recebe exceções de taxa de transferência provisionada, você deve aumentar a taxa de transferência provisionada para a tabela do DynamoDB. A KCL cria a tabela com uma taxa de transferência provisionada de 10 leituras por segundo e 10 gravações por segundo, mas isso pode não ser suficiente para o seu aplicativo. Por exemplo, se o aplicativo Amazon Kinesis Data Streams fizer pontos de verificação com frequência ou operar em um stream que é composto por vários estilhaços, você pode precisar de uma taxa de transferência maior.

Para obter informações sobre a taxa de transferência provisionada no DynamoDB, consulteModo de capacidade de leitura/gravaçãoeTrabalho com tabelas e dadosnoGuia do desenvolvedor do Amazon DynamoDB.

Como uma tabela de leasing é sincronizada com os fragmentos em um fluxo de dados do KDS

Trabalhadores em aplicativos de consumo da KCL usam locações para processar fragmentos de um determinado fluxo de dados. As informações sobre qual trabalhador está alugando qual fragmento a qualquer momento é armazenado em uma tabela de locação. A tabela de leasing deve permanecer em sincronia com as informações mais recentes do fragmento do fluxo de dados enquanto o aplicativo consumidor KCL estiver em execução. O KCL sincroniza a tabela de leasing com as informações de fragmentos adquiridas do serviço Kinesis Data Streams durante a inicialização do aplicativo do consumidor (quando o aplicativo consumidor é inicializado ou reiniciado) e também sempre que um fragmento que está sendo processado chega ao fim (refragmentação). Em outras palavras, os trabalhadores ou um aplicativo consumidor KCL são sincronizados com o fluxo de dados que estão processando durante o bootstrap inicial do aplicativo do consumidor e sempre que o aplicativo consumidor encontrar um evento de reshard stream de dados.

Sincronização em KCL 1.0 - 1,13 e KCL 2.0 - 2.2

No KCL 1.0 - 1.13 e KCL 2.0 - 2.2, durante a inicialização do aplicativo do consumidor e também durante cada evento de reshard stream de dados, a KCL sincroniza a tabela de leasing com as informações de fragmentos adquiridas do serviço Kinesis Data Streams invocando oListShardsou oDescribeStreamAPIs de descoberta. Em todas as versões do KCL listadas acima, cada trabalhador de um aplicativo consumidor KCL conclui as etapas a seguir para executar o processo de sincronização de locação/fragmento durante o bootstrapping do aplicativo consumidor e em cada evento de reshard stream:

  • Obtém todos os fragmentos para dados do fluxo que está sendo processado

  • Obtém todos os arrendamentos de fragmentos da tabela de locação

  • Filtra cada fragmento aberto que não tem locação na tabela de locação

  • Iera sobre todos os fragmentos abertos encontrados e para cada fragmento aberto sem pai aberto:

    • Atravessa a árvore de hierarquia pelo caminho de seus antepassados para determinar se o fragmento é descendente. Um fragmento é considerado descendente, se um estilhaço ancestral estiver sendo processado (entrada de leasing para estilhaço ancestral existe na tabela de leasing) ou se um estilhaço ancestral deve ser processado (por exemplo, se a posição inicial forTRIM_HORIZONouAT_TIMESTAMP)

    • Se o fragmento aberto no contexto for descendente, a KCL verifica o fragmento com base na posição inicial e cria arrendamentos para seus pais, se necessário

Sincronização no KCL 2.x, começando com KCL 2.3 e Beyond

Começando com as versões mais recentes suportadas do KCL 2.x (KCL 2.3) e além, a biblioteca agora suporta as seguintes alterações no processo de sincronização. Essas alterações de sincronização de lease/shard reduzem significativamente o número de chamadas de API feitas pelos aplicativos do consumidor KCL para o serviço Kinesis Data Streams e otimizam o gerenciamento de leasing em seu aplicativo de consumidor KCL.

  • Durante a inicialização do aplicativo, se a tabela de leasing estiver vazia, a KCL utilizará oListShardOpção de filtragem da API (oShardFilterparâmetro de solicitação opcional) para recuperar e criar arrendamentos somente para um instantâneo de fragmentos abertos no momento especificado peloShardFilterparâmetro . OShardFilterparâmetro permite filtrar a resposta doListShardsAPI. A única propriedade necessária doShardFilterO parâmetro éType. A KCL usa oTypepropriedade filter e os seguintes valores válidos para identificar e retornar um instantâneo de fragmentos abertos que podem exigir novos arrendamentos:

    • AT_TRIM_HORIZON- a resposta inclui todos os fragmentos que estavam abertos emTRIM_HORIZON.

    • AT_LATEST- a resposta inclui apenas os fragmentos atualmente abertos do fluxo de dados.

    • AT_TIMESTAMP- a resposta inclui todos os fragmentos cujo carimbo de data/hora inicial é menor ou igual ao carimbo de data/hora fornecido e o carimbo de data/hora final é maior ou igual ao carimbo de data/hora fornecido ou ainda aberto.

    ShardFilteré usado ao criar arrendamentos para uma tabela de leasing vazia para inicializar locações para um instantâneo de fragmentos especificados emRetrievalConfig#initialPositionInStreamExtended.

    Para obter mais informações sobre o ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Em vez de todos os trabalhadores realizarem a sincronização de locação/fragmento para manter a tabela de locação atualizada com os fragmentos mais recentes no fluxo de dados, um único líder de trabalhador eleito executa a sincronização de locação/fragmento.

  • O KCL 2.3 usa oChildShardsparâmetro de retorno doGetRecordsO e aSubscribeToShardAPIs para executar a sincronização de locação/fragmento que acontece emSHARD_ENDpara fragmentos fechados, permitindo que um trabalhador da KCL crie apenas arrendamentos para os fragmentos filhos do fragmento que concluiu o processamento. Para aplicativos compartilhados em todos os consumidores, essa otimização da sincronização de locação/fragmento usa oChildShardsdoGetRecordsAPI. Para os aplicativos de consumo dedicados de throughput (fan-out aprimorado), essa otimização da sincronização de locação/fragmento usa oChildShardsdoSubscribeToShardAPI. Para obter mais informações, consulteGetRecords,SubscribeToEstilhaços, eChildShard.

  • Com as mudanças acima, o comportamento da KCL está se movendo do modelo de todos os trabalhadores aprendendo sobre todos os fragmentos existentes para o modelo de trabalhadores aprendendo apenas sobre os fragmentos infantis dos fragmentos que cada trabalhador possui. Portanto, além da sincronização que acontece durante os eventos de inicialização e de reenvio de aplicativos do consumidor, a KCL agora também executa varreduras periódicas adicionais de fragmento/locação para identificar quaisquer possíveis falhas na tabela de leasing (em outras palavras, para aprender sobre todos os novos fragmentos) para garantir a completa o intervalo de hash do fluxo de dados está sendo processado e cria arrendamentos para eles, se necessário.PeriodicShardSyncManageré o componente responsável pela execução de varreduras periódicas de locação/fragmento.

    Para obter mais informações sobrePeriodicShardSyncManagerno KCL 2.3, consultehttps://github.com/awslabs/amazon-kinesis-client/blob/master/amazon-kinesis-client/src/main/java/software/amazon/kinesis/locações/LeaseManagementConfig.java #L201 -L213.

    No KCL 2.3, novas opções de configuração estão disponíveis para configuraçãoPeriodicShardSyncManageremLeaseManagementConfig:

    Name (Nome) Valor padrão Descrição
    leasesRecoveryAuditorExecutionFrequencyMillis

    120000 (2 minutos)

    Frequência (em millis) do trabalho do auditor para verificar locações parciais na tabela de locação. Se o auditor detectar qualquer furo nos arrendamentos de um fluxo, ele acionaria a sincronização de fragmentos com base emleasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidenceLimite

    3

    Limite de confiança para o trabalho periódico do auditor para determinar se os arrendamentos para um fluxo de dados na tabela de leasing são inconsistentes. Se o auditor encontrar o mesmo conjunto de inconsistências consecutivamente para um fluxo de dados muitas vezes, ele acionaria uma sincronização de fragmentos.

    NovoCloudWatchmétricas também são emitidas agora para monitorar a saúde doPeriodicShardSyncManager. Para obter mais informações, consulte PeriodicShardSyncManager.

  • Incluindo uma otimização paraHierarchicalShardSyncerpara criar apenas arrendamentos para uma camada de fragmentos.

Sincronização no KCL 1.x, começando com o KCL 1.14 e além

Começando com as versões mais recentes suportadas do KCL 1.x (KCL 1.14) e além, a biblioteca agora suporta as seguintes alterações no processo de sincronização. Essas alterações de sincronização de lease/shard reduzem significativamente o número de chamadas de API feitas pelos aplicativos do consumidor KCL para o serviço Kinesis Data Streams e otimizam o gerenciamento de leasing em seu aplicativo de consumidor KCL.

  • Durante a inicialização do aplicativo, se a tabela de leasing estiver vazia, a KCL utilizará oListShardOpção de filtragem da API (oShardFilterparâmetro de solicitação opcional) para recuperar e criar arrendamentos somente para um instantâneo de fragmentos abertos no momento especificado peloShardFilterparâmetro . OShardFilterparâmetro permite filtrar a resposta doListShardsAPI. A única propriedade necessária doShardFilterO parâmetro éType. A KCL usa oTypepropriedade filter e os seguintes valores válidos para identificar e retornar um instantâneo de fragmentos abertos que podem exigir novos arrendamentos:

    • AT_TRIM_HORIZON- a resposta inclui todos os fragmentos que estavam abertos emTRIM_HORIZON.

    • AT_LATEST- a resposta inclui apenas os fragmentos atualmente abertos do fluxo de dados.

    • AT_TIMESTAMP- a resposta inclui todos os fragmentos cujo carimbo de data/hora inicial é menor ou igual ao carimbo de data/hora fornecido e o carimbo de data/hora final é maior ou igual ao carimbo de data/hora fornecido ou ainda aberto.

    ShardFilteré usado ao criar arrendamentos para uma tabela de leasing vazia para inicializar locações para um instantâneo de fragmentos especificados emKinesisClientLibConfiguration#initialPositionInStreamExtended.

    Para obter mais informações sobre o ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Em vez de todos os trabalhadores realizarem a sincronização de locação/fragmento para manter a tabela de locação atualizada com os fragmentos mais recentes no fluxo de dados, um único líder de trabalhador eleito executa a sincronização de locação/fragmento.

  • O KCL 1.14 usa oChildShardsparâmetro de retorno doGetRecordsO e aSubscribeToShardAPIs para executar a sincronização de locação/fragmento que acontece emSHARD_ENDpara fragmentos fechados, permitindo que um trabalhador da KCL crie apenas arrendamentos para os fragmentos filhos do fragmento que concluiu o processamento. Para obter mais informações, consulteGetRecordseChildShard.

  • Com as mudanças acima, o comportamento da KCL está se movendo do modelo de todos os trabalhadores aprendendo sobre todos os fragmentos existentes para o modelo de trabalhadores aprendendo apenas sobre os fragmentos infantis dos fragmentos que cada trabalhador possui. Portanto, além da sincronização que acontece durante os eventos de inicialização e de reenvio de aplicativos do consumidor, a KCL agora também executa varreduras periódicas adicionais de fragmento/locação para identificar quaisquer possíveis falhas na tabela de leasing (em outras palavras, para aprender sobre todos os novos fragmentos) para garantir a completa o intervalo de hash do fluxo de dados está sendo processado e cria arrendamentos para eles, se necessário.PeriodicShardSyncManageré o componente responsável pela execução de varreduras periódicas de locação/fragmento.

    QuandoKinesisClientLibConfiguration#shardSyncStrategyTypeé definido comoShardSyncStrategyType.SHARD_END,PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThresholdé usado para determinar o limite para o número de varreduras consecutivas contendo furos na tabela de leasing após o qual impor uma sincronização de fragmentos. QuandoKinesisClientLibConfiguration#shardSyncStrategyTypeé definido comoShardSyncStrategyType.PERIODIC,leasesRecoveryAuditorInconsistencyConfidenceThresholdé ignorada.

    Para obter mais informações sobrePeriodicShardSyncManagerno KCL 1.14, consultehttps://github.com/awslabs/amazon-kinesis-client/blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/KinesisClientLibConfiguration.java #L987 -L999.

    No KCL 1.14, nova opção de configuração está disponível para configuraçãoPeriodicShardSyncManageremLeaseManagementConfig:

    Name (Nome) Valor padrão Descrição
    leasesRecoveryAuditorInconsistencyConfidenceLimite

    3

    Limite de confiança para o trabalho periódico do auditor para determinar se os arrendamentos para um fluxo de dados na tabela de leasing são inconsistentes. Se o auditor encontrar o mesmo conjunto de inconsistências consecutivamente para um fluxo de dados muitas vezes, ele acionaria uma sincronização de fragmentos.

    NovoCloudWatchmétricas também são emitidas agora para monitorar a saúde doPeriodicShardSyncManager. Para obter mais informações, consulte PeriodicShardSyncManager.

  • O KCL 1.14 agora também oferece suporte à limpeza de leasing adiada. Os arrendamentos são excluídos de forma assíncrona porLeaseCleanupManagerao atingirSHARD_END, quando um fragmento expirou após o período de retenção do fluxo de dados ou foi fechado como resultado de uma operação de recompartilhamento.

    Novas opções de configuração estão disponíveis para configuraçãoLeaseCleanupManager.

    Name (Nome) Valor padrão Descrição
    leaseCleanupIntervalMillis

    1 minuto

    Intervalo no qual executar o segmento de limpeza de leasing.

    completedLeaseCleanupIntervalMillis 5 minutos

    Intervalo no qual verificar se um contrato de locação foi concluído ou não.

    garbageLeaseCleanupIntervalMillis 30 minutos

    Intervalo no qual verificar se um leasing é lixo (ou seja, cortado após o período de retenção do fluxo de dados) ou não.

  • Incluindo uma otimização paraKinesisShardSyncerpara criar apenas arrendamentos para uma camada de fragmentos.

Processando vários fluxos de dados com o mesmo aplicativo KCL 2.x para consumidor Java

Esta seção descreve as seguintes alterações no KCL 2.x para Java que permitem criar aplicativos de consumidor KCL que podem processar mais de um fluxo de dados ao mesmo tempo.

Importante

O processamento multistream só é suportado no KCL 2.x para Java, começando com o KCL 2.3 para Java e além.

O processamento multistream NÃO é suportado para nenhum outro idioma em que o KCL 2.x possa ser implementado.

O processamento multistream NÃO é suportado em nenhuma versão do KCL 1.x.

  • Interface do MultistreamTracker

    Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamadaMultistreamTracker. Essa interface inclui ostreamConfigListmétodo que retorna a lista de fluxos de dados e suas configurações a serem processadas pelo aplicativo consumidor KCL. Observe que os fluxos de dados que estão sendo processados podem ser alterados durante o tempo de execução do aplicativo do consumidor.streamConfigListé chamado periodicamente pelo KCL para saber mais sobre as mudanças nos fluxos de dados a serem processados.

    OstreamConfigListPreenche oStreamConfig.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Observe que oStreamIdentifiereInitialPositionInStreamExtendedsão campos obrigatórios, enquantoconsumerArné opcional. É necessário fornecer oconsumerArnsomente se você estiver usando o KCL 2.x para implementar um aplicativo de consumidor de fan-out aprimorado.

    Para obter mais informações sobreStreamIdentifier, consultehttps://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/common/StreamIdentifier.java #L29. Você pode criar uma instância de multistream para oStreamIdentifierdo identificador de fluxo serializado. O identificador de fluxo serializado deve ter o seguinte formato:account-id:StreamName:streamCreationTimestamp.

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTrackertambém inclui uma estratégia para excluir arrendamentos de fluxos antigos na tabela de leasing (formerStreamsLeasesDeletionStrategy). Observe que a estratégia NÃO PODE ser alterada durante o tempo de execução do aplicativo do consumidor. Para obter mais informações, consultehttps://github.com/awslabs/amazon-kinesis-client/blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/amazon-kinesis-client/src/main/java/software/amazon/kinesis/processador/FormerStreamsLeasesDeletionArquivo Strategy.java

  • ConfigsBuilderé uma classe em todo o aplicativo que você pode usar para especificar todas as configurações do KCL 2.x a serem usadas ao criar seu aplicativo consumidor KCL.ConfigsBuilderclass agora tem suporte para oMultistreamTrackerInterface. Você pode inicializarConfigsBuildercom o nome do único fluxo de dados para consumir registros de:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    Ou você pode inicializarConfigsBuildercomMultiStreamTrackerse você quiser implementar um aplicativo consumidor KCL que processa vários fluxos ao mesmo tempo.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Com o suporte multistream implementado para seu aplicativo consumidor KCL, cada linha da tabela de leasing do aplicativo agora contém a ID do fragmento e o nome do fluxo dos vários fluxos de dados que esse aplicativo processa.

  • Quando o suporte multistream para seu aplicativo consumidor KCL é implementado, o leaseKey assume a seguinte estrutura: account-id:StreamName:streamCreationTimestamp:ShardId. Por exemplo,111111111:multiStreamTest-1:12345:shardId-000000000336.

    Importante

    Quando seu aplicativo consumidor KCL existente está configurado para processar apenas um fluxo de dados, a leaseKey (que é a chave de hash para a tabela de leasing) é a ID do fragmento. Se você reconfigurar esse aplicativo de consumidor KCL existente para processar vários fluxos de dados, ele quebrará sua tabela de leasing, pois com o suporte multistream, a estrutura do leaseKey deve ser a seguinte:account-id:StreamName:StreamCreationTimestamp:ShardId.

Usando a biblioteca de cliente Kinesis com aAWSRegistro de esquemas de Glue

Você pode integrar seus fluxos de dados do Kinesis com oAWSRegistro de esquema de Glue. OAWSO registro de esquemas de Glue permite detectar, controlar e evoluir centralmente esquemas, ao mesmo tempo em que garante que os dados produzidos sejam continuamente validados por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. OAWSO Glue Schema Registry permite melhorarend-to-endqualidade de dados e governança de dados em seus aplicativos de streaming. Para obter mais informações, consulteAWSRegistro de esquemas de Glue. Uma das maneiras de configurar essa integração é por meio do KCL em Java.

Importante

Atualmente, Kinesis Data Streams eAWSA integração do registro do esquema de Glue só é suportada para os fluxos de dados do Kinesis que usam consumidores do KCL 2.3 implementados em Java. Suporte a várias linguagens não é fornecido. Os consumidores do KCL 1.0 não são suportados. Os consumidores KCL 2.x anteriores ao KCL 2.3 não são suportados.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o Schema Registry usando o KCL, consulte a seção “Interagindo com dados usando as bibliotecas KPL/KCL” noCaso de uso: Integrar o Amazon Kinesis Data Streams com oAWSRegistro de esquemas de Glue.