Desenvolver consumidores personalizados com throughput compartilhada usando o AWS SDK for Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver consumidores personalizados com throughput compartilhada usando o AWS SDK for Java

Um dos métodos de desenvolvimento de consumidores personalizados do Kinesis Data Streams com throughput compartilhada envolve o uso das APIs do Amazon Kinesis Data Streams. Esta seção descreve o uso das APIs do Kinesis Data Streams com o AWS SDK para Java. O código de exemplo Java nesta seção demonstra como realizar operações básicas de APIs do KDS e está dividida logicamente por tipo de operação.

Esses exemplos não representam um código pronto para produção. Eles não verificam todas as exceções possíveis nem levam em conta todas as considerações de segurança ou desempenho possíveis.

Você pode chamar as APIs do Kinesis Data Streams usando outras linguagens de programação. Para obter mais informações sobre todos os SDKs da AWS disponíveis, explore o Centro do desenvolvedor da Amazon Web Services.

Importante

O método recomendado para desenvolvimento de consumidores personalizados do Kinesis Data Streams com throughput compartilhada envolve o uso da Kinesis Client Library (KCL). A KCL ajuda você a consumir e processar dados de um fluxo de dados do Kinesis lidando com muitas das tarefas complexas associadas à computação distribuída. Para obter mais informações, consulte Developing Custom Consumers with Shared Throughput Using KCL.

Como obter dados de um stream

As APIs do Kinesis Data Streams incluem os métodos getShardIterator e getRecords que você pode invocar para recuperar registros de um fluxo de dados. Esse é o modelo de pull, em que o código extrai registros de dados diretamente dos fragmentos do fluxo de dados.

Importante

Recomendamos usar o suporte do processador de registros fornecido pela KCL para recuperar registros dos fluxos de dados. Esse é o modelo push, em que você implementa o código que processa os dados. A KCL recupera registros de dados do fluxo de dados e os entrega ao código da aplicação. Além disso, a KCL fornece as funcionalidades de failover, recuperação e balanceamento de carga. Para obter mais informações, consulte Developing Custom Consumers with Shared Throughput Using KCL.

No entanto, em alguns casos, você pode preferir usar as APIs do Kinesis Data Streams. Um exemplo disso é a implementação de ferramentas personalizadas de monitoramento ou depuração dos fluxos de dados.

Importante

O Kinesis Data Streams oferece suporte a alterações do período de retenção do registro de dados no fluxo de dados. Para obter mais informações, consulte Alterar o período de retenção de dados.

Como usar iteradores de estilhaços

Você recupera registros do stream por estilhaço. Para cada estilhaço e para cada lote de registros que recupera desse estilhaço, você precisa obter um iterador de estilhaços. O iterador de estilhaços é usado no objeto getRecordsRequest para especificar o estilhaço a partir do qual os registros devem ser recuperados. O tipo associado ao iterador de estilhaços determina o ponto no estilhaço a partir do qual os registros devem ser recuperados (veja mais à frente nesta seção para obter mais detalhes). Para trabalhar com o iterador de estilhaços, você precisa recuperar o estilhaço, conforme abordado em DescribeStream API - Obsoleto.

Obtenha o iterador de estilhaços inicial usando o método getShardIterator. Obtenha iteradores de estilhaços para obter mais lotes de registros usando o método getNextShardIterator do objetogetRecordsResult retornado pelo método getRecords. Um iterador de estilhaços é válido por 5 minutos. Se usar um iterador de estilhaços enquanto ele for válido, você receberá um novo. Cada iterador de estilhaços permanecerá válido por 5 minutos, mesmo depois de ser usado.

Para obter o iterador de estilhaços inicial, instancie GetShardIteratorRequest e passe-o ao método getShardIterator. Para configurar a solicitação, especifique o stream e o ID do estilhaço. Para saber como obter os fluxos na sua conta da AWS, consulte Listar streams. Para obter informações sobre como obter os estilhaços em um stream, consulte DescribeStream API - Obsoleto.

String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();

Esse código de exemplo especifica TRIM_HORIZON como o tipo de iterador ao obter o iterador de estilhaços inicial. Esse tipo de iterador significa que o retorno dos registros deve começar a partir do primeiro registro adicionado ao fragmento, em vez de começar pelo registro adicionado mais recentemente, também conhecido como extremidade. Estes são tipos de iterador possíveis:

  • AT_SEQUENCE_NUMBER

  • AFTER_SEQUENCE_NUMBER

  • AT_TIMESTAMP

  • TRIM_HORIZON

  • LATEST

Para obter mais informações, consulte ShardIteratorType.

Alguns tipos de iterador exigem que você especifique um número de sequência, além do tipo. Por exemplo:

getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);

Depois de obter um registro usando getRecords, você pode conseguir o número de sequência do registro chamando o método getSequenceNumber do registro.

record.getSequenceNumber()

Além disso, o código que adiciona registros ao stream de dados pode obter o número de sequência de um registro adicional chamando getSequenceNumber no resultado de putRecord.

lastSequenceNumber = putRecordResult.getSequenceNumber();

Você pode usar números de sequência para garantir estritamente o ordenamento crescente do registros. Para obter mais informações, consulte o exemplo de código em Exemplo de PutRecord.

Como usar o GetRecords

Depois que você obtiver o iterador de estilhaços, instancie um objeto GetRecordsRequest. Especifique o iterador para a solicitação usando o método setShardIterator.

Opcionalmente, você também pode definir o número de registros a recuperar usando o método setLimit. O número de registros retornados por getRecords sempre é igual ou menor que esse limite. Se você não especificar esse limite, getRecords retornará 10 MB de registros recuperados. O código de exemplo a seguir define esse limite para 25 registros.

Se nenhum registro for retornado, isso significa que não há registros de dados disponíveis atualmente nesse estilhaço no número de sequência referenciado pelo iterador de estilhaços. Nessa situação, a aplicação deve aguardar o tempo adequado de acordo com as fontes de dados do fluxo. Em seguida, tente obter os dados do estilhaço novamente usando o iterador de estilhaços retornado pela chamada anterior a getRecords.

Passe o getRecordsRequest para o método getRecords e capture o valor retornado como um objeto getRecordsResult. Para obter os registros de dados, chame o método getRecords no objeto getRecordsResult.

GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();

Para preparar-se para outra chamada para getRecords, obtenha o próximo iterador de estilhaços de getRecordsResult.

shardIterator = getRecordsResult.getNextShardIterator();

Para obter os melhores resultados, aguarde pelo menos 1 segundo (1.000 milissegundos) entre as chamadas para getRecords a fim de evitar exceder o limite na frequência de getRecords.

try { Thread.sleep(1000); } catch (InterruptedException e) {}

Normalmente, você deve chamar getRecords em um loop, mesmo se estiver recuperando um único registro em um cenário de teste. Uma única chamada para getRecords pode retornar uma lista de registros vazia, mesmo quando o estilhaço contém mais registros em números de sequência subsequentes. Quando isso ocorre, o NextShardIterator retornado com a lista de registros vazia faz referência a um número de sequência subsequente no estilhaço, e as chamadas posteriores a getRecords acabam retornando os registros. O exemplo a seguir demonstra o uso de um loop.

Exemplo: getRecords

O código de exemplo a seguir reflete as dicas de getRecords nesta seção, inclusive chamadas em um loop.

// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }

Se você estiver usando a Kinesis Client Library, ela poderá fazer várias chamadas antes de retornar dados. Esse comportamento é projetado e não indica um problema com a KCL ou com seus dados.

Como adaptar a uma refragmentação

getRecordsResult.getNextShardIterator retorna null para indicar que o fragmento passou por uma divisão ou uma mesclagem. O fragmento agora está em estado de CLOSED, e você leu todos os registros de dados disponíveis nele.

Nesse cenário, você pode usar getRecordsResult.childShards para conhecer os fragmentos filho que foram criados pela divisão ou mesclagem do fragmento sendo processado. Para obter mais informações, consulte ChildsHard.

No caso de uma divisão, os dois novos estilhaços têm parentShardId igual ao ID do estilhaço que você estava processando anteriormente. O valor de adjacentParentShardId dos dois estilhaços é null.

No caso de uma fusão, o único estilhaço novo criado tem parentShardId igual ao ID de um dos estilhaços pai e adjacentParentShardId igual ao ID do outro estilhaço. Seu aplicativo já leu todos os dados de um desses estilhaços. Esse é o estilhaço para o qual getRecordsResult.getNextShardIterator retornou null. Se a ordem dos dados for importante para o aplicativo, você deverá garantir que ele também leia todos os dados de outro estilhaço pai antes de ler qualquer dado novo de estilhaço filho criado pela fusão.

Se estiver usando vários processadores para recuperar dados do streaming (digamos, um processador por estilhaço) e ocorrer uma divisão ou fusão de estilhaços, você deverá aumentar ou diminuir o número de processadores para se adaptar à alteração no número de estilhaços.

Para obter mais informações sobre a refragmentação, incluindo uma discussão sobre estados de estilhaços (como CLOSED), consulte Reestilhaçar um stream.

Interagir com os dados usando o registro de esquemas do AWS Glue

Você pode integrar os fluxos de dados do Kinesis ao registro de esquemas do AWS Glue. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. O registro de esquemas do AWS Glue permite que você melhore tanto a qualidade de dados de ponta a ponta como a governança dos dados nas aplicações de streaming. Para obter mais informações, consulte Registro de esquemas do AWS Glue. Uma das formas de configurar essa integração é por meio da API GetRecords do Kinesis Data Streams, disponível no AWS SDK Java.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o registro de esquemas usando as APIs GetRecords do Kinesis Data Streams, consulte a seção “Interagir com dados usando as APIs do Kinesis Data Streams” em Caso de uso: integração do Amazon Kinesis Data Streams ao registro de esquemas do AWS Glue.