Crie e execute um serviço gerenciado para o aplicativo Apache Flink para Python - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie e execute um serviço gerenciado para o aplicativo Apache Flink para Python

Nesta seção, você cria um aplicativo Managed Service for Apache Flink para Python com um stream do Kinesis como origem e coletor.

Crie recursos dependentes

Antes de criar um Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Duas transmissões do Kinesis para entrada e saída.

  • Um bucket do Amazon S3 para armazenar o código do aplicativo.

nota

Este tutorial pressupõe que você esteja implantando seu aplicativo na região us-east-1. Se você usa outra região, deve adaptar todas as etapas adequadamente.

Crie dois streams do Kinesis

Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, crie dois streams de dados do Kinesis (ExampleInputStreameExampleOutputStream) na mesma região que você usará para implantar seu aplicativo (us-east-1 neste exemplo). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.

Você pode criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para obter instruções sobre o console, consulte Criar e atualizar fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams.

Como criar os fluxos de dados (AWS CLI)
  1. Para criar o primeiro stream (ExampleInputStream), use o seguinte comando do Amazon Kinesis create-stream AWS CLI .

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1
  2. Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1

Criar um bucket do Amazon S3

Você pode criar um bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esse recurso, consulte os tópicos a seguir:

  • Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo, por exemplo, anexando seu nome de login.

    nota

    Certifique-se de criar o bucket do S3 na região que você usa para este tutorial (us-east-1).

Outros recursos

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:

  • Um grupo de logs chamado /AWS/KinesisAnalytics-java/<my-application>.

  • Um fluxo de logs chamado kinesis-analytics-log-stream.

Configurar seu ambiente de desenvolvimento local

Para desenvolvimento e depuração, você pode executar o aplicativo Python Flink em sua máquina. Você pode iniciar o aplicativo a partir da linha de comando com python main.py ou em um Python IDE de sua escolha.

nota

Em sua máquina de desenvolvimento, você deve ter o Python 3.10 ou 3.11, o Java 11, o Apache Maven e o Git instalados. Recomendamos que você use um IDE como PyCharmou o Visual Studio Code. Para verificar se você atende a todos os pré-requisitos, consulte Cumpra os pré-requisitos para concluir os exercícios antes de continuar.

Para desenvolver seu aplicativo e executá-lo localmente, você deve instalar a biblioteca Flink Python.

  1. Crie um ambiente Python autônomo VirtualEnv usando o Conda ou qualquer ferramenta Python similar.

  2. Instale a PyFlink biblioteca nesse ambiente. Use a mesma versão de tempo de execução do Apache Flink que você usará no Amazon Managed Service para Apache Flink. Atualmente, o tempo de execução recomendado é 1.19.1.

    $ pip install apache-flink==1.19.1
  3. Certifique-se de que o ambiente esteja ativo ao executar seu aplicativo. Se você executar o aplicativo noIDE, verifique se o IDE está usando o ambiente como tempo de execução. O processo depende do IDE que você está usando.

    nota

    Você só precisa instalar a PyFlink biblioteca. Você não precisa instalar um cluster Apache Flink em sua máquina.

Autentique sua sessão AWS

O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no stream de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:

  1. Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulteConfigure o AWS Command Line Interface (AWS CLI).

  2. Verifique se o seu AWS CLI está configurado corretamente e se seus usuários têm permissões para gravar no stream de dados do Kinesis publicando o seguinte registro de teste:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Se você IDE tiver um plug-in com o qual se integrar AWS, poderá usá-lo para passar as credenciais para o aplicativo em execução noIDE. Para obter mais informações, consulte AWS Toolkit for PyCharm, AWS Toolkit for Visual Studio Code AWS e Toolkit for IntelliJ. IDEA

Baixe e examine o código Python de streaming do Apache Flink

O código do aplicativo Python para este exemplo está disponível em. GitHub Para fazer download do código do aplicativo, faça o seguinte:

  1. Duplique o repositório remoto usando o seguinte comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Navegue até o diretório ./python/GettingStarted.

Revise os componentes do aplicativo

O código do aplicativo está localizado emmain.py. Usamos SQL incorporado em Python para definir o fluxo do aplicativo.

nota

Para uma experiência otimizada do desenvolvedor, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service para Apache Flink e localmente, para desenvolvimento em sua máquina. O aplicativo usa a variável de ambiente IS_LOCAL = true para detectar quando está sendo executado localmente. Você deve definir a variável IS_LOCAL = true de ambiente no seu shell ou na configuração de execução do seuIDE.

  • O aplicativo configura o ambiente de execução e lê a configuração do tempo de execução. Para funcionar no Amazon Managed Service para Apache Flink e localmente, o aplicativo verifica a IS_LOCAL variável.

    • O seguinte é o comportamento padrão quando o aplicativo é executado no Amazon Managed Service para Apache Flink:

      1. Carregue as dependências empacotadas com o aplicativo. Para obter mais informações, consulte (link)

      2. Carregue a configuração das propriedades do Runtime que você define no aplicativo Amazon Managed Service for Apache Flink. Para obter mais informações, consulte (link)

    • Quando o aplicativo detecta IS_LOCAL = true quando você executa seu aplicativo localmente:

      1. Carrega dependências externas do projeto.

      2. Carrega a configuração do application_properties.json arquivo incluído no projeto.

        ... APPLICATION_PROPERTIES_FILE_PATH = "/etc/flink/application_properties.json" ... is_local = ( True if os.environ.get("IS_LOCAL") else False ) ... if is_local: APPLICATION_PROPERTIES_FILE_PATH = "application_properties.json" CURRENT_DIR = os.path.dirname(os.path.realpath(__file__)) table_env.get_config().get_configuration().set_string( "pipeline.jars", "file:///" + CURRENT_DIR + "/target/pyflink-dependencies.jar", )
  • O aplicativo define uma tabela de origem com uma CREATE TABLE declaração, usando o Kinesis Connector. Essa tabela lê dados do stream de entrada do Kinesis. O aplicativo usa o nome do fluxo, a região e a posição inicial da configuração do tempo de execução.

    table_env.execute_sql(f""" CREATE TABLE prices ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3), WATERMARK FOR event_time AS event_time - INTERVAL '5' SECOND ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{input_stream_name}', 'aws.region' = '{input_stream_region}', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' ) """)
  • O aplicativo também define uma tabela de coletor usando o Kinesis Connector neste exemplo. Essa tabela envia dados para o stream de saída do Kinesis.

    table_env.execute_sql(f""" CREATE TABLE output ( ticker VARCHAR(6), price DOUBLE, event_time TIMESTAMP(3) ) PARTITIONED BY (ticker) WITH ( 'connector' = 'kinesis', 'stream' = '{output_stream_name}', 'aws.region' = '{output_stream_region}', 'sink.partitioner-field-delimiter' = ';', 'sink.batch.max-size' = '100', 'format' = 'json', 'json.timestamp-format.standard' = 'ISO-8601' )""")
  • Por fim, o aplicativo executa uma SQL tabela coletora INSERT INTO... a partir da tabela de origem. Em um aplicativo mais complexo, é provável que você tenha etapas adicionais para transformar os dados antes de gravar no coletor.

    table_result = table_env.execute_sql("""INSERT INTO output SELECT ticker, price, event_time FROM prices""")
  • Você deve adicionar outra etapa no final da main() função para executar o aplicativo localmente:

    if is_local: table_result.wait()

    Sem essa instrução, o aplicativo é encerrado imediatamente quando você o executa localmente. Você não deve executar essa declaração ao executar seu aplicativo no Amazon Managed Service para Apache Flink.

Gerenciar JAR dependências

Um PyFlink aplicativo geralmente requer um ou mais conectores. O aplicativo neste tutorial usa o Kinesis Connector. Como o Apache Flink é executado em JavaJVM, os conectores são distribuídos como JAR arquivos, independentemente de você implementar seu aplicativo em Python. Você deve empacotar essas dependências com o aplicativo ao implantá-lo no Amazon Managed Service para Apache Flink.

Neste exemplo, mostramos como usar o Apache Maven para buscar as dependências e empacotar o aplicativo para ser executado no Managed Service para Apache Flink.

nota

Existem formas alternativas de buscar e empacotar dependências. Este exemplo demonstra um método que funciona corretamente com um ou mais conectores. Ele também permite que você execute o aplicativo localmente, para desenvolvimento e no Managed Service for Apache Flink sem alterações no código.

Use o arquivo pom.xml

O Apache Maven usa o pom.xml arquivo para controlar dependências e pacotes de aplicativos.

Todas JAR as dependências são especificadas no pom.xml arquivo do <dependencies>...</dependencies> bloco.

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> ... <dependencies> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>4.3.0-1.19</version> </dependency> </dependencies> ...

Para encontrar o artefato e a versão corretos do conector a serem usados, consulteUsando conectores Apache Flink. Certifique-se de consultar a versão do Apache Flink que você está usando. Neste exemplo, usamos o conector Kinesis. Para o Apache Flink 1.19, a versão do conector é. 4.3.0-1.19

nota

Se você estiver usando o Apache Flink 1.19, não há uma versão de conector lançada especificamente para essa versão. Use os conectores lançados para 1.18.

Dependências de download e empacotamento

Use o Maven para baixar as dependências definidas no pom.xml arquivo e empacotá-las para o aplicativo Python Flink.

  1. Navegue até o diretório que contém o projeto Python Getting Started chamado. python/GettingStarted

  2. Execute o seguinte comando:

$ mvn package

O Maven cria um novo arquivo chamado./target/pyflink-dependencies.jar. Quando você está desenvolvendo localmente em sua máquina, o aplicativo Python procura esse arquivo.

nota

Se você esquecer de executar este comando, ao tentar executar seu aplicativo, ele falhará com o erro: Não foi possível encontrar nenhuma fábrica para o identificador “kinesis”.

Grave registros de amostra no fluxo de entrada

Nesta seção, você enviará registros de amostra ao stream para o aplicativo processar. Você tem duas opções para gerar dados de amostra, usando um script Python ou o Kinesis Data Generator.

Gere dados de amostra usando um script Python

Você pode usar um script Python para enviar registros de amostra para o stream.

nota

Para executar esse script Python, você deve usar o Python 3.x e ter a biblioteca for AWS SDKPython (Boto) instalada.

Para começar a enviar dados de teste para o stream de entrada do Kinesis:

  1. Baixe o script stock.py Python do gerador de dados no repositório do gerador GitHub de dados.

  2. Execute o script stock.py:

    $ python stock.py

Mantenha o script em execução enquanto você conclui o resto do tutorial. Agora você pode executar seu aplicativo Apache Flink.

Gere dados de amostra usando o Kinesis Data Generator

Como alternativa ao script Python, você pode usar o Kinesis Data Generator, também disponível em uma versão hospedada, para enviar dados de amostra aleatórios para o stream. O Kinesis Data Generator é executado no seu navegador e você não precisa instalar nada na sua máquina.

Para configurar e executar o Kinesis Data Generator:

  1. Siga as instruções na documentação do Kinesis Data Generator para configurar o acesso à ferramenta. Você executará um AWS CloudFormation modelo que configura um usuário e uma senha.

  2. Acesse o Kinesis Data Generator por meio do URL gerado pelo CloudFormation modelo. Você pode encontrar o URL na guia Saída após a conclusão do CloudFormation modelo.

  3. Configure o gerador de dados:

    • Região: selecione a região que você está usando para este tutorial: us-east-1

    • Stream/stream de entrega: selecione o stream de entrada que o aplicativo usará: ExampleInputStream

    • Registros por segundo: 100

    • Modelo de registro: copie e cole o seguinte modelo:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Teste o modelo: escolha Modelo de teste e verifique se o registro gerado é semelhante ao seguinte:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Inicie o gerador de dados: escolha Selecionar Enviar dados.

O Kinesis Data Generator agora está enviando dados para o. ExampleInputStream

Execute seu aplicativo localmente

Você pode testar o aplicativo localmente, executando a partir da linha de comando com python main.py ou a partir do seuIDE.

Para executar seu aplicativo localmente, você deve ter a versão correta da PyFlink biblioteca instalada conforme descrito na seção anterior. Para obter mais informações, consulte (link)

nota

Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte Crie dois streams de dados do Amazon Kinesis. Além disso, verifique se você tem permissão para ler e gravar nos dois fluxos. Consulte Autentique sua sessão AWS.

Importe o projeto Python para o seu IDE

Para começar a trabalhar no aplicativo em seuIDE, você deve importá-lo como um projeto Python.

O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do ./python/GettingStarted subdiretório para o seuIDE.

Importe o código como um projeto Python existente.

nota

O processo exato para importar um novo projeto em Python varia de acordo com o IDE que você está usando.

Verifique a configuração do aplicativo local

Ao ser executado localmente, o aplicativo usa a configuração no application_properties.json arquivo na pasta de recursos do projeto abaixo./src/main/resources. Você pode editar esse arquivo para usar diferentes nomes ou regiões de stream do Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Execute seu aplicativo Python localmente

Você pode executar seu aplicativo localmente, seja na linha de comando, como um script Python normal, ou no. IDE

Para executar seu aplicativo a partir da linha de comando
  1. Certifique-se de que o ambiente autônomo do Python, como o Conda VirtualEnv ou onde você instalou a biblioteca Python Flink, esteja ativo no momento.

  2. Certifique-se de correr pelo mvn package menos uma vez.

  3. Defina a IS_LOCAL = true variável de ambiente:

    $ export IS_LOCAL=true
  4. Execute o aplicativo como um script Python normal.

    $python main.py
Para executar o aplicativo de dentro do IDE
  1. Configure seu IDE para executar o main.py script com a seguinte configuração:

    1. Use o ambiente Python independente, como o Conda VirtualEnv ou onde você instalou a biblioteca. PyFlink

    2. Use as AWS credenciais para acessar os streams de dados de entrada e saída do Kinesis.

    3. Defina IS_LOCAL = true.

  2. O processo exato para definir a configuração de execução depende de você IDE e varia.

  3. Depois de configurar o seuIDE, execute o script Python e use as ferramentas fornecidas por você IDE enquanto o aplicativo estiver em execução.

Inspecione os registros do aplicativo localmente

Quando executado localmente, o aplicativo não mostra nenhum registro no console, exceto algumas linhas impressas e exibidas quando o aplicativo é iniciado. PyFlink grava registros em um arquivo no diretório em que a biblioteca Python Flink está instalada. O aplicativo imprime a localização dos registros quando é iniciado. Você também pode executar o comando a seguir para encontrar os registros:

$ python -c "import pyflink;import os;print(os.path.dirname(os.path.abspath(pyflink.__file__))+'/log')"
  1. Liste os arquivos no diretório de registro. Normalmente, você encontra um único .log arquivo.

  2. Acompanhe o arquivo enquanto o aplicativo estiver em execução:tail -f <log-path>/<log-file>.log.

Observe os dados de entrada e saída nos streams do Kinesis

Você pode observar os registros enviados ao stream de entrada pelo (gerando o Python de amostra) ou pelo Kinesis Data Generator (link) usando o Visualizador de dados no console do Amazon Kinesis.

Para observar os registros:

Interrompa a execução local do seu aplicativo

Pare a execução do aplicativo em seuIDE. IDEGeralmente fornece uma opção de “parar”. A localização e o método exatos dependem doIDE.

Package o código do seu aplicativo

Nesta seção, você usa o Apache Maven para empacotar o código do aplicativo e todas as dependências necessárias em um arquivo.zip.

Execute o comando do pacote Maven novamente:

$ mvn package

Esse comando gera o arquivotarget/managed-flink-pyflink-getting-started-1.0.0.zip.

Faça o upload do pacote do aplicativo em um bucket do Amazon S3

Nesta seção, você carrega o arquivo.zip criado na seção anterior para o bucket do Amazon Simple Storage Service (Amazon S3) que você criou no início deste tutorial. Se você não concluiu essa etapa, consulte (link).

Para fazer o upload do JAR arquivo de código do aplicativo
  1. Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/

  2. Escolha o bucket que você criou anteriormente para o código do aplicativo.

  3. Escolha Carregar.

  4. Escolha Adicionar arquivos.

  5. Navegue até o arquivo.zip gerado na etapa anterior:target/managed-flink-pyflink-getting-started-1.0.0.zip.

  6. Escolha Carregar sem alterar nenhuma outra configuração.

Crie e configure o serviço gerenciado para o aplicativo Apache Flink

Você pode criar e configurar um serviço gerenciado para o aplicativo Apache Flink usando o console ou o. AWS CLI Para este tutorial, usaremos o console.

Criar o aplicativo

  1. Abra o console do Managed Service for Apache Flink em https://console.aws.amazon.com/flink

  2. Verifique se a região correta está selecionada: Leste dos EUA (Norte da Virgínia) us-east-1.

  3. Abra o menu do lado direito e escolha aplicativos Apache Flink e, em seguida, Criar aplicativo de streaming. Como alternativa, escolha Criar aplicativo de streaming na seção Começar da página inicial.

  4. Na página Criar aplicativos de streaming:

    • Em Escolha um método para configurar o aplicativo de processamento de fluxo, escolha Criar do zero.

    • Para configuração do Apache Flink, versão do aplicativo Flink, escolha Apache Flink 1.19.

    • Para configuração do aplicativo:

      • Em Nome do aplicativo, insira MyApplication.

      • Em Descrição, insira My Python test app.

      • Em Acesso aos recursos do aplicativo, escolha Create/update IAM role kinesis-analytics- MyApplication -us-east-1 com as políticas necessárias.

    • Para o modelo para configurações de aplicativos:

      • Em Modelos, escolha Desenvolvimento.

    • Escolha Criar aplicativo de streaming.

nota

Ao criar um serviço gerenciado para o aplicativo Apache Flink usando o console, você tem a opção de criar uma IAM função e uma política para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses IAM recursos são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Função: kinesisanalytics-MyApplication-us-west-2

O Amazon Managed Service para Apache Flink era conhecido anteriormente como Kinesis Data Analytics. O nome dos recursos que são gerados automaticamente é prefixado kinesis-analytics para fins de compatibilidade com versões anteriores.

Edite a IAM política

Edite a IAM política para adicionar permissões para acessar o bucket do Amazon S3.

Para editar a IAM política para adicionar permissões de bucket do S3
  1. Abra o console do IAM em https://console.aws.amazon.com/iam/.

  2. Selecione Policies (Políticas). Selecione a política kinesis-analytics-service-MyApplication-us-east-1 que o console criou para você na seção anterior.

  3. Escolha Editar e, em seguida, escolha a JSONguia.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (012345678901) com o ID da sua conta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Escolha Próximo e, em seguida, escolha Salvar alterações.

Configurar o aplicativo

Edite a configuração do aplicativo para definir o artefato do código do aplicativo.

Configurar o aplicativo
  1. Na MyApplicationpágina, escolha Configurar.

  2. Na seção Localização do código do aplicativo:

    • Para o bucket do Amazon S3, selecione o bucket que você criou anteriormente para o código do aplicativo. Escolha Procurar e selecione o bucket correto e, em seguida, escolha Escolher. Não selecione o nome do bucket.

    • Em Caminho do objeto do Amazon S3, insira managed-flink-pyflink-getting-started-1.0.0.zip.

  3. Para permissões de acesso, escolha Criar/atualizar IAM função kinesis-analytics-MyApplication-us-east-1 com as políticas necessárias.

  4. Vá para as propriedades de tempo de execução e mantenha os valores padrão para todas as outras configurações.

  5. Escolha Adicionar novo item e adicione cada um dos seguintes parâmetros:

    ID do grupo Chave Valor
    InputStream0 stream.name ExampleInputStream
    InputStream0 flink.stream.initpos LATEST
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
    kinesis.analytics.flink.run.options python main.py
    kinesis.analytics.flink.run.options jarfile lib/pyflink-dependencies.jar
  6. Não modifique nenhuma das outras seções e escolha Salvar alterações.

nota

Quando você opta por ativar o CloudWatch registro na Amazon, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Execute o aplicativo

O aplicativo agora está configurado e pronto para ser executado.

Executar o aplicativo
  1. No console do Amazon Managed Service para Apache Flink, escolha Meu aplicativo e escolha Executar.

  2. Na próxima página, na página de configuração de restauração do aplicativo, escolha Executar com o snapshot mais recente e, em seguida, escolha Executar.

    O status nos detalhes do aplicativo muda de Ready para Starting e depois para Running quando o aplicativo é iniciado.

Quando o aplicativo está no Running status, agora você pode abrir o painel do Flink.

Para abrir o painel do
  1. Escolha Abrir painel do Apache Flink. O painel é aberto em uma nova página.

  2. Na lista de trabalhos em execução, escolha o único trabalho que você pode ver.

    nota

    Se você definiu as propriedades do Runtime ou editou as IAM políticas incorretamente, o status do aplicativo pode se transformar emRunning, mas o painel do Flink mostra que o trabalho está sendo reiniciado continuamente. Esse é um cenário de falha comum se o aplicativo estiver configurado incorretamente ou não tiver permissões para acessar os recursos externos.

    Quando isso acontecer, verifique a guia Exceções no painel do Flink para ver a causa do problema.

Observe as métricas do aplicativo em execução

Na MyApplicationpágina, na seção de CloudWatch métricas da Amazon, você pode ver algumas das métricas fundamentais do aplicativo em execução.

Para ver as métricas
  1. Ao lado do botão Atualizar, selecione 10 segundos na lista suspensa.

  2. Quando o aplicativo está em execução e em bom estado, você pode ver a métrica de tempo de atividade aumentando continuamente.

  3. A métrica de reinicializações completas deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Para investigar o problema, revise a guia Exceções no painel do Flink.

  4. A métrica Número de pontos de verificação com falha deve ser zero em um aplicativo saudável.

    nota

    Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no CloudWatch painel.

Observe os dados de saída nos streams do Kinesis

Verifique se você ainda está publicando dados na entrada, usando o script Python ou o Kinesis Data Generator.

Agora você pode observar a saída do aplicativo em execução no Managed Service for Apache Flink usando o Visualizador de dados no https://console.aws.amazon.com/kinesis/, da mesma forma que você já fez anteriormente.

Para ver a saída
  1. Abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. Verifique se a região é a mesma que você está usando para executar este tutorial. Por padrão, é US-East-1US East (Norte da Virgínia). Altere a região, se necessário.

  3. Escolha Fluxos de dados.

  4. Selecione o stream que você deseja observar. Para este tutorial, use ExampleOutputStream.

  5. Escolha a guia Visualizador de dados.

  6. Selecione qualquer fragmento, mantenha Último como posição inicial e escolha Obter registros. Talvez você veja o erro “nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha Tentar obter registros novamente. Os registros mais recentes publicados no stream são exibidos.

  7. Selecione o valor na coluna Dados para inspecionar o conteúdo do registro em JSON formato.

Pare o aplicativo

Para interromper o aplicativo, acesse a página do console do aplicativo Managed Service for Apache Flink chamada. MyApplication

Como interromper o aplicativo
  1. Na lista suspensa Ação, escolha Parar.

  2. O status nos detalhes do aplicativo muda de Running para Stopping e depois para Ready quando o aplicativo é completamente interrompido.

    nota

    Não se esqueça também de parar de enviar dados para o stream de entrada a partir do script Python ou do Kinesis Data Generator.

Próxima etapa

Limpe AWS os recursos