Desenvolver produtores usando a API do Amazon Kinesis Data Streams com o AWS SDK for Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver produtores usando a API do Amazon Kinesis Data Streams com o AWS SDK for Java

Você pode desenvolver produtores usando a API do Amazon Kinesis Data Streams com o AWS SDL para Java. Se você nunca usou o Kinesis Data Streams, comece familiarizando-se com os conceitos e a terminologia apresentados em O que é o Amazon Kinesis Data Streams? e Conceitos básicos do Amazon Kinesis Data Streams.

Esses exemplos discutem a API do Kinesis Data Streams e usam o AWS SDK para Java para adicionar (colocar) dados a um fluxo. Contudo, na maioria dos casos de uso, é melhor usar a biblioteca KPL do Kinesis Data Streams. Para obter mais informações, consulte Desenvolver produtores usando a Amazon Kinesis Producer Library.

O código Java de exemplo neste capítulo demonstra como executar operações básicas da API do Kinesis Data Streams e está dividido logicamente por tipo de operação. Esses exemplos não representam um código pronto para produção, pois não verificam todas as exceções possíveis nem abrangem todas as considerações de segurança ou de performance possíveis. Você também pode chamar a API do Kinesis Data Streams usando outras linguagens de programação. Para obter mais informações sobre todos os SDKs da AWS disponíveis, explore o Centro do desenvolvedor da Amazon Web Services.

Cada tarefa tem pré-requisitos. Por exemplo, não é possível adicionar dados a um stream enquanto o stream não é criado, o que requer a criação de um cliente. Para obter mais informações, consulte Criar e gerenciar streamings.

Adicionar dados a um stream

Quando um stream é criado, você pode adicionar dados a ele na forma de registros. Um registro é uma estrutura de dados que contém os dados a serem processados na forma de um blob de dados. Depois que você armazena os dados no registro, o Kinesis Data Streams não inspeciona, interpreta nem altera dados de forma alguma. Cada registro também tem um número sequencial e uma chave de partição associados.

Há duas operações diferentes na API do Kinesis Data Streams que adicionam dados a um fluxo, PutRecords e PutRecord. A operação PutRecords envia vários registros ao stream por solicitação HTTP e a operação singular PutRecord envia registros ao stream um por vez (uma solicitação HTTP separada é necessária para cada registro). Talvez convenha usar PutRecords para a maioria dos aplicativos, pois ele atingirá uma throughput mais alta por produtor de dados. Para obter mais informações sobre cada uma dessas operações, consulte as subseções abaixo.

Sempre tenha em mente que, como a aplicação de origem está adicionando dados ao fluxo usando a API do Kinesis Data Streams, provavelmente há uma ou mais aplicações de consumo processando dados fora do fluxo simultaneamente. Para obter informações sobre como os consumidores obtêm dados usando a API do Kinesis Data Streams, consulte Como obter dados de um stream.

Adicionar vários registros com PutRecords

A operação PutRecords envia vários registros ao Kinesis Data Streams em uma única solicitação. Ao usar PutRecords, os produtores podem obter uma throughput mais alta ao enviar dados para o fluxo de dados do Kinesis. Cada solicitação PutRecords pode oferecer suporte a até 500 registros. Cada registro na solicitação pode ter no máximo 1 MB, até um limite de 5 MB para toda a solicitação, incluindo chaves de partição. Assim como a operação única PutRecord descrita abaixo, PutRecords usa números de sequência e chaves de partição. No entanto, o parâmetro PutRecord de SequenceNumberForOrdering não é incluído em uma chamada a PutRecords. A operação PutRecords tenta processar todos os registros na ordem natural da solicitação.

Cada registro de dados tem um número sequencial exclusivo. O número de sequência é atribuído pelo Kinesis Data Streams depois que client.putRecords é chamada para adicionar os registros de dados ao fluxo. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecords, maiores ficam os números sequenciais.

nota

Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.

Uma solicitação PutRecords pode incluir registros com diferentes chaves de partição. O escopo da solicitação é um stream; cada solicitação pode incluir qualquer combinação de chaves de partição e registros, dentro dos limites da solicitação. As solicitações feitas com diferentes chaves de partição a streams com muitos estilhaços diferentes costumam ser mais rápidas do que as solicitações com um pequeno número de chaves de partição para um pequeno número de estilhaços. O número de chaves de partição deve ser muito maior do que o número de estilhaços para reduzir a latência e maximizar a throughput.

Exemplo de PutRecords

O código a seguir cria 100 registros de dados com chaves de partição sequenciais e os coloca em um stream denominado DataStream.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

A resposta a PutRecords inclui uma matriz de resposta Records. Cada registro na matriz de resposta se correlaciona diretamente com um registro na matriz de solicitação por ordenação natural, do início ao fim da solicitação e da resposta. A matriz de resposta de Records sempre inclui o mesmo número de registros da matriz de solicitação.

Tratar falhas ao usar PutRecords

Por padrão, a falha de registros individuais em uma solicitação não interrompe o processamento de registros subsequentes em uma solicitação PutRecords. Isso significa que uma matriz de resposta Records inclui os registros com processamento bem-sucedido e malsucedido. É preciso detectar os registros com processamento malsucedido e incluí-los em uma chamada subsequente.

Os registros bem-sucedidos incluem os valores SequenceNumber e ShardID, e os registros malsucedidos incluem os valores ErrorCode e ErrorMessage. O parâmetro ErrorCode reflete o tipo de erro e pode ter um dos seguintes valores: ProvisionedThroughputExceededException ou InternalFailure. ErrorMessage fornece informações mais detalhadas sobre a exceção ProvisionedThroughputExceededException, incluindo o ID da conta, o nome do streaming e o ID do estilhaço do registro que foi limitado. O exemplo abaixo tem três registros em uma solicitação PutRecords. O segundo registro falha e isso é refletido na resposta.

exemplo Sintaxe da solicitação PutRecords
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
exemplo Sintaxe da resposta a PutRecords
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

Os registros com processamento malsucedido podem ser incluídos nas solicitações PutRecords subsequentes. Primeiro, verifique o parâmetro FailedRecordCount no putRecordsResult para confirmar se há registros com falha na solicitação. Assim sendo, cada putRecordsEntry com um ErrorCode que não seja null deve ser adicionado a uma solicitação subsequente. Para obter um exemplo desse tipo de handler, consulte o seguinte código.

exemplo Manipulador de falhas PutRecords
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

Adicionar um único registro com PutRecord

Cada chamada para PutRecord opera em um único registro. Prefira a operação PutRecords descrita em Adicionar vários registros com PutRecords, a menos que seu aplicativo precise especificamente enviar sempre registos únicos por solicitação ou algum outro motivo para o não uso de PutRecords.

Cada registro de dados tem um número sequencial exclusivo. O número de sequência é atribuído pelo Kinesis Data Streams depois que client.putRecord é chamada para adicionar o registro de dados ao fluxo. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecord, maiores ficam os números sequenciais.

Quando ocorrem colocações em rápida sucessão, não há garantia de que os números de sequência retornados aumentem, porque as operações put aparentam ser essencialmente simultâneas para o Kinesis Data Streams. Para garantir estritamente o aumento de números sequenciais para a mesma chave de partição, use o parâmetro SequenceNumberForOrdering, como mostrado no código de exemplo em Exemplo de PutRecord.

Usando ou não SequenceNumberForOrdering, os registros que o Kinesis Data Streams recebe por meio de uma chamada a GetRecords são estritamente ordenados por número de sequência.

nota

Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.

Uma chave de partição é usada para agrupar os dados dentro de um stream. Um registro de dados é atribuído a um estilhaço dentro do stream com base em sua chave de partição. Especificamente, o Kinesis Data Streams usa a chave de partição como entrada para uma função de hash que mapeia essa chave (e os dados associados) a um determinado fragmento.

Como resultado desse mecanismo de hashing, todos os registros de dados com a mesma chave de partição são mapeados para o mesmo estilhaço no stream. No entanto, se o número de chaves de partição ultrapassar o número de estilhaços, alguns estilhaços conterão necessariamente registros com chaves de partição diferentes. Do ponto de vista do design, para garantir que todos os seus estilhaços sejam bem utilizados, o número de estilhaços (especificado pelo método setShardCount de CreateStreamRequest) deve ser substancialmente menor que o número de chaves de partição exclusivas, e o volume de dados que flui para uma única chave de partição deve ser substancialmente menor que a capacidade do estilhaço.

Exemplo de PutRecord

O código a seguir cria dez registros de dados, distribuídos entre duas chaves de partição, e os coloca em um stream denominado myStreamName.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

O código de exemplo anterior usa setSequenceNumberForOrdering para garantir estritamente o aumento da ordenação dentro de cada chave de partição. Para usar esse parâmetro de forma eficaz, defina o SequenceNumberForOrdering do registro atual (registro n) como o número de sequência do registro anterior (registro n-1). Para obter o número sequencial de um registro que foi adicionado ao stream, chame getSequenceNumber para o resultado de putRecord.

O parâmetro SequenceNumberForOrdering garante estritamente o aumento de números de sequência para a mesma chave de partição. SequenceNumberForOrdering não fornece a ordenação de registros em várias chaves de partição.

Interagir com os dados usando o registro de esquemas do AWS Glue

Você pode integrar os fluxos de dados do Kinesis ao registro de esquemas do AWS Glue. O registro de esquemas do AWS Glue permite detectar, controlar e evoluir esquemas centralmente, ao mesmo tempo que garante que os dados produzidos sejam validados continuamente por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. O registro de esquemas do AWS Glue permite que você melhore tanto a qualidade de dados de ponta a ponta como a governança dos dados nas aplicações de streaming. Para obter mais informações, consulte Registro de esquemas do AWS Glue. Uma das formas de configurar essa integração é por meio das APIs PutRecords e PutRecord do Kinesis Data Streams, disponíveis no AWS SDK Java.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com o registro de esquemas usando as APIs PutRecords e PutRecord do Kinesis Data Streams, consulte a seção “Interagir com dados usando as APIs do Kinesis Data Streams” em Caso de uso: integração do Amazon Kinesis Data Streams ao registro de esquemas do AWS Glue.