EMRFS S3-optimized 커밋에 대한 요구 사항 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

EMRFS S3-optimized 커밋에 대한 요구 사항

EMRFS S3-optimized 커밋은 다음 조건이 충족될 때 사용됩니다.

  • Spark SQL, DataFrames또는 데이터 세트를 사용하여 Amazon S3에 파일을 쓰는 Spark 작업을 실행합니다. Amazon EMR 6.4.0부터 이 커밋은 parquet, 및 텍스트 기반 형식( ORC및 포함)을 포함한 모든 일반적인 형식에 사용할 수 있습니다CSVJSON. Amazon EMR 6.4.0 이전 릴리스의 경우 Parquet 형식만 지원됩니다.

  • Amazon EMR 에서 멀티파트 업로드가 활성화됩니다. 이 값이 기본값입니다. 자세한 내용은 EMRFS S3-optimized 커밋자 및 멀티파트 업로드 단원을 참조하십시오.

  • Spark의 기본 제공 파일 형식 지원이 사용됩니다. 기본 제공 파일 형식 지원은 다음 상황에서 사용됩니다.

    • Hive 메타스토어 테이블의 경우 spark.sql.hive.convertMetastoreParquet가 Parquet 테이블에 true 대해 로 설정된 경우 또는 spark.sql.hive.convertMetastoreOrc가 Amazon 6.4.0 이상이 포함된 Orc 테이블에 true 대해 EMR 로 설정된 경우. 기본 설정입니다.

    • 예를 들어, 작업을 파일 형식 데이터 소스 또는 테이블에 쓰는 경우 USING parquet 절과 함께 대상 테이블이 생성됩니다.

    • 작업이 파티션 분할되지 않은 Hive 메타스토어 Parquet 테이블에 쓸 경우. Spark의 기본 제공 Parquet 지원은 파티션 분할된 Hive 테이블을 지원하지 않습니다. 이 제한 사항은 알려져 있습니다. 자세한 내용은 Apache Spark SQL DataFrames 및 데이터 세트 안내서의 Hive 메타스토어 Parquet 테이블 변환을 참조하세요.

  • 기본 파티션 위치(예: ${table_location}/k1=v1/k2=v2/)에 쓰는 Spark 작업 작업에서는 커미터를 사용합니다. 작업이 사용자 지정 파티션 위치에 쓰는 경우(예: ALTER TABLE SQL 명령을 사용하여 사용자 지정 파티션 위치를 설정하는 경우) 커미터는 사용되지 않습니다.

  • Spark에는 다음 값을 사용해야 합니다.

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 속성을 true로 설정해야 합니다. Amazon EMR 5.20.0 이상의 기본 설정입니다. Amazon EMR 5.19.0에서 기본값은 입니다false. 이 값의 구성에 대한 자세한 내용은 다음(Amazon EMR5.19.0에 대한 EMRFS S3 최적화 커밋 활성화 S3-optimized )을 참조하십시오.

    • 파티셔닝되지 않은 Hive 메타스토어 테이블에 쓰는 경우 Parquet 및 Orc 파일 형식만 지원됩니다. 파티셔닝되지 않은 Parquet Hive 메타스토어 테이블에 쓰는 true 경우 를 로 설정해야 spark.sql.hive.convertMetastoreParquet 합니다. 파티셔닝되지 않은 Orc Hive 메타스토어 테이블에 쓰는 true 경우 를 로 설정해야 spark.sql.hive.convertMetastoreOrc 합니다. 기본 설정입니다.

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter로 설정해야 합니다. 이것이 기본 설정입니다.

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol 또는 로 설정해야 합니다org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol는 Amazon EMR 5.x 시리즈 버전 5.30.0 이상의 기본 설정이고, 는 Amazon EMR 6.x 시리즈 버전 6.2.0 이상의 기본 설정입니다. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol는 이전 Amazon EMR 버전의 기본 설정입니다.

    • Spark 작업이 동적 파티션 열을 포함하는 파티션 분할된 Parquet 데이터 세트를 덮어쓸 경우 partitionOverwriteMode 쓰기 옵션과 spark.sql.sources.partitionOverwriteModestatic으로 설정해야 합니다. 이것이 기본 설정입니다.

      참고

      partitionOverwriteMode 쓰기 옵션은 Spark 2.4.0에서 소개되었습니다. Amazon EMR 릴리스 5.19.0에 포함된 Spark 버전 2.3.2의 경우 spark.sql.sources.partitionOverwriteMode 속성을 설정합니다.

EMRFS S3-optimized 커밋을 사용하지 않는 경우

일반적으로 EMRFS S3-optimized 커밋은 다음 상황에서 사용되지 않습니다.

상황 커미터가 사용되지 않는 이유
에 쓸 때 HDFS 커밋자는 를 사용하여 Amazon S3에 쓰기만 지원합니다EMRFS.
S3A 파일 시스템을 사용하는 경우 커밋자는 만 지원합니다EMRFS.
MapReduce 또는 Spark를 사용하는 경우 RDD API 커밋자는 Spark SQL DataFrame, 또는 데이터 세트 만 사용할 수 있습니다APIs.

다음 Scala 예제에서는 EMRFS S3-optimized 커미터가 전체적으로(첫 번째 예제) 및 부분적으로(두 번째 예제) 사용되지 않는 몇 가지 추가 상황을 보여줍니다.

예 - 동적 파티션 덮어쓰기 모드

다음 Scala 예제에서는 Spark에 EMRFS S3-optimized 커밋자 사용을 완전히 방지하는 다른 커밋 알고리즘을 사용하도록 지시합니다. 코드는 데이터를 쓰는 해당 파티션만 덮어쓰도록 partitionOverwriteMode 속성을 dynamic으로 설정합니다. 그러면 동적 파티션 열이 partitionBy에 의해 지정되고 쓰기 모드가 overwrite로 설정됩니다.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

EMRFS S3-optimized 커미터를 사용하지 않도록 세 가지 설정을 모두 구성해야 합니다. 이렇게 하면 Spark는 Spark의 커밋 프로토콜에 지정된 다른 커밋 알고리즘을 실행합니다. Amazon EMR 5.x 릴리스가 5.30.0 이전이고 Amazon EMR 6.x 릴리스가 6.2.0 이전인 경우 커밋 프로토콜은 Spark의 스테이징 디렉터리를 사용합니다. 이 디렉터리는 로 시작하는 출력 위치 아래에 생성된 임시 디렉터리입니다.spark-staging. 알고리즘은 파티션 디렉터리의 이름을 순차적으로 변경하여 성능을 떨어뜨릴 수 있습니다. Amazon EMR 릴리스 5.30.0 이상 및 6.2.0 이상에 대한 자세한 내용은 섹션을 참조하세요EMRFS S3-optimized 커밋 프로토콜 사용.

Spark 2.4.0의 알고리즘은 다음 단계를 수행합니다.

  1. 작업 시도에서 출력을 Spark의 스테이징 디렉터리 아래의 파티션 디렉터리(예: ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/)에 작성합니다.

  2. 작성된 각 파티션에 대해 작업 시도에서 상대 파티션 경로(예: k1=v1/k2=v2)를 추적합니다.

  3. 작업이 성공적으로 완료되면 추적된 모든 상대적 파티션 경로를 드라이버에 제공합니다.

  4. 모든 작업이 완료된 후 작업 커밋 단계에서 성공적인 작업 시도가 Spark의 스테이징 디렉터리에 쓴 모든 파티션 디렉터리를 수집합니다. Spark는 디렉터리 트리 이름 변경 작업을 사용하여 각 디렉터리의 이름을 순차적으로 최종 출력 위치로 변경합니다.

  5. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.

예 - 사용자 지정 파티션 위치

이 예에서 Scala 코드는 두 개의 파티션으로 삽입됩니다. 한 파티션은 사용자 지정 파티션 위치를 갖습니다. 다른 파티션은 기본 파티션 위치를 사용합니다. EMRFS S3-optimized 커밋은 기본 파티션 위치를 사용하는 파티션에 태스크 출력을 쓰는 데만 사용됩니다.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala 코드는 다음 Amazon S3 객체를 생성합니다.

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

사용자 지정 위치의 파티션에 쓸 때 Spark는 앞의 예와 비슷한 커밋 알고리즘을 사용합니다. 이에 대해서는 아래에서 간단히 설명합니다. 앞의 예제와 같이 이 알고리즘은 이름을 순차적으로 변경하여 성능에 부정적인 영향을 줄 수 있습니다.

  1. 사용자 지정 위치의 파티션에 출력을 쓸 경우 작업은 최종 출력 위치에 생성된 Spark의 스테이징 디렉터리에 있는 파일에 씁니다. 파일 이름에는 파일 충돌을 방지하기 UUID 위한 무작위가 포함됩니다. 작업 시도는 각 파일의 추적과, 필요한 최종 출력 경로를 보존합니다.

  2. 작업이 성공적으로 완료되면 드라이버에 파일과 해당 출력 경로를 제공합니다.

  3. 모든 작업이 완료된 후 작업 커밋 단계에서는 사용자 지정 위치에 파티션에 대해 써진 모든 파일의 이름을 최종 출력 경로로 순차적으로 변경합니다.

  4. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.