Primeros pasos (Scala) - Managed Service para Apache Flink

Amazon Managed Service para Apache Flink Amazon se denominaba anteriormente Amazon Kinesis Data Analytics para Apache Flink.

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Primeros pasos (Scala)

nota

A partir de la versión 1.15, Flink es gratuito para Scala. Las aplicaciones ahora pueden usar la API de Java desde cualquier versión de Scala. Flink todavía usa Scala internamente en algunos componentes clave, pero no lo expone al cargador de clases de código de usuario. Por eso, debes añadir las dependencias de Scala a tus archivos JAR.

Para obtener más información sobre los cambios de Scala en Flink 1.15, consulte Scala Free in One Fifteen.

En este ejercicio, se creará una aplicación de Managed Service para Apache Flink para Scala con un flujo de Kinesis como origen y recepción.

Cree recursos dependientes

Antes de crear una aplicación de Managed Service para Apache Flink para este ejercicio, debe crear los siguientes recursos dependientes:

  • Dos flujos de Kinesis para entrada y salida.

  • Un bucket de Amazon S3 para almacenar el código de la aplicación (ka-app-code-<username>)

Puede crear los flujos de Kinesis y el bucket de Amazon S3 usando la consola. Si desea obtener instrucciones para crear estos recursos, consulte los siguientes temas:

  • Creación y actualización de flujos de datos en la Guía para desarrolladores de Amazon Kinesis Data Streams. Asigne un nombre a sus flujos de datos ExampleInputStream y ExampleOutputStream.

    Cómo crear flujos de datos (AWS CLI)

    • Para crear la primera transmisión (ExampleInputStream), utilice el siguiente comando AWS CLI create-stream de Amazon Kinesis.

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • Para crear el segundo flujo que la aplicación utilizará para escribir la salida, ejecute el mismo comando, cambiando el nombre a ExampleOutputStream.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • ¿Cómo se puede crear un bucket de S3? en la Guía de usuario de Amazon Simple Storage Service. Asigne al bucket de Amazon S3 un nombre único globalmente añadiendo su nombre de inicio de sesión, por ejemplo, ka-app-code-<username>.

Otros recursos

Al crear la aplicación, Managed Service for Apache Flink crea los siguientes CloudWatch recursos de Amazon si aún no existen:

  • Un grupo de registro llamado /AWS/KinesisAnalytics-java/MyApplication

  • Un flujo de registro llamado kinesis-analytics-log-stream

Escriba registros de muestra en el flujo de entrada

En esta sección, se utiliza un script de Python para escribir registros de muestra en el flujo para que la aplicación los procese.

nota

Esta sección requiere AWS SDK for Python (Boto).

nota

El script de Python en esta sección usa AWS CLI. Debe configurarlas AWS CLI para usar las credenciales de su cuenta y su región predeterminada. Para configurar la suya AWS CLI, introduzca lo siguiente:

aws configure
  1. Cree un archivo denominado stock.py con el siguiente contenido:

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Ejecute el script stock.py:

    $ python stock.py

    Mantenga el script en ejecución mientras completa el resto del tutorial.

Descargue y examine el código de la aplicación

El código de la aplicación Python para este ejemplo está disponible en GitHub. Para descargar el código de la aplicación, haga lo siguiente:

  1. Si aún no lo ha hecho, instale el cliente Git. Para obtener más información, consulte Installing Git.

  2. Clone el repositorio remoto con el siguiente comando:

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Vaya al directorio amazon-kinesis-data-analytics-java-examples/scala/GettingStarted.

Tenga en cuenta lo siguiente en relación con el código de la aplicación:

  • Un archivo build.sbt contiene información sobre la configuración y las dependencias de la aplicación, incluidas las bibliotecas de Managed Service para Apache Flink.

  • El archivo BasicStreamingJob.scala contiene el método principal que define la funcionalidad de la aplicación.

  • La aplicación utiliza un origen de Kinesis para leer datos del flujo de origen. El siguiente fragmento crea el origen de Kinesis:

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    La aplicación también utiliza un receptor de Kinesis para escribir en el flujo de resultado. El siguiente fragmento crea el receptor de Kinesis:

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • La aplicación crea conectores de origen y receptor para acceder a recursos externos mediante un StreamExecutionEnvironment objeto.

  • La aplicación crea conectores de origen y recepción mediante las propiedades dinámicas de la aplicación. Las propiedades de tiempo de ejecución de la aplicación se leen para configurar los conectores. Para obtener más información sobre las propiedades de tiempo de ejecución, consulte Runtime Properties.

Compilación y carga del código de la aplicación

En esta sección, compilará y cargará su código de aplicación en el bucket de Amazon S3 que creó en la sección Cree recursos dependientes.

Compilación del código de la aplicación

En esta sección, utilizará la herramienta de creación SBT para crear el código Scala para la aplicación. Para instalar SBT, consulte Install sbt with cs setup. También deberá instalar el Kit de desarrollo de Java (JDK). Consulte Prerequisites for Completing the Exercises.

  1. Para utilizar el código de la aplicación, compile y empaquete el código en un archivo JAR. Puede compilar y empaquetar su código con SBT:

    sbt assembly
  2. Si la aplicación se compila correctamente, se crea el siguiente archivo:

    target/scala-3.2.0/getting-started-scala-1.0.jar
Carga del código de Scala de streaming de Apache Flink

En esta sección, creará un bucket de Amazon S3 y cargará el código de la aplicación.

  1. Abra la consola de Amazon S3 en https://console.aws.amazon.com/s3.

  2. Elija Crear bucket

  3. Escriba ka-app-code-<username> en el campo Nombre del bucket. Añada un sufijo al nombre del bucket, como su nombre de usuario, para que sea único a nivel global. Elija Siguiente.

  4. En el paso Configurar opciones, deje los ajustes tal y como están y elija Siguiente.

  5. En el paso Establecer permisos, deje los ajustes tal y como están y elija Siguiente.

  6. Elija Crear bucket.

  7. Abra el bucket ka-app-code-<username> y elija Cargar.

  8. En el paso Seleccionar archivos, elija Añadir archivos. Vaya al archivo getting-started-scala-1.0.jar que creó en el paso anterior.

  9. No es necesario cambiar ninguno de los ajustes del objeto, por lo tanto, elija Cargar.

El código de la aplicación ya está almacenado en un bucket de Amazon S3 al que la aplicación puede acceder.