Usar a Kinesis Client Library - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Usar a Kinesis Client Library

Um dos métodos de desenvolvimento de aplicações de consumo personalizadas que podem processar dados de fluxos de dados do KDS é usar a Kinesis Client Library (KCL).

nota

Recomenda-se que você use a versão mais recente da KCL 1.x ou da KCL 2.x, dependendo do cenário de uso. Ambas as versões da KCL, tanto 1.x como a 2.x, são atualizadas regularmente para incluir os patches de dependência e segurança e as correções de bugs mais recentes, além de novos recursos compatíveis com versões anteriores. Para obter mais informações, consulte https://github.com/awslabs/ amazon-kinesis-client /releases.

O que é a Kinesis Client Library?

A KCL ajuda você a consumir e processar dados de um fluxo de dados do Kinesis lidando com muitas das tarefas complexas associadas à computação distribuída. Isso inclui balanceamento de carga em várias instâncias de aplicações de consumo, resposta a falhas nas instâncias de aplicações de consumo, verificação de registros processados e reação à refragmentação. A KCL cuida de todas essas subtarefas para que você possa concentrar seus esforços em escrever sua lógica personalizada de processamento de registros.

A KCL é diferente das APIs do Kinesis Data Streams que estão disponíveis nos SDKs da AWS. As APIs do Kinesis Data Streams ajudam você a lidar com vários aspectos desse serviço, incluindo criação de fluxos, refragmentação e colocação e obtenção de registros. A KCL fornece uma camada de abstração em torno de todas essas subtarefas, especificamente para que você possa se concentrar na lógica de processamento de dados personalizada da sua aplicação de consumo. Para obter informações sobre a API do Kinesis Data Streams, consulte a Referência de APIs do Amazon Kinesis.

Importante

A KCL é uma biblioteca Java. Support para linguagens diferentes de Java é fornecido usando uma interface multilíngue chamada de MultiLangDaemon. Esse daemon baseado em Java é executado em segundo plano quando você usa uma linguagem de KCL diferente de Java. Por exemplo, se você instalar o KCL para Python e escrever seu aplicativo de consumidor inteiramente em Python, ainda precisará do Java instalado em seu sistema por causa do. MultiLangDaemon Além disso, MultiLangDaemon tem algumas configurações padrão que talvez você precise personalizar para seu caso de uso, por exemplo, a AWS região à qual ele se conecta. Para obter mais informações sobre o MultiLangDaemon on GitHub, consulte o MultiLangDaemon projeto KCL.

A KCL atua como um intermediário entre a lógica de processamento de registros e o Kinesis Data Streams. A KCL executa as seguintes tarefas:

  • Conecta-se ao fluxo de dados

  • Enumera os fragmentos no fluxo de dados

  • Usa concessões para coordenar as associações de fragmentos e seus operadores

  • Cria uma instância de um processador de registro para cada estilhaço que gerencia

  • Extrai registros de dados do fluxo de dados

  • Envia os registros ao processador de registros correspondente

  • Registros processados pelos pontos de verificação

  • Equilibra as associações (concessões) entre fragmentos e operadores quando a contagem de instâncias de operador muda ou quando o fluxo de dados é refragmentado (os fragmentos são divididos ou mesclados)

Versões da KCL disponíveis

Atualmente, você pode usar qualquer uma destas versões compatíveis da KCL para criar aplicações de consumo personalizadas:

Você pode usar a KCL 1.x ou a KCL 2.x para criar aplicações de consumo que usam throughput compartilhada. Para ter mais informações, consulte Desenvolver consumidores personalizados com throughput compartilhada usando a KCL.

Para criar aplicações de consumo que usam throughput dedicada (consumidores de distribuição avançada), você só pode usar a KCL 2.x. Para ter mais informações, consulte Desenvolver consumidores personalizados com throughput dedicada (distribuição avançada).

Para obter informações sobre as diferenças entre a KCL 1.x e a KCL 2.x e instruções sobre como migrar da KCL 1.x para a KCL 2.x, consulte Migrar consumidores do KCL 1.x para o KCL 2.x.

Conceitos da KCL

  • Aplicação de consumidor da KCL: uma aplicação personalizada que usa a KCL e é projetada para ler e processar registros de fluxos de dados.

  • Instância de aplicação de consumo: as aplicações de consumo da KCL normalmente são distribuídas, com uma ou mais instâncias executadas simultaneamente para coordenar falhas e balancear dinamicamente a carga de processamento dos registros de dados.

  • Operador: uma classe de alto nível que uma instância de aplicação de consumo da KCL usa para começar a processar dados.

    Importante

    Cada instância da aplicação de consumo da KCL tem um operador.

    O operador inicializa e supervisiona várias tarefas, incluindo a sincronização de informações de fragmentos e concessões, o monitoramento de atribuições de fragmentos e o processamento dos dados dos fragmentos. Um operador fornece à KCL as informações de configuração da aplicação de consumo, como o nome do fluxo de dados cujos registros de dados essa aplicação processará e as credenciais da AWS necessárias para acessar o fluxo de dados. O operador também inicia a instância específica da aplicação de consumo da KCL para entregar registros de dados do fluxo de dados aos processadores de registros.

    Importante

    Na KCL 1.x, essa classe é chamada de operador. Para obter mais informações (esses são os repositórios Java KCL), consulte https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/worker.java. Na KCL 2.x, essa classe é chamada de programador. A finalidade do programador na KCL 2.x é idêntica à finalidade do operador na KCL 1.x. Para obter mais informações sobre a classe Scheduler no KCL 2.x, consulte https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/coordinator/Scheduler.java. amazon-kinesis-client

  • Concessão: dado que define a ligação entre um operador e um fragmento. As aplicações de consumo distribuídas da KCL usam concessões para particionar o processamento de registros de dados em uma frota de operadores. A qualquer momento, cada fragmento de registros de dados é associado a um determinado operador por uma concessão identificada pela variável leaseKey.

    Por padrão, um trabalhador pode manter um ou mais arrendamentos (sujeito ao valor da variável maxLeasesForWorker) ao mesmo tempo.

    Importante

    Os operadores competem para manter todas as concessões disponíveis para todos os fragmentos disponíveis em um fluxo de dados. Mas apenas um operador consegue manter uma concessão de cada vez.

    Por exemplo, se você tiver uma instância da aplicação de consumo A com o operador A que está processando um fluxo de dados com quatro fragmentos, o operador A poderá reter as concessões aos fragmentos 1, 2, 3 e 4 ao mesmo tempo. Mas se você tiver duas instâncias de aplicações de consumo A e B com os operadores A e B, e essas instâncias estiverem processando um fluxo de dados com quatro fragmentos, o operador A e o operador B não poderão reter a concessão ao fragmento 1 ao mesmo tempo. Um operador retém a concessão a um fragmento específico até estar pronto para parar de processar os registros de dados do fragmento ou até que uma falha ocorra. Quando um operador libera a concessão, outro operador a assume e a retém.

    Para obter mais informações (esses são os repositórios Java KCL), consulte https://github.com/awslabs/ amazon-kinesis-client /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/leases/impl/lease.java para KCL 1.x e https://github.com/awslabs/ /blob/master/ /src/main/java/software/amazon/kinesis/leases/Lease.java para KCL 2.x. amazon-kinesis-client amazon-kinesis-client

  • Tabela de concessões: uma tabela exclusiva do Amazon DynamoDB usada para monitorar os fragmentos em um fluxo de dados do KDS vinculados a uma concessão e sendo processados pelos operadores da aplicação de consumo da KCL. A tabela de concessões precisa permanecer sincronizada (em um operador e entre todos os operadores) com as informações mais recentes do fragmento do fluxo de dados enquanto a aplicação de consumo da KCL está em execução. Para ter mais informações, consulte Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL.

  • Processador de registros: a lógica que define como a aplicação de consumo da KCL processa os dados obtidos dos fluxos de dados. Em runtime, uma instância da aplicação de consumo da KCL inicia um operador, que, por sua vez, inicia um processador de registros para cada fragmento cuja concessão retém.

Usar uma tabela de concessões para monitorar os fragmentos processados pela aplicação de consumo da KCL

O que é uma tabela de concessões

Em cada aplicação do Amazon Kinesis Data Streams, a KCL usa uma tabela de concessões exclusiva (armazenada em uma tabela do Amazon DynamoDB) para monitorar os fragmentos em um fluxo de dados do KDS vinculados a uma concessão e sendo processados pelos operadores da aplicação de consumo da KCL.

Importante

Como a KCL usa o nome da aplicação de consumo para criar o nome da tabela de concessões que a aplicação usa, cada aplicação de consumo deve ter um nome exclusivo.

Você pode visualizar a tabela usando o console do Amazon DynamoDB enquanto a aplicação de consumo está em execução.

Se a tabela de concessões da aplicação de consumo da KCL não existir quando a aplicação for inicializada, um dos operadores a criará.

Importante

Sua conta é cobrada pelos custos associados à tabela do DynamoDB, além dos custos associados ao próprio Kinesis Data Streams.

Cada linha na tabela de concessões representa um fragmento que está sendo processado pelos operadores da aplicação de consumo. Se sua aplicação de consumo da KCL processar somente um fluxo de dados, a chave de hash da tabela de concessões, leaseKey, será o ID do fragmento. Se você estiver Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java, a estrutura da leaseKey será semelhante a account-id:StreamName:streamCreationTimestamp:ShardId. Por exemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336.

Além do ID do estilhaço, cada linha também inclui os seguintes dados:

  • checkpoint: número de sequência do ponto de verificação mais recente do estilhaço. Esse valor é exclusivo entre todos os fragmentos no fluxo de dados.

  • checkpointSubSequenceNúmero: ao usar o recurso de agregação da Kinesis Producer Library, essa é uma extensão do ponto de verificação que rastreia registros individuais de usuários dentro do registro do Kinesis.

  • leaseCounter: usado para versionamento de arrendamento, para que os operadores possam detectar se o próprio arrendamento foi assumido por outro operador.

  • leaseKey: um identificador exclusivo para um arrendamento. Cada concessão é específica a um fragmento no fluxo de dados e é retida por um operador por vez.

  • leaseOwner: o operador que está retendo esse arrendamento.

  • ownerSwitchesSincePonto de verificação: Quantas vezes esse contrato mudou de trabalhadores desde a última vez que um posto de controle foi escrito.

  • parentShardId: usado para garantir que o fragmento principal seja totalmente processado antes do início do processamento nos fragmentos secundários. Isso garante que os registros sejam processados na mesma ordem em que foram colocados no stream.

  • hashrange: usado pelo PeriodicShardSyncManager para executar sincronizações periódicas a fim de encontrar fragmentos ausentes na tabela de concessões e criar concessões para eles, se necessário.

    nota

    A partir da KCL 1.14 e da KCL 2.3, esse dado está presente na tabela de concessões de cada fragmento. Para obter mais informações sobre PeriodicShardSyncManager e a sincronização periódica entre concessões e fragmentos, consulte Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS.

  • childshards: usado por LeaseCleanupManager para revisar o status de processamento do fragmento filho e decidir se o fragmento pai pode ser excluído da tabela de concessões.

    nota

    A partir da KCL 1.14 e da KCL 2.3, esse dado está presente na tabela de concessões de cada fragmento.

  • shardID: o ID do fragmento.

    nota

    Esse dado só estará presente na tabela de concessões se você estiver Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java. Isso só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões superiores.

  • stream name o identificador do fluxo de dados no formato account-id:StreamName:streamCreationTimestamp.

    nota

    Esse dado só estará presente na tabela de concessões se você estiver Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java. Isso só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões superiores.

Throughput

Se sua aplicação do Amazon Kinesis Data Streams receber exceções de throughput provisionada, você deverá aumentar a throughput provisionada para a tabela do DynamoDB. A KCL cria a tabela com uma throughput provisionada de 10 leituras por segundo e 10 gravações por segundo, mas isso pode não ser suficiente para a aplicação. Por exemplo, se sua aplicação do Amazon Kinesis Data Streams definir pontos de verificação ou usar operadores com frequência em um fluxo de dados composto por vários fragmentos, você poderá precisar de uma throughput maior.

Para obter informações sobre a throughput provisionada no DynamoDB, consulte Modo de capacidade de leitura/gravação e Trabalhar com tabelas e dados no DynamoDB no Guia do desenvolvedor do Amazon DynamoDB.

Como a tabela de concessões é sincronizada com fragmentos em um fluxo de dados do KDS

Os operadores das aplicações de consumo da KCL usam concessões para processar fragmentos de um determinado fluxo de dados. As informações sobre qual operador usa a concessão a um fragmento em um momento determinado são armazenadas em uma tabela de concessões. A tabela de concessões precisa permanecer sincronizada com as informações mais recentes do fragmento do fluxo de dados enquanto a aplicação de consumo da KCL está em execução. A KCL sincroniza a tabela de concessões com as informações de fragmentos adquiridas do serviço Kinesis Data Streams durante a inicialização ou o reinício da aplicação de consumo e sempre que um fragmento sendo processado chega ao fim (refragmentação). Em outras palavras, os operadores ou uma aplicação de consumo da KCL são sincronizados com o fluxo de dados que estão processando durante a inicialização da aplicação e sempre que a aplicação encontra um evento de refragmentação do fluxo de dados.

Sincronização na KCL 1.0 - 1.13 e na KCL 2.0 - 2.2

Na KCL 1.0 - 1.13 e na KCL 2.0 - 2.2, durante a inicialização da aplicação de consumo e em cada evento de refragmentação do fluxo de dados, a KCL sincroniza a tabela de concessões com as informações de fragmentos adquiridas do serviço Kinesis Data Streams invocando as APIs de descoberta ListShards ou DescribeStream. Em todas as versões da KCL listadas acima, cada operador de uma aplicação de consumo da KCL conclui as seguintes etapas para realizar a sincronização de concessão/fragmento durante a inicialização da aplicação e em cada evento de refragmentação do fluxo:

  • Busca todos os fragmentos de dados do fluxo sendo processado

  • Busca todas as concessões do fragmento da tabela de concessões

  • Filtra cada fragmento aberto sem uma concessão na tabela de concessões

  • Itera em todos os fragmentos abertos encontrados e, para cada fragmento aberto sem pai aberto:

    • Percorre a árvore hierárquica no caminho dos ancestrais para determinar se o fragmento é um descendente. Um fragmento será considerado descendente se um fragmento ancestral estiver sendo processado (a entrada de concessão do fragmento ancestral existe na tabela de concessões) ou se houver um fragmento ancestral que deve ser processado (por exemplo, a posição inicial é TRIM_HORIZON ou AT_TIMESTAMP).

    • Se o fragmento aberto for descendente, a KCL verificará sua posição inicial e criará concessões para seus pais, se necessário.

Sincronização na KCL 2.x a partir da KCL 2.3 e em versões posteriores

A partir das versões mais recentes compatíveis da KCL 2.x (KCL 2.3) e posteriores, a biblioteca oferece suporte às alterações no processo de sincronização listadas a seguir. Essas mudanças na sincronização de concessão/fragmento reduzem significativamente o número de chamadas de API feitas pelas aplicações de consumo da KCL ao serviço Kinesis Data Streams e otimizam o gerenciamento de concessões nessa aplicação.

  • Na inicialização da aplicação, se a tabela de concessões estiver vazia, a KCL utilizará a opção de filtragem da API ListShard (o parâmetro de solicitação ShardFilter opcional) para recuperar e criar concessões somente para um instantâneo dos fragmentos abertos no momento especificado pelo parâmetro ShardFilter. O parâmetro ShardFilter permite filtrar a resposta da API ListShards. A única propriedade obrigatória do parâmetro ShardFilter é Type. A KCL usa a propriedade de filtro Type e os seguintes valores válidos para identificar e retornar um instantâneo dos fragmentos abertos que podem exigir novas concessões:

    • AT_TRIM_HORIZON: a resposta inclui todos os fragmentos abertos emTRIM_HORIZON.

    • AT_LATEST: a resposta inclui somente os fragmentos do fluxo de dados abertos no momento.

    • AT_TIMESTAMP: a resposta inclui todos os fragmentos com timestamp inicial menor ou igual ao timestamp fornecido e timestamp final maior ou igual ao timestamp fornecido ou ainda abertos.

    ShardFilter é usado ao criar uma concessão em uma tabela de concessões vazia para inicializar concessões para um instantâneo dos fragmentos especificados em RetrievalConfig#initialPositionInStreamExtended.

    Para obter mais informações sobre o ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Em vez de ter todos os operadores realizando a sincronização de concessão/fragmento para manter a tabela de concessões atualizada com os fragmentos mais recentes no fluxo de dados, um único operador é eleito como líder para executar a sincronização.

  • A KCL 2.3 usa o parâmetro de retorno ChildShards das APIs GetRecords e SubscribeToShard para realizar a sincronização de concessão/fragmento que ocorre em SHARD_END para fragmentos fechados, permitindo que um operador da KCL crie concessões somente para os fragmentos filho do fragmento cujo processamento concluiu. Em aplicações de consumo com throughput compartilhada, a otimização da sincronização de concessão/fragmento usa o parâmetro ChildShards da API GetRecords. Em aplicações de consumo com throughput dedicada (distribuição avançada), a otimização da sincronização de concessão/fragmento usa o parâmetro ChildShards da API SubscribeToShard. Para obter mais informações GetRecords, consulte SubscribeToShards, ChildSharde.

  • Com as mudanças acima, o comportamento da KCL está passando de um modelo no qual todos os operadores obtêm informações de todos os fragmentos existentes para um modelo em que os operadores só obtêm informações dos filhos do fragmento que possui. Portanto, além da sincronização que ocorre durante a inicialização da aplicação de consumo e nos eventos de refragmentação, a KCL agora também realiza varreduras periódicas adicionais de fragmento/concessão para identificar possíveis falhas na tabela de concessões (em outras palavras, para identificar todos os novos fragmentos), confirmando que o intervalo de hash completo do fluxo de dados está sendo processado e criar concessões para eles, se necessário. PeriodicShardSyncManager é o componente responsável pela execução periódica de varreduras de concessão/fragmento.

    Para obter mais informações sobre PeriodicShardSyncManager o KCL 2.3, consulte https://github.com/awslabs/ amazon-kinesis-client /blob/master/ /src/main/java/software/amazon/kinesis/leases/ amazon-kinesis-client .java #L201 -L213. LeaseManagementConfig

    A KCL 2.3 tem novas opções disponíveis para configuração de PeriodicShardSyncManager em LeaseManagementConfig:

    Nome Valor padrão Descrição
    leasesRecoveryAuditorExecutionFrequencyMillis

    120.000 (2 minutos)

    Frequência (em milissegundos) do trabalho do auditor para verificar concessões parciais na tabela de concessões. Se detectar alguma falha nas concessões de um fluxo, o auditor acionará a sincronização de fragmentos com base em leasesRecoveryAuditorInconsistencyConfidenceThreshold.

    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Limite de confiança no trabalho periódico do auditor para determinar se as concessões de um fluxo de dados na tabela de concessões são inconsistentes. Se encontrar consecutivamente o mesmo conjunto de inconsistências em um fluxo de dados pelo número de vezes definido, o auditor acionará uma sincronização de fragmentos.

    Agora, novas CloudWatch métricas também são emitidas para monitorar a integridade doPeriodicShardSyncManager. Para ter mais informações, consulte PeriodicShardSyncManager.

  • Inclui uma otimização de HierarchicalShardSyncer para criar apenas concessões em uma camada de fragmentos.

Sincronização na KCL 1.x a partir da KCL 1.14 e em versões posteriores

A partir das versões mais recentes compatíveis da KCL 1.x (KCL 1.14) e posteriores, a biblioteca oferece suporte às alterações no processo de sincronização listadas a seguir. Essas mudanças na sincronização de concessão/fragmento reduzem significativamente o número de chamadas de API feitas pelas aplicações de consumo da KCL ao serviço Kinesis Data Streams e otimizam o gerenciamento de concessões nessa aplicação.

  • Na inicialização da aplicação, se a tabela de concessões estiver vazia, a KCL utilizará a opção de filtragem da API ListShard (o parâmetro de solicitação ShardFilter opcional) para recuperar e criar concessões somente para um instantâneo dos fragmentos abertos no momento especificado pelo parâmetro ShardFilter. O parâmetro ShardFilter permite filtrar a resposta da API ListShards. A única propriedade obrigatória do parâmetro ShardFilter é Type. A KCL usa a propriedade de filtro Type e os seguintes valores válidos para identificar e retornar um instantâneo dos fragmentos abertos que podem exigir novas concessões:

    • AT_TRIM_HORIZON: a resposta inclui todos os fragmentos abertos emTRIM_HORIZON.

    • AT_LATEST: a resposta inclui somente os fragmentos do fluxo de dados abertos no momento.

    • AT_TIMESTAMP: a resposta inclui todos os fragmentos com timestamp inicial menor ou igual ao timestamp fornecido e timestamp final maior ou igual ao timestamp fornecido ou ainda abertos.

    ShardFilter é usado ao criar uma concessão em uma tabela de concessões vazia para inicializar concessões para um instantâneo dos fragmentos especificados em KinesisClientLibConfiguration#initialPositionInStreamExtended.

    Para obter mais informações sobre o ShardFilter, consulte https://docs.aws.amazon.com/kinesis/latest/APIReference/API_ShardFilter.html.

  • Em vez de ter todos os operadores realizando a sincronização de concessão/fragmento para manter a tabela de concessões atualizada com os fragmentos mais recentes no fluxo de dados, um único operador é eleito como líder para executar a sincronização.

  • A KCL 1.14 usa o parâmetro de retorno ChildShards das APIs GetRecords e SubscribeToShard para realizar a sincronização de concessão/fragmento que ocorre em SHARD_END para fragmentos fechados, permitindo que um operador da KCL crie concessões somente para os fragmentos filho do fragmento cujo processamento concluiu. Para obter mais informações, consulte GetRecords e ChildShard.

  • Com as mudanças acima, o comportamento da KCL está passando de um modelo no qual todos os operadores obtêm informações de todos os fragmentos existentes para um modelo em que os operadores só obtêm informações dos filhos do fragmento que possui. Portanto, além da sincronização que ocorre durante a inicialização da aplicação de consumo e nos eventos de refragmentação, a KCL agora também realiza varreduras periódicas adicionais de fragmento/concessão para identificar possíveis falhas na tabela de concessões (em outras palavras, para identificar todos os novos fragmentos), confirmando que o intervalo de hash completo do fluxo de dados está sendo processado e criar concessões para eles, se necessário. PeriodicShardSyncManager é o componente responsável pela execução periódica de varreduras de concessão/fragmento.

    Quando KinesisClientLibConfiguration#shardSyncStrategyType é definido como ShardSyncStrategyType.SHARD_END, PeriodicShardSync leasesRecoveryAuditorInconsistencyConfidenceThreshold é usado para determinar o limite do número de varreduras consecutivas contendo lacunas na tabela de concessões após o qual é necessário impor uma sincronização de fragmentos. Quando KinesisClientLibConfiguration#shardSyncStrategyType é definido como ShardSyncStrategyType.PERIODIC, leasesRecoveryAuditorInconsistencyConfidenceThreshold é ignorado.

    Para obter mais informações sobre o KCL 1.14, consulte https://github.com/awslabs/ PeriodicShardSyncManager amazon-kinesis-client KinesisClientLibConfiguration /blob/v1.x/src/main/java/com/amazonaws/services/kinesis/clientlibrary/lib/worker/ .java #L987 -L999.

    A KCL 1.14 tem uma nova opção disponível para configuração de PeriodicShardSyncManager em LeaseManagementConfig:

    Nome Valor padrão Descrição
    leasesRecoveryAuditorInconsistencyConfidenceThreshold

    3

    Limite de confiança no trabalho periódico do auditor para determinar se as concessões de um fluxo de dados na tabela de concessões são inconsistentes. Se encontrar consecutivamente o mesmo conjunto de inconsistências em um fluxo de dados pelo número de vezes definido, o auditor acionará uma sincronização de fragmentos.

    Agora, novas CloudWatch métricas também são emitidas para monitorar a integridade doPeriodicShardSyncManager. Para ter mais informações, consulte PeriodicShardSyncManager.

  • A KCL 1.14 agora também oferece suporte à limpeza adiada de concessões. As concessões são excluídas de forma assíncrona por LeaseCleanupManager ao chegar ao SHARD_END quando um fragmento ultrapassar o período de retenção do fluxo de dados ou quando for fechado por uma operação de refragmentação.

    Novas opções disponíveis para configuração de LeaseCleanupManager:

    Nome Valor padrão Descrição
    leaseCleanupIntervalMillis

    1 minuto

    Intervalo de execução do thread de limpeza de concessões.

    completedLeaseCleanupIntervalMillis 5 minutos

    Intervalo de verificação de conclusão da concessão.

    garbageLeaseCleanupIntervalMillis 30 minutos

    Intervalo de verificação do estado de lixo de uma concessão (ou seja, reduzida após o período de retenção do fluxo de dados).

  • Inclui uma otimização de KinesisShardSyncer para criar apenas concessões em uma camada de fragmentos.

Processar vários fluxos de dados com a mesma aplicação de consumo da KCL 2.x para Java

Esta seção descreve as seguintes alterações no KCL 2.x para Java que permitem criar aplicativos de consumo KCL que podem processar mais de um fluxo de dados ao mesmo tempo.

Importante

O processamento multifluxo só tem suporte na KCL 2.x para Java, a partir da KCL 2.3 para Java e versões superiores.

O processamento multifluxo NÃO tem suporte em nenhuma outra linguagem na qual a KCL 2.x possa ser implementada.

O processamento multifluxo NÃO tem suporte em nenhuma versão da KCL 1.x.

  • MultistreamTracker interface

    Para criar um aplicativo de consumidor que possa processar vários fluxos ao mesmo tempo, você deve implementar uma nova interface chamada MultistreamTracker. Essa interface inclui o método streamConfigList, que retorna a lista de fluxos de dados, e suas configurações, a serem processados pela aplicação de consumo da KCL. Observe que os fluxos de dados sendo processados podem ser alterados durante o runtime da aplicação de consumo. streamConfigList é chamado periodicamente pela KCL para obter informações das mudanças nos fluxos de dados a serem processados.

    O streamConfigList método preenche a StreamConfiglista.

    package software.amazon.kinesis.common; import lombok.Data; import lombok.experimental.Accessors; @Data @Accessors(fluent = true) public class StreamConfig { private final StreamIdentifier streamIdentifier; private final InitialPositionInStreamExtended initialPositionInStreamExtended; private String consumerArn; }

    Observe que os campos StreamIdentifier e InitialPositionInStreamExtended são obrigatórios, enquanto consumerArn é opcional. Você só deverá fornecer consumerArn se estiver usando a KCL 2.x para implementar uma aplicação de consumo de distribuição avançada.

    Para obter mais informaçõesStreamIdentifier, consulte https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ amazon-kinesis-client /src/main/java/software/amazon/kinesis/common/ .java #L29. StreamIdentifier Você pode criar uma instância multifluxo para o StreamIdentifier partir do identificador de fluxo serializado. O identificador de fluxo serializado deve ter o formato account-id:StreamName:streamCreationTimestamp.

    * @param streamIdentifierSer * @return StreamIdentifier */ public static StreamIdentifier multiStreamInstance(String streamIdentifierSer) { if (PATTERN.matcher(streamIdentifierSer).matches()) { final String[] split = streamIdentifierSer.split(DELIMITER); return new StreamIdentifier(split[0], split[1], Long.parseLong(split[2])); } else { throw new IllegalArgumentException("Unable to deserialize StreamIdentifier from " + streamIdentifierSer); } }

    MultistreamTracker também inclui uma estratégia para excluir concessões de fluxos antigos na tabela de concessões (formerStreamsLeasesDeletionStrategy). Observe que a estratégia NÃO PODE ser alterada durante o runtime da aplicação de consumo. Para obter mais informações, consulte https://github.com/awslabs/ amazon-kinesis-client /blob/0c5042dadf794fe988438436252a5a8fe70b6b0b/ /src/main/java/software/amazon/kinesis/processor/ amazon-kinesis-client .java FormerStreamsLeasesDeletionStrategy

  • ConfigsBuilderé uma classe de todo o aplicativo que você pode usar para especificar todas as configurações do KCL 2.x a serem usadas ao criar seu aplicativo consumidor KCL. ConfigsBuildera classe agora tem suporte para a MultistreamTracker interface. Você pode inicializar ConfigsBuilder com o nome do único fluxo de dados do qual consumir registros:

    /** * Constructor to initialize ConfigsBuilder with StreamName * @param streamName * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull String streamName, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.right(streamName); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }

    Ou você pode inicializar ConfigsBuilder com MultiStreamTracker se quiser implementar um aplicativo consumidor KCL que processe vários fluxos ao mesmo tempo.

    * Constructor to initialize ConfigsBuilder with MultiStreamTracker * @param multiStreamTracker * @param applicationName * @param kinesisClient * @param dynamoDBClient * @param cloudWatchClient * @param workerIdentifier * @param shardRecordProcessorFactory */ public ConfigsBuilder(@NonNull MultiStreamTracker multiStreamTracker, @NonNull String applicationName, @NonNull KinesisAsyncClient kinesisClient, @NonNull DynamoDbAsyncClient dynamoDBClient, @NonNull CloudWatchAsyncClient cloudWatchClient, @NonNull String workerIdentifier, @NonNull ShardRecordProcessorFactory shardRecordProcessorFactory) { this.appStreamTracker = Either.left(multiStreamTracker); this.applicationName = applicationName; this.kinesisClient = kinesisClient; this.dynamoDBClient = dynamoDBClient; this.cloudWatchClient = cloudWatchClient; this.workerIdentifier = workerIdentifier; this.shardRecordProcessorFactory = shardRecordProcessorFactory; }
  • Com o suporte multifluxo implementado na aplicação de consumo da KCL, cada linha da tabela de concessões da aplicação contém o ID do fragmento e o nome do fluxo que a aplicação processa.

  • Quando o suporte multifluxo para sua aplicação de consumo da KCL é implementado, leaseKey assume a estrutura account-id:StreamName:streamCreationTimestamp:ShardId. Por exemplo, 111111111:multiStreamTest-1:12345:shardId-000000000336.

    Importante

    Quando sua aplicação de consumo da KCL está configurada para processar somente um fluxo de dados, leaseKey (a chave de hash da tabela de concessões) é o ID do fragmento. Se você reconfigurar a aplicação de consumo da KCL existente para processar vários fluxos de dados, a tabela de concessões será quebrada, pois no suporte multifluxo, a estrutura leaseKey deve ser a account-id:StreamName:StreamCreationTimestamp:ShardId.

Usar a Kinesis Client Library com o registro de esquemas do AWS Glue

Você pode integrar os fluxos de dados do Kinesis ao registro de esquemas do AWS Glue. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. O AWS Glue Schema Registry permite que você end-to-end melhore a qualidade e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte Registro de esquemas do AWS Glue. Uma das formas de configurar essa integração é usando a KCL em Java.

Importante

Atualmente, a integração entre o Kinesis Data Streams e o registro de esquemas do AWS Glue só é compatível com os fluxos de dados do Kinesis que usam consumidores da KCL 2.3 implementados em Java. Suporte a várias linguagens não é fornecido. Os consumidores da KCL 1.0 não são compatíveis. Os consumidores da KCL 2.x anteriores à KCL 2.3 não são compatíveis.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o registro de esquemas usando a KCL, consulte a seção “Interagir com dados usando as bibliotecas KPL/KCL” em Caso de uso: integração do Amazon Kinesis Data Streams ao registro de esquemas do AWS Glue.