選取您的 Cookie 偏好設定

我們使用提供自身網站和服務所需的基本 Cookie 和類似工具。我們使用效能 Cookie 收集匿名統計資料,以便了解客戶如何使用我們的網站並進行改進。基本 Cookie 無法停用,但可以按一下「自訂」或「拒絕」以拒絕效能 Cookie。

如果您同意,AWS 與經核准的第三方也會使用 Cookie 提供實用的網站功能、記住您的偏好設定,並顯示相關內容,包括相關廣告。若要接受或拒絕所有非必要 Cookie,請按一下「接受」或「拒絕」。若要進行更詳細的選擇,請按一下「自訂」。

在 Amazon 中使用 Flink 任務 EMR - Amazon EMR

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

在 Amazon 中使用 Flink 任務 EMR

有幾種方式可以與 Amazon 上的 Flink 互動EMR:透過主控台、 ResourceManager 追蹤使用者介面上的 Flink 介面,以及命令列。您可以使用任何這些檔案將JAR檔案提交至 Flink 應用程式。提交JAR檔案後,它就會成為由 Flink 管理的任務 JobManager。 JobManager 位於託管 Flink 工作階段 Application Master 常駐程式的YARN節點上。

您可以在長時間執行的叢集或暫時性叢集上,將 Flink 應用程式作為YARN任務執行。在長時間執行的叢集上,您可以將多個 Flink 任務提交至在 Amazon 上執行的一個 Flink 叢集EMR。如果您在暫時性叢集上執行 Flink 任務,Amazon EMR叢集只會在執行 Flink 應用程式所需的時間內存在,因此您只需支付使用的資源和時間的費用。您可以使用 Amazon EMRAddStepsAPI操作、作為RunJobFlow操作的步驟引數,以及透過 AWS CLI add-stepscreate-cluster命令提交 Flink 任務。

若要啟動多個用戶端可以透過YARNAPI操作提交工作的 Flink 應用程式,您需要建立叢集或新增現有叢集的 Flink 應用程式。如需如何建立新叢集的指示,請參閱 使用 Flink 建立叢集。若要在現有叢集上啟動YARN工作階段,請使用主控台、 AWS CLI或 Java 中的下列步驟SDK。

注意

Amazon 5.5.0 EMR版中新增了 flink-yarn-session命令做為yarn-session.sh指令碼的包裝函式,以簡化執行。如果您使用舊版的 Amazon EMR,請在 主控台或 Args AWS CLI 命令中bash -c "/usr/lib/flink/bin/yarn-session.sh -d"取代 引數

從主控台提交現有叢集上的 Flink 作業

使用現有叢集中的 flink-yarn-session 命令提交 Flink 工作階段。

  1. https://console.aws.amazon.com/emr 開啟 Amazon EMR主控台。

  2. 在叢集清單中,選擇您先前啟動的叢集。

  3. 在叢集詳細資訊頁面中,選擇 Steps (步驟),然後選擇 Add Step (新增步驟)

  4. 按照以下準則輸入參數,然後選擇新增

    參數 描述

    Step type (步驟類型)

    自訂 JAR

    名稱

    協助您識別步驟的名稱。例如 <example-flink-step-name>.

    Jar location (Jar 位置)

    command-runner.jar (command-runner.jar)

    Arguments (引數)

    適用於您應用程式的含引數的 flink-yarn-session 命令。例如 flink-yarn-session -d 會在YARN叢集內以分離狀態 () 啟動 Flink 工作階段-d。如需引數詳細資訊,請參閱最新 Flink 文件中的YARN設定

使用 提交現有叢集上的 Flink 任務 AWS CLI
  • 使用 add-steps 命令將 Flink 作業新增至長時間執行的叢集。下列範例命令指定在分離狀態 () 的YARN叢集內Args="flink-yarn-session", "-d"啟動 Flink 工作階段-d。如需引數詳細資訊,請參閱最新的 Flink 文件中的YARN設定

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

如果您在長時間執行的叢集上已有現有的 Flink 應用程式,則可以指定叢集的 Flink 應用程式 ID 以便向其提交工作。若要取得應用程式 ID,請在 yarn application -list上執行 AWS CLI 或透過 YarnClientAPI操作執行:

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

此 Flink 工作階段的應用程式 ID 為 application_1473169569237_0002,您可以使用它從 AWS CLI 或 將工作提交至應用程式SDK。

範例 SDK 適用於 Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://amzn-s3-demo-bucket/pg11.txt", "--output", "s3://amzn-s3-demo-bucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
範例 AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://amzn-s3-demo-bucket/pg11.txt","--output","s3://amzn-s3-demo-bucket/alice2/" \ --region <region-code>

下列範例啟動的暫時性叢集會先執行 Flink 作業,然後在完成時終止。

範例 SDK 適用於 Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
範例 AWS CLI

使用 create-cluster 子命令建立暫時性 EMR 叢集,此叢集會在 Flink 作業完成時終止:

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://amzn-s3-demo-bucket/pg11.txt --output s3://amzn-s3-demo-bucket/alice/""

下一個主題:

Flink Scala Shell

上一個主題:

設定 Flink
隱私權網站條款Cookie 偏好設定
© 2025, Amazon Web Services, Inc.或其附屬公司。保留所有權利。