Création d’une application à l’aide d’Apache Beam - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Création d’une application à l’aide d’Apache Beam

Dans cet exercice, vous allez créer une application de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam. Apache Beam est un modèle de programmation pour le traitement des données de streaming. Pour obtenir des informations sur l’utilisation d’Apache Beam avec le service géré pour Apache Flink, consultez Utilisation d’Apache Beam.

Note

Pour configurer les prérequis requis pour cet exercice, commencez par terminer l’exercice Mise en route (DataStream API).

Création de ressources dépendantes

Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, vous commencez par créer les ressources dépendantes suivantes :

  • Deux flux de données Kinesis (ExampleInputStream et ExampleOutputStream)

  • Un compartiment Amazon S3 pour stocker le code de l’application (ka-app-code-<username>)

Vous pouvez créer les flux Kinesis et un compartiment S3 à l’aide de la console. Pour obtenir des instructions sur la création de ces ressources, consultez les rubriques suivantes :

  • Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams. Nommez vos flux de données ExampleInputStream et ExampleOutputStream.

  • Comment créer un compartiment S3 ? dans le Guide de l’utilisateur de la console Amazon Simple Storage Service. Donnez au compartiment Amazon S3 un nom unique au monde en ajoutant votre nom de connexion, tel que ka-app-code-<username>.

Écrire des exemples d'enregistrements dans le flux d'entrée

Dans cette section, vous utilisez un script Python pour écrire des chaînes aléatoires dans le flux pour que l’application les traite.

Note

Cette section nécessite le kit AWS SDK for Python (Boto).

  1. Créez un fichier nommé ping.py avec le contenu suivant :

    import json import boto3 import random kinesis = boto3.client('kinesis') while True: data = random.choice(['ping', 'telnet', 'ftp', 'tracert', 'netstat']) print(data) kinesis.put_record( StreamName="ExampleInputStream", Data=data, PartitionKey="partitionkey")
  2. Exécutez le script ping.py :

    $ python ping.py

    Maintenez le script en cours d’exécution pendant que vous terminez le reste du didacticiel.

Téléchargez et examinez le code de l'application

Le code de l'application Java pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :

  1. Installez le client Git si vous ne l’avez pas déjà fait. Pour plus d’informations, consultez Installation de Git.

  2. Cloner le référentiel distant à l’aide de la commande suivante :

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-examples.git
  3. Accédez au répertoire amazon-kinesis-data-analytics-java-examples/Beam.

Le code d’application est situé dans le fichier BasicBeamStreamingJob.java. Notez les informations suivantes à propos du code d’application :

  • L'application utilise Apache Beam ParDopour traiter les enregistrements entrants en invoquant une fonction de transformation personnalisée appeléePingPongFn.

    Le code pour invoquer la fonction PingPongFn est le suivant :

    .apply("Pong transform", ParDo.of(new PingPongFn())
  • Les applications de service géré pour Apache Flink qui utilisent Apache Beam requièrent les composants suivants. Si vous n’incluez pas ces composants et versions dans votre fichier pom.xml, votre application charge des versions incorrectes à partir des dépendances de l’environnement, et comme les versions ne correspondent pas, votre application se bloque au moment de l’exécution.

    <jackson.version>2.10.2</jackson.version> ... <dependency> <groupId>com.fasterxml.jackson.module</groupId> <artifactId>jackson-module-jaxb-annotations</artifactId> <version>2.10.2</version> </dependency>
  • La fonction de transformation PingPongFn transmet les données d’entrée dans le flux de sortie, sauf si les données d’entrée sont un ping, auquel cas elle émet la chaîne pong\n vers le flux de sortie.

    Le code de la fonction de transformation est le suivant :

    private static class PingPongFn extends DoFn<KinesisRecord, byte[]> { private static final Logger LOG = LoggerFactory.getLogger(PingPongFn.class); @ProcessElement public void processElement(ProcessContext c) { String content = new String(c.element().getDataAsBytes(), StandardCharsets.UTF_8); if (content.trim().equalsIgnoreCase("ping")) { LOG.info("Ponged!"); c.output("pong\n".getBytes(StandardCharsets.UTF_8)); } else { LOG.info("No action for: " + content); c.output(c.element().getDataAsBytes()); } } }

Compilez le code de l'application

Pour compiler l’application, procédez comme suit :

  1. Installez Java et Maven si ce n’est pas déjà fait. Pour plus d’informations, consultez Prérequis dans le didacticiel Mise en route (DataStream API).

  2. Compilez l’application à l’aide de la commande suivante :

    mvn package -Dflink.version=1.15.2 -Dflink.version.minor=1.8
    Note

    Le code source fourni repose sur les bibliothèques de Java 11.

La compilation de l’application crée le fichier JAR de l’application (target/basic-beam-app-1.0.jar).

Téléchargez le code Java de streaming Apache Flink

Dans cette section, vous allez charger votre code d’application dans le compartiment Amazon S3 que vous avez créé dans la section Création de ressources dépendantes.

  1. Dans la console Amazon S3, choisissez le <username>compartiment ka-app-code-, puis Upload.

  2. À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier basic-beam-app-1.0.jar que vous avez créé à l’étape précédente.

  3. Vous n’avez pas besoin de modifier les paramètres de l’objet, donc choisissez Charger.

Votre code d’application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.

Création et exécution du service géré pour l'application Apache Flink

Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.

Pour créer l’application

  1. Ouvrez la console du service géré pour Apache Flink à l’adresse https://console.aws.amazon.com/flink

  2. Dans le tableau de bord du service géré pour Apache Flink, choisissez Créer une application d’analyse.

  3. Sur la page Service géré pour Apache Flink - Créer une application, fournissez les détails de l’application comme suit :

    • Pour Nom de l’application, saisissez MyApplication.

    • Pour Exécution, choisissez Apache Flink.

      Note

      Apache Beam n'est actuellement pas compatible avec Apache Flink version 1.19 ou ultérieure.

    • Sélectionnez Apache Flink version 1.15 dans le menu déroulant des versions.

  4. Pour Autorisations d’accès, choisissez Créer/mettre à jour un rôle IAM) kinesis-analytics-MyApplication-us-west-2.

  5. Choisissez Créer une application.

Note

Lorsque vous créez une application de service géré pour Apache Flink à l’aide de la console, vous avez la possibilité de créer un rôle et une politique IAM pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces ressources IAM sont nommées en utilisant le nom de votre application et la région, comme suit :

  • Stratégie : kinesis-analytics-service-MyApplication-us-west-2

  • Rôle : kinesis-analytics-MyApplication-us-west-2

Modifier la politique IAM

Modifiez la politique IAM pour ajouter des autorisations afin d'accéder aux flux de données Kinesis.

  1. Ouvrez la console IAM à l’adresse https://console.aws.amazon.com/iam/.

  2. Choisissez Stratégies. Choisissez la politique kinesis-analytics-service-MyApplication-us-west-2 créée pour vous par la console dans la section précédente.

  3. Sur la page Récapitulatif, choisissez Modifier la politique. Sélectionnez l’onglet JSON.

  4. Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez l’exemple d’ID de compte (012345678901) par votre ID de compte.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "logs:DescribeLogGroups", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*", "arn:aws:s3:::ka-app-code-<username>/basic-beam-app-1.0.jar" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": "logs:DescribeLogStreams", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": "logs:PutLogEvents", "Resource": "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configuration de l'application

  1. Sur la MyApplicationpage, choisissez Configurer.

  2. Sur la page Configurer l’application, indiquez l’emplacement du code:

    • Pour le compartiment Amazon S3, saisissez ka-app-code-<username>.

    • Pour le chemin de l'objet Amazon S3, saisissez basic-beam-app-1.0.jar.

  3. Sous Accéder aux ressources de l’application, pour Autorisations d’accès, choisissez Créer/mettre à jour un rôle IAM kinesis-analytics-MyApplication-us-west-2.

  4. Saisissez :

    ID du groupe Clé Valeur
    BeamApplicationProperties InputStreamName ExampleInputStream
    BeamApplicationProperties OutputStreamName ExampleOutputStream
    BeamApplicationProperties AwsRegion us-west-2
  5. Sous Surveillance, assurez-vous que Surveillance du niveau des métriques est défini sur Application.

  6. Pour la CloudWatch journalisation, cochez la case Activer.

  7. Choisissez Mettre à jour.

Note

Lorsque vous choisissez d'activer la CloudWatch journalisation, le service géré pour Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :

  • Groupe de journaux : /aws/kinesis-analytics/MyApplication

  • Flux de journaux : kinesis-analytics-log-stream

Ce flux de journaux est utilisé pour surveiller l’application. Il ne s’agit pas du même flux de journaux que celui utilisé par l’application pour envoyer les résultats.

Exécutez l'application

Le graphique des tâches Flink peut être visualisé en exécutant l’application, en ouvrant le tableau de bord Apache Flink et en choisissant la tâche Flink souhaitée.

Vous pouvez vérifier les métriques du service géré pour Apache Flink sur la CloudWatch console pour vérifier que l'application fonctionne.

Nettoyer les AWS ressources

Cette section inclut les procédures de nettoyage AWS des ressources créées dans le didacticiel Tumbling Window.

Supprimer votre application Managed Service for Apache Flink

  1. Ouvrez la console du service géré pour Apache Flink à l’adresse https://console.aws.amazon.com/flink

  2. dans le panneau Managed Service for Apache Flink, sélectionnez MyApplication.

  3. Sur la page de l’application, choisissez Supprimer, puis confirmez la suppression.

Supprimer vos flux de données Kinesis

  1. Ouvrez la console Kinesis à l’adresse https://console.aws.amazon.com/kinesis.

  2. Dans le panneau Kinesis Data Streams, ExampleInputStreamsélectionnez.

  3. Sur la ExampleInputStreampage, choisissez Supprimer Kinesis Stream, puis confirmez la suppression.

  4. Sur la page Kinesis Streams, choisissez le ExampleOutputStream, choisissez Actions, choisissez Supprimer, puis confirmez la suppression.

Supprimer votre objet et votre compartiment Amazon S3

  1. Ouvrez la console Amazon S3 sur https://console.aws.amazon.com/s3/.

  2. Choisissez le compartiment ka-app-code -. <username>

  3. Choisissez Supprimer, puis saisissez le nombre du compartiment pour confirmer la suppression.

Supprimer vos ressources IAM

  1. Ouvrez la console IAM à l’adresse https://console.aws.amazon.com/iam/.

  2. Dans la barre de navigation, choisissez Stratégies.

  3. Dans le contrôle du filtre, saisissez kinesis.

  4. Choisissez la politique kinesis-analytics-service- MyApplication -us-west-2.

  5. Choisissez Actions de stratégie, puis Supprimer.

  6. Dans la barre de navigation, choisissez Rôles.

  7. Choisissez le rôle kinesis-analytics- MyApplication -us-west-2.

  8. Choisissez Supprimer le rôle, puis confirmez la suppression.

Supprimer vos CloudWatch ressources

  1. Ouvrez la CloudWatch console à l'adresse https://console.aws.amazon.com/cloudwatch/.

  2. Dans la barre de navigation, choisissez Journaux.

  3. Choisissez le groupe de journaux MyApplication/aws/kinesis-analytics/.

  4. Choisissez Supprimer le groupe de journaux, puis confirmez la suppression.

Étapes suivantes

Maintenant que vous avez créé et exécuté une application basique de service géré pour Apache Flink qui transforme les données à l’aide d’Apache Beam, consultez l’application suivante pour un exemple de solution plus avancée de service géré pour Apache Flink.