Amazon EMR에서 Flink 작업 사용 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon EMR에서 Flink 작업 사용

Amazon EMR에서 Flink와 상호 작용하는 방법은 여러 가지가 있습니다. 콘솔, 추적 UI에 ResourceManager 있는 Flink 인터페이스 및 명령줄을 사용하는 것입니다. 이들 중 하나를 사용하여 JAR 파일을 Flink 애플리케이션에 제출할 수 있습니다. JAR 파일을 제출하면 이 작업은 Flink에서 관리하는 작업이 됩니다. JobManager JobManager 는 Flink 세션 애플리케이션 마스터 데몬을 호스팅하는 YARN 노드에 있습니다.

또한 Flink 애플리케이션을 장기 실행 YARN 작업이나 임시 클러스터로 실행할 수 있습니다. 장기 실행 클러스터에서는 Amazon EMR 클러스터에서 실행 중인 Flink 클러스터 하나로 여러 Flink 애플리케이션을 제출할 수 있습니다. Flink 작업을 임시 클러스터에서 실행할 경우 Amazon EMR 클러스터는 Flink 애플리케이션을 실행하는 데 걸리는 시간 동안만 존재하므로 사용한 리소스와 시간에 대해서만 요금이 청구됩니다. Amazon EMR AddSteps API 작업으로, RunJobFlow 작업에 대한 단계 인수로, AWS CLI add-steps 또는 create-cluster 명령을 통해 Flink 작업을 제출할 수 있습니다.

여러 클라이언트가 YARN API 작업을 통해 작업을 제출할 수 있는 Flink 애플리케이션을 시작하려면 클러스터를 생성하거나 Flink 애플리케이션을 기존 클러스터에 추가해야 합니다. 새 클러스터를 생성하는 방법에 대한 지침은 Flink를 포함하는 클러스터 생성 부분을 참조하세요. 기존 클러스터에서 YARN 세션을 시작하려면 콘솔, AWS CLI 또는 Java SDK에서 다음 단계를 수행합니다.

참고

flink-yarn-session 명령은 실행을 간소화하기 위한 yarn-session.sh 스크립트 래퍼로 Amazon EMR 버전 5.5.0에 추가되었습니다. 이전 버전의 Amazon EMR을 사용하는 경우 콘솔에서 인수에 대해 bash -c "/usr/lib/flink/bin/yarn-session.sh -d"를 대체하거나 AWS CLI 명령에서 Args를 대체합니다.

콘솔에서 기존 클러스터의 Flink 작업을 제출하는 방법

기존 클러스터에서 flink-yarn-session 명령을 사용하여 Flink 세션을 제출합니다.

  1. https://console.aws.amazon.com/emr 에서 아마존 EMR 콘솔을 엽니다.

  2. 클러스터 목록에서 이전에 시작한 클러스터를 선택합니다.

  3. 클러스터 세부 정보 페이지에서 단계, Add Step(단계 추가)을 선택합니다.

  4. 아래 지침에 따라 파라미터를 입력하고 추가를 선택합니다.

    파라미터 설명

    단계 유형

    사용자 지정 JAR

    이름

    단계 식별을 위한 이름. 예를 들면 example-flink-step-name<>입니다.

    Jar location(Jar 위치)

    command-runner.jar

    인수

    애플리케이션에 따라 여러 가지 인수를 사용하는 flink-yarn-session 명령. 예를 들어, flink-yarn-session -d는 YARN 클러스터 내에서 분리된 상태 () 로 Flink 세션을 시작합니다. -d 인수에 대한 자세한 내용은 최신 Flink 설명서에서 YARN Setup을 참조하세요.

AWS CLI를 사용하여 기존 클러스터에서 Flink 작업을 제출하는 방법
  • add-steps 명령을 사용하여 장기 실행 클러스터에 Flink 작업을 추가합니다. 다음 예제 명령은 YARN 클러스터 내에서 분리된 상태(-d)로 Flink 세션을 시작하도록 Args="flink-yarn-session", "-d"를 지정합니다. 인수에 대한 자세한 내용은 최신 Flink 설명서에서 YARN Setup을 참조하세요.

    aws emr add-steps --cluster-id <j-XXXXXXXX> --steps Type=CUSTOM_JAR,Name=<example-flink-step-name>,Jar=command-runner.jar,Args="flink-yarn-session","-d"

장기 실행 클러스터에 기존 Flink 애플리케이션이 이미 있는 경우 클러스터의 Flink 애플리케이션 ID를 지정하여 클러스터에 작업을 제출할 수 있습니다. 애플리케이션 ID를 얻으려면 다음 yarn application -list 명령에서 AWS CLI 또는 API 작업을 통해 실행하세요. YarnClient

$ yarn application -list 16/09/07 19:32:13 INFO client.RMProxy: Connecting to ResourceManager at ip-10-181-83-19.ec2.internal/10.181.83.19:8032 Total number of applications (application-types: [] and states: [SUBMITTED, ACCEPTED, RUNNING]):1 Application-Id Application-Name Application-Type User Queue State Final-State Progress Tracking-URL application_1473169569237_0002 Flink session with 14 TaskManagers (detached) Apache Flink hadoop default RUNNING UNDEFINED 100% http://ip-10-136-154-194.ec2.internal:33089

이 Flink 세션의 애플리케이션 ID는 application_1473169569237_0002이며, 이를 사용하여 AWS CLI 또는 SDK에서 애플리케이션에 작업을 제출할 수 있습니다.

예 SDK for Java
List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("flink", "run", "-m", "yarn-cluster", "-yid", "application_1473169569237_0002", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://myBucket/pg11.txt", "--output", "s3://myBucket/alice2/"); StepConfig flinkRunWordCount = new StepConfig() .withName("Flink add a wordcount step") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCount); AddJobFlowStepsResult res = emr.addJobFlowSteps(new AddJobFlowStepsRequest() .withJobFlowId("myClusterId") .withSteps(stepConfigs));
예 AWS CLI
aws emr add-steps --cluster-id <j-XXXXXXXX> \ --steps Type=CUSTOM_JAR,Name=Flink_Submit_To_Long_Running,Jar=command-runner.jar,\ Args="flink","run","-m","yarn-cluster","-yid","application_1473169569237_0002",\ "/usr/lib/flink/examples/streaming/WordCount.jar",\ "--input","s3://myBucket/pg11.txt","--output","s3://myBucket/alice2/" \ --region <region-code>

다음 예제에서는 Flink 작업을 실행한 다음 완료 시 종료하는 임시 클러스터를 시작합니다.

예 SDK for Java
import java.util.ArrayList; import java.util.List; import com.amazonaws.AmazonClientException; import com.amazonaws.auth.AWSCredentials; import com.amazonaws.auth.AWSStaticCredentialsProvider; import com.amazonaws.auth.profile.ProfileCredentialsProvider; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduce; import com.amazonaws.services.elasticmapreduce.AmazonElasticMapReduceClientBuilder; import com.amazonaws.services.elasticmapreduce.model.*; public class Main_test { public static void main(String[] args) { AWSCredentials credentials_profile = null; try { credentials_profile = new ProfileCredentialsProvider("default").getCredentials(); } catch (Exception e) { throw new AmazonClientException( "Cannot load credentials from .aws/credentials file. " + "Make sure that the credentials file exists and the profile name is specified within it.", e); } AmazonElasticMapReduce emr = AmazonElasticMapReduceClientBuilder.standard() .withCredentials(new AWSStaticCredentialsProvider(credentials_profile)) .withRegion(Regions.US_WEST_1) .build(); List<StepConfig> stepConfigs = new ArrayList<StepConfig>(); HadoopJarStepConfig flinkWordCountConf = new HadoopJarStepConfig() .withJar("command-runner.jar") .withArgs("bash", "-c", "flink", "run", "-m", "yarn-cluster", "-yn", "2", "/usr/lib/flink/examples/streaming/WordCount.jar", "--input", "s3://path/to/input-file.txt", "--output", "s3://path/to/output/"); StepConfig flinkRunWordCountStep = new StepConfig() .withName("Flink add a wordcount step and terminate") .withActionOnFailure("CONTINUE") .withHadoopJarStep(flinkWordCountConf); stepConfigs.add(flinkRunWordCountStep); Application flink = new Application().withName("Flink"); RunJobFlowRequest request = new RunJobFlowRequest() .withName("flink-transient") .withReleaseLabel("emr-5.20.0") .withApplications(flink) .withServiceRole("EMR_DefaultRole") .withJobFlowRole("EMR_EC2_DefaultRole") .withLogUri("s3://path/to/my/logfiles") .withInstances(new JobFlowInstancesConfig() .withEc2KeyName("myEc2Key") .withEc2SubnetId("subnet-12ab3c45") .withInstanceCount(3) .withKeepJobFlowAliveWhenNoSteps(false) .withMasterInstanceType("m4.large") .withSlaveInstanceType("m4.large")) .withSteps(stepConfigs); RunJobFlowResult result = emr.runJobFlow(request); System.out.println("The cluster ID is " + result.toString()); } }
예 AWS CLI

create-cluster 하위 명령을 사용하여 Flink 작업 완료 시 종료되는 임시 클러스터를 생성합니다.

aws emr create-cluster --release-label emr-5.2.1 \ --name "Flink_Transient" \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --auto-terminate --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=<YourKeyName>,InstanceProfile=EMR_EC2_DefaultRole \ --steps Type=CUSTOM_JAR,Jar=command-runner.jar,Name=Flink_Long_Running_Session,\ Args="bash","-c","\"flink run -m yarn-cluster /usr/lib/flink/examples/streaming/WordCount.jar --input s3://myBucket/pg11.txt --output s3://myBucket/alice/""