Crie e execute um serviço gerenciado para o aplicativo Apache Flink - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Crie e execute um serviço gerenciado para o aplicativo Apache Flink

Nesta etapa, você cria um serviço gerenciado para o aplicativo Apache Flink com fluxos de dados do Kinesis como fonte e coletor.

Crie recursos dependentes

Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Dois streams de dados do Kinesis para entrada e saída

  • Um bucket Amazon S3 para armazenar o código do aplicativo

    nota

    Este tutorial pressupõe que você esteja implantando seu aplicativo na região us-east-1 Leste dos EUA (Norte da Virgínia). Se você usa outra região, adapte todas as etapas adequadamente.

Crie dois streams de dados do Amazon Kinesis

Antes de criar um aplicativo do Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis (ExampleInputStream e ExampleOutputStream). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.

Você pode criar esses streams usando o console do Amazon Kinesis ou o comando a AWS CLI seguir. Para obter instruções sobre o console, consulte Criar e atualizar fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams. Para criar os fluxos usando o AWS CLI, use os comandos a seguir, ajustando-se à região que você usa para seu aplicativo.

Como criar os fluxos de dados (AWS CLI)
  1. Para criar o primeiro stream (ExampleInputStream), use o seguinte comando do Amazon Kinesis create-stream AWS CLI :

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-east-1 \
  2. Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome do fluxo paraExampleOutputStream:

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-east-1 \

Crie um bucket Amazon S3 para o código do aplicativo

Você pode criar um bucket do Amazon S3 usando o console. Para saber como criar um bucket do Amazon S3 usando o console, consulte Criação de um bucket no Guia do usuário do Amazon S3. Nomeie o bucket do Amazon S3 usando um nome globalmente exclusivo, por exemplo, anexando seu nome de login.

nota

Certifique-se de criar o bucket na região que você usa para este tutorial (us-east-1).

Outros recursos

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria automaticamente os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:

  • Um grupo de logs chamado /AWS/KinesisAnalytics-java/<my-application>

  • Um fluxo de logs chamado kinesis-analytics-log-stream

Configurar seu ambiente de desenvolvimento local

Para desenvolvimento e depuração, você pode executar o aplicativo Apache Flink em sua máquina diretamente da sua escolha. IDE Todas as dependências do Apache Flink são tratadas como dependências Java regulares usando o Apache Maven.

nota

Em sua máquina de desenvolvimento, você deve ter o Java JDK 11, o Maven e o Git instalados. Recomendamos que você use um ambiente de desenvolvimento como o Eclipse, Java Neon ou IntelliJ. IDEA Para verificar se você atende a todos os pré-requisitos, consulte. Cumpra os pré-requisitos para concluir os exercícios Você não precisa instalar um cluster Apache Flink em sua máquina.

Autentique sua sessão AWS

O aplicativo usa fluxos de dados do Kinesis para publicar dados. Ao executar localmente, você deve ter uma sessão AWS autenticada válida com permissões para gravar no stream de dados do Kinesis. Use as etapas a seguir para autenticar sua sessão:

  1. Se você não tiver o AWS CLI e um perfil nomeado com credencial válida configurado, consulteConfigure o AWS Command Line Interface (AWS CLI).

  2. Verifique se o seu AWS CLI está configurado corretamente e se seus usuários têm permissões para gravar no stream de dados do Kinesis publicando o seguinte registro de teste:

    $ aws kinesis put-record --stream-name ExampleOutputStream --data TEST --partition-key TEST
  3. Se você IDE tiver um plug-in com o qual se integrar AWS, poderá usá-lo para passar as credenciais para o aplicativo em execução noIDE. Para obter mais informações, consulte AWS Toolkit for IDEA AWS IntelliJ e Toolkit for Eclipse.

Baixe e examine o código Java de streaming do Apache Flink

O código do aplicativo Java para este exemplo está disponível em GitHub. Para fazer download do código do aplicativo, faça o seguinte:

  1. Duplique o repositório remoto usando o seguinte comando:

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Navegue até o diretório amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.

Analise os componentes do aplicativo

O aplicativo é totalmente implementado na com.amazonaws.services.msf.BasicStreamingJob classe. O main() método define o fluxo de dados para processar os dados de streaming e executá-los.

nota

Para uma experiência de desenvolvedor otimizada, o aplicativo foi projetado para ser executado sem nenhuma alteração de código no Amazon Managed Service para Apache Flink e localmente, para desenvolvimento em seu. IDE

  • Para ler a configuração de tempo de execução para que ela funcione ao ser executada no Amazon Managed Service para Apache Flink e no seuIDE, o aplicativo detecta automaticamente se está sendo executado de forma independente localmente no. IDE Nesse caso, o aplicativo carrega a configuração do tempo de execução de forma diferente:

    1. Quando o aplicativo detectar que está sendo executado no modo autônomo no seuIDE, forme o application_properties.json arquivo incluído na pasta de recursos do projeto. O conteúdo do arquivo segue.

    2. Quando o aplicativo é executado no Amazon Managed Service para Apache Flink, o comportamento padrão carrega a configuração do aplicativo a partir das propriedades de tempo de execução que você definirá no aplicativo Amazon Managed Service para Apache Flink. Consulte Crie e configure o serviço gerenciado para o aplicativo Apache Flink.

      private static Map<String, Properties> loadApplicationProperties(StreamExecutionEnvironment env) throws IOException { if (env instanceof LocalStreamEnvironment) { LOGGER.info("Loading application properties from '{}'", LOCAL_APPLICATION_PROPERTIES_RESOURCE); return KinesisAnalyticsRuntime.getApplicationProperties( BasicStreamingJob.class.getClassLoader() .getResource(LOCAL_APPLICATION_PROPERTIES_RESOURCE).getPath()); } else { LOGGER.info("Loading application properties from Amazon Managed Service for Apache Flink"); return KinesisAnalyticsRuntime.getApplicationProperties(); } }
  • O main() método define o fluxo de dados do aplicativo e o executa.

    • Inicializa os ambientes de streaming padrão. Neste exemplo, mostramos como criar o StreamExecutionEnvironment a ser usado com o DataSteam API e o StreamTableEnvironment a ser usado com SQL e a TabelaAPI. Os dois objetos de ambiente são duas referências separadas ao mesmo ambiente de tempo de execução, para uso diferenteAPIs.

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    • Carregue os parâmetros de configuração do aplicativo. Isso os carregará automaticamente do local correto, dependendo de onde o aplicativo está sendo executado:

      Map<String, Properties> applicationParameters = loadApplicationProperties(env);
    • O aplicativo define uma fonte usando o conector Kinesis Consumer para ler dados do stream de entrada. A configuração do fluxo de entrada é definida no PropertyGroupId =InputStream0. O nome e a região do fluxo estão nas propriedades nomeadas stream.name eaws.region, respectivamente. Para simplificar, essa fonte lê os registros como uma string.

      private static FlinkKinesisConsumer<String> createSource(Properties inputProperties) { String inputStreamName = inputProperties.getProperty("stream.name"); return new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties); } ... public static void main(String[] args) throws Exception { ... SourceFunction<String> source = createSource(applicationParameters.get("InputStream0")); DataStream<String> input = env.addSource(source, "Kinesis Source"); ... }
    • Em seguida, o aplicativo define um coletor usando o conector Kinesis Streams Sink para enviar dados para o stream de saída. O nome e a região do fluxo de saída são definidos em PropertyGroupId =OutputStream0, semelhante ao fluxo de entrada. O coletor é conectado diretamente ao interno DataStream que está recebendo dados da fonte. Em um aplicativo real, você tem alguma transformação entre fonte e coletor.

      private static KinesisStreamsSink<String> createSink(Properties outputProperties) { String outputStreamName = outputProperties.getProperty("stream.name"); return KinesisStreamsSink.<String>builder() .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema()) .setStreamName(outputStreamName) .setPartitionKeyGenerator(element -> String.valueOf(element.hashCode())) .build(); } ... public static void main(String[] args) throws Exception { ... Sink<String> sink = createSink(applicationParameters.get("OutputStream0")); input.sinkTo(sink); ... }
    • Por fim, você executa o fluxo de dados que acabou de definir. Essa deve ser a última instrução do main() método, depois de definir todos os operadores que o fluxo de dados exige:

      env.execute("Flink streaming Java API skeleton");

Use o arquivo pom.xml

O arquivo pom.xml define todas as dependências exigidas pelo aplicativo e configura o plug-in Maven Shade para criar o fat-jar que contém todas as dependências exigidas pelo Flink.

  • Algumas dependências têm provided escopo. Essas dependências estão disponíveis automaticamente quando o aplicativo é executado no Amazon Managed Service para Apache Flink. Eles são necessários para compilar o aplicativo ou para executá-lo localmente em seuIDE. Para obter mais informações, consulte Execute seu aplicativo localmente. Certifique-se de usar a mesma versão do Flink do tempo de execução que você usará no Amazon Managed Service para Apache Flink.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-streaming-java</artifactId> <version>${flink.version}</version> <scope>provided</scope> </dependency>
  • Você deve adicionar dependências adicionais do Apache Flink ao pom com o escopo padrão, como o conector Kinesis usado por esse aplicativo. Para obter mais informações, consulte Use conectores Apache Flink com o Managed Service para Apache Flink. Você também pode adicionar quaisquer dependências Java adicionais exigidas pelo seu aplicativo.

    <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-connector-kinesis</artifactId> <version>${aws.connector.version}</version> </dependency>
  • O plug-in Maven Java Compiler garante que o código seja compilado em Java 11, a JDK versão atualmente suportada pelo Apache Flink.

  • O plug-in Maven Shade empacota o fat-jar, excluindo algumas bibliotecas que são fornecidas pelo tempo de execução. Também especifica dois transformadores: e. ServicesResourceTransformer ManifestResourceTransformer O último configura a classe que contém o main método para iniciar o aplicativo. Se você renomear a classe principal, não se esqueça de atualizar esse transformador.

  • <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-shade-plugin</artifactId> ... <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer"> <mainClass>com.amazonaws.services.msf.BasicStreamingJob</mainClass> </transformer> ... </plugin>

Grave registros de amostra no fluxo de entrada

Nesta seção, você enviará registros de amostra ao stream para o aplicativo processar. Você tem duas opções para gerar dados de amostra, usando um script Python ou o Kinesis Data Generator.

Gere dados de amostra usando um script Python

Você pode usar um script Python para enviar registros de amostra para o stream.

nota

Para executar esse script Python, você deve usar o Python 3.x e ter a biblioteca for AWS SDKPython (Boto) instalada.

Para começar a enviar dados de teste para o stream de entrada do Kinesis:

  1. Baixe o script stock.py Python do gerador de dados no repositório do gerador GitHub de dados.

  2. Execute o script stock.py:

    $ python stock.py

Mantenha o script em execução enquanto você conclui o resto do tutorial. Agora você pode executar seu aplicativo Apache Flink.

Gere dados de amostra usando o Kinesis Data Generator

Como alternativa ao script Python, você pode usar o Kinesis Data Generator, também disponível em uma versão hospedada, para enviar dados de amostra aleatórios para o stream. O Kinesis Data Generator é executado no seu navegador e você não precisa instalar nada na sua máquina.

Para configurar e executar o Kinesis Data Generator:

  1. Siga as instruções na documentação do Kinesis Data Generator para configurar o acesso à ferramenta. Você executará um AWS CloudFormation modelo que configura um usuário e uma senha.

  2. Acesse o Kinesis Data Generator por meio do URL gerado pelo CloudFormation modelo. Você pode encontrar o URL na guia Saída após a conclusão do CloudFormation modelo.

  3. Configure o gerador de dados:

    • Região: selecione a região que você está usando para este tutorial: us-east-1

    • Stream/stream de entrega: selecione o stream de entrada que o aplicativo usará: ExampleInputStream

    • Registros por segundo: 100

    • Modelo de registro: copie e cole o seguinte modelo:

      { "event_time" : "{{date.now("YYYY-MM-DDTkk:mm:ss.SSSSS")}}, "ticker" : "{{random.arrayElement( ["AAPL", "AMZN", "MSFT", "INTC", "TBV"] )}}", "price" : {{random.number(100)}} }
  4. Teste o modelo: escolha Modelo de teste e verifique se o registro gerado é semelhante ao seguinte:

    { "event_time" : "2024-06-12T15:08:32.04800, "ticker" : "INTC", "price" : 7 }
  5. Inicie o gerador de dados: escolha Selecionar Enviar dados.

O Kinesis Data Generator agora está enviando dados para o. ExampleInputStream

Execute seu aplicativo localmente

Você pode executar e depurar seu aplicativo Flink localmente em seu. IDE

nota

Antes de continuar, verifique se os fluxos de entrada e saída estão disponíveis. Consulte Crie dois streams de dados do Amazon Kinesis. Além disso, verifique se você tem permissão para ler e gravar nos dois fluxos. Consulte Autentique sua sessão AWS.

A configuração do ambiente de desenvolvimento local requer Java 11JDK, Apache Maven e e IDE para desenvolvimento em Java. Verifique se você atende aos pré-requisitos exigidos. Consulte Cumpra os pré-requisitos para concluir os exercícios.

Importe o projeto Java para o seu IDE

Para começar a trabalhar no aplicativo em seuIDE, você deve importá-lo como um projeto Java.

O repositório que você clonou contém vários exemplos. Cada exemplo é um projeto separado. Para este tutorial, importe o conteúdo do ./java/GettingStarted subdiretório para o seuIDE.

Insira o código como um projeto Java existente usando o Maven.

nota

O processo exato para importar um novo projeto Java varia de acordo com o IDE que você está usando.

Verifique a configuração do aplicativo local

Ao ser executado localmente, o aplicativo usa a configuração no application_properties.json arquivo na pasta de recursos do projeto abaixo./src/main/resources. Você pode editar esse arquivo para usar diferentes nomes ou regiões de stream do Kinesis.

[ { "PropertyGroupId": "InputStream0", "PropertyMap": { "stream.name": "ExampleInputStream", "flink.stream.initpos": "LATEST", "aws.region": "us-east-1" } }, { "PropertyGroupId": "OutputStream0", "PropertyMap": { "stream.name": "ExampleOutputStream", "aws.region": "us-east-1" } } ]

Defina sua configuração de IDE execução

Você pode executar e depurar o aplicativo Flink IDE diretamente de você executando a classe principalcom.amazonaws.services.msf.BasicStreamingJob, da mesma forma que executaria qualquer aplicativo Java. Antes de executar o aplicativo, você deve definir a configuração Executar. A configuração depende do IDE que você está usando. Por exemplo, consulte Configurações de execução/depuração na documentação do IntelliJ. IDEA Em particular, você deve configurar o seguinte:

  1. Adicione as provided dependências ao classpath. Isso é necessário para garantir que as dependências com provided escopo sejam passadas para o aplicativo quando executado localmente. Sem essa configuração, o aplicativo exibe um class not found erro imediatamente.

  2. Passe as AWS credenciais para acessar os streams do Kinesis para o aplicativo. A maneira mais rápida é usar o AWS Toolkit for IDEA IntelliJ. Usando esse IDE plug-in na configuração Executar, você pode selecionar um AWS perfil específico. AWS a autenticação acontece usando esse perfil. Você não precisa passar as AWS credenciais diretamente.

  3. Verifique se ele IDE executa o aplicativo usando JDK11.

Execute o aplicativo em seu IDE

Depois de definir a configuração Executar para oBasicStreamingJob, você pode executá-la ou depurá-la como um aplicativo Java comum.

nota

Você não pode executar o fat-jar gerado pelo Maven diretamente na linha java -jar ... de comando. Esse jar não contém as dependências principais do Flink necessárias para executar o aplicativo de forma independente.

Quando o aplicativo é iniciado com êxito, ele registra algumas informações sobre o minicluster autônomo e a inicialização dos conectores. Isso é seguido por vários INFO e alguns WARN registros que o Flink normalmente emite quando o aplicativo é iniciado.

13:43:31,405 INFO com.amazonaws.services.msf.BasicStreamingJob [] - Loading application properties from 'flink-application-properties-dev.json' 13:43:31,549 INFO org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer [] - Flink Kinesis Consumer is going to read the following streams: ExampleInputStream, 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.cpu.cores required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.task.off-heap.size required for local execution is not set, setting it to the maximal possible value. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.min required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.network.max required for local execution is not set, setting it to its default value 64 mb. 13:43:31,676 INFO org.apache.flink.runtime.taskexecutor.TaskExecutorResourceUtils [] - The configuration option taskmanager.memory.managed.size required for local execution is not set, setting it to its default value 128 mb. 13:43:31,677 INFO org.apache.flink.runtime.minicluster.MiniCluster [] - Starting Flink Mini Cluster ....

Depois que a inicialização for concluída, o aplicativo não emitirá mais nenhuma entrada de registro. Enquanto os dados estão fluindo, nenhum registro é emitido.

Para verificar se o aplicativo está processando dados corretamente, você pode inspecionar os streams de entrada e saída do Kinesis, conforme descrito na seção a seguir.

nota

Não emitir registros sobre o fluxo de dados é o comportamento normal de um aplicativo Flink. A emissão de registros em cada registro pode ser conveniente para depuração, mas pode adicionar uma sobrecarga considerável durante a execução em produção.

Observe os dados de entrada e saída nos streams do Kinesis

Você pode observar os registros enviados ao stream de entrada pelo (gerando o Python de amostra) ou pelo Kinesis Data Generator (link) usando o Visualizador de dados no console do Amazon Kinesis.

Para observar registros
  1. Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.

  2. Verifique se a região é a mesma em que você está executando este tutorial, que é us-east-1 Leste dos EUA (Norte da Virgínia) por padrão. Altere a região se ela não corresponder.

  3. Escolha fluxos de dados.

  4. Selecione o fluxo que você deseja observar, ExampleInputStream ou ExampleOutputStream.

  5. Escolha a guia Visualizador de dados.

  6. Escolha qualquer fragmento, mantenha Último como posição inicial e escolha Obter registros. Talvez você veja o erro “Nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha Tentar obter registros novamente. Os registros mais recentes publicados no stream são exibidos.

  7. Escolha o valor na coluna Dados para inspecionar o conteúdo do registro em JSON formato.

Pare de executar seu aplicativo localmente

Pare a execução do aplicativo em seuIDE. IDEGeralmente fornece uma opção de “parar”. A localização e o método exatos dependem do IDE que você está usando.

Compile e empacote o código do seu aplicativo

Nesta seção, você usa o Apache Maven para compilar o código Java e empacotá-lo em um arquivo. JAR Você pode compilar e empacotar seu código usando a ferramenta de linha de comando Maven ou seu. IDE

Para compilar e empacotar usando a linha de comando do Maven:

Vá para o diretório que contém o GettingStarted projeto Java e execute o seguinte comando:

$ mvn package

Para compilar e empacotar usando seuIDE:

Execute mvn package a partir da sua integração com o IDE Maven.

Em ambos os casos, o seguinte JAR arquivo é criado:target/amazon-msf-java-stream-app-1.0.jar.

nota

Executar um “projeto de compilação” a partir do seu IDE pode não criar o JAR arquivo.

Faça o upload do JAR arquivo de código do aplicativo

Nesta seção, você carrega o JAR arquivo criado na seção anterior para o bucket do Amazon Simple Storage Service (Amazon S3) que você criou no início deste tutorial. Se você não concluiu essa etapa, consulte (link).

Para carregar o JAR arquivo de código do aplicativo
  1. Abra o console do Amazon S3 em. https://console.aws.amazon.com/s3/

  2. Escolha o bucket que você criou anteriormente para o código do aplicativo.

  3. Escolha Carregar.

  4. Escolha Adicionar arquivos.

  5. Navegue até o JAR arquivo gerado na etapa anterior:target/amazon-msf-java-stream-app-1.0.jar.

  6. Escolha Carregar sem alterar nenhuma outra configuração.

Atenção

Certifique-se de selecionar o JAR arquivo correto em<repo-dir>/java/GettingStarted/target/amazon-msf-java-stream-app-1.0.jar.

O target diretório também contém outros JAR arquivos que você não precisa carregar.

Crie e configure o serviço gerenciado para o aplicativo Apache Flink

Você pode criar e executar um aplicativo Managed Service for Apache Flink usando o console ou a AWS CLI. Para este tutorial, você usará o console.

nota

Quando você cria o aplicativo usando o console, seus recursos AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs são criados para você. Ao criar o aplicativo usando o AWS CLI, você cria esses recursos separadamente.

Criar o aplicativo

Para criar o aplicativo
  1. Abra o console do Managed Service for Apache Flink em /flink https://console.aws.amazon.com

  2. Verifique se a região correta está selecionada: us-east-1 Leste dos EUA (Norte da Virgínia)

  3. Abra o menu à direita e escolha aplicativos Apache Flink e, em seguida, Criar aplicativo de streaming. Como alternativa, escolha Criar aplicativo de streaming no contêiner Começar da página inicial.

  4. Na página Criar aplicativo de streaming:

    • Escolha um método para configurar o aplicativo de processamento de fluxo: escolha Criar do zero.

    • Configuração do Apache Flink, versão do aplicativo Flink: escolha Apache Flink 1.19.

  5. Configure seu aplicativo

    • Nome do aplicativo: enterMyApplication.

    • Descrição: enterMy java test app.

    • Acesso aos recursos do aplicativo: escolha Criar/atualizar IAM função kinesis-analytics-MyApplication-us-east-1 com as políticas necessárias.

  6. Configure seu modelo para as configurações do aplicativo

    • Modelos: escolha Desenvolvimento.

  7. Escolha Criar aplicativo de streaming na parte inferior da página.

nota

Ao criar um serviço gerenciado para o aplicativo Apache Flink usando o console, você tem a opção de criar uma IAM função e uma política para seu aplicativo. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses IAM recursos são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-east-1

  • Função: kinesisanalytics-MyApplication-us-east-1

O Amazon Managed Service para Apache Flink era conhecido anteriormente como Kinesis Data Analytics. O nome dos recursos que são criados automaticamente é prefixado com kinesis-analytics- para fins de compatibilidade com versões anteriores.

Edite a IAM política

Edite a IAM política para adicionar permissões para acessar os fluxos de dados do Kinesis.

Para editar a política
  1. Abra o IAM console em https://console.aws.amazon.com/iam/.

  2. Selecione Policies (Políticas). Selecione a política kinesis-analytics-service-MyApplication-us-east-1 que o console criou para você na seção anterior.

  3. Escolha Editar e, em seguida, escolha a JSONguia.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua a conta de amostra IDs (012345678901) com o ID da sua conta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::my-bucket/kinesis-analytics-placeholder-s3-object" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-east-1:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-east-1:012345678901:stream/ExampleOutputStream" } ] }
  5. Escolha Avançar na parte inferior da página e escolha Salvar alterações.

Configurar o aplicativo

Edite a configuração do aplicativo para definir o artefato do código do aplicativo.

Para editar a configuração
  1. Na MyApplicationpágina, escolha Configurar.

  2. Na seção Localização do código do aplicativo:

    • Para o bucket do Amazon S3, selecione o bucket que você criou anteriormente para o código do aplicativo. Escolha Procurar e selecione o bucket correto e, em seguida, selecione Escolher. Não clique no nome do bucket.

    • Em Caminho do objeto do Amazon S3, insira amazon-msf-java-stream-app-1.0.jar.

  3. Para permissões de acesso, escolha Criar/atualizar IAM função kinesis-analytics-MyApplication-us-east-1 com as políticas necessárias.

  4. Na seção Propriedades do tempo de execução, adicione as propriedades a seguir.

  5. Escolha Adicionar novo item e adicione cada um dos seguintes parâmetros:

    ID do grupo Chave Valor
    InputStream0 stream.name ExampleInputStream
    InputStream0 aws.region us-east-1
    OutputStream0 stream.name ExampleOutputStream
    OutputStream0 aws.region us-east-1
  6. Não modifique nenhuma das outras seções.

  7. Escolha Salvar alterações.

nota

Quando você opta por ativar o CloudWatch registro na Amazon, o Managed Service for Apache Flink cria um grupo de logs e um stream de logs para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Execute o aplicativo

O aplicativo agora está configurado e pronto para ser executado.

Executar o aplicativo
  1. No console do Amazon Managed Service para Apache Flink, escolha Meu aplicativo e escolha Executar.

  2. Na próxima página, na página de configuração de restauração do aplicativo, escolha Executar com o snapshot mais recente e, em seguida, escolha Executar.

    O status nos detalhes do aplicativo muda de Ready para Starting e depois para Running quando o aplicativo é iniciado.

Quando o aplicativo está no Running status, agora você pode abrir o painel do Flink.

Para abrir o painel do
  1. Escolha Abrir painel do Apache Flink. O painel é aberto em uma nova página.

  2. Na lista de trabalhos em execução, escolha o único trabalho que você pode ver.

    nota

    Se você definiu as propriedades do Runtime ou editou as IAM políticas incorretamente, o status do aplicativo pode se transformar emRunning, mas o painel do Flink mostra que o trabalho está sendo reiniciado continuamente. Esse é um cenário de falha comum se o aplicativo estiver configurado incorretamente ou não tiver permissões para acessar os recursos externos.

    Quando isso acontecer, verifique a guia Exceções no painel do Flink para ver a causa do problema.

Observe as métricas do aplicativo em execução

Na MyApplicationpágina, na seção de CloudWatch métricas da Amazon, você pode ver algumas das métricas fundamentais do aplicativo em execução.

Para ver as métricas
  1. Ao lado do botão Atualizar, selecione 10 segundos na lista suspensa.

  2. Quando o aplicativo está em execução e em bom estado, você pode ver a métrica de tempo de atividade aumentando continuamente.

  3. A métrica de reinicializações completas deve ser zero. Se estiver aumentando, a configuração pode ter problemas. Para investigar o problema, revise a guia Exceções no painel do Flink.

  4. A métrica Número de pontos de verificação com falha deve ser zero em um aplicativo saudável.

    nota

    Esse painel exibe um conjunto fixo de métricas com uma granularidade de 5 minutos. Você pode criar um painel de aplicativos personalizado com qualquer métrica no CloudWatch painel.

Observe os dados de saída nos streams do Kinesis

Verifique se você ainda está publicando dados na entrada, usando o script Python ou o Kinesis Data Generator.

Agora você pode observar a saída do aplicativo em execução no Managed Service for Apache Flink usando o Visualizador de dados no https://console.aws.amazon.com/kinesis/, da mesma forma que você já fez anteriormente.

Para ver a saída
  1. Abra o console do Kinesis em https://console.aws.amazon.com /kinesis.

  2. Verifique se a região é a mesma que você está usando para executar este tutorial. Por padrão, é US-East-1US East (Norte da Virgínia). Altere a região, se necessário.

  3. Escolha fluxos de dados.

  4. Selecione o stream que você deseja observar. Para este tutorial, use ExampleOutputStream.

  5. Escolha a guia Visualizador de dados.

  6. Selecione qualquer fragmento, mantenha Último como posição inicial e escolha Obter registros. Talvez você veja o erro “nenhum registro encontrado para esta solicitação”. Em caso afirmativo, escolha Tentar obter registros novamente. Os registros mais recentes publicados no stream são exibidos.

  7. Selecione o valor na coluna Dados para inspecionar o conteúdo do registro em JSON formato.

Pare o aplicativo

Para interromper o aplicativo, acesse a página do console do aplicativo Managed Service for Apache Flink chamada. MyApplication

Como interromper o aplicativo
  1. Na lista suspensa Ação, escolha Parar.

  2. O status nos detalhes do aplicativo muda de Running para Stopping e depois para Ready quando o aplicativo é completamente interrompido.

    nota

    Não se esqueça também de parar de enviar dados para o stream de entrada a partir do script Python ou do Kinesis Data Generator.

Próxima etapa

Limpe AWS os recursos