Amazon EMR 内で Flink ジョブを操作する - Amazon EMR

Amazon EMR 内で Flink ジョブを操作する

Amazon EMR 上でコンソールを通じて Flink とやりとりするにはいくつかの方法があります。ResourceManager の UI 追跡にある Flink インターフェイスと、コマンドライン上です。このいずれの場合にも、JAR ファイルを Flink アプリケーションに送信できます。送信した JAR ファイルは Flink JobManager によって管理されるジョブになります。JobManager は、Flink セッションのアプリケーションマスターデーモンをホストする YARN ノードで動作しています。

長期実行のクラスターまたは一時的なクラスターで、YARN ジョブとして Flink アプリケーションを実行できます。長期実行のジョブでは、複数の Flink ジョブを Amazon EMR で実行する 1 つの Flink クラスターに送信できます。一時的なクラスターで Flink ジョブを稼働する場合、Amazon EMR クラスターは Flink アプリケーションを実行するために必要な時間のためだけに存在します。そのため、使用したリソースと費やした時間に対してのみ課金されます。Flink ジョブを送信するには、Amazon EMR AddSteps API オペレーションによって RunJobFlow オペレーションのステップを調整するか、AWS CLI の add-steps または create-cluster コマンドを使用します。

複数のクライアントが YARN API オペレーションを介して作業を送信できる Flink アプリケーションを起動するには、クラスターを作成するか、既存のクラスターに Flink アプリケーションを追加する必要があります。新しいクラスターを作成する手順については、「Flink を使用してクラスターを作成する」を参照してください。既存のクラスターで YARN セッションを開始するには、コンソール、AWS CLI、Java SDK から次のステップに従います。

注記

Amazon EMR バージョン 5.5.0 では、実行を簡素化するための yarn-session.sh スクリプトのラッパーとして、flink-yarn-session コマンドが追加されました。以前のバージョンの Amazon EMR を使用している場合は、コンソールの引数、または AWS CLI コマンドの Argsbash -c "/usr/lib/flink/bin/yarn-session.sh -d" で置き換えます。

コンソールを使用して Flink ジョブを既存のクラスターに送信するには

flink-yarn-session コマンドを使用して、Flink セッションを既存のクラスターに送信します。

  1. Amazon EMR コンソール (https://console.aws.amazon.com/emr) を開きます。

  2. クラスターリストで、以前に起動したクラスターを選択します。

  3. クラスターの詳細ページで、[Steps (ステップ)]、[Add Step (ステップの追加)] の順に選択します。

  4. 次のガイドラインに従ってパラメータを入力し、[追加] を選択します。

    パラメータ 説明

    ステップタイプ

    カスタム JAR

    名前

    ステップを識別するのに役立つ名前。例えば、<example-flink-step-name> などです。

    Jar location

    command-runner.jar

    引数

    flink-yarn-session コマンドとアプリケーションに適切な引数。たとえば、flink-yarn-session -d は、デタッチ状態 (-d) で、 YARN クラスターで Flink セッションを開始します。引数の詳細については、最新の Flink ドキュメントの「YARN Setup (YARN の設定)」を参照してください。

AWS CLI を使用して Flink ジョブを既存のクラスターに送信するには
  • Flink ジョブを長期実行クラスターに追加するには、add-steps コマンドを使用します。次のコマンド例では、YARN クラスター内において Flink セッションが切り離された状態 (-d) で開始されるよう、Args="flink-yarn-session", "-d" を指定しています。引数の詳細については、最新の Flink ドキュメントの「YARN Setup (YARN の設定)」を参照してください。

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

長期実行クラスターに既存の Flink アプリケーションがある場合は、作業を送信するために、クラスターの Flink アプリケーション ID を指定できます。アプリケーション ID を取得するには、AWS CLI で yarn application -list を実行するか、YarnClient API オペレーションを使用します。

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

この Flink セッションのアプリケーション ID は application_1473169569237_0002 であり、これを使用することで、AWS CLI または SDK からアプリケーションに操作内容を送信できます。

例 SDK for Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
例 AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region <region-code>

次の例では Flink ジョブを実行する一時的なクラスターを起動し、完了時に終了します。

例 SDK for Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash","-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
例 AWS CLI

Flink ジョブの完了時に終了する一時的なクラスターを作成するには、create-cluster サブコマンドを使用します。

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""