Começando (Scala) - Managed Service for Apache Flink

Anteriormente, o Amazon Managed Service for Apache Flink era conhecido como Amazon Kinesis Data Analytics for Apache Flink.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Começando (Scala)

nota

A partir da versão 1.15, o Flink é gratuito para Scala. Agora, os aplicativos podem usar a API Java de qualquer versão do Scala. O Flink ainda usa o Scala em alguns componentes principais internamente, mas não expõe o Scala no carregador de classes do código do usuário. Por isso, você deve adicionar dependências do Scala aos seus arquivos JAR.

Para obter mais informações sobre as mudanças do Scala no Flink 1.15, consulte Sem o Scala na versão 1.15.

Neste exercício, você cria um aplicativo do Managed Service for Apache Flink para o Scala com fluxos do Kinesi como fonte e coletor.

Crie recursos dependentes

Antes de criar um aplicativo Managed Service for Apache Flink para este exercício, você cria os seguintes recursos dependentes:

  • Duas transmissões do Kinesis para entrada e saída.

  • Um bucket do Amazon S3 para armazenar o código do aplicativo (ka-app-code-<username>)

Você pode criar os fluxos do Kinesis e o bucket do Amazon S3 usando o console. Para obter instruções sobre como criar esses recursos, consulte os tópicos a seguir:

  • Criando e atualizando fluxos de dados no Guia do desenvolvedor do Amazon Kinesis Data Streams. Nomeie seus fluxos de dados ExampleInputStream e ExampleOutputStream.

    Como criar os fluxos de dados (AWS CLI)

    • Para criar o primeiro stream (ExampleInputStream), use o seguinte comando AWS CLI create-stream do Amazon Kinesis.

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • Para criar o segundo fluxo que o aplicativo usa para gravar a saída, execute o mesmo comando, alterando o nome da transmissão para ExampleOutputStream.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Para obter instruções, consulte Como criar um bucket do S3? no Guia do usuário do Amazon Simple Storage Service. Dê ao bucket do Amazon S3 um nome globalmente exclusivo anexando seu nome de login, como ka-app-code-<username>.

Outros recursos

Quando você cria seu aplicativo, o Managed Service for Apache Flink cria os seguintes CloudWatch recursos da Amazon, caso eles ainda não existam:

  • Um grupo de logs chamado /AWS/KinesisAnalytics-java/MyApplication

  • Um fluxo de logs chamado kinesis-analytics-log-stream

Grave registros de amostra no fluxo de entrada

Nesta seção, você usa um script Python para gravar registros de amostra no fluxo para o aplicativo processar.

nota

Essa seção requer AWS SDK for Python (Boto).

nota

O script do Python nesta seção usa o AWS CLI. Você deve configurar seu AWS CLI para usar as credenciais da sua conta e a região padrão. Para configurar seu AWS CLI, digite o seguinte:

aws configure
  1. Crie um arquivo denominado stock.py com o conteúdo a seguir:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Execute o script stock.py:

    $ python stock.py

    Mantenha o script em execução enquanto você conclui o restante do tutorial.

Baixe e examine o código do aplicativo

O código do aplicativo Python para este exemplo está disponível em. GitHub Para fazer download do código do aplicativo, faça o seguinte:

  1. Instale o cliente do Git se você ainda não tiver feito isso. Para obter mais informações, consulte Instalando o Git.

  2. Duplique o repositório remoto com o seguinte comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Navegue até o diretório amazon-kinesis-data-analytics-java-examples/scala/GettingStarted.

Observe o seguinte sobre o código do aplicativo:

  • Um arquivo build.sbt contém informações sobre a configuração e as dependências do aplicativo, incluindo as bibliotecas do Managed Service for Apache Flink.

  • O arquivo BasicStreamingJob.scala contém o método principal que define a funcionalidade do aplicativo.

  • O aplicativo usa uma origem do Kinesis para ler o fluxo de origem. O trecho a seguir cria a origem do Kinesis:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    O aplicativo também usa um coletor do Kinesis para gravar no fluxo de resultados. O trecho a seguir cria o coletor do Kinesis:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • O aplicativo cria conectores de origem e coletor para acessar recursos externos usando um StreamExecutionEnvironment objeto.

  • O aplicativo cria conectores de origem e de coletores usando propriedades dinâmicas do aplicativo. As propriedades de runtime do aplicativo para ler e configurar os conectores. Para obter mais informações sobre as propriedades de runtime, consulte Propriedades de runtime.

Compile e faça o upload do código do aplicativo

Nesta seção, você compila e carrega o código do seu aplicativo no bucket do Amazon S3 que você criou na seção Crie recursos dependentes.

Compilar o código do aplicativo

Nesta seção, você usa a ferramenta de compilação SBT para criar o código do Scala para o aplicativo. Para instalar o SBT, consulte Instalar o sbt com a configuração cs. Você também precisa instalar o Java Development Kit (JDK). Consulte Pré-requisitos para concluir os exercícios.

  1. Para usar o seu código de aplicativo, compile-o e empacote-o em um arquivo JAR. Você pode compilar e empacotar seu código com o SBT:

    sbt assembly
  2. Se o aplicativo for compilado com êxito, o arquivo a seguir é criado:

    target/scala-3.2.0/getting-started-scala-1.0.jar
Faça o upload do código Scala do Apache Flink Streaming

Nesta seção, você cria um bucket do Amazon S3 e faz upload do código do seu aplicativo.

  1. Abra o console do Amazon S3 em https://console.aws.amazon.com/s3/.

  2. Selecione Create bucket (Criar bucket)

  3. Insira ka-app-code-<username> no campo Nome do bucket. Adicione um sufixo para o nome do bucket, como o nome do usuário, para torná-lo globalmente exclusivo. Selecione Next (Próximo).

  4. Na etapa Configurar opções, mantenha as configurações como estão e selecione Próximo.

  5. Na etapa Definir permissões, mantenha as configurações como estão e selecione Próximo.

  6. Selecione Criar bucket.

  7. Selecione o bucket ka-app-code-<username> e, em seguida, selecione Upload.

  8. Na etapa Selecionar arquivos, selecione Adicionar arquivos. Navegue até o arquivo getting-started-scala-1.0.jar que você criou na etapa anterior.

  9. Você não precisa alterar nenhuma das configurações para o objeto, em seguida, selecione Upload.

O código passa a ser armazenado em um bucket do Amazon S3 que pode ser acessado pelo aplicativo.