As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Etapa 3: Criar e executar uma aplicação Flink do Managed Service for Apache Flink
Neste exercício, você cria uma aplicação Flink do Managed Service for Apache Flink com fluxos de dados como fonte e coletor.
Esta seção contém as seguintes etapas:
- Criar dois fluxos de dados do Amazon Kinesis Data Streams
- Gravação de registros de amostra no fluxo de entrada
- Baixar e examinar o código Java Apache Flink Streaming
- Compilar o código do aplicativo
- faça o upload do código Java Apache Flink Streaming
- Criar e executar a aplicação do Managed Service for Apache Flink
Criar dois fluxos de dados do Amazon Kinesis Data Streams
Antes de criar uma aplicação Flink do Managed Service for Apache Flink para este exercício, crie dois fluxos de dados do Kinesis(ExampleInputStream
e ExampleOutputStream
). O aplicativo usa esses fluxos para os fluxos de origem e de destino do aplicativo.
Você pode criar esses fluxos usando o console do Amazon Kinesis ou o comando da AWS CLI a seguir. Para instruções do console, consulte Criar e atualizar streamings de dados.
Como criar os fluxos de dados (AWS CLI)
-
Para criar o primeiro fluxo (
ExampleInputStream
), use o comandocreate-stream
AWS CLI do Amazon Kinesis a seguir.$ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
-
Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para
ExampleOutputStream
.$ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
Gravação de registros de amostra no fluxo de entrada
Nesta seção, você usa um script Python para gravar registros de amostra no fluxo para o aplicativo processar.
nota
Essa seção requer AWS SDK for Python (Boto)
-
Crie um arquivo denominado
stock.py
com o conteúdo a seguir:import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
-
Mais adiante neste tutorial, você executa o script
stock.py
para enviar dados para o aplicativo.$ python stock.py
Baixar e examinar o código Java Apache Flink Streaming
O código do aplicativo Java para esses exemplos está disponível em GitHub. Para fazer download do código do aplicativo, faça o seguinte:
-
Duplique o repositório remoto com o seguinte comando:
git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
-
Navegue até o diretório
GettingStarted
.
O código do aplicativo está localizado nos arquivos CloudWatchLogSink.java
e CustomSinkStreamingJob.java
. Observe o seguinte sobre o código do aplicativo:
-
A aplicação usa uma origem do Kinesis para ler o fluxo de origem. O trecho a seguir cria o coletor do Kinesis:
return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
Compilar o código do aplicativo
Nesta seção, você usa o compilador do Apache Maven para criar o código Java para o aplicativo. Para obter informações sobre como instalar o Apache Maven e o Java Development Kit (JDK), consulte Pré-requisitos para concluir os exercícios.
Seu aplicativo Java requer os seguintes componentes:
-
Um arquivo Project Object Model (pom.xml)
. Este arquivo contém informações sobre a configuração e as dependências da aplicação, incluindo as bibliotecas do Managed Service for Apache Flink para aplicações Flink. -
Um método
main
que contém a lógica do aplicativo.
nota
Para usar o conector do Kinesis para a aplicação a seguir, você precisa baixar o código-fonte do conector e compilá-lo como descrito na documentação do Apache Flink
Como criar e compilar o código do aplicativo
-
Crie um aplicativo Java/Maven em seu ambiente de desenvolvimento. Para obter informações sobre como criar um aplicativo, consulte a documentação do seu ambiente de desenvolvimento:
-
Use o código a seguir para um arquivo chamado
StreamingJob.java
.package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }
Observe o seguinte sobre o exemplo de código anterior:
-
Este arquivo contém o método
main
que define a funcionalidade do aplicativo. -
Seu aplicativo cria conectores de origem e de destino para acessar recursos externos usando um objeto
StreamExecutionEnvironment
. -
O aplicativo cria conectores de origem e de destino usando propriedades estáticas. Para usar as propriedades do aplicativo dinâmico, use os métodos
createSinkFromApplicationProperties
ecreateSourceFromApplicationProperties
para criar os conectores. Esses métodos leem as propriedades do aplicativo para configurar os conectores.
-
-
Para usar o seu código de aplicativo, compile-o e empacote-o em um arquivo JAR. Há duas formas de compilar e empacotar o código:
-
Use a ferramenta de linha de comando do Maven. Crie seu arquivo JAR executando o seguinte comando no diretório que contém o arquivo
pom.xml
:mvn package
-
Use o ambiente de desenvolvimento. Consulte a documentação de seu ambiente de desenvolvimento para obter mais detalhes.
Você pode carregar o pacote como um arquivo JAR, ou pode compactar o pacote e carregá-lo como um arquivo ZIP. Se você criar seu aplicativo usando a AWS CLI, especifique o tipo de conteúdo de código (JAR ou ZIP).
-
-
Se houver erros durante a compilação, verifique se sua variável de ambiente
JAVA_HOME
está definida corretamente.
Se o aplicativo for compilado com êxito, o arquivo a seguir é criado:
target/java-getting-started-1.0.jar
faça o upload do código Java Apache Flink Streaming
Nesta seção, você cria um bucket do Amazon Simple Storage Service (Amazon S3) e faz upload do código do aplicativo.
Para fazer upload do código do aplicativo
Abra o console do Amazon S3 em https://console.aws.amazon.com/s3/
. -
Selecione Criar bucket.
-
Insira
ka-app-code-
no campo Nome do bucket. Adicione um sufixo para o nome do bucket, como o nome do usuário, para torná-lo globalmente exclusivo. Selecione Next (Próximo).<username>
-
Na etapa Configure options (Configurar opções), mantenha as configurações como estão e selecione Next (Próximo).
-
Na etapa Set permissions (Definir permissões), mantenha as configurações como estão e selecione Next (Próximo).
-
Selecione Criar bucket.
-
No console do Amazon S3, escolha o ka-app-code-
bucket e escolha Upload. <username>
-
Na etapa Select files (Selecionar arquivos), selecione Add files (Adicionar arquivos). Navegue até o arquivo
java-getting-started-1.0.jar
que você criou na etapa anterior. Selecione Next (Próximo). -
Na etapa Definir permissões, mantenha as configurações como estão. Escolha Próximo.
-
Na etapa Definir propriedades, mantenha as configurações como estão. Escolha Carregar.
O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pela aplicação.
Criar e executar a aplicação do Managed Service for Apache Flink
Você pode criar e executar uma aplicação Flink do Managed Service for Apache Flink usando o console ou a AWS CLI.
nota
Quando você cria o aplicativo usando o console, seus recursos AWS Identity and Access Management (IAM) e do Amazon CloudWatch Logs são criados para você. Quando cria o aplicativo usando a AWS CLI, você cria esses recursos separadamente.
Criar e executar o aplicativo (console)
Siga estas etapas para criar, configurar, atualizar e executar o aplicativo usando o console.
Criar o aplicativo
Abra o console do Kinesis em https://console.aws.amazon.com/kinesis
. -
No painel do Amazon Kinesis, escolha Criar aplicativo de análise.
-
Na página Kinesis Analytics - Create application (Kinesis Analytics – Criar aplicativo), forneça os detalhes do aplicativo da seguinte forma:
-
Em Nome do aplicativo, insira
MyApplication
. -
Em Descrição, insira
My java test app
. -
Em Runtime (Tempo de execução), escolha Apache Flink 1.6.
-
-
Em Permissões de acesso, escolha Criar/atualizar o perfil do IAM
kinesis-analytics-MyApplication-us-west-2
. -
Selecione Create application (Criar aplicativo).
nota
Ao criar uma aplicação Flink do Managed Service for Apache Flink usando o console, você pode criar um perfil do IAM e uma política com a aplicação. O aplicativo usa essa função e política para acessar os recursos dependentes. Esses recursos do IAM são nomeados usando o nome do aplicativo e a região da seguinte forma:
-
Política:
kinesis-analytics-service-
MyApplication
-us-west-2
-
Função:
kinesis-analytics-
MyApplication
-us-west-2
Editar a política do IAM
Edite a política do IAM para adicionar permissões de acesso aos fluxos de dados do Kinesis.
Abra o console IAM em https://console.aws.amazon.com/iam/
. -
Selecione Policies (Políticas). Selecione a política
kinesis-analytics-service-MyApplication-us-west-2
que o console criou para você na seção anterior. -
Na página Summary (Resumo), selecione Edit policy (Editar política). Selecione a guia JSON.
-
Adicione a seção destacada do exemplo de política a seguir à política. Substitua os exemplos de IDs de conta (
012345678901
) pelo ID da conta.{ "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-
username
/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" }
] }
Configurar o aplicativo
-
Na MyApplicationpágina, escolha Configurar.
-
Na página Configurar aplicativo, forneça o Local do código:
-
Em Bucket do Amazon S3, insira
ka-app-code-
.<username>
-
Em Caminho do objeto do Amazon S3, insira
java-getting-started-1.0.jar
.
-
-
Na seção Acesso aos recursos do aplicativo, em Permissões de acesso, escolha Criar/atualizar o perfil do IAM
kinesis-analytics-MyApplication-us-west-2
. -
Em Properties (Propriedades), Group ID (ID do grupo), insira
ProducerConfigProperties
. -
Insira as seguintes propriedades e valores de aplicativo:
Chave Valor flink.inputstream.initpos
LATEST
aws:region
us-west-2
AggregationEnabled
false
-
Em Monitoramento, confirme se Nível de monitoramento de métricas está definido como Aplicativo.
-
Para CloudWatch registrar, marque a caixa de seleção Ativar.
-
Escolha Atualizar.
nota
Quando você opta por ativar o CloudWatch registro, o Managed Service for Apache Flink cria um grupo de registros e um fluxo de registros para você. Os nomes desses recursos são os seguintes:
-
Grupo de logs:
/aws/kinesis-analytics/MyApplication
-
Fluxo de logs:
kinesis-analytics-log-stream
Executar o aplicativo
-
Na MyApplicationpágina, escolha Executar. Confirme a ação.
-
Quando o aplicativo estiver em execução, atualize a página. O console mostra o Gráfico do aplicativo.
Interromper o aplicativo
Na MyApplicationpágina, escolha Parar. Confirme a ação.
Atualize o aplicativo
Usando o console, você pode atualizar configurações do aplicativo, como as propriedades do aplicativo, as configurações de monitoramento e a localização ou o nome do arquivo JAR do aplicativo. Você também pode recarregar o JAR do aplicativo do bucket do Amazon S3 se precisar atualizar o código do aplicativo.
Na MyApplicationpágina, escolha Configurar. Atualize as configurações do aplicativo e selecione Update (Atualizar).
Criar e executar o aplicativo (AWS CLI)
Nesta seção, você usa a AWS CLI para criar e executar a aplicação Flink do Managed Service for Apache Flink. O Managed Service for Apache Flink para aplicações Flink usa o comando kinesisanalyticsv2
da AWS CLI para criar e interagir com aplicações do Managed Service for Apache Flink.
Criar uma política de permissões
Primeiro, crie uma política de permissões com duas instruções: uma que concede permissões para a ação read
no fluxo de origem, e outra que concede permissões para ações write
no fluxo de destino. Em seguida, você anexa a política a um perfil do IAM (que criará na próxima seção). Assim, ao assumir o perfil, o serviço Managed Service for Apache Flink terá as permissões necessárias para ler o fluxo de origem e gravar no fluxo de coleta.
Use o código a seguir para criar a política de permissões KAReadSourceStreamWriteSinkStream
. Substitua
pelo nome de usuário que você usou para criar o bucket do Amazon S3 e armazenar o código do aplicativo. Substitua o ID da conta nos Nomes de recurso da Amazon (ARNs) (username
) pelo ID da conta.012345678901
{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-
username
", "arn:aws:s3:::ka-app-code-username
/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901
:stream/ExampleOutputStream" } ] }
Para step-by-step obter instruções sobre como criar uma política de permissões, consulte Tutorial: Criar e anexar sua primeira política gerenciada pelo cliente no Guia do usuário do IAM.
nota
Para acessar outros serviços da AWS, você pode usar o AWS SDK for Java. O Managed Service for Apache Flink define automaticamente as credenciais exigidas pelo SDK como as credenciais do perfil do IAM associado a sua aplicação. Não é necessária nenhuma etapa adicional.
Criar uma perfil do IAM
Nesta seção, você cria um perfil do IAM que a aplicação Flink do Managed Service for Apache Flink pode assumir para ler um fluxo de origem e gravar no fluxo de coleta.
O Managed Service for Apache Flink não pode acessar seu fluxo sem permissões. Essas permissões são concedidas usando um perfil do IAM. Cada perfil do IAM tem duas políticas anexadas. A política de confiança concede ao Managed Service for Apache Flink permissão para assumir o perfil, e a política de permissões determina o que o serviço pode fazer depois de assumir a função.
Você anexa a política de permissões que criou na seção anterior a essa função.
Para criar uma perfil do IAM
Abra o console do IAM em https://console.aws.amazon.com/iam/
. -
No painel de navegação, selecione Roles (Funções) e Create Role (Criar função).
-
Em Selecionar tipo de identidade de confiança, selecione Serviço da AWS. Em Choose the service that will use this role (Selecionar o serviço que usará esta função), selecione Kinesis. Em Select your use case (Selecionar seu caso de uso), selecione Kinesis Analytics.
Selecione Next: Permissions (Próximo: permissões).
-
Na página Attach permissions policies, selecione Next: Review. Você pode anexar políticas de permissões depois de criar a função.
-
Na página Create role (Criar função), insira
KA-stream-rw-role
para o Role name (Nome da função). Selecione Criar função.Você criou um perfil do IAM chamado
KA-stream-rw-role
. Em seguida, você atualiza as políticas de confiança e de permissões para a função. -
Anexe a política de permissões à função.
nota
Para este exercício, o Managed Service for Apache Flink assume esse perfil para ler dados de um fluxo de dados do Kinesis (origem) e gravar a saída em outro fluxo de dados do Kinesis. Depois, você anexa a política que criou na etapa anterior, Criar uma política de permissões.
-
Na página Summary (Resumo), selecione a guia Permissions (Permissões).
-
Selecione Attach Policies.
-
Na caixa de pesquisa, insira
KAReadSourceStreamWriteSinkStream
(a política que você criou na seção anterior). -
Escolha a ReadInputStreamWriteOutputStream política KA e escolha Anexar política.
-
Agora você criou a função de execução de serviço que seu aplicativo usa para acessar os recursos. Anote o ARN da nova função.
Para step-by-step obter instruções sobre como criar uma função, consulte Como criar uma função do IAM (console) no Guia do usuário do IAM.
Criar o aplicativo do Managed Service for Apache Flink
-
Salve o seguinte código JSON em um arquivo chamado
create_request.json
. Substitua o ARN da função de amostra pelo ARN da função que você criou anteriormente. Substitua o sufixo do ARN do bucket (
) pelo sufixo que você selecionou na seção anterior. Substitua o ID da conta de exemplo (username
) na função de execução do serviço pelo ID da conta.012345678901
{ "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::
012345678901
:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username
", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } } -
Execute a ação
CreateApplication
com a solicitação anterior para criar o aplicativo:aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json
O aplicativo agora é criado. Você inicia o aplicativo na próxima etapa.
Iniciar o aplicativo
Nesta seção, você usa a ação StartApplication
para iniciar o aplicativo.
Para iniciar o aplicativo
-
Salve o seguinte código JSON em um arquivo chamado
start_request.json
.{ "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
-
Execute a ação
StartApplication
com a solicitação anterior para iniciar o aplicativo:aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json
O aplicativo agora está em execução. Você pode verificar as métricas do Managed Service for Apache Flink no CloudWatch console da Amazon para verificar se o aplicativo está funcionando.
Interromper o aplicativo
Nesta seção, você usa a ação StopApplication
para interromper o aplicativo.
Como interromper o aplicativo
-
Salve o seguinte código JSON em um arquivo chamado
stop_request.json
.{"ApplicationName": "test" }
-
Execute a ação
StopApplication
com a seguinte solicitação para interromper o aplicativo:aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json
O aplicativo agora está interrompido.