Desenvolver produtores usando a API do Amazon Kinesis Data Streams com oAWS SDK for Java - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Desenvolver produtores usando a API do Amazon Kinesis Data Streams com oAWS SDK for Java

Você pode desenvolver produtores usando a API do Amazon Kinesis Data Streams com oAWSSDK for Java. Se você não estiver familiarizado com o Kinesis Data Streams, comece familiarizando-se com os conceitos e a terminologia apresentados emO que é o Amazon Kinesis Data Streams?ePrimeiros passos com o Amazon Kinesis Data Streams.

Esses exemplos discutem oAPI Kinesis Data Streamse use oAWSSDK for JavaPara adicionar (colocar) dados a um stream. Contudo, na maioria dos casos de uso, é melhor usar a biblioteca KPL do Kinesis Data Streams. Para obter mais informações, consulte Desenvolvendo produtores usando a biblioteca de produtores do Amazon Kinesis.

O código Java de exemplo neste capítulo demonstra como executar operações básicas da API do Kinesis Data Streams e está dividido logicamente por tipo de operação. Esses exemplos não representam um código pronto para produção, pois não verificam todas as exceções possíveis nem abrangem todas as considerações de segurança ou de performance possíveis. Além disso, você pode chamar oAPI Kinesis Data Streamsusando outras linguagens de programação. Para obter mais informações sobre todos os disponíveisAWSSDKs, consulteComece a desenvolver com a Amazon Web Services.

Cada tarefa tem pré-requisitos. Por exemplo, não é possível adicionar dados a um stream enquanto o stream não é criado, o que requer a criação de um cliente. Para obter mais informações, consulte Criar e gerenciar streamings.

Adicionar dados a um stream

Quando um stream é criado, você pode adicionar dados a ele na forma de registros. Um registro é uma estrutura de dados que contém os dados a serem processados na forma de um blob de dados. Depois de armazenar dados no registro, o Kinesis Data Streams não inspeciona, interpreta ou altera dados de forma alguma. Cada registro também tem um número sequencial e uma chave de partição associados.

Há duas operações diferentes na API do Kinesis Data Streams que adicionam dados a um stream,PutRecordsePutRecord. A operação PutRecords envia vários registros ao stream por solicitação HTTP e a operação singular PutRecord envia registros ao stream um por vez (uma solicitação HTTP separada é necessária para cada registro). Talvez convenha usar PutRecords para a maioria dos aplicativos, pois ele atingirá uma taxa de transferência mais alta por produtor de dados. Para obter mais informações sobre cada uma dessas operações, consulte as subseções abaixo.

Sempre lembre-se de que, como o aplicativo de origem está adicionando dados ao stream usando a API do Kinesis Data Streams, provavelmente há um ou mais aplicativos de consumidor que estão processando dados fora do stream simultaneamente. Para obter informações sobre como os consumidores obtêm dados usando a API do Kinesis Data Streams, consulteComo obter dados de um stream.

Adicionar vários registros comPutRecords

OPutRecordsA operação envia vários registros ao Kinesis Data Streams em uma única solicitação. Ao usarPutRecordsOs produtores podem alcançar uma taxa de transferência mais alta ao enviar dados para seu stream de dados do Kinesis. EACHPutRecordsA solicitação pode oferecer suporte a até 500 registros. Cada registro na solicitação pode ter no máximo 1 MB, até um limite de 5 MB para toda a solicitação, incluindo chaves de partição. Tal como acontece com o singlePutRecordoperação descrita abaixo,PutRecordsO usa números sequenciais e chaves de partição. No entanto, o parâmetro PutRecord de SequenceNumberForOrdering não é incluído em uma chamada a PutRecords. A operação PutRecords tenta processar todos os registros na ordem natural da solicitação.

Cada registro de dados tem um número sequencial exclusivo. O número sequência é atribuído pelo Kinesis Data Streams após a chamadaclient.putRecordsPara adicionar os registros de dados ao stream. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecords, maiores ficam os números sequenciais.

nota

Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.

Uma solicitação PutRecords pode incluir registros com diferentes chaves de partição. O escopo da solicitação é um stream; cada solicitação pode incluir qualquer combinação de chaves de partição e registros, dentro dos limites da solicitação. As solicitações feitas com diferentes chaves de partição a streams com muitos estilhaços diferentes costumam ser mais rápidas do que as solicitações com um pequeno número de chaves de partição para um pequeno número de estilhaços. O número de chaves de partição deve ser muito maior do que o número de estilhaços para reduzir a latência e maximizar a taxa de transferência.

PutRecordsExemplo

O código a seguir cria 100 registros de dados com chaves de partição sequenciais e os coloca em um stream denominado DataStream.

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);

A resposta a PutRecords inclui uma matriz de resposta Records. Cada registro na matriz de resposta se correlaciona diretamente com um registro na matriz de solicitação por ordenação natural, do início ao fim da solicitação e da resposta. A matriz de resposta de Records sempre inclui o mesmo número de registros da matriz de solicitação.

Tratamento de falhas ao usarPutRecords

Por padrão, a falha de registros individuais em uma solicitação não interrompe o processamento de registros subsequentes em uma solicitação PutRecords. Isso significa que uma matriz de resposta Records inclui os registros com processamento bem-sucedido e malsucedido. É preciso detectar os registros com processamento malsucedido e incluí-los em uma chamada subsequente.

Os registros bem-sucedidos incluem os valores SequenceNumber e ShardID, e os registros malsucedidos incluem os valores ErrorCode e ErrorMessage. O parâmetro ErrorCode reflete o tipo de erro e pode ter um dos seguintes valores: ProvisionedThroughputExceededException ou InternalFailure. ErrorMessage fornece informações mais detalhadas sobre a exceção ProvisionedThroughputExceededException, incluindo o ID da conta, o nome do streaming e o ID do estilhaço do registro que foi limitado. O exemplo abaixo tem três registros em uma solicitação PutRecords. O segundo registro falha e isso é refletido na resposta.

exemplo PutRecordsSintaxe da solicitação
{ "Records": [ { "Data": "XzxkYXRhPl8w", "PartitionKey": "partitionKey1" }, { "Data": "AbceddeRFfg12asd", "PartitionKey": "partitionKey1" }, { "Data": "KFpcd98*7nd1", "PartitionKey": "partitionKey3" } ], "StreamName": "myStream" }
exemplo PutRecordsSintaxe da resposta
{ "FailedRecordCount”: 1, "Records": [ { "SequenceNumber": "21269319989900637946712965403778482371", "ShardId": "shardId-000000000001" }, { “ErrorCode":”ProvisionedThroughputExceededException”, “ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111." }, { "SequenceNumber": "21269319989999637946712965403778482985", "ShardId": "shardId-000000000002" } ] }

Os registros com processamento malsucedido podem ser incluídos nas solicitações PutRecords subsequentes. Primeiro, verifique o parâmetro FailedRecordCount no putRecordsResult para confirmar se há registros com falha na solicitação. Assim sendo, cada putRecordsEntry com um ErrorCode que não seja null deve ser adicionado a uma solicitação subsequente. Para obter um exemplo desse tipo de handler, consulte o seguinte código.

exemplo PutRecordsmanipulador de falhas
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }

Adicionar um único registro comPutRecord

Cada chamada para PutRecord opera em um único registro. Prefira a operação PutRecords descrita em Adicionar vários registros comPutRecords, a menos que seu aplicativo precise especificamente enviar sempre registos únicos por solicitação ou algum outro motivo para o não uso de PutRecords.

Cada registro de dados tem um número sequencial exclusivo. O número sequência é atribuído pelo Kinesis Data Streams após a chamadaclient.putRecordPara adicionar o registro de dados ao stream. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecord, maiores ficam os números sequenciais.

Quando ocorrem colocações em rápida sucessão, não há garantia de que os números sequenciais retornados aumentem, porque as operações put aparentam ser essencialmente simultâneas ao Kinesis Data Streams. Para garantir estritamente o aumento de números sequenciais para a mesma chave de partição, use o parâmetro SequenceNumberForOrdering, como mostrado no código de exemplo em PutRecordExemplo.

Se você usa ou nãoSequenceNumberForOrdering, registra que o Kinesis Data Streams recebe por meio de umGetRecordschamadas são estritamente ordenadas por número de sequência.

nota

Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.

Uma chave de partição é usada para agrupar os dados dentro de um stream. Um registro de dados é atribuído a um estilhaço dentro do stream com base em sua chave de partição. Especificamente, o Kinesis Data Streams usa a chave de partição como entrada para uma função de hash que mapeia a chave de partição (e dados associados) para um determinado estilhaço.

Como resultado desse mecanismo de hashing, todos os registros de dados com a mesma chave de partição são mapeados para o mesmo estilhaço no stream. No entanto, se o número de chaves de partição ultrapassar o número de estilhaços, alguns estilhaços conterão necessariamente registros com chaves de partição diferentes. Do ponto de vista do design, para garantir que todos os seus estilhaços sejam bem utilizados, o número de estilhaços (especificado pelo método setShardCount de CreateStreamRequest) deve ser substancialmente menor que o número de chaves de partição exclusivas, e o volume de dados que flui para uma única chave de partição deve ser substancialmente menor que a capacidade do estilhaço.

PutRecordExemplo

O código a seguir cria dez registros de dados, distribuídos entre duas chaves de partição, e os coloca em um stream denominado myStreamName.

for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }

O código de exemplo anterior usa setSequenceNumberForOrdering para garantir estritamente o aumento da ordenação dentro de cada chave de partição. Para usar esse parâmetro de forma eficaz, defina o SequenceNumberForOrdering do registro atual (registro n) como o número de sequência do registro anterior (registro n-1). Para obter o número sequencial de um registro que foi adicionado ao stream, chame getSequenceNumber para o resultado de putRecord.

O parâmetro SequenceNumberForOrdering garante estritamente o aumento de números de sequência para a mesma chave de partição. SequenceNumberForOrdering não fornece a ordenação de registros em várias chaves de partição.

Interagir com dados usando oAWSRegistro de esquemas

Você pode integrar seus fluxos de dados do Kinesis com oAWSRegistro de esquema de Glue. OAWSGlue Registro de esquemas do permite detectar, controlar e evoluir centralmente esquemas, garantindo que dados produzidos sejam continuamente validados por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. OAWSO Glue Schema Registry permite melhorarend-to-endqualidade de dados e governança de dados em seus aplicativos de streaming. Para obter mais informações, consulteAWSRegistro de esquemas. Uma das maneiras de configurar essa integração é através doPutRecordsePutRecordAs APIs do Kinesis Data Streams disponíveis noAWSSDK do Java.

Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com oPutRecordsePutRecordConsulte as APIs do Kinesis Data Streams, consulte a seção “Interagir com dados usando as APIs do Kinesis Data Streams” noCaso de uso: Integrar o Amazon Kinesis Data Streams com oAWSRegistro de esquemas.