Menu
Amazon EMR
Amazon EMR Release Guide

Apache Flink

Apache Flink is a streaming dataflow engine that makes it easy to run real-time stream processing on high-throughput data sources. It supports event time semantics for out-of-order events, exactly-once semantics, backpressure control, and APIs optimized for writing both streaming and batch applications.

Additionally, Flink has connectors for third-party data sources, such as the following:

Currently, Amazon EMR supports Flink as a YARN application so that you can manage resources along with other applications within a cluster. Flink-on-YARN has an easy way to submit transient Flink jobs or you can create a long-running cluster that accepts multiple jobs and allocates resources according to the overall YARN reservation.

Note

Support for the FlinkKinesisConsumer class was added in Amazon EMR version 5.2.1.

Release Information

Application Amazon EMR Release Label Components installed with this application

Flink 1.2.1

emr-5.6.0

emrfs, hadoop-client, hadoop-mapred, hadoop-hdfs-datanode, hadoop-hdfs-library, hadoop-hdfs-namenode, hadoop-httpfs-server, hadoop-kms-server, hadoop-yarn-nodemanager, hadoop-yarn-resourcemanager, hadoop-yarn-timeline-server, flink-client

Creating a Cluster with Flink

Clusters can be launched using the AWS Management Console, AWS CLI, or an AWS SDK.

To launch a cluster with Flink installed using the console

  1. Open the Amazon EMR console at https://console.aws.amazon.com/elasticmapreduce/.

  2. Choose Create cluster, Go to advanced options.

  3. For Software Configuration, choose EMR Release emr-5.1.0 or later.

  4. Choose Flink as an application, along with any others to install.

  5. Select other options as necessary and choose Create cluster.

To launch a cluster with Flink using the AWS CLI

  • Create the cluster with the following command:

    Copy
    aws emr create-cluster --name "Cluster with Flink" --release-label emr-5.6.0 \ --applications Name=Flink --ec2-attributes KeyName=myKey \ --instance-type m3.xlarge --instance-count 3 --use-default-roles

    Note

    Linux line continuation characters (\) are included for readability. They can be removed or used in Linux commands. For Windows, remove them or replace with a caret (^).

Configuring Flink

You may want to configure Flink using a configuration file. For example, the main configuration file for Flink is called flink-conf.yaml. This is configurable using the Amazon EMR configuration API so when you start your cluster, you supply a configuration for the file to modify.

To configure the number of task slots used for Flink using the AWS CLI

  1. Create a file, configuration.json, with the following content:

    Copy
    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. Next, create a cluster with the following configuration:

    Copy
    aws emr create-cluster --release-label emr-5.6.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m3.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

Note

It is also possible to change some configurations using the Flink API. For more information, see Basic API Concepts in the Flink documentation.

Parallelism Options

As the owner of your application, you know best what resources should be assigned to tasks within Flink. For the purposes of the examples in this documentation, use the same number of tasks as the slave instances that you use for the application. We generally recommend this for the initial level of parallelism but you can also increase the granularity of parallelism using task slots, which should generally not exceed the number of virtual cores per instance. For more information about Flink’s architecture, see Concepts in the Flink documentation.

Configurable Files

Currently, the files that are configurable within the Amazon EMR configuration API are:

  • flink-conf.yaml

  • log4j.properties

  • log4j-yarn-session.properties

  • log4j-cli.properties

Working with Flink Jobs in Amazon EMR

There are several ways to interact with Flink on Amazon EMR: through Amazon EMR steps, the Flink interface found on the ResourceManager Tracking UI, and at the command line. All of these also allow you to submit a JAR file of a Flink application to run.

Additionally, you can run Flink applications as a long-running YARN job or as a transient cluster. In a long-running job, you can submit multiple Flink applications to one Flink cluster running on Amazon EMR. If you run Flink as a transient job, your Amazon EMR cluster exists only for the time it takes to run the Flink application, so you are only charged for the resources and time used. In either case, you can submit a Flink job using the Amazon EMR AddSteps API operation, or as a step argument to the RunJobFlow operation or AWS CLI create-cluster command.

Start a Flink Long-Running YARN Job as a Step

You may want to start a long-running Flink job that multiple clients can submit to through YARN API operations. You start a Flink YARN session and submit jobs to the Flink JobManager, which is located on the YARN node that hosts the Flink session Application Master daemon. To start a YARN session, use the following steps from the console, AWS CLI, or Java SDK.

To submit a long-running job using the console

Submit the long-running Flink session using the flink-yarn-session command in an existing cluster.

Note

The flink-yarn-session command was added in Amazon EMR version 5.5.0 as a wrapper for the yarn-session.sh script to simplify execution. If you use an earlier version of Amazon EMR, substitute bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d" for Argument in the steps that follow.

  1. Open the Amazon EMR console at https://console.aws.amazon.com/elasticmapreduce/.

  2. In the cluster list, select the cluster you previously launched.

  3. In the cluster details page, choose Steps, Add Step.

  4. Enter parameters using the guidelines that follow and then choose Add.

    Parameter Description

    Step type

    Custom JAR

    Name

    A name to help you identify the step. For example, Flink_Long_Running_Session.

    Jar location

    command-runner.jar

    Arguments

    The flink-yarn-session command with arguments appropriate for your application. For example, flink-yarn-session -n 2 -d starts a long-running Flink session within your YARN cluster in a detached state (-d) with two task managers (-n 2). See YARN Setup in the latest Flink documentation for argument details.

    Note

    With Amazon EMR versions earlier than 5.5.0, you must specify the Flink script yarn-session.sh directly instead of flink-yarn-session, specifying the full path to the script. For example, bash -c "/usr/lib/flink/bin/yarn-session.sh -n 2 -d".

To submit a long-running Flink job using the AWS CLI

  • To launch a long-running Flink cluster within EMR, use the create-cluster command:

    Copy
    aws emr create-cluster --release-label emr-5.6.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m3.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=MyKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="flink-yarn-session -n 2 -d"

Submit Work to an Existing, Long-Running Flink YARN Job

You can submit work using a command-line option but you can also use Flink’s native interface proxied through the YARN ResourceManager. To submit through an EMR step using the Flink CLI, specify the long-running Flink cluster’s YARN application ID. To do this, run yarn application –list on the EMR command line or through the YarnClient API operation:

Copy
$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

SDK for Java

Copy
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", “-yid”, “application_1473169569237_0002”, "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));

AWS CLI

Use the add-steps subcommand to submit new jobs to an existing Flink cluster:

Copy
aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

Submit a Transient Flink Job

The following example launches the Flink WordCount example by adding a step to an existing cluster.

Console

In the console details page for an existing cluster, add the step by choosing Add Step for the Steps field.

SDK for Java

The following examples illustrate two approaches to running a Flink job. The first example submits a Flink job to a running cluster. The second example creates a cluster that runs a Flink job and then terminates on completion.

Copy
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
Copy
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar " + "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.2.1") .withApplications(myApps) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://myLogBucket") .withInstances( new JobFlowInstancesConfig().withEc2KeyName("myKeyName").withInstanceCount(2) .withKeepJobFlowAliveWhenNoSteps(false).withMasterInstanceType("m3.xlarge") .withSlaveInstanceType("m3.xlarge")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request);

AWS CLI

Use the add-steps subcommand to submit new jobs to an existing Flink cluster:

Copy
aws emr add-steps --cluster-id myClusterId \ --steps Type=CUSTOM_JAR,Name=Flink_Transient_No_Terminate,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002","-yn","2",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region myRegion

Use the create-cluster subcommand to create a transient EMR cluster that terminates when the Flink job completes:

Copy
aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m3.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster -yn 2 /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""

Using the Scala Shell

Currently, the Flink Scala shell for EMR clusters is only configured to start new YARN sessions. You can use the Scala shell by following the procedure below.

Using the Flink Scala shell on the master node

  1. Log in to the master node using SSH as described in Connect to the Master Node using SSH.

  2. Type the following to start a shell:

    In Amazon EMR version 5.5.0 and later, you can use:

    Copy
    % flink-scala-shell yarn -n 1

    In earlier versions of Amazon EMR, use:

    Copy
    % /usr/lib/flink/bin/start-scala-shell.sh yarn -n 1

    This starts the Flink Scala shell so you can interactively use Flink. Just as with other interfaces and options, you can scale the -n option value used in the example based on the number of tasks you want to run from the shell.

Finding the Flink Web Interface

The Application Master that belongs to the Flink application hosts the Flink web interface, which is an alternative way to submit a JAR as a job or to view the current status of other jobs. The Flink web interface is active as long as you have a Flink session running. If you have a long-running YARN job already active, you can follow the instructions in the Connect to the Master Node Using SSH topic in the Amazon EMR Management Guide to connect to the YARN ResourceManager. For example, if you’ve set up an SSH tunnel and have activated a proxy in your browser, you choose the ResourceManager connection under Connections in your EMR cluster details page.

After you find the ResourceManager, select the YARN application that’s hosting a Flink session. Choose the link under the Tracking UI column.

In the Flink web interface, you can view configuration, submit your own custom JAR as a job, or monitor jobs in progress.