Sélectionner vos préférences de cookies

Nous utilisons des cookies essentiels et des outils similaires qui sont nécessaires au fonctionnement de notre site et à la fourniture de nos services. Nous utilisons des cookies de performance pour collecter des statistiques anonymes afin de comprendre comment les clients utilisent notre site et d’apporter des améliorations. Les cookies essentiels ne peuvent pas être désactivés, mais vous pouvez cliquer sur « Personnaliser » ou « Refuser » pour refuser les cookies de performance.

Si vous êtes d’accord, AWS et les tiers approuvés utiliseront également des cookies pour fournir des fonctionnalités utiles au site, mémoriser vos préférences et afficher du contenu pertinent, y compris des publicités pertinentes. Pour accepter ou refuser tous les cookies non essentiels, cliquez sur « Accepter » ou « Refuser ». Pour effectuer des choix plus détaillés, cliquez sur « Personnaliser ».

Travailler avec les offres d'emploi Flink sur Amazon EMR - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Travailler avec les offres d'emploi Flink sur Amazon EMR

Il existe plusieurs manières d'interagir avec Flink sur Amazon EMR : via la console, via l'interface Flink située dans l'interface utilisateur de ResourceManager suivi et via la ligne de commande. Vous pouvez soumettre un JAR fichier à une application Flink avec n'importe lequel de ces outils. Une fois qu'un JAR fichier a été soumis, il devient une tâche gérée par le Flink JobManager. JobManager Il se trouve sur le YARN nœud qui héberge le daemon Application Master de la session Flink.

Vous pouvez exécuter une application Flink en tant que YARN tâche sur un cluster de longue durée ou sur un cluster transitoire. Sur un cluster de longue durée, vous pouvez soumettre plusieurs tâches Flink à un cluster Flink exécuté sur Amazon. EMR Si vous exécutez une tâche Flink sur un cluster temporaire, votre EMR cluster Amazon n'existe que pendant le temps nécessaire à l'exécution de l'application Flink. Vous n'êtes donc facturé que pour les ressources et le temps utilisés. Vous pouvez soumettre une tâche Flink avec l'EMRAddStepsAPIopération Amazon, en tant qu'argument d'étape de l'RunJobFlowopération et via les create-cluster commandes AWS CLI add-steps or.

Pour démarrer une application Flink à laquelle plusieurs clients peuvent soumettre du travail par le biais d'YARNAPIopérations, vous devez soit créer un cluster, soit ajouter une application Flink à un cluster existant. Pour obtenir des instructions sur la création d'un cluster, veuillez consulter Création d'un cluster avec Flink. Pour démarrer une YARN session sur un cluster existant, suivez les étapes ci-dessous depuis la console ou depuis JavaSDK. AWS CLI

Note

La flink-yarn-session commande a été ajoutée dans la EMR version 5.5.0 d'Amazon en tant que wrapper pour le yarn-session.sh script afin de simplifier l'exécution. Si vous utilisez une version antérieure d'AmazonEMR, remplacez bash -c "/usr/lib/flink/bin/yarn-session.sh -d" Arguments dans la console ou parArgs. dans la AWS CLI commande.

Pour soumettre une tâche Flink sur un cluster existant depuis la console

Soumettez la session Flink avec la commande flink-yarn-session dans un cluster existant.

  1. Ouvrez la EMR console Amazon à l'adresse https://console.aws.amazon.com/emr.

  2. Dans la liste des clusters, sélectionnez celui que vous avez lancé précédemment.

  3. Dans la page des détails de cluster, sélectionnez Steps (Étapes), Add Step (Ajouter une étape).

  4. Utilisez les directives suivantes pour saisir les paramètres, puis sélectionnez Ajouter.

    Paramètre Description

    Type d'étape

    Personnalisé JAR

    Nom

    Nom permettant d'identifier la nouvelle étape. Par exemple, <example-flink-step-name>.

    Emplacement JAR

    command-runner.jar

    Arguments

    La commande flink-yarn-session avec les arguments appropriés pour votre application. Par exemple, flink-yarn-session -d démarre une session Flink au sein de votre YARN cluster dans un état détaché (-d). Voir la YARNconfiguration dans la dernière documentation de Flink pour plus de détails sur les arguments.

Pour soumettre une tâche Flink sur un cluster existant à l'aide du AWS CLI
  • Utilisez la commande add-steps pour ajouter une tâche Flink à un cluster de longue durée. L'exemple de commande suivant indique Args="flink-yarn-session", "-d" de démarrer une session Flink au sein de votre YARN cluster dans un état détaché (-d). Voir la YARNconfiguration dans la dernière documentation de Flink pour plus de détails sur les arguments.

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

Si vous possédez déjà une application Flink sur un cluster de longue durée, vous pouvez spécifier l'ID de l'application Flink du cluster afin d'y soumettre du travail. Pour obtenir l'ID de l'application, exécutez yarn application -list le AWS CLI ou via l'YarnClientAPIopération suivante :

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

L'identifiant de candidature pour cette session Flink estapplication_1473169569237_0002, que vous pouvez utiliser pour soumettre du travail à l'application depuis le AWS CLI ou unSDK.

Exemple SDKpour Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
Exemple AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region <region-code>

Les exemples suivants lancent un cluster transitoire qui exécute une tâche Flink, puis la résilie à son terme.

Exemple SDKpour Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
Exemple AWS CLI

Utilisez la sous-commande create-cluster pour créer un cluster transitoire qui se termine quand la tâche Flink est terminée :

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""

Rubrique suivante :

Shell Flink Scala

Rubrique précédente :

Configuration de Flink
ConfidentialitéConditions d'utilisation du sitePréférences de cookies
© 2025, Amazon Web Services, Inc. ou ses affiliés. Tous droits réservés.