Étape 3 : créer et exécuter un service géré pour l'application Apache Flink - Service géré pour Apache Flink

Le service géré Amazon pour Apache Flink était auparavant connu sous le nom d’Amazon Kinesis Data Analytics pour Apache Flink.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étape 3 : créer et exécuter un service géré pour l'application Apache Flink

Dans cet exercice, vous allez créer une application de service géré pour Apache Flink avec des flux de données comme source et comme récepteur.

Création de deux flux de données Amazon Kinesis

Avant de créer une application de service géré pour Apache Flink dans le cadre de cet exercice, commencez par créer deux flux de données Kinesis (ExampleInputStream et ExampleOutputStream). Votre application utilise ces flux pour les flux source et de destination de l’application.

Vous pouvez créer ces flux à l’aide de la console Amazon Kinesis ou de la commande AWS CLI suivante. Pour obtenir des instructions sur la console, consultez Création et mise à jour de flux de données dans le Guide du développeur Amazon Kinesis Data Streams.

Pour créer les flux de données (AWS CLI)
  1. Pour créer le premier flux (ExampleInputStream), utilisez la commande Amazon Kinesis create-stream AWS CLI suivante.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Pour créer le second flux utilisé par l’application pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Écrire des exemples d'enregistrements dans le flux d'entrée

Dans cette section, vous utilisez un script Python pour écrire les exemples d’enregistrements dans le flux pour que l’application les traite.

Note

Cette section nécessite le kit AWS SDK for Python (Boto).

  1. Créez un fichier nommé stock.py avec le contenu suivant :

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { 'event_time': datetime.datetime.now().isoformat(), 'ticker': random.choice(['AAPL', 'AMZN', 'MSFT', 'INTC', 'TBV']), 'price': round(random.random() * 100, 2)} def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey") if __name__ == '__main__': generate(STREAM_NAME, boto3.client('kinesis', region_name='us-west-2'))
  2. Plus loin dans ce didacticiel, vous exécutez le script stock.py pour envoyer des données à l’application.

    $ python stock.py

Téléchargez et examinez le code Java de streaming d'Apache Flink

Le code de l'application Java pour cet exemple est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :

  1. Cloner le référentiel distant à l’aide de la commande suivante :

    git clone https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples.git
  2. Accédez au répertoire amazon-managed-service-for-apache-flink-examples/tree/main/java/GettingStarted.

Notez les informations suivantes à propos du code d’application :

  • Un fichier de modèle d’objet du projet (pom.xml) contient des informations sur la configuration et les dépendances de l’application, y compris les bibliothèques du service géré pour Apache Flink.

  • Le fichier BasicStreamingJob.java contient la méthode main qui définit la fonctionnalité de l’application.

  • L’application utilise une source Kinesis pour lire à partir du flux source. L’extrait de code suivant crée la source Kinesis :

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
  • Votre application crée les connecteurs source et récepteur pour accéder aux ressources externes à l’aide d’un objet StreamExecutionEnvironment.

  • L’application crée les connecteurs source et récepteur à l’aide de propriétés statiques. Pour utiliser les propriétés de l’application dynamique, utilisez les méthodes createSourceFromApplicationProperties et createSinkFromApplicationProperties pour créer les connecteurs. Ces méthodes lisent les propriétés de l’application pour configurer les connecteurs.

    Pour de plus amples informations sur les propriétés d’exécution, consultez Propriétés d'exécution.

Compilez le code de l'application

Dans cette section, vous allez utiliser le compilateur Apache Maven pour créer le code Java pour l’application. Pour obtenir des informations sur l’installation d’Apache Maven et sur le kit de développement Java (JDK), consultez Remplir les conditions préalables pour terminer les exercices.

Pour compiler le code d’application
  1. Pour utiliser votre code d’application, vous le compilez et l’intégrez dans un fichier JAR. Vous pouvez compiler et intégrer votre code de deux manières :

    • À l’aide de l’outil de ligne de commande Maven. Créez votre fichier JAR en exécutant la commande suivante dans le répertoire qui contient le fichier pom.xml :

      mvn package -Dflink.version=1.19.1
    • À l’aide de votre environnement de développement. Consultez la documentation de votre environnement de développement pour plus de détails.

      Note

      Le code source fourni repose sur les bibliothèques de Java 11.

    Vous pouvez charger votre package en tant que fichier JAR, ou compresser le package et le charger en tant que fichier ZIP. Si vous créez votre application à l'aide du AWS CLI, vous spécifiez le type de contenu de votre code (JAR ou ZIP).

  2. En cas d’erreur lors de la compilation, vérifiez que votre variable d’environnement JAVA_HOME est correctement définie.

Si la compilation de l’application aboutit, le fichier suivant est créé :

target/amazon-msf-java-stream-app-1.0.jar

Téléchargez le code Java de streaming Apache Flink

Dans cette section, vous allez créer un compartiment Amazon Simple Storage Service (Amazon S3) et charger votre code d'application.

Pour charger le code d’application
  1. Ouvrez la console Amazon S3 sur https://console.aws.amazon.com/s3/.

  2. Choisissez Créer un compartiment.

  3. Saisissez ka-app-code-<username> dans le champ Nom du compartiment. Ajoutez un suffixe au nom du compartiment, par exemple votre nom d’utilisateur, pour qu’il soit unique. Choisissez Suivant.

  4. À l’étape Configurer les options, conservez les paramètres, puis choisissez Suivant.

  5. À l’étape Définir des autorisations, conservez les paramètres, puis choisissez Suivant.

  6. Choisissez Créer un compartiment.

  7. Dans la console Amazon S3, choisissez le <username>compartiment ka-app-code-, puis Upload.

  8. À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier amazon-msf-java-stream-app-1.0.jar que vous avez créé à l’étape précédente. Choisissez Suivant.

  9. Vous n’avez pas besoin de modifier les paramètres de l’objet, donc choisissez Charger.

Votre code d’application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.

Création et exécution du service géré pour l'application Apache Flink

Vous pouvez créer et exécuter une application de service géré pour Apache Flink à l’aide de la console ou de l’interface AWS CLI.

Note

Lorsque vous créez l'application à l'aide de la console, vos ressources AWS Identity and Access Management (IAM) et Amazon CloudWatch Logs sont créées pour vous. Lorsque vous créez l'application à l'aide du AWS CLI, vous créez ces ressources séparément.

Création et exécution de l'application (console)

Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.

Pour créer l’application

  1. Ouvrez la console du service géré pour Apache Flink à l’adresse https://console.aws.amazon.com/flink

  2. Dans le tableau de bord du service géré pour Apache Flink, choisissez Créer une application d’analyse.

  3. Sur la page Service géré pour Apache Flink - Créer une application, fournissez les détails de l’application comme suit :

    • Pour Nom de l’application, saisissez MyApplication.

    • Pour Description, saisissez My java test app.

    • Pour Exécution, choisissez Apache Flink.

    • Laissez le menu déroulant de la version sous la forme Apache Flink version 1.19.

  4. Pour Autorisations d’accès, choisissez Créer/mettre à jour un rôle IAM) kinesis-analytics-MyApplication-us-west-2.

  5. Choisissez Créer une application.

Note

Lorsque vous créez une application de service géré pour Apache Flink à l’aide de la console, vous avez la possibilité de créer un rôle et une politique IAM pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces ressources IAM sont nommées en utilisant le nom de votre application et la région, comme suit :

  • Stratégie : kinesis-analytics-service-MyApplication-us-west-2

  • Rôle : kinesisanalytics-MyApplication-us-west-2

Modifier la politique IAM

Modifiez la politique IAM pour ajouter des autorisations afin d'accéder aux flux de données Kinesis.

  1. Ouvrez la console IAM à l’adresse https://console.aws.amazon.com/iam/.

  2. Choisissez Stratégies. Choisissez la politique kinesis-analytics-service-MyApplication-us-west-2 créée pour vous par la console dans la section précédente.

  3. Sur la page Récapitulatif, choisissez Modifier la politique. Sélectionnez l’onglet JSON.

  4. Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez l’exemple d’ID de compte (012345678901) par votre ID de compte.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/amazon-msf-java-stream-app-1.0.jar" ] }, { "Sid": "DescribeLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "DescribeLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutLogEvents", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configuration de l'application

  1. Sur la MyApplicationpage, choisissez Configurer.

  2. Sur la page Configurer l’application, indiquez l’emplacement du code:

    • Pour le compartiment Amazon S3, saisissez ka-app-code-<username>.

    • Pour le chemin de l'objet Amazon S3, saisissez amazon-msf-java-stream-app-1.0.jar.

  3. Sous Accéder aux ressources de l’application, pour Autorisations d’accès, choisissez Créer/mettre à jour un rôle IAM kinesis-analytics-MyApplication-us-west-2.

  4. Sous Propriétés, pour ID de groupe, saisissez ProducerConfigProperties.

  5. Entrez les valeurs et propriétés d’application suivantes :

    ID du groupe Clé Valeur
    FlinkApplicationProperties flink.inputstream.initpos LATEST
    FlinkApplicationProperties aws.region us-west-2
    FlinkApplicationProperties AggregationEnabled false
  6. Sous Surveillance, assurez-vous que Surveillance du niveau des métriques est défini sur Application.

  7. Pour la CloudWatch journalisation, cochez la case Activer.

  8. Choisissez Mettre à jour.

Note

Lorsque vous choisissez d'activer la CloudWatch journalisation Amazon, Managed Service for Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :

  • Groupe de journaux : /aws/kinesis-analytics/MyApplication

  • Flux de journaux : kinesis-analytics-log-stream

Exécution de l’application

Le graphique des tâches Flink peut être visualisé en exécutant l’application, en ouvrant le tableau de bord Apache Flink et en choisissant la tâche Flink souhaitée.

Arrêt de l’application

Sur la MyApplicationpage, choisissez Stop. Confirmez l’action.

Mise à jour de l’application

À l’aide de la console, vous pouvez mettre à jour les paramètres d’application tels que les paramètres de surveillance, les propriétés d’application et l’emplacement ou le nom du fichier JAR de l’application. Vous pouvez également recharger le fichier JAR de l'application à partir du compartiment Amazon S3 si vous avez besoin de mettre à jour le code de l'application.

Sur la MyApplicationpage, choisissez Configurer. Mettez à jour les paramètres de l’application, puis choisissez Mettre à jour.

Créez et exécutez l'application (AWS CLI)

Dans cette section, vous allez utiliser le AWS CLI pour créer et exécuter l'application Managed Service for Apache Flink. Le service géré pour Apache Flink utilise la kinesisanalyticsv2 AWS CLI commande pour créer et interagir avec le service géré pour les applications Apache Flink.

Créer une stratégie d’autorisations

Note

Vous devez créer une stratégie d’autorisations et un rôle pour votre application. Si vous ne créez pas ces ressources IAM, votre application ne peut pas accéder à ses flux de données et de journaux.

Vous commencez par créer une stratégie d’autorisations avec deux instructions : une qui accorde des autorisations pour l’action read sur le flux source et une autre qui accorde des autorisations pour les actions write sur le flux récepteur. Vous attachez ensuite la politique à un rôle IAM (que vous allez créer dans la section suivante). Ainsi, lorsque le service géré pour Apache Flink assume le rôle, le service dispose des autorisations nécessaires pour lire à partir du flux source et écrire dans le flux récepteur.

Utilisez le code suivant pour créer la politique d’autorisations AKReadSourceStreamWriteSinkStream. Remplacez username par le nom d'utilisateur que vous avez utilisé pour créer le compartiment Amazon S3 pour stocker le code d'application. Remplacez l’ID de compte dans les Amazon Resource Names (ARN) (012345678901) par votre ID de compte.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Pour step-by-step obtenir des instructions sur la création d'une politique d'autorisations, voir Tutoriel : créer et joindre votre première politique gérée par le client dans le guide de l'utilisateur IAM.

Note

Pour accéder à d’autres services Amazon, vous pouvez utiliser le AWS SDK for Java. Le service géré pour Apache Flink définit automatiquement les informations d’identification requises par le kit SDK en fonction du rôle IAM d’exécution du service associé à votre application. Aucune étape supplémentaire n’est nécessaire.

Créer un rôle IAM

Dans cette section, vous créez un rôle IAM que l’application de service géré pour Apache Flink peut assumer pour lire un flux source et écrire dans le flux récepteur.

Le service géré pour Apache Flink ne peut pas accéder à votre flux sans autorisation. Vous utilisez un rôle IAM pour accorder ces autorisations. Deux politiques sont attachées à chaque rôle IAM. La politique d’approbation accorde au service géré pour Apache Flink l’autorisation d’assumer le rôle, et la politique d’autorisation détermine ce que le service géré pour Apache Flink peut faire après avoir assumé le rôle.

Vous attachez la politique d’autorisations que vous avez créée dans la section précédente à ce rôle.

Pour créer un rôle IAM
  1. Ouvrez la console IAM à l’adresse https://console.aws.amazon.com/iam/.

  2. Dans le volet de navigation, choisissez Rôles, puis Créer un rôle.

  3. Sous Sélectionner le type d'identité approuvée, choisissez Service AWS . Sous Choisir le service qui utilisera ce rôle, choisissez EC2. Sous Sélectionner votre cas d’utilisation, choisissez Kinesis Analytics.

    Sélectionnez Next: Permissions (Étape suivante : autorisations).

  4. Dans la page Attacher des stratégies d’autorisations, choisissez Suivant : vérification. Vous attachez des stratégies d’autorisations après avoir créé le rôle.

  5. Sur la page Créer un rôle, saisissez MF-stream-rw-role pour le Nom du rôle. Sélectionnez Créer un rôle.

    Vous venez de créer un nouveau rôle IAM appelé MF-stream-rw-role. Ensuite, vous mettez à jour les stratégies d’approbation et d’autorisation pour le rôle.

  6. Attachez la politique d’autorisation au rôle.

    Note

    Dans le cadre de cet exercice, le service géré pour Apache Flink assume ce rôle à la fois pour la lecture des données à partir d’un flux de données Kinesis (source) et pour l’écriture des résultats dans un autre flux de données Kinesis. Vous attachez donc la politique que vous avez créée à l’étape précédente, Créer une stratégie d’autorisations.

    1. Sur la page Récapitulatif, choisissez l’onglet Autorisations.

    2. Choisissez Attacher des stratégies.

    3. Dans la zone de recherche, saisissez AKReadSourceStreamWriteSinkStream (la politique que vous avez créée dans la section précédente).

    4. Choisissez la ReadSourceStreamWriteSinkStream politique AK, puis choisissez Attach policy.

Vous avez maintenant créé le rôle d’exécution de service que votre application utilise pour accéder aux ressources. Notez l’ARN du nouveau rôle.

Pour step-by-step obtenir des instructions sur la création d'un rôle, consultez la section Création d'un rôle IAM (console) dans le guide de l'utilisateur IAM.

Création du service géré pour l'application Apache Flink

  1. Copiez le code JSON suivant dans un fichier nommé create_request.json. Remplacez l’exemple d’ARN du rôle par l’ARN du rôle que vous avez créé précédemment. Remplacez le suffixe de l’ARN du compartiment (username) par le suffixe que vous avez choisi dans la section précédente. Remplacez l’exemple d’ID de compte (012345678901) dans le rôle d’exécution de service par votre ID de compte.

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_19", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/MF-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "amazon-msf-java-stream-app-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. Exécutez l’action CreateApplication avec la demande précédente pour créer l’application :

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

L’application est maintenant créée. Vous démarrez l’application dans l’étape suivante.

Lancez l'application

Dans cette section, vous utilisez l’action StartApplication pour démarrer l’application.

Pour démarrer l’application
  1. Copiez le code JSON suivant dans un fichier nommé start_request.json.

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. Exécutez l’action StartApplication avec la demande précédente pour démarrer l’application :

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

L’application est maintenant en cours d’exécution. Vous pouvez consulter les métriques du service géré pour Apache Flink sur la CloudWatch console Amazon pour vérifier que l'application fonctionne.

Arrêtez l'application

Dans cette section, vous allez utiliser l’action StopApplication pour arrêter l’application.

Pour arrêter l’application
  1. Copiez le code JSON suivant dans un fichier nommé stop_request.json.

    { "ApplicationName": "test" }
  2. Exécutez l’action StopApplication avec la demande suivante pour arrêter l’application :

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

L’application est maintenant arrêtée.

Ajouter une option de CloudWatch journalisation

Vous pouvez utiliser le AWS CLI pour ajouter un flux de CloudWatch journal Amazon à votre application. Pour plus d'informations sur l'utilisation de CloudWatch Logs avec votre application, consultezConfiguration de la journalisation des applications.

Mettre à jour les propriétés d'environnement

Dans cette section, vous utilisez l’action UpdateApplication pour modifier les propriétés d’environnement de l’application sans recompiler le code de l’application. Dans cet exemple, vous modifiez la région des flux source et de destination.

Pour mettre à jour des propriétés d’environnement pour l’application
  1. Copiez le code JSON suivant dans un fichier nommé update_properties_request.json.

    {"ApplicationName": "test", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "EnvironmentPropertyUpdates": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. Exécutez l’action UpdateApplication avec la demande précédente pour mettre à jour les propriétés de l’environnement :

    aws kinesisanalyticsv2 update-application --cli-input-json file://update_properties_request.json

Mise à jour du code de l’application

Lorsque vous devez mettre à jour le code de votre application avec une nouvelle version de votre package de code, vous utilisez l'UpdateApplication AWS CLI action.

Note

Pour charger une nouvelle version du code de l’application portant le même nom de fichier, vous devez spécifier la nouvelle version de l’objet. Pour de plus amples informations sur l’utilisation des versions d’objet Amazon S3, consultez Activation et désactivation de la gestion des versions.

Pour l'utiliser AWS CLI, supprimez votre ancien package de code de votre compartiment Amazon S3, téléchargez la nouvelle version et appelez UpdateApplication en spécifiant le même compartiment Amazon S3 et le même nom d'objet, ainsi que la nouvelle version de l'objet. L’application redémarrera avec le nouveau package de code.

L’exemple de demande d’action UpdateApplication suivant recharge le code de l’application et redémarre l’application. Mettez à jour l’CurrentApplicationVersionId à la version actuelle de l’application. Vous pouvez vérifier la version actuelle de l’application à l’aide des actions ListApplications ou DescribeApplication. Mettez à jour le suffixe du nom du compartiment (<username>) avec le suffixe que vous avez choisi dans la section Création de deux flux de données Amazon Kinesis.

{ "ApplicationName": "test", "CurrentApplicationVersionId": 1, "ApplicationConfigurationUpdate": { "ApplicationCodeConfigurationUpdate": { "CodeContentUpdate": { "S3ContentLocationUpdate": { "BucketARNUpdate": "arn:aws:s3:::ka-app-code-username", "FileKeyUpdate": "amazon-msf-java-stream-app-1.0.jar", "ObjectVersionUpdate": "SAMPLEUehYngP87ex1nzYIGYgfhypvDU" } } } } }

Étape suivante

Étape 4 : Nettoyer les AWS ressources