As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Desenvolver consumidores personalizados com taxa de transferência compartilhada usando o AWS SDK for Java
Um dos métodos para desenvolver consumidores personalizados do Kinesis Data Streams com conteúdo compartilhado é usar as APIs do Amazon Kinesis Data Streams. Esta seção descreve o uso das APIs Kinesis Data Streams comAWS SDK for Java. O código de exemplo Java nesta seção demonstra como realizar operações básicas da API KDS e é dividido logicamente por tipo de operação.
Esses exemplos não representam um código pronto para produção. Eles não verificam todas as exceções possíveis nem levam em conta todas as considerações de segurança ou desempenho possíveis.
Você pode chamar as APIs do Kinesis Data Streams usando outras linguagens de programação diferentes. Para obter mais informações sobre todos osAWS SDKs disponíveis, consulte Comece a desenvolver com a Amazon Web Services
Importante
O método recomendado para evoluir personalizados Kinesis Data Streams com fluxo de dados compartilhado é usar a Kinesis Client Library (KCL) Kinesis Client Library (KCL). A KCL ajuda você a consumir e processar dados de um stream de dados do Kinesis cuidando de muitas das tarefas complexas associadas à computação distribuída. Para obter mais informações, consulte Desenvolvendo consumidores personalizados com produtividade compartilhada usando o KCL.
Tópicos
Como obter dados de um stream
As APIs do Kinesis Data Streams incluem osgetRecords
métodosgetShardIterator
e que você pode invocar para recuperar registros de um fluxo de dados. Esse é o modelo pull, em que seu código extrai registros de dados diretamente dos fragmentos do fluxo de dados.
Importante
Recomendamos que você use o suporte de processador de registros fornecido pela KCL para recuperar registros de seus fluxos de dados. Esse é o modelo push, em que você implementa o código que processa os dados. O KCL recupera registros de dados do fluxo de dados e os entrega ao código do seu aplicativo. Além disso, o KCL fornece funcionalidade de failover, recuperação e balanceamento de carga. Para obter mais informações, consulte Desenvolvendo consumidores personalizados com produtividade compartilhada usando o KCL.
No entanto, em alguns casos, você pode preferir usar as APIs Kinesis Data Streams. Por exemplo, para implementar ferramentas personalizadas para monitorar ou depurar seus fluxos de dados.
Importante
O Kinesis Data Streams suporta alterações no período de retenção de registros de dados do seu stream de dados. Para obter mais informações, consulte Alterar o período de retenção de dados.
Como usar iteradores de estilhaços
Você recupera registros do stream por estilhaço. Para cada estilhaço e para cada lote de registros que recupera desse estilhaço, você precisa obter um iterador de estilhaços. O iterador de estilhaços é usado no objeto getRecordsRequest
para especificar o estilhaço a partir do qual os registros devem ser recuperados. O tipo associado ao iterador de estilhaços determina o ponto no estilhaço a partir do qual os registros devem ser recuperados (veja mais à frente nesta seção para obter mais detalhes). Para trabalhar com o iterador de estilhaços, você precisa recuperar o estilhaço, conforme abordado em DescribeStream API - Obsoleto.
Obtenha o iterador de estilhaços inicial usando o método getShardIterator
. Obtenha iteradores de estilhaços para obter mais lotes de registros usando o método getNextShardIterator
do objetogetRecordsResult
retornado pelo método getRecords
. Um iterador de estilhaços é válido por 5 minutos. Se usar um iterador de estilhaços enquanto ele for válido, você receberá um novo. Cada iterador de estilhaços permanecerá válido por 5 minutos, mesmo depois de ser usado.
Para obter o iterador de estilhaços inicial, instancie GetShardIteratorRequest
e passe-o ao método getShardIterator
. Para configurar a solicitação, especifique o stream e o ID do estilhaço. Para obter informações sobre como obter fluxo em sua conta, consulte Para obter informações sobre como obter transmissões em suaAWS conta, consulteListar streams Rastrereamento em Para obter informações sobre como obter os estilhaços em um stream, consulte DescribeStream API - Obsoleto.
String shardIterator; GetShardIteratorRequest getShardIteratorRequest = new GetShardIteratorRequest(); getShardIteratorRequest.setStreamName(myStreamName); getShardIteratorRequest.setShardId(shard.getShardId()); getShardIteratorRequest.setShardIteratorType("TRIM_HORIZON"); GetShardIteratorResult getShardIteratorResult = client.getShardIterator(getShardIteratorRequest); shardIterator = getShardIteratorResult.getShardIterator();
Esse código de exemplo especifica TRIM_HORIZON
como o tipo de iterador ao obter o iterador de estilhaços inicial. Esse tipo de iterador significa que os registros devem ser retornados começando com o primeiro registro adicionado ao fragmento, em vez de começar com o registro adicionado mais recentemente, também conhecido como ponta. Estes são tipos de iterador possíveis:
-
AT_SEQUENCE_NUMBER
-
AFTER_SEQUENCE_NUMBER
-
AT_TIMESTAMP
-
TRIM_HORIZON
-
LATEST
Para obter mais informações, consulte ShardIteratorType.
Alguns tipos de iterador exigem que você especifique um número de sequência, além do tipo. Por exemplo:
getShardIteratorRequest.setShardIteratorType("AT_SEQUENCE_NUMBER"); getShardIteratorRequest.setStartingSequenceNumber(specialSequenceNumber);
Depois de obter um registro usando getRecords
, você pode conseguir o número de sequência do registro chamando o método getSequenceNumber
do registro.
record.getSequenceNumber()
Além disso, o código que adiciona registros ao stream de dados pode obter o número de sequência de um registro adicional chamando getSequenceNumber
no resultado de putRecord
.
lastSequenceNumber = putRecordResult.getSequenceNumber();
Você pode usar números de sequência para garantir estritamente o ordenamento crescente do registros. Para obter mais informações, consulte o exemplo de código em PutRecordExemplo.
Usando GetRecords
Depois que você obtiver o iterador de estilhaços, instancie um objeto GetRecordsRequest
. Especifique o iterador para a solicitação usando o método setShardIterator
.
Opcionalmente, você também pode definir o número de registros a recuperar usando o método setLimit
. O número de registros retornados por getRecords
sempre é igual ou menor que esse limite. Se você não especificar esse limite,getRecords
retornará 10 MB de registros recuperados. O código de exemplo a seguir define esse limite para 25 registros.
Se nenhum registro for retornado, isso significa que não há registros de dados disponíveis atualmente nesse estilhaço no número de sequência referenciado pelo iterador de estilhaços. Nessa situação, seu aplicativo deve esperar por um período de tempo apropriado para as fontes de dados do stream. Em seguida, tente obter os dados do estilhaço novamente usando o iterador de estilhaços retornado pela chamada anterior a getRecords
.
Passe o getRecordsRequest
para o método getRecords
e capture o valor retornado como um objeto getRecordsResult
. Para obter os registros de dados, chame o método getRecords
no objeto getRecordsResult
.
GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult getRecordsResult = client.getRecords(getRecordsRequest); List<Record> records = getRecordsResult.getRecords();
Para preparar-se para outra chamada para getRecords
, obtenha o próximo iterador de estilhaços de getRecordsResult
.
shardIterator = getRecordsResult.getNextShardIterator();
Para obter os melhores resultados, aguarde pelo menos 1 segundo (1.000 milissegundos) entre as chamadas para getRecords
a fim de evitar exceder o limite na frequência de getRecords
.
try { Thread.sleep(1000); } catch (InterruptedException e) {}
Normalmente, você deve chamar getRecords
em um loop, mesmo se estiver recuperando um único registro em um cenário de teste. Uma única chamada para getRecords
pode retornar uma lista de registros vazia, mesmo quando o estilhaço contém mais registros em números de sequência subsequentes. Quando isso ocorre, o NextShardIterator
retornado com a lista de registros vazia faz referência a um número de sequência subsequente no estilhaço, e as chamadas posteriores a getRecords
acabam retornando os registros. O exemplo a seguir demonstra o uso de um loop.
Exemplo: getRecords
O código de exemplo a seguir reflete as dicas de getRecords
nesta seção, inclusive chamadas em um loop.
// Continuously read data records from a shard List<Record> records; while (true) { // Create a new getRecordsRequest with an existing shardIterator // Set the maximum records to return to 25 GetRecordsRequest getRecordsRequest = new GetRecordsRequest(); getRecordsRequest.setShardIterator(shardIterator); getRecordsRequest.setLimit(25); GetRecordsResult result = client.getRecords(getRecordsRequest); // Put the result into record list. The result can be empty. records = result.getRecords(); try { Thread.sleep(1000); } catch (InterruptedException exception) { throw new RuntimeException(exception); } shardIterator = result.getNextShardIterator(); }
Se você estiver usando a Biblioteca de Cliente Kinesis, ela poderá fazer várias chamadas antes de retornar os dados. Esse comportamento é intencional e não indica um problema com o KCL ou com seus dados.
Como adaptar a uma refragmentação
SegetRecordsResult.getNextShardIterator
retornarnull
, isso indica que ocorreu uma divisão ou mesclagem de fragmentos que envolveu esse fragmento. Esse fragmento agora está em umCLOSED
estado e você leu todos os registros de dados disponíveis desse fragmento.
Nesse cenário, você pode usargetRecordsResult.childShards
para aprender sobre os novos fragmentos secundários do fragmento que está sendo processado e que foram criados pela divisão ou mesclagem. Para obter mais informações, consulte Para obter mais informações, consulte ChildShard.
No caso de uma divisão, os dois novos estilhaços têm parentShardId
igual ao ID do estilhaço que você estava processando anteriormente. O valor de adjacentParentShardId
dos dois estilhaços é null
.
No caso de uma fusão, o único estilhaço novo criado tem parentShardId
igual ao ID de um dos estilhaços pai e adjacentParentShardId
igual ao ID do outro estilhaço. Seu aplicativo já leu todos os dados de um desses estilhaços. Esse é o estilhaço para o qual getRecordsResult.getNextShardIterator
retornou null
. Se a ordem dos dados for importante para o aplicativo, você deverá garantir que ele também leia todos os dados de outro estilhaço pai antes de ler qualquer dado novo de estilhaço filho criado pela fusão.
Se estiver usando vários processadores para recuperar dados do streaming (digamos, um processador por estilhaço) e ocorrer uma divisão ou fusão de estilhaços, você deverá aumentar ou diminuir o número de processadores para se adaptar à alteração no número de estilhaços.
Para obter mais informações sobre a refragmentação, incluindo uma discussão sobre estados de estilhaços (como CLOSED
), consulte Reestilhaçar um stream.
Interagindo com dados usando oAWS Glue Schema Registry
Você pode integrar seus fluxos de dados do Kinesis com o registro do esquemaAWS Glue. O registro de esquema doAWS Glue permite detectar, controlar e evoluir centralmente dados produzidos são validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. OAWS Glue Schema Registry permite que você melhore a qualidade end-to-end dos dados e a governança de dados em seus aplicativos de streaming. Para obter mais informações, consulte Registro de esquemas do AWSGlue Schema Registry. Uma das maneiras de configurar essa integração é por meio da APIGetRecords
Kinesis Data Streams disponível noAWS Java SDK.
Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o Schema Registry usando as APIs doGetRecords
Kinesis Data Streams, consulte a seção “Interagindo com dados usando as APIs do Kinesis Data Streams” em Caso de uso: Integração do Amazon Kinesis Data Streams com oAWS Glue Schema Registry.