Étape 3 : créer et exécuter un service géré pour l'application Apache Flink - Amazon Kinesis Data Streams

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Étape 3 : créer et exécuter un service géré pour l'application Apache Flink

Dans cet exercice, vous allez créer une application de service géré pour Apache Flink avec des flux de données comme source et comme récepteur.

Création de deux flux de données Amazon Kinesis

Avant de créer un Amazon Managed Service pour Apache Flink dans le cadre de cet exercice, créez deux flux de données Kinesis ExampleInputStream (ExampleOutputStreamet). Votre application utilise ces flux pour les flux source et de destination de l’application.

Vous pouvez créer ces flux à l'aide de la console Amazon Kinesis ou de la commande AWS CLI suivante. Pour de plus amples informations, veuillez consulter Création et mise à jour des flux de données.

Pour créer les flux de données (AWS CLI)
  1. Pour créer le premier flux (ExampleInputStream), utilisez la commande Amazon Kinesis create-stream AWS CLI suivante.

    $ aws kinesis create-stream \ --stream-name ExampleInputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser
  2. Pour créer le second flux utilisé par l’application pour écrire la sortie, exécutez la même commande en remplaçant le nom du flux par ExampleOutputStream.

    $ aws kinesis create-stream \ --stream-name ExampleOutputStream \ --shard-count 1 \ --region us-west-2 \ --profile adminuser

Écriture d’exemples d’enregistrements dans le flux d’entrée

Dans cette section, vous utilisez un script Python pour écrire les exemples d’enregistrements dans le flux pour que l’application les traite.

Note

Cette section nécessite le kit AWS SDK for Python (Boto).

  1. Créez un fichier nommé stock.py avec le contenu suivant :

    import datetime import json import random import boto3 STREAM_NAME = "ExampleInputStream" def get_data(): return { "EVENT_TIME": datetime.datetime.now().isoformat(), "TICKER": random.choice(["AAPL", "AMZN", "MSFT", "INTC", "TBV"]), "PRICE": round(random.random() * 100, 2), } def generate(stream_name, kinesis_client): while True: data = get_data() print(data) kinesis_client.put_record( StreamName=stream_name, Data=json.dumps(data), PartitionKey="partitionkey" ) if __name__ == "__main__": generate(STREAM_NAME, boto3.client("kinesis"))
  2. Plus loin dans ce didacticiel, vous exécutez le script stock.py pour envoyer des données à l’application.

    $ python stock.py

Téléchargez et examinez le code Java de streaming d'Apache Flink

Le code d'application Java pour ces exemples est disponible sur GitHub. Pour télécharger le code d’application, procédez comme suit :

  1. Cloner le référentiel distant à l’aide de la commande suivante :

    git clone https://github.com/aws-samples/amazon-kinesis-data-analytics-java-examples.git
  2. Accédez au répertoire GettingStarted.

Le code d'application est situé dans les fichiers CustomSinkStreamingJob.java et CloudWatchLogSink.java. Notez les informations suivantes à propos du code d’application :

  • L'application utilise une source Kinesis pour lire à partir du flux source. L'extrait de code suivant crée le récepteur Kinesis :

    return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));

Compilez le code de l'application

Dans cette section, vous allez utiliser le compilateur Apache Maven pour créer le code Java pour l’application. Pour plus d'informations sur l'installation d'Apache Maven et du kit de développement Java (JDK), consultezConditions préalables pour terminer les exercices.

Votre application Java nécessite les composants suivants :

  • Un fichier de modèle d'objet du projet (pom.xml). Ce fichier contient des informations sur la configuration et les dépendances de l'application, y compris le service Amazon Managed Service pour les bibliothèques Apache Flink.

  • Une méthode main qui contient la logique de l'application.

Note

Pour utiliser le connecteur Kinesis pour l'application suivante, vous devez télécharger le code source du connecteur et le créer comme décrit dans la documentation Apache Flink.

Pour créer et compiler le code d'application
  1. Créez une application Java/Maven dans votre environnement de développement. Pour de plus amples informations sur la création d'une application, veuillez consultez la documentation relative à votre environnement de développement :

  2. Utilisez le code suivant dans un fichier nommé StreamingJob.java.

    package com.amazonaws.services.kinesisanalytics; import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime; import org.apache.flink.api.common.serialization.SimpleStringSchema; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer; import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer; import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants; import java.io.IOException; import java.util.Map; import java.util.Properties; public class StreamingJob { private static final String region = "us-east-1"; private static final String inputStreamName = "ExampleInputStream"; private static final String outputStreamName = "ExampleOutputStream"; private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) { Properties inputProperties = new Properties(); inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST"); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties)); } private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env) throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), applicationProperties.get("ConsumerConfigProperties"))); } private static FlinkKinesisProducer<String> createSinkFromStaticConfig() { Properties outputProperties = new Properties(); outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region); outputProperties.setProperty("AggregationEnabled", "false"); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), outputProperties); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } private static FlinkKinesisProducer<String> createSinkFromApplicationProperties() throws IOException { Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties(); FlinkKinesisProducer<String> sink = new FlinkKinesisProducer<>(new SimpleStringSchema(), applicationProperties.get("ProducerConfigProperties")); sink.setDefaultStream(outputStreamName); sink.setDefaultPartition("0"); return sink; } public static void main(String[] args) throws Exception { // set up the streaming execution environment final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); /* * if you would like to use runtime configuration properties, uncomment the * lines below * DataStream<String> input = createSourceFromApplicationProperties(env); */ DataStream<String> input = createSourceFromStaticConfig(env); /* * if you would like to use runtime configuration properties, uncomment the * lines below * input.addSink(createSinkFromApplicationProperties()) */ input.addSink(createSinkFromStaticConfig()); env.execute("Flink Streaming Java API Skeleton"); } }

    Notez les informations suivantes à propos de l'exemple de code précédent :

    • Ce fichier contient la méthode main qui définit la fonctionnalité de l'application.

    • Votre application crée les connecteurs source et récepteur pour accéder aux ressources externes à l’aide d’un objet StreamExecutionEnvironment.

    • L’application crée les connecteurs source et récepteur à l’aide de propriétés statiques. Pour utiliser les propriétés de l’application dynamique, utilisez les méthodes createSourceFromApplicationProperties et createSinkFromApplicationProperties pour créer les connecteurs. Ces méthodes lisent les propriétés de l’application pour configurer les connecteurs.

  3. Pour utiliser le code de votre application, vous devez le compiler et le regrouper dans un JAR fichier. Vous pouvez compiler et intégrer votre code de deux manières :

    • À l'aide de l'outil de ligne de commande Maven. Créez votre JAR fichier en exécutant la commande suivante dans le répertoire qui contient le pom.xml fichier :

      mvn package
    • À l’aide de votre environnement de développement. Consultez la documentation de votre environnement de développement pour plus de détails.

    Vous pouvez soit télécharger votre package sous forme de JAR fichier, soit compresser votre package et le télécharger sous forme de ZIP fichier. Si vous créez votre application à l'aide du AWS CLI, vous spécifiez le type de contenu de votre code (JARouZIP).

  4. En cas d’erreur lors de la compilation, vérifiez que votre variable d’environnement JAVA_HOME est correctement définie.

Si la compilation de l’application aboutit, le fichier suivant est créé :

target/java-getting-started-1.0.jar

Téléchargez le code Java de streaming Apache Flink

Dans cette section, vous allez créer un compartiment Amazon Simple Storage Service (Amazon S3) et charger votre code d'application.

Pour charger le code d’application
  1. Ouvrez la console Amazon S3 à l'adresse https://console.aws.amazon.com/s3/.

  2. Choisissez Créer un compartiment.

  3. Saisissez ka-app-code-<username> dans le champ Nom du compartiment. Ajoutez un suffixe au nom du compartiment, par exemple votre nom d’utilisateur, pour qu’il soit unique. Choisissez Suivant.

  4. À l’étape Configurer les options, conservez les paramètres, puis choisissez Suivant.

  5. À l’étape Définir des autorisations, conservez les paramètres, puis choisissez Suivant.

  6. Choisissez Créer un compartiment.

  7. Dans la console Amazon S3, choisissez le ka-app-code -<username>bucket, puis choisissez Upload.

  8. À l’étape Sélectionner les fichiers, choisissez Ajouter des fichiers. Accédez au fichier java-getting-started-1.0.jar que vous avez créé à l’étape précédente. Choisissez Suivant.

  9. À l'étape Définir des autorisations, conservez les paramètres. Choisissez Suivant.

  10. À l'étape Définir les propriétés, conservez les paramètres. Sélectionnez Charger.

Votre code d'application est désormais stocké dans un compartiment Amazon S3 auquel votre application peut accéder.

Création et exécution du service géré pour l'application Apache Flink

Vous pouvez créer et exécuter une application de service géré pour Apache Flink à l’aide de la console ou de l’interface AWS CLI.

Note

Lorsque vous créez l'application à l'aide de la console, vos ressources AWS Identity and Access Management (IAM) et Amazon CloudWatch Logs sont créées pour vous. Lorsque vous créez l'application à l'aide du AWS CLI, vous créez ces ressources séparément.

Création et exécution de l'application (console)

Suivez ces étapes pour créer, configurer, mettre à jour et exécuter l’application à l’aide de la console.

Pour créer l’application

  1. Ouvrez la console Kinesis à l’adresse https://console.aws.amazon.com/kinesis.

  2. Dans le tableau de bord Amazon Kinesis, choisissez Création d'une application d'analyse.

  3. Sur la page Kinesis Analytics - Créer une application, fournissez les détails de l'application comme suit :

    • Pour Nom de l’application, saisissez MyApplication.

    • Pour Description, saisissez My java test app.

    • Pour Runtime (Exécution), choisissez Apache Flink 1.6.

  4. Pour les autorisations d'accès, choisissez Create/update IAM role kinesis-analytics-MyApplication-us-west-2.

  5. Choisissez Créer une application.

Note

Lorsque vous créez une application Amazon Managed Service pour Apache Flink à l'aide de la console, vous avez la possibilité de créer un IAM rôle et une politique pour votre application. Votre application utilise ce rôle et cette politique pour accéder à ses ressources dépendantes. Ces IAM ressources sont nommées à l'aide du nom de votre application et de votre région, comme suit :

  • Stratégie : kinesis-analytics-service-MyApplication-us-west-2

  • Rôle : kinesis-analytics-MyApplication-us-west-2

Modifier la IAM politique

Modifiez la IAM politique pour ajouter des autorisations d'accès aux flux de données Kinesis.

  1. Ouvrez la console IAM à l’adresse https://console.aws.amazon.com/iam/.

  2. Choisissez Stratégies. Choisissez la politique kinesis-analytics-service-MyApplication-us-west-2 créée pour vous par la console dans la section précédente.

  3. Sur la page Récapitulatif, choisissez Modifier la politique. Cliquez sur l'onglet JSON.

  4. Ajoutez la section mise en surbrillance dans l’exemple de stratégie suivant à la politique. Remplacez le compte d'exemple IDs (012345678901) avec votre identifiant de compte.

    { "Version": "2012-10-17", "Statement": [ { "Sid": "ReadCode", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": [ "arn:aws:s3:::ka-app-code-username/java-getting-started-1.0.jar" ] }, { "Sid": "ListCloudwatchLogGroups", "Effect": "Allow", "Action": [ "logs:DescribeLogGroups" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:*" ] }, { "Sid": "ListCloudwatchLogStreams", "Effect": "Allow", "Action": [ "logs:DescribeLogStreams" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:*" ] }, { "Sid": "PutCloudwatchLogs", "Effect": "Allow", "Action": [ "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-2:012345678901:log-group:/aws/kinesis-analytics/MyApplication:log-stream:kinesis-analytics-log-stream" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Configuration de l'application

  1. Sur la MyApplicationpage, choisissez Configurer.

  2. Sur la page Configurer l’application, indiquez l’emplacement du code:

    • Pour le compartiment Amazon S3, saisissez ka-app-code-<username>.

    • Pour le chemin de l'objet Amazon S3, saisissez java-getting-started-1.0.jar.

  3. Sous Accès aux ressources de l'application, pour les autorisations d'accès, choisissez Create/update IAM role kinesis-analytics-MyApplication-us-west-2.

  4. Sous Propriétés, pour ID de groupe, saisissez ProducerConfigProperties.

  5. Entrez les valeurs et propriétés d’application suivantes :

    Clé Valeur
    flink.inputstream.initpos LATEST
    aws:region us-west-2
    AggregationEnabled false
  6. Sous Surveillance, assurez-vous que Surveillance du niveau des métriques est défini sur Application.

  7. Pour la CloudWatch journalisation, cochez la case Activer.

  8. Choisissez Mettre à jour.

Note

Lorsque vous choisissez d'activer la CloudWatch journalisation, le service géré pour Apache Flink crée un groupe de journaux et un flux de journaux pour vous. Les noms de ces ressources sont les suivants :

  • Groupe de journaux : /aws/kinesis-analytics/MyApplication

  • Flux de journaux : kinesis-analytics-log-stream

Exécutez l'application

  1. Sur la MyApplicationpage, choisissez Exécuter. Confirmez l’action.

  2. Lorsque l’application est en cours d’exécution, actualisez la page. La console affiche le graphique de l’application.

Arrêtez l'application

Sur la MyApplicationpage, choisissez Stop. Confirmez l’action.

Mise à jour de l'application

À l'aide de la console, vous pouvez mettre à jour les paramètres de l'application, tels que les propriétés de l'application, les paramètres de surveillance, ainsi que l'emplacement ou le nom de fichier de l'applicationJAR. Vous pouvez également recharger l'application JAR depuis le compartiment Amazon S3 si vous devez mettre à jour le code de l'application.

Sur la MyApplicationpage, choisissez Configurer. Mettez à jour les paramètres de l’application, puis choisissez Mettre à jour.

Créez et exécutez l'application (AWS CLI)

Dans cette section, vous allez utiliser le AWS CLI pour créer et exécuter l'application Managed Service for Apache Flink. Le service géré pour Apache Flink utilise la kinesisanalyticsv2 AWS CLI commande pour créer et interagir avec le service géré pour les applications Apache Flink.

Créer une stratégie d’autorisations

Vous commencez par créer une stratégie d’autorisations avec deux instructions : une qui accorde des autorisations pour l’action read sur le flux source et une autre qui accorde des autorisations pour les actions write sur le flux récepteur. Vous associez ensuite la politique à un IAM rôle (que vous créez dans la section suivante). Ainsi, lorsque le service géré pour Apache Flink assume le rôle, le service dispose des autorisations nécessaires pour lire à partir du flux source et écrire dans le flux récepteur.

Utilisez le code suivant pour créer la politique d’autorisations KAReadSourceStreamWriteSinkStream. Remplacez username par le nom d’utilisateur que vous avez utilisé pour créer le compartiment Amazon S3 pour stocker le code d’application. Remplacez l'ID de compte dans Amazon Resource Names (ARNs) (012345678901) par votre identifiant de compte.

{ "Version": "2012-10-17", "Statement": [ { "Sid": "S3", "Effect": "Allow", "Action": [ "s3:GetObject", "s3:GetObjectVersion" ], "Resource": ["arn:aws:s3:::ka-app-code-username", "arn:aws:s3:::ka-app-code-username/*" ] }, { "Sid": "ReadInputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleInputStream" }, { "Sid": "WriteOutputStream", "Effect": "Allow", "Action": "kinesis:*", "Resource": "arn:aws:kinesis:us-west-2:012345678901:stream/ExampleOutputStream" } ] }

Pour step-by-step obtenir des instructions sur la création d'une politique d'autorisations, voir Tutoriel : créer et joindre votre première politique gérée par le client dans le guide de IAM l'utilisateur.

Note

Pour accéder à d'autres AWS services, vous pouvez utiliser le AWS SDK for Java. Le service géré pour Apache Flink définit automatiquement les informations d'identification requises par le SDK en fonction du IAM rôle d'exécution du service associé à votre application. Aucune étape supplémentaire n’est nécessaire.

Créez un rôle IAM.

Dans cette section, vous allez créer un IAM rôle que Managed Service for Apache Flink peut assumer pour lire un flux source et écrire dans le flux récepteur.

Le service géré pour Apache Flink ne peut pas accéder à votre flux sans autorisation. Vous accordez ces autorisations via un IAM rôle. Deux politiques sont associées à chaque IAM rôle. La politique d’approbation accorde au service géré pour Apache Flink l’autorisation d’assumer le rôle, et la politique d’autorisation détermine ce que le service géré pour Apache Flink peut faire après avoir assumé le rôle.

Vous attachez la politique d’autorisations que vous avez créée dans la section précédente à ce rôle.

Pour créer un rôle IAM
  1. Ouvrez la console IAM à l’adresse https://console.aws.amazon.com/iam/.

  2. Dans le volet de navigation, choisissez Rôles, puis Créer un rôle.

  3. Sous Sélectionner le type d'identité approuvée, choisissez Service AWS . Sous Choisir le service qui utilisera ce rôle, choisissez EC2. Sous Sélectionner votre cas d’utilisation, choisissez Kinesis Analytics.

    Sélectionnez Next: Permissions (Étape suivante : autorisations).

  4. Dans la page Attacher des stratégies d’autorisations, choisissez Suivant : vérification. Vous attachez des stratégies d’autorisations après avoir créé le rôle.

  5. Sur la page Créer un rôle, saisissez KA-stream-rw-role pour le Nom du rôle. Sélectionnez Créer un rôle.

    Vous avez maintenant créé un nouveau IAM rôle appeléKA-stream-rw-role. Ensuite, vous mettez à jour les stratégies d’approbation et d’autorisation pour le rôle.

  6. Attachez la politique d’autorisation au rôle.

    Note

    Dans le cadre de cet exercice, le service géré pour Apache Flink assume ce rôle à la fois pour la lecture des données à partir d’un flux de données Kinesis (source) et pour l’écriture des résultats dans un autre flux de données Kinesis. Vous attachez donc la politique que vous avez créée à l’étape précédente, Créer une stratégie d’autorisations.

    1. Sur la page Récapitulatif, choisissez l’onglet Autorisations.

    2. Choisissez Attacher des stratégies.

    3. Dans la zone de recherche, saisissez KAReadSourceStreamWriteSinkStream (la politique que vous avez créée dans la section précédente).

    4. Choisissez la KAReadInputStreamWriteOutputStreampolitique, puis choisissez Attacher la politique.

Vous avez maintenant créé le rôle d’exécution de service que votre application utilise pour accéder aux ressources. Prenez note ARN du nouveau rôle.

Pour step-by-step obtenir des instructions sur la création d'un rôle, consultez la section Création d'un IAM rôle (console) dans le guide de IAM l'utilisateur.

Création de l’application de service géré pour Apache Flink

  1. Enregistrez le JSON code suivant dans un fichier nommécreate_request.json. Remplacez le rôle ARN d'exemple par le rôle que vous avez créé précédemment. ARN Remplacez le ARN suffixe du bucket (username) par le suffixe que vous avez choisi dans la section précédente. Remplacez l’exemple d’ID de compte (012345678901) dans le rôle d’exécution de service par votre ID de compte.

    { "ApplicationName": "test", "ApplicationDescription": "my java test app", "RuntimeEnvironment": "FLINK-1_6", "ServiceExecutionRole": "arn:aws:iam::012345678901:role/KA-stream-rw-role", "ApplicationConfiguration": { "ApplicationCodeConfiguration": { "CodeContent": { "S3ContentLocation": { "BucketARN": "arn:aws:s3:::ka-app-code-username", "FileKey": "java-getting-started-1.0.jar" } }, "CodeContentType": "ZIPFILE" }, "EnvironmentProperties": { "PropertyGroups": [ { "PropertyGroupId": "ProducerConfigProperties", "PropertyMap" : { "flink.stream.initpos" : "LATEST", "aws.region" : "us-west-2", "AggregationEnabled" : "false" } }, { "PropertyGroupId": "ConsumerConfigProperties", "PropertyMap" : { "aws.region" : "us-west-2" } } ] } } }
  2. Exécutez l’action CreateApplication avec la demande précédente pour créer l’application :

    aws kinesisanalyticsv2 create-application --cli-input-json file://create_request.json

L’application est maintenant créée. Vous démarrez l’application dans l’étape suivante.

Démarrage de l’application

Dans cette section, vous utilisez l’action StartApplication pour démarrer l’application.

Pour démarrer l’application
  1. Enregistrez le JSON code suivant dans un fichier nomméstart_request.json.

    { "ApplicationName": "test", "RunConfiguration": { "ApplicationRestoreConfiguration": { "ApplicationRestoreType": "RESTORE_FROM_LATEST_SNAPSHOT" } } }
  2. Exécutez l’action StartApplication avec la demande précédente pour démarrer l’application :

    aws kinesisanalyticsv2 start-application --cli-input-json file://start_request.json

L’application est maintenant en cours d’exécution. Vous pouvez consulter les métriques du service géré pour Apache Flink sur la CloudWatch console Amazon pour vérifier que l'application fonctionne.

Arrêt de l’application

Dans cette section, vous allez utiliser l’action StopApplication pour arrêter l’application.

Pour arrêter l’application
  1. Enregistrez le JSON code suivant dans un fichier nomméstop_request.json.

    {"ApplicationName": "test" }
  2. Exécutez l’action StopApplication avec la demande suivante pour arrêter l’application :

    aws kinesisanalyticsv2 stop-application --cli-input-json file://stop_request.json

L’application est maintenant arrêtée.