Mise en route (Scala) - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Mise en route (Scala)

Note

À partir de la version 1.15, Flink est gratuit avec Scala. Les applications peuvent désormais utiliser l’API Java depuis n’importe quelle version de Scala. Flink utilise toujours Scala dans quelques composants clés en interne, mais n'expose pas Scala dans le chargeur de classes de code utilisateur. Pour cette raison, vous devez ajouter des dépendances Scala dans vos archives JAR.

Pour plus d’informations sur les modifications apportées à Scala dans Flink 1.15, consultez Scala Free in One Fifteen.

Dans cet exercice, vous allez créer une application de service géré pour Apache Flink dédiée à Scala avec un flux Kinesis comme source et comme récepteur.

Création de ressources dépendantes

Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes :

  • Deux flux Kinesis pour l’entrée et la sortie.

  • Un compartiment Amazon S3 pour stocker le code de l’application (ka-app-code-<username>)

Vous pouvez créer les flux Kinesis et un compartiment S3 à l’aide de la console. Pour obtenir des instructions sur la création de ces ressources, consultez les rubriques suivantes :

  • Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams. Nommez vos flux de données ExampleInputStream et ExampleOutputStream.

    Pour créer les flux de données (AWS CLI)

    • Pour créer le premier stream (ExampleInputStream), utilisez la commande Amazon Kinesis AWS CLI create-stream suivante.

      aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
    • Pour créer le second flux utilisé par l’application pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par ExampleOutputStream.

      aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  • Comment créer un compartiment S3 ? dans le Guide de l’utilisateur de la console Amazon Simple Storage Service. Donnez au compartiment Amazon S3 un nom unique au monde en ajoutant votre nom de connexion, tel que ka-app-code-<username>.

Autres ressources

Lorsque vous créez votre application, Managed Service for Apache Flink crée les CloudWatch ressources Amazon suivantes si elles n'existent pas déjà :

  • Un groupe de journaux appelé /AWS/KinesisAnalytics-java/MyApplication

  • Un flux de journaux appelé kinesis-analytics-log-stream

Écrire des exemples d'enregistrements dans le flux d'entrée

Dans cette section, vous utilisez un script Python pour écrire les exemples d’enregistrements dans le flux pour que l’application les traite.

Note

Cette section nécessite le kit AWS SDK for Python (Boto).

Note

Le script Python de cette section utilise l’interface AWS CLI. Vous devez configurer votre compte AWS CLI pour utiliser les informations d'identification de votre compte et la région par défaut. Pour configurer votre AWS CLI, entrez les informations suivantes :

aws configure
  1. Créez un fichier nommé stock.py avec le contenu suivant :

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Exécutez le script stock.py :

    $ python stock.py

    Maintenez le script en cours d’exécution pendant que vous terminez le reste du didacticiel.

Téléchargez et examinez le code de l'application

Le code de l'application Python pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :

  1. Installez le client Git si vous ne l’avez pas déjà fait. Pour plus d’informations, consultez Installation de Git.

  2. Cloner le référentiel distant à l’aide de la commande suivante :

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Accédez au répertoire amazon-kinesis-data-analytics-java-examples/scala/GettingStarted.

Notez les informations suivantes à propos du code d’application :

  • Un fichier build.sbt contient des informations sur la configuration et les dépendances de l’application, y compris les bibliothèques du service géré pour Apache Flink.

  • Le fichier BasicStreamingJob.scala contient la méthode principale qui définit la fonctionnalité de l’application.

  • L’application utilise une source Kinesis pour lire à partir du flux source. L’extrait de code suivant crée la source Kinesis :

    private def createSource: FlinkKinesisConsumer[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val inputProperties = applicationProperties.get("ConsumerConfigProperties") new FlinkKinesisConsumer[String](inputProperties.getProperty(streamNameKey, defaultInputStreamName), new SimpleStringSchema, inputProperties) }

    L’application utilise également un récepteur Kinesis pour écrire dans le flux de résultats. L’extrait de code suivant crée le récepteur Kinesis :

    private def createSink: KinesisStreamsSink[String] = { val applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties val outputProperties = applicationProperties.get("ProducerConfigProperties") KinesisStreamsSink.builder[String] .setKinesisClientProperties(outputProperties) .setSerializationSchema(new SimpleStringSchema) .setStreamName(outputProperties.getProperty(streamNameKey, defaultOutputStreamName)) .setPartitionKeyGenerator((element: String) => String.valueOf(element.hashCode)) .build }
  • L'application crée des connecteurs source et récepteur pour accéder à des ressources externes à l'aide d'un StreamExecutionEnvironment objet.

  • L’application crée les connecteurs source et récepteur à l’aide de propriétés d’application dynamiques. Les propriétés d’exécution de l’application sont lues pour configurer les connecteurs. Pour de plus amples informations sur les propriétés d’exécution, consultez Runtime Properties.

Compiler et charger le code d’application

Dans cette section, vous allez compiler et charger votre code d’application dans le compartiment Amazon S3 que vous avez créé dans la section Création de ressources dépendantes.

Compilation du code d’application

Dans cette section, vous utilisez l’outil de construction SBT pour créer le code Scala de l’application. Pour installer SBT, consultez Install sbt with cs setup. Vous devez également installer le kit de développement Java (JDK). Consultez Prerequisites for Completing the Exercises.

  1. Pour utiliser votre code d’application, vous le compilez et l’intégrez dans un fichier JAR. Vous pouvez compiler et empaqueter votre code avec SBT :

    sbt assembly
  2. Si la compilation de l’application aboutit, le fichier suivant est créé :

    target/scala-3.2.0/getting-started-scala-1.0.jar
Chargement du code Scala Apache Flink

Dans cette section, vous allez créer un compartiment Amazon S3 et charger votre code d’application.

  1. Ouvrez la console Amazon S3 sur https://console.aws.amazon.com/s3/.

  2. Choisissez Créer un compartiment.

  3. Saisissez ka-app-code-<username> dans le champ Nom du compartiment. Ajoutez un suffixe au nom du compartiment, par exemple votre nom d’utilisateur, pour qu’il soit unique. Choisissez Suivant.

  4. À l’étape Configurer les options, conservez les paramètres, puis choisissez Suivant.

  5. À l’étape Définir des autorisations, conservez les paramètres, puis choisissez Suivant.

  6. Choisissez Créer un compartiment.

  7. Ouvrez le compartiment ka-app-code-<username>, puis choisissez Charger.

  8. À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier getting-started-scala-1.0.jar que vous avez créé à l’étape précédente.

  9. Vous n’avez pas besoin de modifier les paramètres de l’objet, donc choisissez Charger.

Votre code d’application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.