翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon EMR 内で Flink ジョブを操作する
Amazon EMR 上でコンソールを通じて Flink とやりとりするにはいくつかの方法があります。 ResourceManager トラッキング UI 追跡にある Flink インターフェイスと、コマンドライン上です。これらはすべてで、Flink アプリケーションに JAR ファイルを送信して実行できます。送信されると、JAR ファイルは Flink によって管理されるジョブになります。Flink JobManager は Flink セッションの Application Master デーモンをホストする YARN ノードに配置されています。
長期実行のクラスターまたは一時的なクラスターで、YARN ジョブとして Flink アプリケーションを実行できます。長期実行のジョブでは、複数の Flink ジョブを Amazon EMR で実行する 1 つの Flink クラスターに送信できます。一時的なクラスターで Flink ジョブを稼働する場合、Amazon EMR クラスターは Flink アプリケーションを実行するために必要な時間のためだけに存在します。そのため、使用したリソースと費やした時間に対してのみ課金されます。Amazon EMR AddSteps
API オペレーションを使用して、RunJobFlow
オペレーションに対するステップの調整として、AWS CLI、add-steps
、create-cluster
またはコマンドを介して Flink ジョブを送信できます。
長期実行のクラスターのステップとして、Flink YARN アプリケーションを起動します
複数のクライアントが YARN API オペレーションを介して作業を送信できる Flink アプリケーションを起動するには、クラスターを作成するか、既存のクラスターに Flink アプリケーションを追加する必要があります。新しいクラスターを作成する手順については、「Flink を使用してクラスターを作成する」を参照してください。既存のクラスターで YARN セッションを開始するには、コンソール、AWS CLI、Java SDK から次のステップに従います。
Amazon EMR バージョン 5.5.0 では、実行を簡素化するための yarn-session.sh
スクリプトのラッパーとして、flink-yarn-session
コマンドが追加されました。以前のバージョンの Amazon EMR を使用している場合は、コンソールの引数、または AWS CLI コマンドの Args
を bash -c
"/usr/lib/flink/bin/yarn-session.sh -d"
で置き換えます。
コンソールを使用して既存のクラスターで Flink ジョブを送信するには
既存のクラスターで flink-yarn-session
コマンドを使って、Flink セッションを送信します。
Amazon EMR コンソール (https://console.aws.amazon.com/emr
) を開きます。 -
クラスターリストで、以前に起動したクラスターを選択します。
-
クラスターの詳細ページで、[Steps (ステップ)]、[Add Step (ステップの追加)] の順に選択します。
-
以下のガイドラインを使ってパラメータを入力し、[Add (追加)] を選択します。
パラメータ 説明 ステップタイプ
カスタム JAR 名前
ステップを識別するのに役立つ名前。たとえば、 <example-flink-step-name >
。Jar location
command-runner.jar
引数
flink-yarn-session
コマンドとアプリケーションに適切な引数。たとえば、flink-yarn-session -d
は、デタッチ状態 (-d
) で、YARN クラスターで Flink セッションを開始します。引数の詳細については、最新の Flink ドキュメントの「YARN Setup (YARN の設定)」を参照してください。
AWS CLI を使用して Flink ジョブを既存のクラスターに送信するには
-
add-steps
コマンドを使用して、長期実行クラスターに Flink ジョブを追加します。次のコマンド例では、デタッチ状態 (-d
) で、YARN クラスターで FlinkArgs="flink-yarn-session", "-d"
セッションを開始するように指定します。引数の詳細については、最新の Flink ドキュメントの「YARN Setup (YARN の設定)」を参照してください。 aws emr add-steps --cluster-id
<j-XXXXXXXX>
--steps Type=CUSTOM_JAR,Name=<example-flink-step-name>
,Jar=command-runner.jar,Args="flink-yarn-session","-d"
長期実行クラスター上の既存の Flink アプリケーションに作業を送信する
長期実行クラスターに既存の Flink アプリケーションがある場合は、作業を送信するために、クラスターの Flink アプリケーション ID を指定できます。アプリケーション ID を取得するには、AWS CLIまたは YarnClientyarn application -list
オペレーションを使用して実行します。
$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089
この Flink セッションのアプリケーション ID は application_1473169569237_0002
です。これを使用して、AWS CLI または SDK でアプリケーションに作業を送信できます。
例 SDK for Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("
myClusterId
") .withSteps(stepConfigs));
例 AWS CLI
aws emr add-steps --cluster-id
<j-XXXXXXXX>
\ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region<region-code>
一時的な Flink ジョブを送信する
次の例では Flink ジョブを実行する一時的なクラスターを起動し、完了時に終了します。
例 SDK for Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
例 AWS CLI
Flink ジョブの完了時に終了する一時的なクラスターを作成するには、create-cluster
サブコマンドを使用します。
aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=
<YourKeyName>
,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""