本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
在 Amazon 中使用 Flink 任務 EMR
有幾種方式可以與 Amazon 上的 Flink 互動EMR:透過主控台、 ResourceManager 追蹤使用者介面上的 Flink 介面,以及命令列。您可以使用任何這些檔案將JAR檔案提交至 Flink 應用程式。提交JAR檔案後,它就會成為由 Flink 管理的任務 JobManager。 JobManager 位於託管 Flink 工作階段 Application Master 常駐程式的YARN節點上。
您可以在長時間執行的叢集或暫時性叢集上,將 Flink 應用程式作為YARN任務執行。在長時間執行的叢集上,您可以將多個 Flink 任務提交至在 Amazon 上執行的一個 Flink 叢集EMR。如果您在暫時性叢集上執行 Flink 任務,Amazon EMR叢集只會在執行 Flink 應用程式所需的時間內存在,因此您只需支付使用的資源和時間的費用。您可以使用 Amazon EMRAddSteps
API操作、作為RunJobFlow
操作的步驟引數,以及透過 AWS CLI add-steps
或 create-cluster
命令提交 Flink 任務。
啟動 Flink YARN 應用程式作為長時間執行叢集的步驟
若要啟動多個用戶端可以透過YARNAPI操作提交工作的 Flink 應用程式,您需要建立叢集或新增現有叢集的 Flink 應用程式。如需如何建立新叢集的指示,請參閱 使用 Flink 建立叢集。若要在現有叢集上啟動YARN工作階段,請使用主控台、 AWS CLI或 Java 中的下列步驟SDK。
注意
Amazon 5.5.0 EMR版中新增了 flink-yarn-session
命令做為yarn-session.sh
指令碼的包裝函式,以簡化執行。如果您使用舊版的 Amazon EMR,請在 主控台或 Args
AWS CLI 命令中bash -c "/usr/lib/flink/bin/yarn-session.sh -d"
取代 引數。
從主控台提交現有叢集上的 Flink 作業
使用現有叢集中的 flink-yarn-session
命令提交 Flink 工作階段。
在 https://console.aws.amazon.com/emr
開啟 Amazon EMR主控台。 -
在叢集清單中,選擇您先前啟動的叢集。
-
在叢集詳細資訊頁面中,選擇 Steps (步驟),然後選擇 Add Step (新增步驟)。
-
按照以下準則輸入參數,然後選擇新增。
參數 描述 Step type (步驟類型)
自訂 JAR 名稱
協助您識別步驟的名稱。例如 <example-flink-step-name>
.Jar location (Jar 位置)
command-runner.jar (command-runner.jar)
Arguments (引數)
適用於您應用程式的含引數的
flink-yarn-session
命令。例如flink-yarn-session -d
會在YARN叢集內以分離狀態 () 啟動 Flink 工作階段-d
。如需引數詳細資訊,請參閱最新 Flink 文件中的YARN設定。
使用 提交現有叢集上的 Flink 任務 AWS CLI
-
使用
add-steps
命令將 Flink 作業新增至長時間執行的叢集。下列範例命令指定在分離狀態 () 的YARN叢集內Args="flink-yarn-session", "-d"
啟動 Flink 工作階段-d
。如需引數詳細資訊,請參閱最新的 Flink 文件中的YARN設定。 aws emr add-steps --cluster-id
<j-XXXXXXXX>
--steps Type=CUSTOM_JAR,Name=<example-flink-step-name>
,Jar=command-runner.jar,Args="flink-yarn-session","-d"
將工作提交至長時間執行的叢集上的現有 Flink 應用程式
如果您在長時間執行的叢集上已有現有的 Flink 應用程式,則可以指定叢集的 Flink 應用程式 ID 以便向其提交工作。若要取得應用程式 ID,請在 yarn application -list
上執行 AWS CLI 或透過 YarnClient
$ yarn application -list
16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032
Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1
Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL
application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089
此 Flink 工作階段的應用程式 ID 為 application_1473169569237_0002
,您可以使用它從 AWS CLI 或 將工作提交至應用程式SDK。
範例 SDK 適用於 Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("
myClusterId
") .withSteps(stepConfigs));
範例 AWS CLI
aws emr add-steps --cluster-id
<j-XXXXXXXX>
\ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region<region-code>
提交暫時性 Flink 作業
下列範例啟動的暫時性叢集會先執行 Flink 作業,然後在完成時終止。
範例 SDK 適用於 Java
import java.util.ArrayList;
import java.util.List;
import com.amazonaws.AmazonClientException;
import com.amazonaws.auth.AWSCredentials;
import com.amazonaws.auth.AWSStaticCredentialsProvider;
import com.amazonaws.auth.profile.ProfileCredentialsProvider;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce;
import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder;
import com.amazonaws.services.elasticmapreduce.model.*;
public class Main_test {
public static void main(String[] args) {
AWSCredentials credentials_profile = null;
try {
credentials_profile = new ProfileCredentialsProvider("default").getCredentials();
} catch (Exception e) {
throw new AmazonClientException(
"Cannot load credentials from .aws/credentials file. " +
"Make sure that the credentials file exists and the profile name is specified within it.",
e);
}
AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(credentials_profile))
.withRegion(Regions.US_WEST_1)
.build();
List<StepConfig> stepConfigs = new ArrayList<StepConfig>();
HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig()
.withJar("command-runner.jar")
.withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2",
"/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output",
"s3://path/to/output/");
StepConfig flinkRunWordCountStep = new StepConfig()
.withName("Flink add a wordcount step and terminate")
.withActionOnFailure("CONTINUE")
.withHadoopJarStep(flinkWordCountConf);
stepConfigs.add(flinkRunWordCountStep);
Application flink = new Application().withName("Flink");
RunJobFlowRequest request = new RunJobFlowRequest()
.withName("flink-transient")
.withReleaseLabel("emr-5.20.0")
.withApplications(flink)
.withServiceRole("EMR_DefaultRole")
.withJobFlowRole("EMR_EC2_DefaultRole")
.withLogUri("s3://path/to/my/logfiles")
.withInstances(new JobFlowInstancesConfig()
.withEc2KeyName("myEc2Key")
.withEc2SubnetId("subnet-12ab3c45")
.withInstanceCount(3)
.withKeepJobFlowAliveWhenNoSteps(false)
.withMasterInstanceType("m4.large")
.withSlaveInstanceType("m4.large"))
.withSteps(stepConfigs);
RunJobFlowResult result = emr.runJobFlow(request);
System.out.println("The cluster ID is " + result.toString());
}
}
範例 AWS CLI
使用 create-cluster
子命令建立暫時性 EMR 叢集,此叢集會在 Flink 作業完成時終止:
aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=
<YourKeyName>
,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""