Crie um notebook Studio com a Amazon MSK - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie um notebook Studio com a Amazon MSK

Este tutorial descreve como criar um notebook Studio que usa um MSK cluster da Amazon como fonte.

Configurar um MSK cluster da Amazon

Para este tutorial, você precisa de um MSK cluster da Amazon que permita acesso em texto sem formatação. Se você ainda não tiver um MSK cluster da Amazon configurado, siga o MSK tutorial Como começar a usar a Amazon para criar uma AmazonVPC, um MSK cluster da Amazon, um tópico e uma instância EC2 cliente da Amazon.

Ao seguir o tutorial, faça o seguinte:

Adicione um NAT gateway ao seu VPC

Se você criou um MSK cluster da Amazon seguindo o MSK tutorial Getting Started Using Amazon, ou se sua Amazon existente ainda VPC não tem um NAT gateway para suas sub-redes privadas, você deve adicionar um NAT Gateway à sua Amazon. VPC O diagrama a seguir mostra a arquitetura.

AWS VPC architecture with public and private subnets, NAT gateway, and Glue Data Catalog integration.

Para criar um NAT gateway para sua AmazonVPC, faça o seguinte:

  1. Abra o VPC console da Amazon em https://console.aws.amazon.com/vpc/.

  2. Escolha NATGateways na barra de navegação esquerda.

  3. Na página NATGateways, escolha Create NAT Gateway.

  4. Na página Create NAT Gateway, forneça os seguintes valores:

    Nome - opcional ZeppelinGateway
    Sub-rede AWS KafkaTutorialSubnet1
    ID de alocação de IP elástico Escolha um IP elástico disponível. Se não houver nenhum elástico IPs disponível, escolha Alocar IP elástico e, em seguida, escolha o IP elástico que o console cria.

    Escolha Criar NAT gateway.

  5. Na barra de navegação à esquerda, escolha Tabelas de rotas.

  6. Escolha Criar tabela de rotas.

  7. Na página Criar tabela de rotas, forneça as seguintes informações:

    • Nome da tag: ZeppelinRouteTable

    • VPC: Escolha o seu VPC (por exemplo AWS KafkaTutorialVPC).

    Escolha Criar.

  8. Na lista de tabelas de rotas, escolha ZeppelinRouteTable. Escolha a guia Rotas e selecione Editar rotas.

  9. Na página Editar rotas, selecione Adicionar rota.

  10. Em Para Destino, insira 0.0.0.0/0. Para Target, escolha NATGateway, ZeppelinGateway. Selecione Salvar rotas. Escolha Fechar.

  11. Na página Tabelas de rotas, com a ZeppelinRouteTableopção selecionada, escolha a guia Associações de sub-rede. Selecione Editar associações de sub-rede.

  12. Na página Editar associações de sub-rede, escolha AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet3. Escolha Salvar.

Crie uma AWS Glue conexão e uma tabela

Seu notebook Studio usa um AWS Gluebanco de dados para metadados sobre sua fonte de MSK dados da Amazon. Nesta seção, você cria uma AWS Glue conexão que descreve como acessar seu MSK cluster Amazon e uma AWS Glue tabela que descreve como apresentar os dados em sua fonte de dados para clientes como seu notebook Studio.

Criar uma conexão
  1. Faça login no AWS Management Console e abra o AWS Glue console em https://console.aws.amazon.com/glue/.

  2. Se você ainda não tiver um AWS Glue banco de dados, escolha Bancos de dados na barra de navegação à esquerda. Selecione Adicionar banco de dados. Na janela Adicionar banco de dados, insira default para Nome do banco de dados. Escolha Criar.

  3. Selecione Conexões na barra de navegação à esquerda. Selecione Adicionar conexão.

  4. Na janela Adicionar conexão, forneça os seguintes valores:

    • Em Nome da conexão, insira ZeppelinConnection.

    • Em Tipo de conexão, escolha Kafka.

    • Para o servidor bootstrap Kafka URLs, forneça a string do broker bootstrap para seu cluster. Você pode obter os corretores de bootstrap no MSK console ou digitando o seguinte comando: CLI

      aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn ClusterArn
    • Desmarque a caixa de seleção Exigir SSL conexão.

    Escolha Próximo.

  5. Na VPCpágina, forneça os seguintes valores:

    • Para VPC, escolha o nome do seu VPC (por exemplo AWS KafkaTutorialVPC.)

    • Em Sub-rede, escolha AWS KafkaTutorialSubnet2.

    • Em Grupos de segurança, escolha todos os grupos disponíveis.

    Escolha Próximo.

  6. Na página Propriedades da conexão / Acesso à conexão, selecione Concluir.

Criar uma tabela
nota

Você pode criar manualmente a tabela conforme descrito nas etapas a seguir ou usar o código do conector de criação de tabela para o Managed Service for Apache Flink em seu notebook no Apache Zeppelin para criar sua tabela por meio de uma instrução. DDL Em seguida, você pode fazer AWS Glue o check-in para verificar se a tabela foi criada corretamente.

  1. Na barra de navegação à esquerda, selecione Tabelas. Na página Tabelas, selecione Adicionar tabelas, Adicionar tabela manualmente.

  2. Na página Configurar propriedades da tabela, insira stock para o Nome da tabela. Certifique-se de selecionar o banco de dados que você criou anteriormente. Escolha Próximo.

  3. Na página Adicionar um armazenamento de dados, selecione Kafka. Para o nome do tópico, insira o nome do tópico (por exemplo AWS KafkaTutorialTopic). Para Conexão, escolha ZeppelinConnection.

  4. Na página Classificação, escolha JSON. Escolha Próximo.

  5. Na página Definir um esquema, selecione Adicionar coluna para adicionar uma coluna. Adicione colunas com as seguintes propriedades:

    Nome da coluna Tipo de dados
    ticker string
    price double

    Escolha Próximo.

  6. Na próxima página, verifique suas configurações e selecione Concluir.

  7. Selecione a tabela recém-criada na lista de tabelas.

  8. Escolha Editar tabela e adicione as seguintes propriedades:

    • chave:managed-flink.proctime, valor: proctime

    • chave:flink.properties.group.id, valor: test-consumer-group

    • chave:flink.properties.auto.offset.reset, valor: latest

    • chave:classification, valor: json

    Sem esses pares de chave/valor, o notebook Flink apresenta um erro.

  9. Selecione Apply (Aplicar).

Crie um notebook Studio com a Amazon MSK

Agora que você criou os recursos que o seu aplicativo usa, crie seu bloco de anotações do Studio.

Você pode criar seu aplicativo usando o AWS Management Console ou AWS CLI o.
nota

Você também pode criar um notebook Studio a partir do MSK console da Amazon escolhendo um cluster existente e, em seguida, escolhendo Processar dados em tempo real.

Crie um notebook Studio usando o AWS Management Console

  1. Abra o serviço gerenciado para o console Apache Flink em casa https://console.aws.amazon.com/managed-flink/? region=us-east-1#/aplicativos/painel.

  2. Na página Aplicativos do Managed Service for Apache Flink, selecione a guia Studio. Selecione Criar bloco de anotações do Studio.

    nota

    Para criar um notebook Studio a partir dos consoles Amazon MSK ou Kinesis Data Streams, selecione seu cluster MSK Amazon de entrada ou stream de dados do Kinesis e escolha Processar dados em tempo real.

  3. Na página ‬Criar bloco de anotações do Studio, forneça as seguintes informações:

    • Insira MyNotebook para Nome do bloco de anotações do Studio.

    • Selecione o padrão para o banco de dados AWS Glue.

    Selecione Criar bloco de anotações do Studio.

  4. Na MyNotebookpágina, escolha a guia Configuração. Na seção Redes, selecione Editar.

  5. Na MyNotebook página Editar rede para, escolha a VPCconfiguração com base no MSK cluster da Amazon. Escolha seu MSK cluster da Amazon para o Amazon MSK Cluster. Escolha Salvar alterações.

  6. Na MyNotebookpágina, escolha Executar. Aguarde até que o Status mostre Em execução.

Crie um notebook Studio usando o AWS CLI

Para criar seu notebook Studio usando o AWS CLI, faça o seguinte:

  1. Verifique se você tem as informações a seguir. Você precisa desses valores para criar seu aplicativo.

    • O ID da sua conta.

    • A sub-rede IDs e o ID do grupo de segurança da Amazon VPC que contém seu MSK cluster da Amazon.

  2. Crie um arquivo chamado create.json com o conteúdo a seguir. Substitua os valores de espaço reservado por suas próprias informações.

    { "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::AccountID:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1", "SubnetID 2", "SubnetID 3" ], "SecurityGroupIds": [ "VPC Security Group ID" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID:database/default" } } } } }
  3. Para criar o seu aplicativo, execute o comando a seguir:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
  4. Quando o comando for concluído, você deverá ver um resultado semelhante ao apresentado a seguir, mostrando os detalhes do seu novo bloco de anotações do Studio:

    { "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
  5. Execute o comando a seguir para iniciar o aplicativo. Substitua o valor do exemplo pelo ID de sua conta.

    aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook\

Envie dados para o seu MSK cluster da Amazon

Nesta seção, você executa um script Python em seu EC2 cliente Amazon para enviar dados para sua fonte de dados da AmazonMSK.

  1. Conecte-se ao seu EC2 cliente Amazon.

  2. Execute os comandos a seguir para instalar o Python versão 3, o Pip e o pacote Kafka para Python e confirme as ações:

    sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
  3. Configure o AWS CLI em sua máquina cliente digitando o seguinte comando:

    aws configure

    Forneça as credenciais da sua conta e us-east-1 para region.

  4. Crie um arquivo chamado stock.py com o conteúdo a seguir. Substitua o valor da amostra pela string Bootstrap Brokers do seu MSK cluster Amazon e atualize o nome do tópico se o tópico não for: AWS KafkaTutorialTopic

    from kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "<<Bootstrap Broker List>>" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())
  5. Execute o script com o comando a seguir:

    $ python3 stock.py
  6. Deixe o script em execução enquanto você conclui a seção a seguir.

Teste seu bloco de anotações do Studio

Nesta seção, você usa seu notebook Studio para consultar dados do seu MSK cluster da Amazon.

  1. Abra o serviço gerenciado para o console Apache Flink em casa https://console.aws.amazon.com/managed-flink/? region=us-east-1#/aplicativos/painel.

  2. Na página Aplicativos do Managed Service for Apache Flink, selecione a guia bloco de anotações do Studio. Escolha MyNotebook.

  3. Na MyNotebookpágina, escolha Abrir no Apache Zeppelin.

    A interface do Apache Zeppelin é aberta em uma nova guia.

  4. Na página Bem-vindo ao Zeppelin!, selecione Nova anotação do Zeppelin.

  5. Na página Anotação do Zeppelin, insira a seguinte consulta em uma nova anotação:

    %flink.ssql(type=update) select * from stock

    Selecione o ícone de execução.

    O aplicativo exibe dados do MSK cluster da Amazon.

Para abrir o painel do Apache Flink para que seu aplicativo visualize aspectos operacionais, escolha. FLINKJOB Para obter mais informações sobre o painel do Flink, consulte o painel do Apache Flink no Guia do desenvolvedor do Managed Service for Apache Flink.

Para obter mais exemplos de SQL consultas do Flink Streaming, consulte Consultas na documentação do Apache Flink.