Executar operações básicas de fluxo de dados do Kinesis usando a AWS CLI - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Executar operações básicas de fluxo de dados do Kinesis usando a AWS CLI

Esta seção descreve o uso básico de um stream de dados do Kinesis a partir da linha de comando usando aAWS CLI. Certifique-se de estar familiarizado com os conceitos discutidos em Terminologia e conceitos do Amazon Kinesis Data Streams.

nota

Depois de criar um stream, sua conta incorre em cobranças nominais pelo uso do Kinesis Data Streams, pois o Kinesis Data Streams não está qualificado para oAWSnível gratuito. Quando você concluir este tutorial, exclua seuAWSrecursos para parar de incorrer em cobranças. Para obter mais informações, consulte Etapa 4: Limpar.

Etapa 1: Criar um fluxo

Sua primeira etapa é criar um stream e verificar se ele foi criado com êxito. Use o comando a seguir para criar um stream denominado "Foo":

aws kinesis create-stream --stream-name Foo

Em seguida, emita o comando a seguir para verificar o andamento da criação do stream:

aws kinesis describe-stream-summary --stream-name Foo

Você deve obter uma saída semelhante ao exemplo a seguir:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "CREATING", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Neste exemplo, o stream tem o status CREATING, o que significa que não está pronto para uso. Verifique novamente em alguns instantes para ver uma saída semelhante ao exemplo a seguir:

{ "StreamDescriptionSummary": { "StreamName": "Foo", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/Foo", "StreamStatus": "ACTIVE", "RetentionPeriodHours": 48, "StreamCreationTimestamp": 1572297168.0, "EnhancedMonitoring": [ { "ShardLevelMetrics": [] } ], "EncryptionType": "NONE", "OpenShardCount": 3, "ConsumerCount": 0 } }

Há informações nessa saída com que você não precisa se preocupar neste tutorial. O principal agora é "StreamStatus": "ACTIVE", que mostra que o stream está pronto para uso, e as informações sobre o único estilhaço que você solicitou. Você também pode verificar a existência do novo stream usando o comando list-streams, como mostrado aqui:

aws kinesis list-streams

Resultado:

{ "StreamNames": [ "Foo" ] }

Etapa 2: Inserir um registro

Agora que tem um stream ativo, você está pronto para colocar alguns dados. Neste tutorial, você usará o comando mais simples possível, put-record, que coloca um único registro de dados contendo o texto "testdata" no stream:

aws kinesis put-record --stream-name Foo --partition-key 123 --data testdata

Se bem-sucedido, esse comando gerará uma saída semelhante ao seguinte exemplo:

{ "ShardId": "shardId-000000000000", "SequenceNumber": "49546986683135544286507457936321625675700192471156785154" }

Parabéns, você adicionou dados a um stream! Em seguida, você verá como obter dados do stream.

Etapa 3: Obter o registro

GetShardIterador

Antes de obter dados do stream, você precisa obter o iterador referente ao estilhaço do seu interesse. Um iterador de estilhaços representa a posição do stream e do estilhaço da qual o consumidor (neste caso, o comando get-record) lerá. Você usará o comando get-shard-iterator da seguinte forma:

aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo

Lembre-se de que oaws kinesisOs comandos têm o suporte de uma API do Kinesis Data Streams, portanto, caso você tenha curiosidade sobre algum parâmetro mostrado, leia sobre ele naGetShardIteratorTópico de referência da API. A execução bem-sucedida gerará uma saída semelhante ao exemplo a seguir (role no sentido horizontal para ver a saída inteira):

{ "ShardIterator": "AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=" }

A string longa de caracteres aparentemente aleatórios é o iterador de estilhaços (a sua será diferente). Você precisará copiar/colar o iterador de estilhaços no comando get, mostrado a seguir. Os iteradores de estilhaços têm um ciclo de vida de 300 segundos, tempo que deve ser suficiente para você copiar/colar o iterador de estilhaços no próximo comando. Você precisará remover quaisquer novas linhas do seu iterador de estilhaços antes de colá-lo no próximo comando. Se você receber uma mensagem de erro de que o iterador de estilhaços não é mais válido, basta executar o comando get-shard-iterator novamente.

GetRecords

Oget-recordsO comando recebe dados do stream, resultando em uma chamada para oGetRecordsna API do Kinesis Data Streams. O iterador de estilhaços especifica a posição, no estilhaço, de onde você quer começar a ler os registros de dados em sequência. Se não houver registros disponíveis na parte do estilhaço para onde o iterador aponta, GetRecords retornará uma lista vazia. Poderá demorar várias chamadas para se chegar a uma parte do estilhaço que contenha registros.

No exemplo a seguir do comando get-records (role horizontalmente para ver o comando inteiro):

aws kinesis get-records --shard-iterator AAAAAAAAAAHSywljv0zEgPX4NyKdZ5wryMzP9yALs8NeKbUjp1IxtZs1Sp+KEd9I6AJ9ZG4lNR1EMi+9Md/nHvtLyxpfhEzYvkTZ4D9DQVz/mBYWRO6OTZRKnW9gd+efGN2aHFdkH1rJl4BL9Wyrk+ghYG22D2T1Da2EyNSH1+LAbK33gQweTJADBdyMwlo5r6PqcP2dzhg=

Se você estiver executando este tutorial a partir de um processador de comando do tipo Unix, como bash, poderá automatizar a aquisição de iterador de estilhaços usando um comando aninhado, como este (role horizontalmente para ver o comando inteiro):

SHARD_ITERATOR=$(aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo --query 'ShardIterator') aws kinesis get-records --shard-iterator $SHARD_ITERATOR

Se você estiver executando este tutorial a partir de um sistema compatívelPowerShell, você pode automatizar a aquisição do iterador de estilhaços usando um comando como este (role horizontalmente para ver o comando inteiro):

aws kinesis get-records --shard-iterator ((aws kinesis get-shard-iterator --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON --stream-name Foo).split('"')[4])

O resultado bem-sucedido do comando get-records solicitará registros do seu stream para o estilhaço que você especificou quando obteve o iterador de estilhaços, como no exemplo a seguir (role horizontalmente para ver a saída inteira):

{ "Records":[ { "Data":"dGVzdGRhdGE=", "PartitionKey":"123”, "ApproximateArrivalTimestamp": 1.441215410867E9, "SequenceNumber":"49544985256907370027570885864065577703022652638596431874" } ], "MillisBehindLatest":24000, "NextShardIterator":"AAAAAAAAAAEDOW3ugseWPE4503kqN1yN1UaodY8unE0sYslMUmC6lX9hlig5+t4RtZM0/tALfiI4QGjunVgJvQsjxjh2aLyxaAaPr+LaoENQ7eVs4EdYXgKyThTZGPcca2fVXYJWL3yafv9dsDwsYVedI66dbMZFC8rPMWc797zxQkv4pSKvPOZvrUIudb8UkH3VMzx58Is=" }

Observe que get-records é descrito acima como uma solicitação, ou seja, você pode receber zero ou mais registros mesmo que haja registros em seu stream, e os registros retornados podem não representar todos os registros que existem atualmente no stream. Isso é perfeitamente normal, e o código de produção simplesmente sondará o stream quanto aos registros em intervalos apropriados (essa velocidade de sondagem varia de acordo com os requisitos de design específicos do aplicativo).

A primeira coisa que você provavelmente verá sobre o seu registro nesta parte do tutorial é que os dados parecem lixo, e não o em texto limpotestdataenviamos. Isso ocorre devido ao modo como put-record usa a codificação Base64 para permitir que você envie dados binários. No entanto, o Kinesis Data Streams oferece suporte noAWS CLInão fornece Base64decodificandoComo essa decodificação para conteúdo binário bruto impresso em stdout acarretará comportamento indesejado e possíveis problemas de segurança em algumas plataformas e terminais. Se você usar um decodificador Base64 (por exemplo, https://www.base64decode.org/) para decodificar dGVzdGRhdGE= manualmente, verá que ele é, na verdade, testdata. Isso é suficiente para os fins deste tutorial, pois, na prática, a AWS CLI raramente é usada para consumir dados. Ela é usada com mais frequência para monitorar o estado do stream e obter informações, como mostrado anteriormente (describe-stream e list-streams). Tutoriais futuros mostrarão como criar aplicativos de consumidor de qualidade de produção usando a Kinesis Client Library (KCL), que se encarregará da Base64 para você. Para obter mais informações sobre o KCL, consulteDesenvolver consumidores personalizados com taxa de transferência compartilhada usando a KCL.

Nem sempre get-records retornará todos os registros no stream/estilhaço especificado. Quando isso acontecer, use o NextShardIterator a partir do último resultado para obter o próximo conjunto de registros. Portanto, se mais dados estavam sendo colocados no stream (situação normal em aplicativos de produção), você pode continuar sondando dados usando get-records todas as vezes. No entanto, se você não chamar get-records usando o próximo iterador de estilhaços dentro do tempo de vida de 300 segundos do iterador de estilhaços, receberá uma mensagem de erro e precisará usar o comando get-shard-iterator para obter um novo iterador de estilhaços.

Essa saída também fornece MillisBehindLatest, que é o número de milissegundos em que a resposta da operação GetRecords está da ponta do stream, indicando o atraso do consumidor em relação ao tempo atual. Um valor zero indica que o processamento de registros foi alcançado e não há nenhum registro novo para processar no momento. No caso deste tutorial, se você está lendo tudo conforme avança, poderá ver um número muito grande. Isso não é problema, pois por padrão, os registros de dados permanecem em um stream por 24 horas, aguardando você recuperá-los. Esse período, chamado de período de retenção, é configurável para até 365 dias.

Observe que um resultado bem-sucedido de get-records sempre terá um NextShardIterator, mesmo que não haja mais nenhum registro atualmente no stream. Este é um modelo de sondagem que assume que um produtor colocará mais registros no stream em um determinado momento. Você pode criar suas próprias rotinas de sondagem, porém, se usar a KCL mencionada anteriormente para desenvolver aplicativos de consumidor, ela se encarregará dessa sondagem para você.

Se você chamar get-records até que não haja mais registros no stream e o estilhaço do qual você está extraindo, verá uma saída com registros vazios, semelhante ao exemplo a seguir (role horizontalmente para ver a saída inteira):

{ "Records": [], "NextShardIterator": "AAAAAAAAAAGCJ5jzQNjmdhO6B/YDIDE56jmZmrmMA/r1WjoHXC/kPJXc1rckt3TFL55dENfe5meNgdkyCRpUPGzJpMgYHaJ53C3nCAjQ6s7ZupjXeJGoUFs5oCuFwhP+Wul/EhyNeSs5DYXLSSC5XCapmCAYGFjYER69QSdQjxMmBPE/hiybFDi5qtkT6/PsZNz6kFoqtDk=" }

Etapa 4: Limpar

Por fim, convém excluir seu stream para liberar recursos e evitar cobranças indesejadas à sua conta, como observado anteriormente. Faça isso efetivamente sempre que tiver criado um stream que não será usado, pois as cobranças incidem por stream mesmo que ele não seja usado para colocar e obter dados. O comando de limpeza é simples:

aws kinesis delete-stream --stream-name Foo

Como o êxito do comando não gera saída, você pode usar describe-stream para verificar o andamento da exclusão:

aws kinesis describe-stream-summary --stream-name Foo

Se você executar este comando imediatamente após o comando de exclusão, provavelmente verá que parte de saída é semelhante ao exemplo a seguir:

{ "StreamDescriptionSummary": { "StreamName": "samplestream", "StreamARN": "arn:aws:kinesis:us-west-2:123456789012:stream/samplestream", "StreamStatus": "ACTIVE",

Após a exclusão total do stream, describe-stream gerará um erro "não encontrado":

A client error (ResourceNotFoundException) occurred when calling the DescribeStreamSummary operation: Stream Foo under account 123456789012 not found.