Etapa 3: Criar e executar uma aplicação Flink do Managed Service for Apache Flink - Amazon Kinesis Data Streams

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Etapa 3: Criar e executar uma aplicação Flink do Managed Service for Apache Flink

Neste exercício, você cria uma aplicação Flink do Managed Service for Apache Flink com fluxos de dados como fonte e coletor.

Criar dois fluxos de dados do Amazon Kinesis Data Streams

Antes de criar uma aplicação Flink do Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis(ExampleInputStream e ExampleOutputStream). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.

Você pode criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para instruções do console, consulte Criar e atualizar streamings de dados.

Como criar os fluxos de dados (AWS CLI)
  1. Para criar o primeiro fluxo (ExampleInputStream), use o comando create-stream AWS CLI do Amazon Kinesis a seguir.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Gravação de registros de amostra no fluxo de entrada

Nesta seção, você usa um script Python para gravar registros de amostra no fluxo para o aplicativo processar.

nota

Essa seção requer AWS SDK for Python (Boto).

  1. Crie um arquivo denominado stock.py com o conteúdo a seguir:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. Mais adiante neste tutorial, você executa o script stock.py para enviar dados para o aplicativo.

    $ python stock.py

Baixar e examinar o código Java Apache Flink Streaming

O código do aplicativo Java para esses exemplos está disponível em GitHub. Para fazer download do código do aplicativo, faça o seguinte:

  1. Duplique o repositório remoto com o seguinte comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. Navegue até o diretório GettingStarted.

O código do aplicativo está localizado nos arquivos CloudWatchLogSink.java e CustomSinkStreamingJob.java. Observe o seguinte sobre o código do aplicativo:

  • A aplicação usa uma origem do Kinesis para ler o fluxo de origem. O trecho a seguir cria o coletor do Kinesis:

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

Compilar o código do aplicativo

Nesta seção, você usa o compilador do Apache Maven para criar o código Java para o aplicativo. Para obter informações sobre como instalar o Apache Maven e o Java Development Kit (JDK), consulte Pré-requisitos para concluir os exercícios.

Seu aplicativo Java requer os seguintes componentes:

  • Um arquivo Project Object Model (pom.xml). Este arquivo contém informações sobre a configuração e as dependências da aplicação, incluindo as bibliotecas do Managed Service for Apache Flink para aplicações Flink.

  • Um método main que contém a lógica do aplicativo.

nota

Para usar o conector do Kinesis para a aplicação a seguir, você precisa baixar o código-fonte do conector e compilá-lo como descrito na documentação do Apache Flink.

Como criar e compilar o código do aplicativo
  1. Crie um aplicativo Java/Maven em seu ambiente de desenvolvimento. Para obter informações sobre como criar um aplicativo, consulte a documentação do seu ambiente de desenvolvimento:

  2. Use o código a seguir para um arquivo chamado StreamingJob.java.

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    Observe o seguinte sobre o exemplo de código anterior:

    • Este arquivo contém o método main que define a funcionalidade do aplicativo.

    • Seu aplicativo cria conectores de origem e de destino para acessar recursos externos usando um objeto StreamExecutionEnvironment.

    • O aplicativo cria conectores de origem e de destino usando propriedades estáticas. Para usar as propriedades do aplicativo dinâmico, use os métodos createSinkFromApplicationProperties e createSourceFromApplicationProperties para criar os conectores. Esses métodos leem as propriedades do aplicativo para configurar os conectores.

  3. Para usar o seu código de aplicativo, compile-o e empacote-o em um arquivo JAR. Há duas formas de compilar e empacotar o código:

    • Use a ferramenta de linha de comando do Maven. Crie seu arquivo JAR executando o seguinte comando no diretório que contém o arquivo pom.xml:

      mvn package
    • Use o ambiente de desenvolvimento. Consulte a documentação de seu ambiente de desenvolvimento para obter mais detalhes.

    Você pode carregar o pacote como um arquivo JAR, ou pode compactar o pacote e carregá-lo como um arquivo ZIP. Se você criar seu aplicativo usando a AWS CLI, especifique o tipo de conteúdo de código (JAR ou ZIP).

  4. Se houver erros durante a compilação, verifique se sua variável de ambiente JAVA_HOME está definida corretamente.

Se o aplicativo for compilado com êxito, o arquivo a seguir é criado:

target/java-getting-started-1.0.jar

faça o upload do código Java Apache Flink Streaming

Nesta seção, você cria um bucket do Amazon Simple Storage Service (Amazon S3) e faz upload do código do aplicativo.

Para fazer upload do código do aplicativo
  1. Abra o console do Amazon S3 em https://console.aws.amazon.com/s3/.

  2. Selecione Criar bucket.

  3. Insira ka-app-code-<username> no campo Nome do bucket. Adicione um sufixo para o nome do bucket, como o nome do usuário, para torná-lo globalmente exclusivo. Selecione Next (Próximo).

  4. Na etapa Configure options (Configurar opções), mantenha as configurações como estão e selecione Next (Próximo).

  5. Na etapa Set permissions (Definir permissões), mantenha as configurações como estão e selecione Next (Próximo).

  6. Selecione Criar bucket.

  7. No console do Amazon S3, escolha o ka-app-code- bucket e escolha Upload. <username>

  8. Na etapa Select files (Selecionar arquivos), selecione Add files (Adicionar arquivos). Navegue até o arquivo java-getting-started-1.0.jar que você criou na etapa anterior. Selecione Next (Próximo).

  9. Na etapa Definir permissões, mantenha as configurações como estão. Escolha Próximo.

  10. Na etapa Definir propriedades, mantenha as configurações como estão. Escolha Carregar.

O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pela aplicação.

Criar e executar a aplicação do Managed Service for Apache Flink

Você pode criar e executar uma aplicação Flink do Managed Service for Apache Flink usando o console ou a AWS CLI.

nota

Quando você cria o aplicativo usando o console, seus recursos AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs são criados para você. Quando cria o aplicativo usando a AWS CLI, você cria esses recursos separadamente.

Criar e executar o aplicativo (console)

Siga estas etapas para criar, configurar, atualizar e executar o aplicativo usando o console.

Criar o aplicativo

  1. Abra o console do Kinesis em https://console.aws.amazon.com/kinesis.

  2. No painel do Amazon Kinesis, escolha Criar aplicativo de análise.

  3. Na página Kinesis Analytics - Create application (Kinesis Analytics – Criar aplicativo), forneça os detalhes do aplicativo da seguinte forma:

    • Em Nome do aplicativo, insira MyApplication.

    • Em Descrição, insira My java test app.

    • Em Runtime (Tempo de execução), escolha Apache Flink 1.6.

  4. Em Permissões de acesso, escolha Criar/atualizar o perfil do IAM kinesis-analytics-MyApplication-us-west-2.

  5. Selecione Create application (Criar aplicativo).

nota

Ao criar uma aplicação Flink do Managed Service for Apache Flink usando o console, você pode criar um perfil do IAM e uma política com a aplicação. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:

  • Política: kinesis-analytics-service-MyApplication-us-west-2

  • Função: kinesis-analytics-MyApplication-us-west-2

Editar a política do IAM

Edite a política do IAM para adicionar permissões de acesso aos fluxos de dados do Kinesis.

  1. Abra o console IAM em https://console.aws.amazon.com/iam/.

  2. Selecione Policies (Políticas). Selecione a política kinesis-analytics-service-MyApplication-us-west-2 que o console criou para você na seção anterior.

  3. Na página Summary (Resumo), selecione Edit policy (Editar política). Selecione a guia JSON.

  4. Adicione a seção destacada do exemplo de política a seguir à política. Substitua os exemplos de IDs de conta (012345678901) pelo ID da conta.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configurar o aplicativo

  1. Na MyApplicationpágina, escolha Configurar.

  2. Na página Configurar aplicativo, forneça o Local do código:

    • Em Bucket do Amazon S3, insira ka-app-code-<username>.

    • Em Caminho do objeto do Amazon S3, insira java-getting-started-1.0.jar.

  3. Na seção ‭‬Acesso aos recursos do aplicativo‭‬, em ‬Permissões de acesso‭‬, escolha ‭‬Criar/atualizar o perfil do IAM kinesis-analytics-MyApplication-us-west-2.

  4. Em Properties (Propriedades), Group ID (ID do grupo), insira ProducerConfigProperties.

  5. Insira as seguintes propriedades e valores de aplicativo:

    Chave Valor
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. Em Monitoramento, confirme se Nível de monitoramento de métricas está definido como Aplicativo.

  7. Para CloudWatch registrar, marque a caixa de seleção Ativar.

  8. Escolha Atualizar.

nota

Quando você opta por ativar o CloudWatch registro, o Managed Service for Apache Flink cria um grupo de registros e um fluxo de registros para você. Os nomes desses recursos são os seguintes:

  • Grupo de logs: /aws/kinesis-analytics/MyApplication

  • Fluxo de logs: kinesis-analytics-log-stream

Executar o aplicativo

  1. Na MyApplicationpágina, escolha Executar. Confirme a ação.

  2. Quando o aplicativo estiver em execução, atualize a página. O console mostra o Gráfico do aplicativo.

Interromper o aplicativo

Na MyApplicationpágina, escolha Parar. Confirme a ação.

Atualize o aplicativo

Usando o console, você pode atualizar configurações do aplicativo, como as propriedades do aplicativo, as configurações de monitoramento e a localização ou o nome do arquivo JAR do aplicativo. Você também pode recarregar o JAR do aplicativo do bucket do Amazon S3 se precisar atualizar o código do aplicativo.

Na MyApplicationpágina, escolha Configurar. Atualize as configurações do aplicativo e selecione Update (Atualizar).

Criar e executar o aplicativo (AWS CLI)

Nesta seção, você usa a AWS CLI para criar e executar a aplicação Flink do Managed Service for Apache Flink. O Managed Service for Apache Flink para aplicações Flink usa o comando kinesisanalyticsv2 da AWS CLI para criar e interagir com aplicações do Managed Service for Apache Flink.

Criar uma política de permissões

Primeiro, crie uma política de permissões com duas instruções: uma que concede permissões para a ação read no fluxo de origem, e outra que concede permissões para ações write no fluxo de destino. Em seguida, você anexa a política a um perfil do IAM (que criará na próxima seção). Assim, ao assumir o perfil, o serviço Managed Service for Apache Flink terá as permissões necessárias para ler o fluxo de origem e gravar no fluxo de coleta.

Use o código a seguir para criar a política de permissões KAReadSourceStreamWriteSinkStream. Substitua username pelo nome de usuário que você usou para criar o bucket do Amazon S3 e armazenar o código do aplicativo. Substitua o ID da conta nos Nomes de recurso da Amazon (ARNs) (012345678901) pelo ID da conta.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Para step-by-step obter instruções sobre como criar uma política de permissões, consulte Tutorial: Criar e anexar sua primeira política gerenciada pelo cliente no Guia do usuário do IAM.

nota

Para acessar outros serviços da AWS, você pode usar o AWS SDK for Java. O Managed Service for Apache Flink define automaticamente as credenciais exigidas pelo SDK como as credenciais do perfil do IAM associado a sua aplicação. Não é necessária nenhuma etapa adicional.

Criar uma perfil do IAM

Nesta seção, você cria um perfil do IAM que a aplicação Flink do Managed Service for Apache Flink pode assumir para ler um fluxo de origem e gravar no fluxo de coleta.

O Managed Service for Apache Flink não pode acessar seu fluxo sem permissões. Essas permissões são concedidas usando um perfil do IAM. Cada perfil do IAM tem duas políticas anexadas. A política de confiança concede ao Managed Service for Apache Flink permissão para assumir o perfil, e a política de permissões determina o que o serviço pode fazer depois de assumir a função.

Você anexa a política de permissões que criou na seção anterior a essa função.

Para criar uma perfil do IAM
  1. Abra o console do IAM em https://console.aws.amazon.com/iam/.

  2. No painel de navegação, selecione Roles (Funções) e Create Role (Criar função).

  3. Em Selecionar tipo de identidade de confiança, selecione Serviço da AWS. Em Choose the service that will use this role (Selecionar o serviço que usará esta função), selecione Kinesis. Em Select your use case (Selecionar seu caso de uso), selecione Kinesis Analytics.

    Selecione Next: Permissions (Próximo: permissões).

  4. Na página Attach permissions policies, selecione Next: Review. Você pode anexar políticas de permissões depois de criar a função.

  5. Na página Create role (Criar função), insira KA-stream-rw-role para o Role name (Nome da função). Selecione Criar função.

    Você criou um perfil do IAM chamado KA-stream-rw-role. Em seguida, você atualiza as políticas de confiança e de permissões para a função.

  6. Anexe a política de permissões à função.

    nota

    Para este exercício, o Managed Service for Apache Flink assume esse perfil para ler dados de um fluxo de dados do Kinesis (origem) e gravar a saída em outro fluxo de dados do Kinesis. Depois, você anexa a política que criou na etapa anterior, Criar uma política de permissões.

    1. Na página Summary (Resumo), selecione a guia Permissions (Permissões).

    2. Selecione Attach Policies.

    3. Na caixa de pesquisa, insira KAReadSourceStreamWriteSinkStream (a política que você criou na seção anterior).

    4. Escolha a ReadInputStreamWriteOutputStream política KA e escolha Anexar política.

Agora você criou a função de execução de serviço que seu aplicativo usa para acessar os recursos. Anote o ARN da nova função.

Para step-by-step obter instruções sobre como criar uma função, consulte Como criar uma função do IAM (console) no Guia do usuário do IAM.

Criar o aplicativo do Managed Service for Apache Flink

  1. Salve o seguinte código JSON em um arquivo chamado create_request.json. Substitua o ARN da função de amostra pelo ARN da função que você criou anteriormente. Substitua o sufixo do ARN do bucket (username) pelo sufixo que você selecionou na seção anterior. Substitua o ID da conta de exemplo (012345678901) na função de execução do serviço pelo ID da conta.

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. Execute a ação CreateApplication com a solicitação anterior para criar o aplicativo:

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

O aplicativo agora é criado. Você inicia o aplicativo na próxima etapa.

Iniciar o aplicativo

Nesta seção, você usa a ação StartApplication para iniciar o aplicativo.

Para iniciar o aplicativo
  1. Salve o seguinte código JSON em um arquivo chamado start_request.json.

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. Execute a ação StartApplication com a solicitação anterior para iniciar o aplicativo:

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

O aplicativo agora está em execução. Você pode verificar as métricas do Managed Service for Apache Flink no CloudWatch console da Amazon para verificar se o aplicativo está funcionando.

Interromper o aplicativo

Nesta seção, você usa a ação StopApplication para interromper o aplicativo.

Como interromper o aplicativo
  1. Salve o seguinte código JSON em um arquivo chamado stop_request.json.

    {"ApplicationName": "test" }
  2. Execute a ação StopApplication com a seguinte solicitação para interromper o aplicativo:

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

O aplicativo agora está interrompido.