Amazon EMR
Amazon EMR Release Guide

Working with Flink Jobs in Amazon EMR

There are several ways to interact with Flink on Amazon EMR: through Amazon EMR steps, the Flink interface found on the ResourceManager Tracking UI, and at the command line. All of these also allow you to submit a JAR file of a Flink application to run.

Additionally, you can run Flink applications as a long-running YARN job or as a transient cluster. In a long-running job, you can submit multiple Flink applications to one Flink cluster running on Amazon EMR. If you run Flink as a transient job, your Amazon EMR cluster exists only for the time it takes to run the Flink application, so you are only charged for the resources and time used. In either case, you can submit a Flink job using the Amazon EMR AddSteps API operation, or as a step argument to the RunJobFlow operation or AWS CLI create-cluster command.

You may want to start a long-running Flink job that multiple clients can submit to through YARN API operations. You start a Flink YARN session and submit jobs to the Flink JobManager, which is located on the YARN node that hosts the Flink session Application Master daemon. To start a YARN session, use the following steps from the console, AWS CLI, or Java SDK.

To submit a long-running job using the console

Submit the long-running Flink session using the flink-yarn-session command in an existing cluster.

Note

The flink-yarn-session command was added in Amazon EMR version 5.5.0 as a wrapper for the yarn-session.sh script to simplify execution. If you use an earlier version of Amazon EMR, substitute bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d" for Argument in the steps that follow.

  1. Open the Amazon EMR console at https://console.aws.amazon.com/elasticmapreduce/.

  2. In the cluster list, select the cluster you previously launched.

  3. In the cluster details page, choose Steps, Add Step.

  4. Enter parameters using the guidelines that follow and then choose Add.

    Parameter Description

    Step type

    Custom JAR

    Name

    A name to help you identify the step. For example, Flink_Long_Running_Session.

    Jar location

    command-runner.jar

    Arguments

    The flink-yarn-session command with arguments appropriate for your application. For example, flink-yarn-session -n 2 -d starts a long-running Flink session within your YARN cluster in a detached state (-d) with two task managers (-n 2). See YARN Setup in the latest Flink documentation for argument details.

    Note

    With Amazon EMR versions earlier than 5.5.0, you must specify the Flink script yarn-session.sh directly instead of flink-yarn-session, specifying the full path to the script. For example, bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d".

To submit a long-running Flink job using the AWS CLI

  • To launch a long-running Flink cluster within EMR, use the create-cluster command:

    aws emr create-cluster --release-label emr-5.17.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m4.large \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="flink-yarn-session -n 2 -d"

You can submit work using a command-line option but you can also use Flink’s native interface proxied through the YARN ResourceManager. To submit through an EMR step using the Flink CLI, specify the long-running Flink cluster’s YARN application ID. To do this, run yarn application –list on the EMR command line or through the YarnClient API operation:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

Example SDK for Java

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", “-yid”, “application_1473169569237_0002”, "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

Example AWS CLI

Use the add-steps subcommand to submit new jobs to an existing Flink cluster:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

The following example launches the Flink WordCount example by adding a step to an existing cluster.

Example Console

In the console details page for an existing cluster, add the step by choosing Add Step for the Steps field.

Example SDK for Java

The following examples illustrate two approaches to running a Flink job. The first example submits a Flink job to a running cluster. The second example creates a cluster that runs a Flink job and then terminates on completion.

List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar " + "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.2.1") .withApplications(myApps) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://myLogBucket") .withInstances( new JobFlowInstancesConfig().withEc2KeyName("myKeyName").withInstanceCount(2) .withKeepJobFlowAliveWhenNoSteps(false).withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request);

Example AWS CLI

Use the add-steps subcommand to submit new jobs to an existing Flink cluster:

aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Transient_No_Terminate,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

Use the create-cluster subcommand to create a transient EMR cluster that terminates when the Flink job completes:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m4.large \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""