Crie e execute um serviço gerenciado para o aplicativo Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie e execute um serviço gerenciado para o aplicativo Apache Flink

Neste exercício, você cria um serviço gerenciado para o aplicativo Apache Flink com fluxos de dados do Kinesis como fonte e coletor.

Crie recursos dependentes

Antes de criar um Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Um bucket do Amazon S3 para armazenar o código do aplicativo e gravar a saída do aplicativo.

    nota

    Este tutorial pressupõe que você esteja implantando seu aplicativo na região us-east-1. Se você usa outra região, deve adaptar todas as etapas adequadamente.

Criar um bucket do Amazon S3

Você pode criar um bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esse recurso, consulte os tópicos a seguir:

  • Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login.

    nota

    Certifique-se de criar o bucket na região que você usa para este tutorial. O padrão para o tutorial é us-east-1.

Outros recursos

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:

  • Um grupo de logs chamado /AWS/KinesisAnalytics-java/<my-application>.

  • Um fluxo de logs chamado kinesis-analytics-log-stream.

Configurar seu ambiente de desenvolvimento local

Para desenvolvimento e depuração, você pode executar o aplicativo Apache Flink em sua máquina, diretamente da sua escolha. IDE Todas as dependências do Apache Flink são tratadas como dependências normais do Java usando o Maven.

nota

Em sua máquina de desenvolvimento, você deve ter o Java JDK 11, o Maven e o Git instalados. Recomendamos que você use um ambiente de desenvolvimento como o Eclipse, Java Neon ou IntelliJ. IDEA Para verificar se você atende a todos os pré-requisitos, consulte. Cumpra os pré-requisitos para concluir os exercícios Você não precisa instalar um cluster Apache Flink em sua máquina.

Autentique sua sessão AWS

O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no stream de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:

  1. Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulteConfigure o AWS Command Line Interface (AWS CLI).

  2. Se você IDE tiver um plug-in com o qual se integrar AWS, poderá usá-lo para passar as credenciais para o aplicativo em execução noIDE. Para obter mais informações, consulte AWS Kit de ferramentas para IDEA IntelliJ AWS e Kit de ferramentas para compilar o aplicativo ou executar o Eclipse.

Baixe e examine o código Java de streaming do Apache Flink

O código do aplicativo para este exemplo está disponível em GitHub.

Para baixar o código de aplicativo Java
  1. Duplique o repositório remoto usando o seguinte comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Navegue até o diretório ./java/GettingStartedTable.

Revise os componentes do aplicativo

O aplicativo é totalmente implementado na com.amazonaws.services.msf.BasicTableJob classe. O main() método define fontes, transformações e sumidouros. A execução é iniciada por uma instrução de execução no final desse método.

nota

Para uma experiência ideal para o desenvolvedor, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service para Apache Flink e localmente, para desenvolvimento em seu. IDE

  • Para ler a configuração do tempo de execução para que ela funcione durante a execução no Amazon Managed Service para Apache Flink e no seuIDE, o aplicativo detecta automaticamente se está sendo executado de forma independente localmente no. IDE Nesse caso, o aplicativo carrega a configuração do tempo de execução de forma diferente:

    1. Quando o aplicativo detectar que está sendo executado no modo autônomo no seuIDE, forme o application_properties.json arquivo incluído na pasta de recursos do projeto. O conteúdo do arquivo é apresentado a seguir.

    2. Quando o aplicativo é executado no Amazon Managed Service para Apache Flink, o comportamento padrão carrega a configuração do aplicativo a partir das propriedades de tempo de execução que você definirá no aplicativo Amazon Managed Service para Apache Flink. Consulte Crie e configure o serviço gerenciado para o aplicativo Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • O main() método define o fluxo de dados do aplicativo e o executa.

    • Inicializa os ambientes de streaming padrão. Neste exemplo, mostramos como criar tanto o StreamExecutionEnvironment para usar com o DataStream API quanto o StreamTableEnvironment para usar com SQL e a TabelaAPI. Os dois objetos de ambiente são duas referências separadas ao mesmo ambiente de tempo de execução, para uso diferenteAPIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env, EnvironmentSettings.newInstance().build());
    • Carregue os parâmetros de configuração do aplicativo. Isso os carregará automaticamente do local correto, dependendo de onde o aplicativo está sendo executado:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • O conector FileSystem coletor que o aplicativo usa para gravar resultados nos arquivos de saída do Amazon S3 quando o Flink conclui um ponto de verificação. Você deve ativar os pontos de verificação para gravar arquivos no destino. Quando o aplicativo está sendo executado no Amazon Managed Service para Apache Flink, a configuração do aplicativo controla o ponto de verificação e o ativa por padrão. Por outro lado, quando executados localmente, os pontos de verificação são desativados por padrão. O aplicativo detecta que ele é executado localmente e configura o ponto de verificação a cada 5.000 ms.

      if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
    • Esse aplicativo não recebe dados de uma fonte externa real. Ele gera dados aleatórios para serem processados por meio do DataGen conector. Esse conector está disponível para DataStream APISQL, e TabelaAPI. Para demonstrar a integração entre APIs eles, o aplicativo usa a DataStram API versão porque ela oferece mais flexibilidade. Cada registro é gerado por uma função geradora chamada StockPriceGeneratorFunction nesse caso, na qual você pode colocar uma lógica personalizada.

      DataGeneratorSource<StockPrice> source = new DataGeneratorSource<>( new StockPriceGeneratorFunction(), Long.MAX_VALUE, RateLimiterStrategy.perSecond(recordPerSecond), TypeInformation.of(StockPrice.class));
    • No DataStream API, os registros podem ter classes personalizadas. As aulas devem seguir regras específicas para que o Flink possa usá-las como registro. Para obter mais informações, consulte Tipos de dados compatíveis. Neste exemplo, a StockPrice classe é uma POJO.

    • A fonte é então anexada ao ambiente de execução, gerando um DataStream deStockPrice. Esse aplicativo não usa semântica de tempo de evento e não gera uma marca d'água. Execute a DataGenerator fonte com um paralelismo de 1, independente do paralelismo do resto do aplicativo.

      DataStream<StockPrice> stockPrices = env.fromSource( source, WatermarkStrategy.noWatermarks(), "data-generator" ).setParallelism(1);
    • O que segue no fluxo de processamento de dados é definido usando a Tabela API SQL e. Para fazer isso, convertemos o DataStream of StockPrices em uma tabela. O esquema da tabela é automaticamente inferido da StockPrice classe.

      Table stockPricesTable = tableEnv.fromDataStream(stockPrices);
    • O trecho de código a seguir mostra como definir uma visualização e uma consulta usando a tabela programática: API

      Table filteredStockPricesTable = stockPricesTable. select( $("eventTime").as("event_time"), $("ticker"), $("price"), dateFormat($("eventTime"), "yyyy-MM-dd").as("dt"), dateFormat($("eventTime"), "HH").as("hr") ).where($("price").isGreater(50)); tableEnv.createTemporaryView("filtered_stock_prices", filteredStockPricesTable);
    • Uma tabela de coletor é definida para gravar os resultados em um bucket do Amazon S3 como JSON arquivos. Para ilustrar a diferença de definir uma visualização programaticamente, com a Tabela, API a tabela coletora é definida usando. SQL

      tableEnv.executeSql("CREATE TABLE s3_sink (" + "eventTime TIMESTAMP(3)," + "ticker STRING," + "price DOUBLE," + "dt STRING," + "hr STRING" + ") PARTITIONED BY ( dt, hr ) WITH (" + "'connector' = 'filesystem'," + "'fmat' = 'json'," + "'path' = 's3a://" + s3Path + "'" + ")");
    • A última etapa do é inserir executeInsert() a visualização filtrada dos preços das ações na mesa da pia. Esse método inicia a execução do fluxo de dados que definimos até agora.

      filteredStockPricesTable.executeInsert("s3_sink");

Use o arquivo pom.xml

O arquivo pom.xml define todas as dependências exigidas pelo aplicativo e configura o plug-in Maven Shade para criar o fat-jar que contém todas as dependências exigidas pelo Flink.

  • Algumas dependências têm provided escopo. Essas dependências estão disponíveis automaticamente quando o aplicativo é executado no Amazon Managed Service para Apache Flink. Eles são necessários para a inscrição ou para o aplicativo local em seuIDE. Para obter mais informações, consulte (atualizar para a tabelaAPI)Execute seu aplicativo localmente. Certifique-se de usar a mesma versão do Flink do tempo de execução que você usará no Amazon Managed Service para Apache Flink. Para usar a Tabela API eSQL, você deve incluir o flink-table-planner-loader eflink-table-runtime-dependencies, ambos com provided escopo.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-planner-loader</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-table-runtime</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Você deve adicionar dependências adicionais do Apache Flink ao pom com o escopo padrão. Por exemplo, o DataGen conector, o FileSystem SQLconector e o JSONformato.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-datagen</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-files</artifactId> <version>${flink.version}</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-json</artifactId> <version>${flink.version}</version> </dependency>
  • Para gravar no Amazon S3 quando executado localmente, o S3 Hadoop File System também está incluído no escopo. provided

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-s3-fs-hadoop</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • O plug-in Maven Java Compiler garante que o código seja compilado em Java 11, a JDK versão atualmente suportada pelo Apache Flink.

  • O plug-in Maven Shade empacota o fat-jar, excluindo algumas bibliotecas que são fornecidas pelo tempo de execução. Também especifica dois transformadores: e. ServicesResourceTransformer ManifestResourceTransformer O último configura a classe que contém o main método para iniciar o aplicativo. Se você renomear a classe principal, não se esqueça de atualizar esse transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Execute seu aplicativo localmente

Você pode executar e depurar seu aplicativo Flink localmente em seu. IDE

nota

Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte Crie dois streams de dados do Amazon Kinesis. Além disso, verifique se você tem permissão para ler e gravar nos dois fluxos. Consulte Autentique sua sessão AWS.

A configuração do ambiente de desenvolvimento local requer Java 11JDK, Apache Maven e an IDE para desenvolvimento em Java. Verifique se você atende aos pré-requisitos exigidos. Consulte Cumpra os pré-requisitos para concluir os exercícios.

Importe o projeto Java para o seu IDE

Para começar a trabalhar no aplicativo em seuIDE, você deve importá-lo como um projeto Java.

O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do ./jave/GettingStartedTable subdiretório para o seuIDE.

Insira o código como um projeto Java existente usando o Maven.

nota

O processo exato para importar um novo projeto Java varia de acordo com o IDE que você está usando.

Modificar a configuração do aplicativo local

Ao ser executado localmente, o aplicativo usa a configuração no application_properties.json arquivo na pasta de recursos do projeto abaixo./src/main/resources. Para este aplicativo tutorial, os parâmetros de configuração são o nome do bucket e o caminho em que os dados serão gravados.

Edite a configuração e modifique o nome do bucket do Amazon S3 para corresponder ao bucket que você criou no início deste tutorial.

[ { "PropertyGroupId": "bucket", "PropertyMap": { "name": "<bucket-name>", "path": "output" } } ]
nota

A propriedade de configuração name deve conter somente o nome do bucket, por exemplomy-bucket-name. Não inclua nenhum prefixo, como s3:// ou uma barra final.

Se você modificar o caminho, omita as barras à esquerda ou à direita.

Defina sua configuração de IDE execução

Você pode executar e depurar o aplicativo Flink IDE diretamente de você executando a classe principalcom.amazonaws.services.msf.BasicTableJob, da mesma forma que executaria qualquer aplicativo Java. Antes de executar o aplicativo, você deve definir a configuração Executar. A configuração depende do IDE que você está usando. Por exemplo, consulte Configurações de execução/depuração na documentação do IntelliJ. IDEA Em particular, você deve configurar o seguinte:

  1. Adicione as provided dependências ao classpath. Isso é necessário para garantir que as dependências com provided escopo sejam passadas para o aplicativo quando executado localmente. Sem essa configuração, o aplicativo exibe um class not found erro imediatamente.

  2. Passe as AWS credenciais para acessar os streams do Kinesis para o aplicativo. A maneira mais rápida é usar o AWS Toolkit for IDEA IntelliJ. Usando esse IDE plug-in na configuração Executar, você pode selecionar um AWS perfil específico. AWS a autenticação acontece usando esse perfil. Você não precisa passar as AWS credenciais diretamente.

  3. Verifique se ele IDE executa o aplicativo usando JDK11.

Execute o aplicativo em seu IDE

Depois de definir a configuração Executar para oBasicTableJob, você pode executá-la ou depurá-la como um aplicativo Java comum.

nota

Você não pode executar o fat-jar gerado pelo Maven diretamente na linha java -jar ... de comando. Esse jar não contém as dependências principais do Flink necessárias para executar o aplicativo de forma independente.

Quando o aplicativo é iniciado com êxito, ele registra algumas informações sobre o minicluster autônomo e a inicialização dos conectores. Isso é seguido por vários INFO e alguns WARN registros que o Flink normalmente emite quando o aplicativo é iniciado.

21:28:34,982 INFO com.amazonaws.services.msf.BasicTableJob [] - Loading application properties from 'flink-application-properties-dev.json' 21:28:35,149 INFO com.amazonaws.services.msf.BasicTableJob [] - s3Path is ExampleBucket/my-output-bucket ...

Depois que a inicialização for concluída, o aplicativo não emitirá mais nenhuma entrada de registro. Enquanto os dados estão fluindo, nenhum registro é emitido.

Para verificar se o aplicativo está processando dados corretamente, você pode inspecionar o conteúdo do bucket de saída, conforme descrito na seção a seguir.

nota

Não emitir registros sobre o fluxo de dados é o comportamento normal de um aplicativo Flink. A emissão de registros em cada registro pode ser conveniente para depuração, mas pode adicionar uma sobrecarga considerável durante a execução em produção.

Observe o aplicativo gravando dados em um bucket S3

Esse aplicativo de exemplo gera dados aleatórios internamente e grava esses dados no bucket S3 de destino que você configurou. A menos que você tenha modificado o caminho de configuração padrão, os dados serão gravados no output caminho seguido pelo particionamento de dados e horas, no formato. ./output/<yyyy-MM-dd>/<HH>

O conector do FileSystem coletor cria novos arquivos no ponto de verificação do Flink. Ao ser executado localmente, o aplicativo executa um ponto de verificação a cada 5 segundos (5.000 milissegundos), conforme especificado no código.

if (env instanceof LocalStreamEnvironment) { env.enableCheckpointing(5000); }
Para navegar no bucket do S3 e observar o arquivo gravado pelo aplicativo
    1. Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/

  1. Escolha o bucket que você criou anteriormente.

  2. Navegue até o output caminho e, em seguida, até as pastas de data e hora que correspondem à hora atual no fuso UTC horário.

  3. Atualize periodicamente para observar novos arquivos aparecendo a cada 5 segundos.

  4. Selecione e baixe um arquivo para observar o conteúdo.

    nota

    Por padrão, os arquivos não têm extensões. O conteúdo é formatado comoJSON. Você pode abrir os arquivos com qualquer editor de texto para inspecionar o conteúdo.

Interrompa a execução local do seu aplicativo

Pare a execução do aplicativo em seuIDE. IDEGeralmente fornece uma opção de “parar”. A localização e o método exatos dependem doIDE.

Compile e empacote o código do seu aplicativo

Nesta seção, você usa o Apache Maven para compilar o código Java e empacotá-lo em um arquivo. JAR Você pode compilar e empacotar seu código usando a ferramenta de linha de comando Maven ou seu. IDE

Para compilar e empacotar usando a linha de comando do Maven

Vá para o diretório que contém o GettingStarted projeto Jave e execute o seguinte comando:

$ mvn package

Para compilar e empacotar usando seu IDE

Execute mvn package a partir da sua integração com o IDE Maven.

Em ambos os casos, o JAR arquivo target/amazon-msf-java-table-app-1.0.jar é criado.

nota

Executar um projeto de compilação a partir do seu IDE pode não criar o JAR arquivo.

Faça o upload do JAR arquivo de código do aplicativo

Nesta seção, você carrega o JAR arquivo criado na seção anterior para o bucket do Amazon S3 que você criou no início deste tutorial. Se você já fez isso, concluaCriar um bucket do Amazon S3.

Para fazer upload do código do aplicativo
  1. Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/

  2. Escolha o bucket que você criou anteriormente para o código do aplicativo.

  3. Escolha o campo Carregar.

  4. Escolha Adicionar arquivos.

  5. Navegue até o JAR arquivo gerado na seção anterior:target/amazon-msf-java-table-app-1.0.jar.

  6. Escolha Carregar sem alterar nenhuma outra configuração.

    Atenção

    Certifique-se de selecionar o JAR arquivo correto em<repo-dir>/java/GettingStarted/target/amazon/msf-java-table-app-1.0.jar.

    O diretório de destino também contém outros JAR arquivos que você não precisa carregar.

Crie e configure o serviço gerenciado para o aplicativo Apache Flink

Você pode criar e configurar um serviço gerenciado para o aplicativo Apache Flink usando o console ou o. AWS CLI Para este tutorial, você usará o console.

nota

Quando você cria o aplicativo usando o console, seus recursos AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs são criados para você. Ao criar o aplicativo usando o AWS CLI, você deve criar esses recursos separadamente.

Criar o aplicativo

  1. Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com

  2. Verifique se a região correta está selecionada: Leste dos EUA (Norte da Virgínia) us-east-1.

  3. No menu à direita, escolha aplicativos Apache Flink e, em seguida, escolha Criar aplicativo de streaming. Como alternativa, escolha Criar aplicativo de streaming na seção Começar da página inicial.

  4. Na página Criar aplicativo de streaming, faça o seguinte:

    • Em Escolha um método para configurar o aplicativo de processamento de stream, escolha Criar do zero.

    • Para configuração do Apache Flink, versão do aplicativo Flink, escolha Apache Flink 1.19.

    • Na seção Configuração do aplicativo, preencha o seguinte:

      • Em Nome do aplicativo, insira MyApplication.

      • Em Descrição, insira My Java Table API test app.

      • Para Acesso aos recursos do aplicativo, escolha Create/update IAM role kinesis-analytics- MyApplication -us-east-1 com as políticas necessárias.

    • Em Modelo para configurações do aplicativo, faça o seguinte:

      • Em Modelos, escolha Desenvolvimento.

  5. Escolha Criar aplicativo de streaming.

nota

Ao criar um serviço gerenciado para o aplicativo Apache Flink usando o console, você tem a opção de criar uma IAM função e uma política para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses IAM recursos são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • Função: kinesisanalytics-MyApplication-us-east-1

Edite a IAM política

Edite a IAM política para adicionar permissões para acessar o bucket do Amazon S3.

Para editar a IAM política para adicionar permissões de bucket do S3
  1. Abra o IAM console em https://console.aws.amazon.com/iam/.

  2. Selecione Policies (Políticas). Selecione a política kinesis-analytics-service-MyApplication-us-east-1 que o console criou para você na seção anterior.

  3. Escolha Editar e, em seguida, escolha a JSONguia.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua o ID da conta de amostra (012345678901) com o ID da sua conta e <bucket-name> com o nome do bucket do S3 que você criou.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "WriteOutputBucket", "Effect": "Allow", "Action": "s3:*", Resource": [ "arn:aws:s3:::my-bucket" ] } ] }
  5. Escolha Próximo e, em seguida, escolha Salvar alterações.

Configurar o aplicativo

Edite o aplicativo para definir o artefato do código do aplicativo.

Configurar o aplicativo
  1. Na MyApplicationpágina, escolha Configurar.

  2. Na seção Localização do código do aplicativo, escolha Configurar.

    • Para o bucket do Amazon S3, selecione o bucket que você criou anteriormente para o código do aplicativo. Escolha Procurar e selecione o bucket correto e, em seguida, escolha Escolher. Não clique no nome do bucket.

    • Em Caminho do objeto do Amazon S3, insira amazon-msf-java-table-app-1.0.jar.

  3. Para permissões de acesso, escolha Criar/atualizar IAM função kinesis-analytics-MyApplication-us-east-1.

  4. Na seção Propriedades do tempo de execução, adicione as propriedades a seguir.

  5. Escolha Adicionar novo item e adicione cada um dos seguintes parâmetros:

    ID do grupo Chave Valor
    bucket name your-bucket-name
    bucket path output
  6. Não modifique nenhuma outra configuração.

  7. Escolha Salvar alterações.

nota

Quando você opta por ativar o CloudWatch registro na Amazon, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Execute o aplicativo

O aplicativo agora está configurado e pronto para ser executado.

Executar o aplicativo
  1. Volte para a página do console no Amazon Managed Service para Apache Flink e escolha. MyApplication

  2. Escolha Executar para iniciar o aplicativo.

  3. Na configuração de restauração do aplicativo, escolha Executar com o snapshot mais recente.

  4. Escolha Executar.

  5. O status nos detalhes do aplicativo muda de Ready para Starting e depois para Running após o início do aplicativo.

Quando o aplicativo está em Running status, você pode abrir o painel do Flink.

Para abrir o painel e visualizar o trabalho
  1. Escolha Abrir painel do Apache Flink. O painel é aberto em uma nova página.

  2. Na lista de trabalhos em execução, escolha o único trabalho que você pode ver.

    nota

    Se você definiu as propriedades do tempo de execução ou editou as IAM políticas incorretamente, o status do aplicativo pode mudar paraRunning, mas o painel do Flink mostra o trabalho sendo reiniciado continuamente. Esse é um cenário de falha comum quando o aplicativo está configurado incorretamente ou não tem as permissões para acessar os recursos externos.

    Quando isso acontecer, verifique a guia Exceções no painel do Flink para investigar a causa do problema.

Observe as métricas do aplicativo em execução

Na MyApplicationpágina, na seção de CloudWatch métricas da Amazon, você pode ver algumas das métricas fundamentais do aplicativo em execução.

Para ver as métricas
  1. Ao lado do botão Atualizar, selecione 10 segundos na lista suspensa.

  2. Quando o aplicativo está em execução e em bom estado, você pode ver a métrica de tempo de atividade aumentando continuamente.

  3. A métrica de reinicializações completas deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Consulte a guia Exceções no painel do Flink para investigar o problema.

  4. A métrica Número de pontos de verificação com falha deve ser zero em um aplicativo saudável.

    nota

    Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no CloudWatch painel.

Observe o aplicativo gravando dados no bucket de destino

Agora você pode observar o aplicativo em execução no Amazon Managed Service para Apache Flink gravando arquivos no Amazon S3.

Para observar os arquivos, siga o mesmo processo usado para verificar os arquivos que estavam sendo gravados quando o aplicativo estava sendo executado localmente. Consulte Observe o aplicativo gravando dados em um bucket S3.

Lembre-se de que o aplicativo grava novos arquivos no ponto de verificação do Flink. Quando executados no Amazon Managed Service para Apache Flink, os pontos de verificação são habilitados por padrão e executados a cada 60 segundos. O aplicativo cria novos arquivos aproximadamente a cada 1 minuto.

Pare o aplicativo

Para interromper o aplicativo, acesse a página do console do aplicativo Managed Service for Apache Flink chamada. MyApplication

Como interromper o aplicativo
  1. Na lista suspensa Ação, escolha Parar.

  2. O status nos detalhes do aplicativo muda de Running para Stopping e depois para Ready quando o aplicativo é completamente interrompido.

    nota

    Não se esqueça também de parar de enviar dados para o stream de entrada a partir do script Python ou do Kinesis Data Generator.