Etapa 1: criar os streamings de entrada e saída - Guia do Desenvolvedor de Amazon Kinesis Data Analytics para aplicativos SQL

Para novos projetos, recomendamos que você use o novo Managed Service for Apache Flink Studio em vez do Kinesis Data Analytics para aplicativos SQL. O Managed Service for Apache Flink Studio combina facilidade de uso com recursos analíticos avançados, permitindo que você crie aplicativos sofisticados de processamento de stream em minutos.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Etapa 1: criar os streamings de entrada e saída

Antes de criar um aplicativo Amazon Kinesis Data Analytics para o exemplo de Hotspots, crie dois fluxos de dados do Kinesis. Configure um dos streamings como a origem de streaming do aplicativo e outro como o destino em que o Kinesis Data Analytics mantém a saída do aplicativo.

Etapa 1.1: Criar os fluxos de dados do Kinesis

Nesta seção, você cria dois fluxos de dados do Kinesis: ExampleInputStream e ExampleOutputStream.

Crie esses fluxos de dados usando o console ou a AWS CLI.

  • Para criar os fluxos de dados usando o console:

    1. Faça login no AWS Management Console e abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

    2. Selecione Data Streams (Fluxos de dados) no painel de navegação.

    3. Escolha Create Kinesis stream (Criar fluxo do Kinesis) e crie um fluxo com um estilhaço chamado ExampleInputStream.

    4. Repita a etapa anterior, criando um streaming com um estilhaço denominado ExampleOutputStream.

  • Para criar fluxos de dados usando a AWS CLI:

    • Crie fluxos (ExampleInputStream e ExampleOutputStream) usando o seguinte comando Kinesis create-stream AWS CLI. Para criar o segundo streaming, que o aplicativo usará para gravar a saída, execute o mesmo comando, alterando o nome do streaming para ExampleOutputStream.

      $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Etapa 1.2: Gravação de registros de amostra no stream de entrada

Nesta etapa, você executa o código Python para gerar continuamente registros de amostra e gravar no stream ExampleInputStream.

{"x": 7.921782426109737, "y": 8.746265312709893, "is_hot": "N"} {"x": 0.722248626580026, "y": 4.648868803193405, "is_hot": "Y"}
  1. Instale o Python e o pip.

    Para obter informações sobre como instalar o Python, consulte o site do Python.

    Você pode instalar dependências usando o pip. Para obter informações sobre como instalar o pip, consulte Installation no site do pip.

  2. Execute o código do Python a seguir. Esse código faz o seguinte:

    • Gera um ponto de acesso em potencial em algum lugar do plano (X, Y).

    • Gera um conjunto de 1.000 pontos para cada ponto de acesso. Desses pontos, 20% são agrupados em torno do ponto de acesso. Os demais são gerados aleatoriamente em todo o espaço.

    • O comando put-record grava os registros JSON no streaming.

    Importante

    Não faça upload desse arquivo em um servidor web, porque ele contém suas credenciais da AWS.

    import json from pprint import pprint import random import time import boto3 STREAM_NAME = "ExampleInputStream" def get_hotspot(field, spot_size): hotspot = { "left": field["left"] + random.random() * (field["width"] - spot_size), "width": spot_size, "top": field["top"] + random.random() * (field["height"] - spot_size), "height": spot_size, } return hotspot def get_record(field, hotspot, hotspot_weight): rectangle = hotspot if random.random() < hotspot_weight else field point = { "x": rectangle["left"] + random.random() * rectangle["width"], "y": rectangle["top"] + random.random() * rectangle["height"], "is_hot": "Y" if rectangle is hotspot else "N", } return {"Data": json.dumps(point), "PartitionKey": "partition_key"} def generate( stream_name, field, hotspot_size, hotspot_weight, batch_size, kinesis_client ): """ Generates points used as input to a hotspot detection algorithm. With probability hotspot_weight (20%), a point is drawn from the hotspot; otherwise, it is drawn from the base field. The location of the hotspot changes for every 1000 points generated. """ points_generated = 0 hotspot = None while True: if points_generated % 1000 == 0: hotspot = get_hotspot(field, hotspot_size) records = [ get_record(field, hotspot, hotspot_weight) for _ in range(batch_size) ] points_generated += len(records) pprint(records) kinesis_client.put_records(StreamName=stream_name, Records=records) time.sleep(0.1) if __name__ == "__main__": generate( stream_name=STREAM_NAME, field={"left": 0, "width": 10, "top": 0, "height": 10}, hotspot_size=1, hotspot_weight=0.2, batch_size=10, kinesis_client=boto3.client("kinesis"), )

Próxima etapa

Etapa 2: criar o aplicativo Kinesis Data Analytics