아마존에서 플링크 설정하기 EMR - 아마존 EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

아마존에서 플링크 설정하기 EMR

Amazon EMR 릴리스 6.9.0 이상은 하이브에 대한 아파치 플링크 커넥터를 사용하여 하이브 메타스토어와 AWS Glue 카탈로그를 모두 지원합니다. 이 섹션에서는 Flink를 사용하여 AWS Glue 카탈로그Hive 메타스토어를 구성하는 데 필요한 단계를 설명합니다.

  1. 릴리스 6.9.0 이상과 두 개 이상의 애플리케이션 (Hive 및 Flink) 이 포함된 EMR 클러스터를 생성하십시오.

  2. 스크립트 러너를 사용하여 다음 스크립트를 단계 함수로 실행합니다.

    hive-metastore-setup.sh

    sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.
  1. 릴리스 6.9.0 이상과 두 개 이상의 애플리케이션 (Hive 및 Flink) 이 포함된 EMR 클러스터를 생성합니다.

  2. AWS Glue 데이터 카탈로그 설정에서 Hive 테이블 메타데이터에 사용을 선택하여 클러스터에서 데이터 카탈로그를 활성화합니다.

  3. 스크립트 러너를 사용하여 다음 스크립트를 단계 함수로 실행합니다. Amazon EMR 클러스터에서 명령 및 스크립트를 실행합니다.

    glue-catalog-setup.sh

    sudo cp /usr/lib/hive/auxlib/aws-glue-datacatalog-hive3-client.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/antlr-runtime-3.5.2.jar /usr/lib/flink/lib sudo cp /usr/lib/hive/lib/hive-exec-3.1.3*.jar /lib/flink/lib sudo cp /usr/lib/hive/lib/libfb303-0.9.3.jar /lib/flink/lib sudo cp /usr/lib/flink/opt/flink-connector-hive_2.12-1.15.2.jar /lib/flink/lib sudo chmod 755 /usr/lib/flink/lib/aws-glue-datacatalog-hive3-client.jar sudo chmod 755 /usr/lib/flink/lib/antlr-runtime-3.5.2.jar sudo chmod 755 /usr/lib/flink/lib/hive-exec-3.1.3*.jar sudo chmod 755 /usr/lib/flink/lib/libfb303-0.9.3.jar sudo chmod 755 /usr/lib/flink/lib/flink-connector-hive_2.12-1.15.2.jar
    Form to add a Custom JAR step with fields for step type, name, JAR location, arguments, and failure action.

Amazon EMR 구성을 사용하여 구성 파일로 API Flink를 구성할 수 있습니다. 내에서 구성 가능한 파일은 다음과 같습니다. API

  • flink-conf.yaml

  • log4j.properties

  • flink-log4j-session

  • log4j-cli.properties

Flink용 기본 구성 파일은 flink-conf.yaml입니다.

AWS CLI를 사용하여 Flink에 사용되는 작업 슬롯 수를 구성하는 방법
  1. 다음 콘텐츠가 포함된 configurations.json 파일을 생성합니다.

    [ { "Classification": "flink-conf", "Properties": { "taskmanager.numberOfTaskSlots":"2" } } ]
  2. 다음 구성을 사용하여 클러스터를 생성합니다.

    aws emr create-cluster --release-label emr-7.2.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole
참고

APIFlink를 사용하여 일부 구성을 변경할 수도 있습니다. 자세한 내용은 Flink 설명서에서 Concepts를 참조하세요.

Amazon EMR 버전 5.21.0 이상에서는 클러스터 구성을 재정의하고 실행 중인 클러스터의 각 인스턴스 그룹에 대해 추가 구성 분류를 지정할 수 있습니다. Amazon EMR 콘솔, AWS Command Line Interface (AWS CLI) 또는 를 사용하여 이 작업을 수행할 수 AWS SDK 있습니다. 자세한 내용은 실행 중 클러스터의 인스턴스 그룹에 대해 구성 제공을 참조하십시오.

애플리케이션 소유자는 Flink 내에서 작업에 할당할 리소스를 잘 알고 있습니다. 이 설명서의 예제에서는 애플리케이션에 사용되는 태스크 인스턴스와 동일한 개수의 작업을 사용합니다. 일반적으로 이러한 사용은 초기 수준의 병렬화에 권장되지만, 작업 슬롯을 사용하여 더 세부적으로 병렬화할 수도 있습니다. 이 경우 일반적으로 인스턴스당 가상 코어 수를 초과하면 안 됩니다. Flink의 아키텍처에 대한 자세한 내용은 Flink 설명서에서 Concepts를 참조하세요.

Flink는 JobManager 여러 기본 노드가 있는 Amazon EMR 클러스터에서 기본 노드 장애 조치 프로세스 중에도 계속 사용할 수 있습니다. Amazon EMR 5.28.0부터 JobManager 고가용성도 자동으로 활성화됩니다. 수동 구성이 필요하지 않습니다.

Amazon EMR 버전 5.27.0 이하에서는 단일 장애 JobManager 지점이 발생합니다. JobManager 실패하면 모든 작업 상태가 손실되고 실행 중인 작업이 재개되지 않습니다. 다음 예제에서 볼 수 ZooKeeper 있듯이 애플리케이션 시도 횟수, 체크포인트를 구성하고 Flink용 상태 저장소로 활성화하여 JobManager 고가용성을 활성화할 수 있습니다.

[ { "Classification": "yarn-site", "Properties": { "yarn.resourcemanager.am.max-attempts": "10" } }, { "Classification": "flink-conf", "Properties": { "yarn.application-attempts": "10", "high-availability": "zookeeper", "high-availability.zookeeper.quorum": "%{hiera('hadoop::zk')}", "high-availability.storageDir": "hdfs:///user/flink/recovery", "high-availability.zookeeper.path.root": "/flink" } } ]

Flink에 대한 최대 애플리케이션 마스터 시도 횟수와 애플리케이션 시도 횟수를 모두 구성해야 YARN 합니다. 자세한 내용은 YARN클러스터 고가용성 구성을 참조하십시오. 이전에 완료된 체크포인트에서 JobManager 복구 실행 작업을 다시 시작하도록 Flink 체크포인트를 구성할 수도 있습니다. 자세한 내용은 Flink checkpointing을 참조하세요.

Flink 1.11.x를 사용하는 Amazon EMR 버전의 경우 JobManager (jobmanager.memory.process.size) 와 TaskManager (taskmanager.memory.process.size) in 모두에 대해 총 메모리 프로세스 크기를 구성해야 합니다. flink-conf.yaml 구성을 API 사용하여 클러스터를 구성하거나 를 통해 해당 필드의 주석 처리를 수동으로 제거하여 이러한 값을 설정할 수 있습니다. SSH Flink는 다음과 같은 기본값을 제공합니다.

  • jobmanager.memory.process.size: 1,600m

  • taskmanager.memory.process.size: 1,728m

JVM메타스페이스와 오버헤드를 제외하려면 대신 총 Flink 메모리 크기 () 를 사용하십시오. taskmanager.memory.flink.size taskmanager.memory.process.size taskmanager.memory.process.size의 기본값은 1,280m입니다. taskmanager.memory.process.sizetaskmanager.memory.process.size를 모두 설정하는 것은 권장되지 않습니다.

Flink 1.12.0 이상을 사용하는 모든 Amazon EMR 버전은 Flink용 오픈 소스 세트에 Amazon의 기본값으로 설정된 기본값이 있으므로 사용자가 직접 구성할 필요가 없습니다. EMR

Flink 애플리케이션 컨테이너는 .out 파일, .log 파일, .err 파일과 같은 세 가지 유형의 로그 파일을 생성하고 작성합니다. .err 파일만 압축되어 파일 시스템에서 제거되고, .log.out 로그 파일은 파일 시스템에 남습니다. 이러한 출력 파일을 관리할 수 있고 클러스터를 안정적으로 유지하기 위해 최대 파일 수를 설정하고 파일 크기를 제한하도록 log4j.properties에서 로그 로테이션을 구성할 수 있습니다.

아마존 EMR 버전 5.30.0 이상

Amazon EMR 5.30.0부터 Flink는 구성 분류 이름이 있는 log4j2 로깅 프레임워크를 사용합니다. 다음 예제 구성은 log4j2 flink-log4j. 형식을 보여줍니다.

[ { "Classification": "flink-log4j", "Properties": { "appender.main.name": "MainAppender", "appender.main.type": "RollingFile", "appender.main.append" : "false", "appender.main.fileName" : "${sys:log.file}", "appender.main.filePattern" : "${sys:log.file}.%i", "appender.main.layout.type" : "PatternLayout", "appender.main.layout.pattern" : "%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n", "appender.main.policies.type" : "Policies", "appender.main.policies.size.type" : "SizeBasedTriggeringPolicy", "appender.main.policies.size.size" : "100MB", "appender.main.strategy.type" : "DefaultRolloverStrategy", "appender.main.strategy.max" : "10" }, } ]

아마존 EMR 버전 5.29.0 및 이전

아마존 EMR 버전 5.29.0 및 이전 버전에서 Flink는 log4j 로깅 프레임워크를 사용합니다. 다음 구성 예제에서는 log4j 형식을 보여줍니다.

[ { "Classification": "flink-log4j", "Properties": { "log4j.appender.file": "org.apache.log4j.RollingFileAppender", "log4j.appender.file.append":"true", # keep up to 4 files and each file size is limited to 100MB "log4j.appender.file.MaxFileSize":"100MB", "log4j.appender.file.MaxBackupIndex":4, "log4j.appender.file.layout":"org.apache.log4j.PatternLayout", "log4j.appender.file.layout.ConversionPattern":"%d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n" }, } ]

아마존 EMR 릴리스 6.12.0 이상에서는 플링크에 대한 자바 11 런타임 지원을 제공합니다. 다음 섹션에서는 Flink에 대한 Java 11 런타임 지원을 제공하도록 클러스터를 구성하는 방법을 설명합니다.

다음 단계를 사용하여 Flink 및 Java 11 EMR 런타임으로 클러스터를 생성하십시오. Java 11 런타임 지원을 추가하는 구성 파일은 flink-conf.yaml입니다.

Console
콘솔에서 Flink 및 Java 11 런타임을 사용하여 클러스터를 만들려면
  1. 에 AWS Management Console로그인하고 https://console.aws.amazon.com/emr 에서 아마존 EMR 콘솔을 엽니다.

  2. 탐색 창에서 EMREC2on에서 클러스터를 선택한 다음 클러스터 생성을 선택합니다.

  3. Amazon EMR 릴리스 6.12.0 이상을 선택하고 Flink 애플리케이션을 설치하도록 선택합니다. 클러스터에 설치하려는 다른 애플리케이션을 모두 선택합니다.

  4. 클러스터를 계속 설정합니다. 선택적 소프트웨어 설정 섹션에서 기본 구성 입력 옵션을 사용하고 다음 구성을 입력합니다.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  5. 계속 설정하고 클러스터를 시작합니다.

AWS CLI
에서 Flink 및 Java 11 런타임을 사용하여 클러스터를 만들려면 CLI
  1. Java 11을 사용하도록 Flink를 구성하는 구성 파일 configurations.json을 생성합니다.

    [ { "Classification": "flink-conf", "Properties": { "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" } } ]
  2. 에서 Amazon EMR 릴리스 6.12.0 이상이 포함된 새 EMR 클러스터를 생성하고 다음 예와 같이 Flink 애플리케이션을 설치합니다. AWS CLI

    aws emr create-cluster --release-label emr-6.12.0 \ --applications Name=Flink \ --configurations file://./configurations.json \ --region us-east-1 \ --log-uri s3://myLogUri \ --instance-type m5.xlarge \ --instance-count 2 \ --service-role EMR_DefaultRole_V2 \ --ec2-attributes KeyName=YourKeyName,InstanceProfile=EMR_EC2_DefaultRole

다음 단계를 사용하여 실행 중인 EMR 클러스터를 Flink 및 Java 11 런타임으로 업데이트하십시오. Java 11 런타임 지원을 추가하는 구성 파일은 flink-conf.yaml입니다.

Console
콘솔에서 Flink 및 Java 11 런타임으로 실행 중인 클러스터를 업데이트하려면
  1. 에 AWS Management Console로그인하고 https://console.aws.amazon.com/emr 에서 아마존 EMR 콘솔을 엽니다.

  2. 탐색 창에서 [EMRon] EC2 에서 [Clusters] 를 선택한 다음 업데이트하려는 클러스터를 선택합니다.

    참고

    클러스터가 Java 11을 지원하려면 Amazon EMR 릴리스 6.12.0 이상을 사용해야 합니다.

  3. 구성 탭을 선택합니다.

  4. 인스턴스 그룹 구성 섹션에서 업데이트하려는 실행 중인 인스턴스 그룹을 선택한 다음, 목록 작업 메뉴에서 재구성을 선택합니다.

  5. 다음과 같이 속성 편집 옵션을 사용하여 인스턴스 그룹을 재구성합니다. 각 항목 이후에 새 구성 추가를 선택합니다.

    분류 속성

    flink-conf

    containerized.taskmanager.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    containerized.master.env.JAVA_HOME

    /usr/lib/jvm/jre-11

    flink-conf

    env.java.home

    /usr/lib/jvm/jre-11

  6. 변경 내용 저장을 선택하여 구성을 추가합니다.

AWS CLI
Flink 및 Java 11 런타임을 사용하도록 실행 중인 클러스터를 업데이트하려면 CLI

modify-instance-groups 명령을 사용하여 실행 중인 클러스터의 인스턴스 그룹에 대해 새 구성을 지정합니다.

  1. 먼저 Java 11을 사용하도록 Flink를 구성하는 configurations.json 구성 파일을 생성합니다. 다음 예제에서는 다음을 대체합니다.ig-1xxxxxxx9 재구성하려는 인스턴스 그룹의 ID를 입력합니다. modify-instance-groups 명령을 실행하는 동일한 디렉터리에 파일을 저장합니다.

    [ { "InstanceGroupId":"ig-1xxxxxxx9", "Configurations":[ { "Classification":"flink-conf", "Properties":{ "containerized.taskmanager.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "containerized.master.env.JAVA_HOME":"/usr/lib/jvm/jre-11", "env.java.home":"/usr/lib/jvm/jre-11" }, "Configurations":[] } ] } ]
  2. 에서 AWS CLI다음 명령을 실행합니다. 재구성하려는 인스턴스 그룹의 ID를 바꿉니다.

    aws emr modify-instance-groups --cluster-id j-2AL4XXXXXX5T9 \ --instance-groups file://configurations.json

실행 중인 클러스터의 Java 런타임을 확인하려면 다음을 사용하여 기본 노드에 연결에 설명된 SSH 대로 기본 노드에 SSH 로그인합니다. 그런 다음, 다음 명령을 실행합니다.

ps -ef | grep flink

-ef 옵션과 함께 ps 명령은 시스템에서 실행 중인 모든 프로세스를 나열합니다. grep로 해당 출력을 필터링하여 flink 문자열에 대한 언급을 찾을 수 있습니다. Java 런타임 환경 (JRE) 값에 대한 출력을 jre-XX 검토하십시오. 다음 출력에서 jre-11은 런타임에 Flink용으로 Java 11이 선택되었음을 나타냅니다.

flink    19130     1  0 09:17 ?        00:00:15 /usr/lib/jvm/jre-11/bin/java -Djava.io.tmpdir=/mnt/tmp -Dlog.file=/usr/lib/flink/log/flink-flink-historyserver-0-ip-172-31-32-127.log -Dlog4j.configuration=file:/usr/lib/flink/conf/log4j.properties -Dlog4j.configurationFile=file:/usr/lib/flink/conf/log4j.properties -Dlogback.configurationFile=file:/usr/lib/flink/conf/logback.xml -classpath /usr/lib/flink/lib/flink-cep-1.17.0.jar:/usr/lib/flink/lib/flink-connector-files-1.17.0.jar:/usr/lib/flink/lib/flink-csv-1.17.0.jar:/usr/lib/flink/lib/flink-json-1.17.0.jar:/usr/lib/flink/lib/flink-scala_2.12-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-java-uber-1.17.0.jar:/usr/lib/flink/lib/flink-table-api-scala-bridge_2.12-1.17.0.

또는 를 사용하여 기본 노드에 SSH 로그인하고 명령을 flink-yarn-session -d 사용하여 Flink YARN 세션을 시작할 수도 있습니다. 출력에는 다음 java-11-amazon-corretto 예제에서 Flink용 자바 가상 머신 (JVM) 이 표시됩니다.

2023-05-29 10:38:14,129 INFO  org.apache.flink.configuration.GlobalConfiguration           [] - Loading configuration property: containerized.master.env.JAVA_HOME, /usr/lib/jvm/java-11-amazon-corretto.x86_64