Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.
As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Criar um bloco de anotações do Studio com o Amazon MSK
Este tutorial descreve como criar um bloco de anotações do Studio que usa um cluster do Amazon MSK como fonte.
Este tutorial contém as seguintes seções:
Configuração
Para este tutorial, você precisa de um cluster do Amazon MSK que permita o acesso em texto sem formatação. Se você ainda não tiver um cluster Amazon MSK configurado, siga o tutorial Conceitos básicos do uso do Amazon MSK para criar uma Amazon VPC, um cluster Amazon MSK, um tópico e uma instância cliente do Amazon EC2.
Ao seguir o tutorial, faça o seguinte:
Na Etapa 3: Criar um cluster Amazon MSK, na etapa 4, altere o valor
ClientBroker
deTLS
paraPLAINTEXT
.
Adicione um gateway NAT à sua VPC
Se você criou um cluster Amazon MSK seguindo o tutorial Conceitos básicos do uso do Amazon MSK, ou se sua Amazon VPC existente ainda não tem um gateway NAT para suas sub-redes privadas, você deve adicionar um gateway NAT à sua Amazon VPC. O diagrama a seguir mostra a arquitetura.
Para criar um gateway NAT para sua Amazon VPC, faça o seguinte:
Abra o console do Amazon VPC em https://console.aws.amazon.com/vpc/
. Selecione Gateways NAT na barra de navegação à esquerda.
Na página Gateways NATs, escolha Criar gateway NAT.
Na página Criar gateway NAT, entre os valores a seguir:
Nome - opcional ZeppelinGateway
Sub-rede AWS KafkaTutorialSubnet1 ID de alocação de IP elástico Escolha um IP elástico disponível. Se não houver IPs elásticos disponíveis, escolha Alocar IP elástico e, em seguida, escolha o IP elástico que o console cria. Escolha Criar um gateway NAT.
Na barra de navegação à esquerda, escolha Tabelas de rotas.
Escolha Criar tabela de rotas.
Na página Criar tabela de rotas, forneça as seguintes informações:
Nome da tag:
ZeppelinRouteTable
VPC: escolha sua VPC (por exemplo, VPC).AWS KafkaTutorial
Escolha Criar.
Na lista de tabelas de rotas, escolha ZeppelinRouteTable. Escolha a guia Rotas e selecione Editar rotas.
Na página Editar rotas, selecione Adicionar rota.
Em Para Destino, insira
0.0.0.0/0
. Para Target, escolha NAT Gateway, ZeppelinGateway. Selecione Salvar rotas. Escolha Fechar.Na página Tabelas de rotas, com a ZeppelinRouteTableopção selecionada, escolha a guia Associações de sub-rede. Selecione Editar associações de sub-rede.
Na página Editar associações de sub-rede, escolha AWS KafkaTutorialSubnet2 e AWS KafkaTutorialSubnet3. Escolha Salvar.
Crie uma AWS Glue conexão e uma tabela
Seu bloco de anotações do Studio usa um banco de dados AWS Glue para metadados sobre sua fonte de dados Amazon MSK. Nesta seção, você cria uma AWS Glue conexão que descreve como acessar seu cluster Amazon MSK e uma AWS Glue tabela que descreve como apresentar os dados em sua fonte de dados para clientes como seu notebook Studio.
Criar uma conexão
Faça login no AWS Management Console e abra o AWS Glue console em https://console.aws.amazon.com/glue/
. Se você ainda não tiver um AWS Glue banco de dados, escolha Bancos de dados na barra de navegação à esquerda. Selecione Adicionar banco de dados. Na janela Adicionar banco de dados, insira
default
para Nome do banco de dados. Selecione Create (Criar).Selecione Conexões na barra de navegação à esquerda. Selecione Adicionar conexão.
Na janela Adicionar conexão, forneça os seguintes valores:
Em Nome da conexão, insira
ZeppelinConnection
.Em Tipo de conexão, escolha Kafka.
Para URLs do servidor bootstrap Kafka, forneça a sequência de caracteres do corretor bootstrap para seu cluster. Você pode obter os corretores de bootstrap no console do MSK ou digitando o seguinte comando da CLI:
aws kafka get-bootstrap-brokers --region us-east-1 --cluster-arn
ClusterArn
Desmarque a caixa de seleção Exigir conexão SSL.
Escolha Avançar.
Na página VPC, forneça os valores a seguir:
Para VPC, escolha o nome da sua VPC (por exemplo, VPC). AWS KafkaTutorial
Em Sub-rede, escolha AWS KafkaTutorialSubnet2.
Em Grupos de segurança, escolha todos os grupos disponíveis.
Escolha Avançar.
Na página Propriedades da conexão / Acesso à conexão, selecione Concluir.
Criar uma tabela
nota
Você pode criar manualmente a tabela conforme descrito nas etapas a seguir ou usar o código do conector de criação de tabela para o Managed Service for Apache Flink em seu bloco de anotações no Apache Zeppelin para criar sua tabela por meio de uma instrução DDL. Em seguida, você pode fazer AWS Glue o check-in para verificar se a tabela foi criada corretamente.
Na barra de navegação à esquerda, selecione Tabelas. Na página Tabelas, selecione Adicionar tabelas, Adicionar tabela manualmente.
Na página Configurar propriedades da tabela, insira
stock
como nome da tabela. Certifique-se de selecionar o banco de dados que você criou anteriormente. Selecione Next (Próximo).Na página Adicionar um armazenamento de dados, selecione Kafka. Para o nome do tópico, insira o nome do tópico (por exemplo AWS KafkaTutorialTopic). Para Conexão, escolha ZeppelinConnection.
Na página Classificação, selecione JSON. Selecione Next (Próximo).
Na página Definir um esquema, selecione Adicionar coluna para adicionar uma coluna. Adicione colunas com as seguintes propriedades:
Nome da coluna Tipo de dados ticker
string
price
double
Selecione Next (Próximo).
Na próxima página, verifique suas configurações e selecione Concluir.
-
Selecione a tabela recém-criada na lista de tabelas.
-
Selecione Editar tabela e adicione uma propriedade com a chave
managed-flink.proctime
e o valorproctime
. -
Escolha Aplicar.
Crie um bloco de anotações do Studio com o Amazon MSK
Agora que você criou os recursos que o seu aplicativo usa, crie seu bloco de anotações do Studio.
Você pode criar seu aplicativo usando o AWS Management Console ou AWS CLI o.
nota
Você também pode criar um bloco de anotações do Studio a partir do console Amazon MSK escolhendo um cluster existente e, em seguida, escolhendo Processar dados em tempo real.
Crie um notebook Studio usando o AWS Management Console
Na página dos Aplicativos do Managed Service for Apache Flink, selecione a guia Studio. Selecione Criar bloco de anotações do Studio.
nota
Para criar um bloco de anotações do Studio a partir dos consoles Amazon MSK ou Kinesis Data Streams, selecione seu cluster Amazon MSK de entrada ou fluxo de dados do Kinesis e escolha Processar dados em tempo real.
Na página Criar instância de bloco de anotações, forneça as seguintes informações:
-
Insira
MyNotebook
para Nome do bloco de anotações do Studio. Selecione o padrão para o banco de dados AWS Glue.
Selecione Criar bloco de anotações do Studio.
-
Na MyNotebookpágina, escolha a guia Configuração. Na seção Redes, selecione Editar.
Na MyNotebook página Editar rede para, escolha a configuração de VPC com base no cluster Amazon MSK. Selecione seu cluster Amazon MSK para Cluster Amazon MSK. Escolha Salvar alterações.
Na MyNotebookpágina, escolha Executar. Aguarde até que o Status mostre Em execução.
Crie um notebook Studio usando o AWS CLI
Para criar seu notebook Studio usando o AWS CLI, faça o seguinte:
Verifique se você tem as informações a seguir. Você precisa desses valores para criar seu aplicativo.
O ID da sua conta.
Os IDs de sub-rede e o ID do grupo de segurança da Amazon VPC que contém o cluster Amazon MSK.
Crie um arquivo chamado
create.json
com o conteúdo a seguir. Substitua os valores de espaço reservado por suas próprias informações.{ "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::
AccountID
:role/ZeppelinRole", "ApplicationConfiguration": { "ApplicationSnapshotConfiguration": { "SnapshotsEnabled": false }, "VpcConfigurations": [ { "SubnetIds": [ "SubnetID 1
", "SubnetID 2
", "SubnetID 3
" ], "SecurityGroupIds": [ "VPC Security Group ID
" ] } ], "ZeppelinApplicationConfiguration": { "CatalogConfiguration": { "GlueDataCatalogConfiguration": { "DatabaseARN": "arn:aws:glue:us-east-1:AccountID
:database/default" } } } } }Para criar o seu aplicativo, execute o comando a seguir:
aws kinesisanalyticsv2 create-application --cli-input-json file://create.json
Quando o comando for concluído, você deverá ver um resultado semelhante ao apresentado a seguir, mostrando os detalhes do seu novo bloco de anotações do Studio:
{ "ApplicationDetail": { "ApplicationARN": "arn:aws:kinesisanalyticsus-east-1:012345678901:application/MyNotebook", "ApplicationName": "MyNotebook", "RuntimeEnvironment": "ZEPPELIN-FLINK-3_0", "ApplicationMode": "INTERACTIVE", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/ZeppelinRole", ...
Execute o comando a seguir para iniciar o aplicativo. Substitua o valor de amostra pelo ID de sua conta.
aws kinesisanalyticsv2 start-application --application-arn arn:aws:kinesisanalyticsus-east-1:
012345678901
:application/MyNotebook\
Envie dados para seu cluster Amazon MSK
Nesta seção, você executa um script Python em seu cliente Amazon EC2 para enviar dados para sua fonte de dados Amazon MSK.
Conecte-se ao seu cliente do Amazon EC2.
Execute os comandos a seguir para instalar o Python versão 3, o Pip e o pacote Kafka para Python e confirme as ações:
sudo yum install python37 curl -O https://bootstrap.pypa.io/get-pip.py python3 get-pip.py --user pip install kafka-python
Configure o AWS CLI em sua máquina cliente digitando o seguinte comando:
aws configure
Forneça as credenciais da sua conta e
us-east-1
pararegion
.Crie um arquivo chamado
stock.py
com o conteúdo a seguir. Substitua o valor da amostra pela string Bootstrap Brokers do seu cluster Amazon MSK e atualize o nome do tópico se o tópico não for: AWS KafkaTutorialTopicfrom kafka import KafkaProducer import json import random from datetime import datetime BROKERS = "
<<Bootstrap Broker List>>
" producer = KafkaProducer( bootstrap_servers=BROKERS, value_serializer=lambda v: json.dumps(v).encode('utf-8'), retry_backoff_ms=500, request_timeout_ms=20000, security_protocol='PLAINTEXT') def getStock(): data = {} now = datetime.now() str_now = now.strftime("%Y-%m-%d %H:%M:%S") data['event_time'] = str_now data['ticker'] = random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']) price = random.random() * 100 data['price'] = round(price, 2) return data while True: data =getStock() # print(data) try: future = producer.send("AWSKafkaTutorialTopic", value=data) producer.flush() record_metadata = future.get(timeout=10) print("sent event to Kafka! topic {} partition {} offset {}".format(record_metadata.topic, record_metadata.partition, record_metadata.offset)) except Exception as e: print(e.with_traceback())Execute o script com o comando a seguir:
$ python3 stock.py
Deixe o script em execução enquanto você conclui a seção a seguir.
Teste seu bloco de anotações do Studio
Nesta seção, você usa seu bloco de anotações do Studio para consultar dados do seu cluster Amazon MSK.
Abra o console do Managed Service for Apache Flink em https://console.aws.amazon.com/managed-flink/home?region=us-east-1#/applications/dashboard
. Na página Aplicativos do Managed Service for Apache Flink, selecione a guia bloco de anotações do Studio. Escolha MyNotebook.
Na MyNotebookpágina, escolha Abrir no Apache Zeppelin.
A interface do Apache Zeppelin é aberta em uma nova guia.
Na página Bem-vindo ao Zeppelin!, selecione Nova anotação do Zeppelin.
Na página Anotação do Zeppelin, insira a seguinte consulta em uma nova anotação:
%flink.ssql(type=update) select * from stock
Selecione o ícone de execução.
O aplicativo exibe dados do cluster Amazon MSK.
Para abrir o painel do Apache Flink para que seu aplicativo visualize aspectos operacionais, selecione TRABALHO FLINK. Para obter mais informações sobre o painel do Flink, consulte o painel do Apache Flink no Guia do desenvolvedor do Managed Service for Apache Flink.
Para obter mais exemplos de consultas SQL do Flink Streaming, veja Consultas