As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Desenvolver produtores usando a API do Amazon Kinesis Data Streams com oAWS SDK for Java
Você pode desenvolver produtores usando a API do Amazon Kinesis Data Streams com oAWSSDK for Java. Se você não estiver familiarizado com o Kinesis Data Streams, comece familiarizando-se com os conceitos e a terminologia apresentados emO que é o Amazon Kinesis Data Streams?ePrimeiros passos com o Amazon Kinesis Data Streams.
Esses exemplos discutem oAPI Kinesis Data Streamse use oAWSSDK for Java
O código Java de exemplo neste capítulo demonstra como executar operações básicas da API do Kinesis Data Streams e está dividido logicamente por tipo de operação. Esses exemplos não representam um código pronto para produção, pois não verificam todas as exceções possíveis nem abrangem todas as considerações de segurança ou de performance possíveis. Além disso, você pode chamar oAPI Kinesis Data Streamsusando outras linguagens de programação. Para obter mais informações sobre todos os disponíveisAWSSDKs, consulteComece a desenvolver com a Amazon Web Services
Cada tarefa tem pré-requisitos. Por exemplo, não é possível adicionar dados a um stream enquanto o stream não é criado, o que requer a criação de um cliente. Para obter mais informações, consulte Criar e gerenciar streamings.
Adicionar dados a um stream
Quando um stream é criado, você pode adicionar dados a ele na forma de registros. Um registro é uma estrutura de dados que contém os dados a serem processados na forma de um blob de dados. Depois de armazenar dados no registro, o Kinesis Data Streams não inspeciona, interpreta ou altera dados de forma alguma. Cada registro também tem um número sequencial e uma chave de partição associados.
Há duas operações diferentes na API do Kinesis Data Streams que adicionam dados a um stream,PutRecords
ePutRecord
. A operação PutRecords
envia vários registros ao stream por solicitação HTTP e a operação singular PutRecord
envia registros ao stream um por vez (uma solicitação HTTP separada é necessária para cada registro). Talvez convenha usar PutRecords
para a maioria dos aplicativos, pois ele atingirá uma taxa de transferência mais alta por produtor de dados. Para obter mais informações sobre cada uma dessas operações, consulte as subseções abaixo.
Sempre lembre-se de que, como o aplicativo de origem está adicionando dados ao stream usando a API do Kinesis Data Streams, provavelmente há um ou mais aplicativos de consumidor que estão processando dados fora do stream simultaneamente. Para obter informações sobre como os consumidores obtêm dados usando a API do Kinesis Data Streams, consulteComo obter dados de um stream.
Importante
Adicionar vários registros comPutRecords
OPutRecords
A operação envia vários registros ao Kinesis Data Streams em uma única solicitação. Ao usarPutRecords
Os produtores podem alcançar uma taxa de transferência mais alta ao enviar dados para seu stream de dados do Kinesis. EACHPutRecords
A solicitação pode oferecer suporte a até 500 registros. Cada registro na solicitação pode ter no máximo 1 MB, até um limite de 5 MB para toda a solicitação, incluindo chaves de partição. Tal como acontece com o singlePutRecord
operação descrita abaixo,PutRecords
O usa números sequenciais e chaves de partição. No entanto, o parâmetro PutRecord
de SequenceNumberForOrdering
não é incluído em uma chamada a PutRecords
. A operação PutRecords
tenta processar todos os registros na ordem natural da solicitação.
Cada registro de dados tem um número sequencial exclusivo. O número sequência é atribuído pelo Kinesis Data Streams após a chamadaclient.putRecords
Para adicionar os registros de dados ao stream. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecords
, maiores ficam os números sequenciais.
nota
Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.
Uma solicitação PutRecords
pode incluir registros com diferentes chaves de partição. O escopo da solicitação é um stream; cada solicitação pode incluir qualquer combinação de chaves de partição e registros, dentro dos limites da solicitação. As solicitações feitas com diferentes chaves de partição a streams com muitos estilhaços diferentes costumam ser mais rápidas do que as solicitações com um pequeno número de chaves de partição para um pequeno número de estilhaços. O número de chaves de partição deve ser muito maior do que o número de estilhaços para reduzir a latência e maximizar a taxa de transferência.
PutRecordsExemplo
O código a seguir cria 100 registros de dados com chaves de partição sequenciais e os coloca em um stream denominado DataStream
.
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
A resposta a PutRecords
inclui uma matriz de resposta Records
. Cada registro na matriz de resposta se correlaciona diretamente com um registro na matriz de solicitação por ordenação natural, do início ao fim da solicitação e da resposta. A matriz de resposta de Records
sempre inclui o mesmo número de registros da matriz de solicitação.
Tratamento de falhas ao usarPutRecords
Por padrão, a falha de registros individuais em uma solicitação não interrompe o processamento de registros subsequentes em uma solicitação PutRecords
. Isso significa que uma matriz de resposta Records
inclui os registros com processamento bem-sucedido e malsucedido. É preciso detectar os registros com processamento malsucedido e incluí-los em uma chamada subsequente.
Os registros bem-sucedidos incluem os valores SequenceNumber
e ShardID
, e os registros malsucedidos incluem os valores ErrorCode
e ErrorMessage
. O parâmetro ErrorCode
reflete o tipo de erro e pode ter um dos seguintes valores: ProvisionedThroughputExceededException
ou InternalFailure
. ErrorMessage
fornece informações mais detalhadas sobre a exceção ProvisionedThroughputExceededException
, incluindo o ID da conta, o nome do streaming e o ID do estilhaço do registro que foi limitado. O exemplo abaixo tem três registros em uma solicitação PutRecords
. O segundo registro falha e isso é refletido na resposta.
exemplo PutRecordsSintaxe da solicitação
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
exemplo PutRecordsSintaxe da resposta
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
Os registros com processamento malsucedido podem ser incluídos nas solicitações PutRecords
subsequentes. Primeiro, verifique o parâmetro FailedRecordCount
no putRecordsResult
para confirmar se há registros com falha na solicitação. Assim sendo, cada putRecordsEntry
com um ErrorCode
que não seja null
deve ser adicionado a uma solicitação subsequente. Para obter um exemplo desse tipo de handler, consulte o seguinte código.
exemplo PutRecordsmanipulador de falhas
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
Adicionar um único registro comPutRecord
Cada chamada para PutRecord
opera em um único registro. Prefira a operação PutRecords
descrita em Adicionar vários registros comPutRecords, a menos que seu aplicativo precise especificamente enviar sempre registos únicos por solicitação ou algum outro motivo para o não uso de PutRecords
.
Cada registro de dados tem um número sequencial exclusivo. O número sequência é atribuído pelo Kinesis Data Streams após a chamadaclient.putRecord
Para adicionar o registro de dados ao stream. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecord
, maiores ficam os números sequenciais.
Quando ocorrem colocações em rápida sucessão, não há garantia de que os números sequenciais retornados aumentem, porque as operações put aparentam ser essencialmente simultâneas ao Kinesis Data Streams. Para garantir estritamente o aumento de números sequenciais para a mesma chave de partição, use o parâmetro SequenceNumberForOrdering
, como mostrado no código de exemplo em PutRecordExemplo.
Se você usa ou nãoSequenceNumberForOrdering
, registra que o Kinesis Data Streams recebe por meio de umGetRecords
chamadas são estritamente ordenadas por número de sequência.
nota
Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.
Uma chave de partição é usada para agrupar os dados dentro de um stream. Um registro de dados é atribuído a um estilhaço dentro do stream com base em sua chave de partição. Especificamente, o Kinesis Data Streams usa a chave de partição como entrada para uma função de hash que mapeia a chave de partição (e dados associados) para um determinado estilhaço.
Como resultado desse mecanismo de hashing, todos os registros de dados com a mesma chave de partição são mapeados para o mesmo estilhaço no stream. No entanto, se o número de chaves de partição ultrapassar o número de estilhaços, alguns estilhaços conterão necessariamente registros com chaves de partição diferentes. Do ponto de vista do design, para garantir que todos os seus estilhaços sejam bem utilizados, o número de estilhaços (especificado pelo método setShardCount
de CreateStreamRequest
) deve ser substancialmente menor que o número de chaves de partição exclusivas, e o volume de dados que flui para uma única chave de partição deve ser substancialmente menor que a capacidade do estilhaço.
PutRecordExemplo
O código a seguir cria dez registros de dados, distribuídos entre duas chaves de partição, e os coloca em um stream denominado myStreamName
.
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
O código de exemplo anterior usa setSequenceNumberForOrdering
para garantir estritamente o aumento da ordenação dentro de cada chave de partição. Para usar esse parâmetro de forma eficaz, defina o SequenceNumberForOrdering
do registro atual (registro n) como o número de sequência do registro anterior (registro n-1). Para obter o número sequencial de um registro que foi adicionado ao stream, chame getSequenceNumber
para o resultado de putRecord
.
O parâmetro SequenceNumberForOrdering
garante estritamente o aumento de números de sequência para a mesma chave de partição. SequenceNumberForOrdering
não fornece a ordenação de registros em várias chaves de partição.
Interagir com dados usando oAWSRegistro de esquemas
Você pode integrar seus fluxos de dados do Kinesis com oAWSRegistro de esquema de Glue. OAWSGlue Registro de esquemas do permite detectar, controlar e evoluir centralmente esquemas, garantindo que dados produzidos sejam continuamente validados por um esquema registrado. O esquema define a estrutura e o formato de um registro de dados. Um esquema é uma especificação versionada para publicação, consumo ou armazenamento de dados confiáveis. OAWSO Glue Schema Registry permite melhorarend-to-endqualidade de dados e governança de dados em seus aplicativos de streaming. Para obter mais informações, consulteAWSRegistro de esquemas. Uma das maneiras de configurar essa integração é através doPutRecords
ePutRecord
As APIs do Kinesis Data Streams disponíveis noAWSSDK do Java.
Para obter instruções detalhadas sobre como configurar a integração do Kinesis Data Streams com oPutRecordsePutRecordConsulte as APIs do Kinesis Data Streams, consulte a seção “Interagir com dados usando as APIs do Kinesis Data Streams” noCaso de uso: Integrar o Amazon Kinesis Data Streams com oAWSRegistro de esquemas.