As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Desenvolva produtores usando o Amazon Kinesis API Data Streams com o AWS SDK for Java
Você pode desenvolver produtores usando o Amazon Kinesis API Data Streams AWS SDK com o for Java. Se você nunca usou o Kinesis Data Streams, comece familiarizando-se com os conceitos e a terminologia apresentados em O que é o Amazon Kinesis Data Streams? e Use o AWS CLI para realizar operações do Amazon Kinesis Data Streams.
Esses exemplos discutem o Kinesis API Data Streams e AWS SDKusam o
O código de exemplo de Java neste capítulo demonstra como realizar operações básicas do Kinesis Data API Streams e é dividido logicamente por tipo de operação. Esses exemplos não representam um código pronto para produção, pois não verificam todas as exceções possíveis nem abrangem todas as considerações de segurança ou de performance possíveis. Além disso, você pode chamar o Kinesis API Data Streams usando outras linguagens de programação. Para obter mais informações sobre todas as opções disponíveis AWS SDKs, consulte Comece a desenvolver com a Amazon Web Services
Cada tarefa tem pré-requisitos. Por exemplo, não é possível adicionar dados a um stream enquanto o stream não é criado, o que requer a criação de um cliente. Para obter mais informações, consulte Crie e gerencie streams de dados do Kinesis.
Adicionar dados a um stream
Quando um stream é criado, você pode adicionar dados a ele na forma de registros. Um registro é uma estrutura de dados que contém os dados a serem processados na forma de um blob de dados. Depois que você armazena os dados no registro, o Kinesis Data Streams não inspeciona, interpreta nem altera dados de forma alguma. Cada registro também tem um número sequencial e uma chave de partição associados.
Há duas operações diferentes no Kinesis Data Streams que adicionam API dados a um PutRecords
stream, e. PutRecord
A PutRecords
operação envia vários registros para seu stream por HTTP solicitação, e a PutRecord
operação singular envia registros para seu stream, um por vez (uma HTTP solicitação separada é necessária para cada registro). Talvez convenha usar PutRecords
para a maioria dos aplicativos, pois ele atingirá uma throughput mais alta por produtor de dados. Para obter mais informações sobre cada uma dessas operações, consulte as subseções abaixo.
Lembre-se sempre de que, como seu aplicativo de origem está adicionando dados ao stream usando o Kinesis API Data Streams, provavelmente há um ou mais aplicativos consumidores que estão processando simultaneamente dados fora do stream. Para obter informações sobre como os consumidores obtêm dados usando o Kinesis API Data StreamsObter dados de um stream, consulte.
Importante
Adicione vários registros com PutRecords
A operação PutRecords
envia vários registros ao Kinesis Data Streams em uma única solicitação. Ao usar PutRecords
, os produtores podem obter uma throughput mais alta ao enviar dados para o fluxo de dados do Kinesis. Cada solicitação PutRecords
pode oferecer suporte a até 500 registros. Cada registro na solicitação pode ter no máximo 1 MB, até um limite de 5 MB para toda a solicitação, incluindo chaves de partição. Assim como a operação única PutRecord
descrita abaixo, PutRecords
usa números de sequência e chaves de partição. No entanto, o parâmetro PutRecord
de SequenceNumberForOrdering
não é incluído em uma chamada a PutRecords
. A operação PutRecords
tenta processar todos os registros na ordem natural da solicitação.
Cada registro de dados tem um número sequencial exclusivo. O número de sequência é atribuído pelo Kinesis Data Streams depois que client.putRecords
é chamada para adicionar os registros de dados ao fluxo. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecords
, maiores ficam os números sequenciais.
nota
Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.
Uma solicitação PutRecords
pode incluir registros com diferentes chaves de partição. O escopo da solicitação é um stream; cada solicitação pode incluir qualquer combinação de chaves de partição e registros, dentro dos limites da solicitação. As solicitações feitas com diferentes chaves de partição a streams com muitos estilhaços diferentes costumam ser mais rápidas do que as solicitações com um pequeno número de chaves de partição para um pequeno número de estilhaços. O número de chaves de partição deve ser muito maior do que o número de estilhaços para reduzir a latência e maximizar a throughput.
PutRecordsexemplo
O código a seguir cria 100 registros de dados com chaves de partição sequenciais e os coloca em um stream denominado DataStream
.
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard(); clientBuilder.setRegion(regionName); clientBuilder.setCredentials(credentialsProvider); clientBuilder.setClientConfiguration(config); AmazonKinesis kinesisClient = clientBuilder.build(); PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(streamName); List <PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int i = 0; i < 100; i++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(i).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", i)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = kinesisClient.putRecords(putRecordsRequest); System.out.println("Put Result" + putRecordsResult);
A resposta a PutRecords
inclui uma matriz de resposta Records
. Cada registro na matriz de resposta se correlaciona diretamente com um registro na matriz de solicitação por ordenação natural, do início ao fim da solicitação e da resposta. A matriz de resposta de Records
sempre inclui o mesmo número de registros da matriz de solicitação.
Lidar com falhas ao usar PutRecords
Por padrão, a falha de registros individuais em uma solicitação não interrompe o processamento de registros subsequentes em uma solicitação PutRecords
. Isso significa que uma matriz de resposta Records
inclui os registros com processamento bem-sucedido e malsucedido. É preciso detectar os registros com processamento malsucedido e incluí-los em uma chamada subsequente.
Os registros bem-sucedidos incluem os valores SequenceNumber
e ShardID
, e os registros malsucedidos incluem os valores ErrorCode
e ErrorMessage
. O parâmetro ErrorCode
reflete o tipo de erro e pode ter um dos seguintes valores: ProvisionedThroughputExceededException
ou InternalFailure
. ErrorMessage
fornece informações mais detalhadas sobre a exceção ProvisionedThroughputExceededException
, incluindo o ID da conta, o nome do streaming e o ID do estilhaço do registro que foi limitado. O exemplo abaixo tem três registros em uma solicitação PutRecords
. O segundo registro falha e isso é refletido na resposta.
exemplo PutRecords Sintaxe da solicitação
{
"Records": [
{
"Data": "XzxkYXRhPl8w",
"PartitionKey": "partitionKey1"
},
{
"Data": "AbceddeRFfg12asd",
"PartitionKey": "partitionKey1"
},
{
"Data": "KFpcd98*7nd1",
"PartitionKey": "partitionKey3"
}
],
"StreamName": "myStream"
}
exemplo PutRecords Sintaxe de resposta
{
"FailedRecordCount”: 1,
"Records": [
{
"SequenceNumber": "21269319989900637946712965403778482371",
"ShardId": "shardId-000000000001"
},
{
“ErrorCode":”ProvisionedThroughputExceededException”,
“ErrorMessage": "Rate exceeded for shard shardId-000000000001 in stream exampleStreamName under account 111111111111."
},
{
"SequenceNumber": "21269319989999637946712965403778482985",
"ShardId": "shardId-000000000002"
}
]
}
Os registros com processamento malsucedido podem ser incluídos nas solicitações PutRecords
subsequentes. Primeiro, verifique o parâmetro FailedRecordCount
no putRecordsResult
para confirmar se há registros com falha na solicitação. Assim sendo, cada putRecordsEntry
com um ErrorCode
que não seja null
deve ser adicionado a uma solicitação subsequente. Para obter um exemplo desse tipo de handler, consulte o seguinte código.
exemplo PutRecords manipulador de falhas
PutRecordsRequest putRecordsRequest = new PutRecordsRequest(); putRecordsRequest.setStreamName(myStreamName); List<PutRecordsRequestEntry> putRecordsRequestEntryList = new ArrayList<>(); for (int j = 0; j < 100; j++) { PutRecordsRequestEntry putRecordsRequestEntry = new PutRecordsRequestEntry(); putRecordsRequestEntry.setData(ByteBuffer.wrap(String.valueOf(j).getBytes())); putRecordsRequestEntry.setPartitionKey(String.format("partitionKey-%d", j)); putRecordsRequestEntryList.add(putRecordsRequestEntry); } putRecordsRequest.setRecords(putRecordsRequestEntryList); PutRecordsResult putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); while (putRecordsResult.getFailedRecordCount() > 0) { final List<PutRecordsRequestEntry> failedRecordsList = new ArrayList<>(); final List<PutRecordsResultEntry> putRecordsResultEntryList = putRecordsResult.getRecords(); for (int i = 0; i < putRecordsResultEntryList.size(); i++) { final PutRecordsRequestEntry putRecordRequestEntry = putRecordsRequestEntryList.get(i); final PutRecordsResultEntry putRecordsResultEntry = putRecordsResultEntryList.get(i); if (putRecordsResultEntry.getErrorCode() != null) { failedRecordsList.add(putRecordRequestEntry); } } putRecordsRequestEntryList = failedRecordsList; putRecordsRequest.setRecords(putRecordsRequestEntryList); putRecordsResult = amazonKinesisClient.putRecords(putRecordsRequest); }
Adicione um único registro com PutRecord
Cada chamada para PutRecord
opera em um único registro. Prefira a operação PutRecords
descrita em Adicione vários registros com PutRecords, a menos que seu aplicativo precise especificamente enviar sempre registos únicos por solicitação ou algum outro motivo para o não uso de PutRecords
.
Cada registro de dados tem um número sequencial exclusivo. O número de sequência é atribuído pelo Kinesis Data Streams depois que client.putRecord
é chamada para adicionar o registro de dados ao fluxo. Os números sequenciais da mesma chave de partição geralmente aumentam com o tempo: quanto maior o período entre as solicitações PutRecord
, maiores ficam os números sequenciais.
Quando ocorrem colocações em rápida sucessão, não há garantia de que os números de sequência retornados aumentem, porque as operações put aparentam ser essencialmente simultâneas para o Kinesis Data Streams. Para garantir estritamente o aumento de números sequenciais para a mesma chave de partição, use o parâmetro SequenceNumberForOrdering
, como mostrado no código de exemplo em PutRecordexemplo.
Usando ou não SequenceNumberForOrdering
, os registros que o Kinesis Data Streams recebe por meio de uma chamada a GetRecords
são estritamente ordenados por número de sequência.
nota
Os números de sequência não podem ser usados como índices para conjuntos de dados dentro do mesmo stream. Para separar logicamente conjuntos de dados, use chaves de partição ou crie um stream separado para cada conjunto de dados.
Uma chave de partição é usada para agrupar os dados dentro de um stream. Um registro de dados é atribuído a um estilhaço dentro do stream com base em sua chave de partição. Especificamente, o Kinesis Data Streams usa a chave de partição como entrada para uma função de hash que mapeia essa chave (e os dados associados) a um determinado fragmento.
Como resultado desse mecanismo de hashing, todos os registros de dados com a mesma chave de partição são mapeados para o mesmo estilhaço no stream. No entanto, se o número de chaves de partição ultrapassar o número de estilhaços, alguns estilhaços conterão necessariamente registros com chaves de partição diferentes. Do ponto de vista do design, para garantir que todos os seus estilhaços sejam bem utilizados, o número de estilhaços (especificado pelo método setShardCount
de CreateStreamRequest
) deve ser substancialmente menor que o número de chaves de partição exclusivas, e o volume de dados que flui para uma única chave de partição deve ser substancialmente menor que a capacidade do estilhaço.
PutRecordexemplo
O código a seguir cria dez registros de dados, distribuídos entre duas chaves de partição, e os coloca em um stream denominado myStreamName
.
for (int j = 0; j < 10; j++) { PutRecordRequest putRecordRequest = new PutRecordRequest(); putRecordRequest.setStreamName( myStreamName ); putRecordRequest.setData(ByteBuffer.wrap( String.format( "testData-%d", j ).getBytes() )); putRecordRequest.setPartitionKey( String.format( "partitionKey-%d", j/5 )); putRecordRequest.setSequenceNumberForOrdering( sequenceNumberOfPreviousRecord ); PutRecordResult putRecordResult = client.putRecord( putRecordRequest ); sequenceNumberOfPreviousRecord = putRecordResult.getSequenceNumber(); }
O código de exemplo anterior usa setSequenceNumberForOrdering
para garantir estritamente o aumento da ordenação dentro de cada chave de partição. Para usar esse parâmetro de forma eficaz, defina o SequenceNumberForOrdering
do registro atual (registro n) como o número de sequência do registro anterior (registro n-1). Para obter o número sequencial de um registro que foi adicionado ao stream, chame getSequenceNumber
para o resultado de putRecord
.
O parâmetro SequenceNumberForOrdering
garante estritamente o aumento de números de sequência para a mesma chave de partição. SequenceNumberForOrdering
não fornece a ordenação de registros em várias chaves de partição.