在 Amazon EMR 中使用 Flink 作業 - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Amazon EMR 中使用 Flink 作業

您有多種方式可以在 Amazon EMR 上與 Flink 互動,例如:透過主控台、ResourceManager 追蹤 UI 上的 Flink 介面,或在命令列。您可以使用其中任何方式將 JAR 檔案提交至 Flink 應用程式。一旦提交 JAR 檔案,它就會變成由 Flink JobManager 管理的作業。JobManager 位於託管 Flink 作業階段 Application Master 常駐程式的 YARN 節點上。

您可以在長時間執行的叢集或暫時性叢集上將 Flink 應用程式作為 YARN 作業執行。在長時間執行的叢集上,您可以將多個 Flink 作業提交至在 Amazon EMR 上執行的一個 Flink 叢集。如果您在暫時性叢集上執行 Flink 作業,則您的 Amazon EMR 叢集僅會在需要執行 Flink 應用程式的時間裡存在,因此僅需要支付實際使用的資源和時間的費用。您可以使用 Amazon EMR AddSteps API 操作提交 Flink 任務,做為RunJobFlow操作的步驟引數,以及透過 AWS CLI add-stepscreate-cluster命令提交。

若要啟動多個用戶端可以透過 YARN API 操作向其提交工作的 Flink 應用程式,您需要建立叢集或將 Flink 應用程式新增至現有叢集。如需如何建立新叢集的指示,請參閱 使用 Flink 建立叢集。若要在現有叢集上啟動 YARN 工作階段,請從主控台、 AWS CLI或 Java SDK 按下列步驟進行。

注意

Amazon EMR 5.5.0 版中新增了 flink-yarn-session 命令,用於做為 yarn-session.sh 指令碼的包裝函式,以簡化執行作業。如果您使用較早版本的 Amazon EMR,請用 bash -c "/usr/lib/flink/bin/yarn-session.sh -d" 替代主控台中的引數 或 AWS CLI 命令中的 Args.。

從主控台提交現有叢集上的 Flink 作業

使用現有叢集中的 flink-yarn-session 命令提交 Flink 工作階段。

  1. https://console.aws.amazon.com/emr:// 開啟 Amazon EMR 主控台。

  2. 在叢集清單中,選擇您先前啟動的叢集。

  3. 在叢集詳細資訊頁面中,選擇 Steps (步驟),然後選擇 Add Step (新增步驟)

  4. 按照以下準則輸入參數,然後選擇新增

    參數 描述

    Step type (步驟類型)

    Custom JAR (自訂 JAR)

    名稱

    協助您識別步驟的名稱。例如,<example-flink-step-name>

    Jar location (Jar 位置)

    command-runner.jar (command-runner.jar)

    Arguments (引數)

    適用於您應用程式的含引數的 flink-yarn-session 命令。例如,flink-yarn-session -d 會在 YARN 叢集中以分離狀態 (-d) 啟動 Flink 工作階段。如需引數詳細資訊,請參閱最新 Flink 文件中的 YARN 設定

使用 在現有叢集上提交 Flink 任務 AWS CLI
  • 使用 add-steps 命令將 Flink 作業新增至長時間執行的叢集。下列範例命令指定 Args="flink-yarn-session", "-d" 在 YARN 叢集中以分離狀態 (-d) 啟動 Flink 工作階段。如需引數詳細資訊,請參閱最新 Flink 文件中的 YARN 設定

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

如果您在長時間執行的叢集上已有現有的 Flink 應用程式,則可以指定叢集的 Flink 應用程式 ID 以便向其提交工作。若要取得應用程式 ID,請在 yarn application -list上執行, AWS CLI 或透過 YarnClient API 操作執行:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

此 Flink 工作階段的應用程式 ID 為 application_1473169569237_0002,您可以使用它從 AWS CLI 或 SDK 將工作提交至應用程式。

範例 適用於 Java 的開發套件
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
範例 AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region <region-code>

下列範例啟動的暫時性叢集會先執行 Flink 作業,然後在完成時終止。

範例 適用於 Java 的開發套件
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
範例 AWS CLI

使用 create-cluster 子命令建立暫時性 EMR 叢集,此叢集會在 Flink 作業完成時終止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""