EMRFS S3 최적화 커미터의 요구 사항 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

EMRFS S3 최적화 커미터의 요구 사항

EMRFS S3 최적화 커미터는 다음과 같은 상황에서 사용됩니다.

  • Spark SQL, DataFrames 또는 Dataset를 사용하여 Amazon S3 파일을 쓰는 Spark 작업을 실행할 경우. Amazon EMR 6.4.0부터 이 커미터는 파켓, ORC 및 텍스트 기반 형식 (CSV 및 JSON 포함) 을 포함한 모든 일반적인 형식에 사용할 수 있습니다. Amazon EMR 6.4.0 이전 릴리스 버전의 경우 파켓 형식만 지원됩니다.

  • Amazon EMR에서 멀티파트 업로드를 활성화한 경우 이 값이 기본값입니다. 자세한 정보는 EMRFS S3 최적화 커미터 및 멀티파트 업로드을 참조하십시오.

  • Spark의 기본 제공 Parquet 지원을 사용할 경우. 기본 제공 Parquet 지원은 다음 상황에서 사용됩니다.

    • spark.sql.hive.convertMetastoreParquettrue로 설정된 경우. 이것이 기본 설정입니다.

    • 작업이 Parquet 데이터 원본 또는 테이블에 쓸 때 (예: 대상 테이블은USING parquet절.

    • 작업이 파티션 분할되지 않은 Hive 메타스토어 Parquet 테이블에 쓸 경우. Spark의 기본 제공 Parquet 지원은 파티션 분할된 Hive 테이블을 지원하지 않습니다. 이 제한 사항은 알려져 있습니다. 자세한 내용은 단원을 참조하십시오.하이브 메타 스토어 쪽모이 세공 표 변환아파치 스파크 SQL에서 DataFrames 및 데이터세트 가이드.

  • 기본 파티션 위치에 쓰는 작업 작업 (예:${table_location}/k1=v1/k2=v2/커미터를 사용합니다. 작업 작업이 사용자 정의 파티션 위치에 쓰는 경우 (예: 사용자 정의 파티션 위치가ALTER TABLE SQL명령.

  • Spark에는 다음 값을 사용해야 합니다.

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 속성을 true로 설정해야 합니다. 이 설정은 Amazon EMR 5.20.0 이상에서 기본 설정입니다. Amazon EMR 5.19.0의 경우 기본값은 입니다.false. 이 값의 구성에 대한 자세한 내용은 다음(Amazon EMR 5.19.0에 대해 EMRFS S3 최적화 커미터 활성화)을 참조하십시오.

    • 파티션 분할 되지 않은 Hive 메타스토어 테이블에 쓸 경우 spark.sql.hive.convertMetastoreParquettrue로 설정해야 합니다. 이것이 기본 설정입니다.

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter로 설정해야 합니다. 이것이 기본 설정입니다.

    • spark.sql.sources.commitProtocolClass를 로 설정해야 합니다.org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol또는org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol.org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolEMR 5.x 시리즈 버전 5.30.0 이상과 EMR 6.x 시리즈 버전 6.2.0 이상에 대한 기본 설정입니다.org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol은 (는) 이전 EMR 버전의 기본 설정입니다.

    • Spark 작업이 동적 파티션 열을 포함하는 파티션 분할된 Parquet 데이터 세트를 덮어쓸 경우 partitionOverwriteMode 쓰기 옵션과 spark.sql.sources.partitionOverwriteModestatic으로 설정해야 합니다. 이것이 기본 설정입니다.

      참고

      partitionOverwriteMode 쓰기 옵션은 Spark 2.4.0에서 소개되었습니다. 아마존 EMR 릴리스 5.19.0에 포함된 Spark 버전 2.3.2의 경우spark.sql.sources.partitionOverwriteMode속성입니다.

EMRFS S3 최적화 커미터를 사용하지 않는 경우

일반적으로 EMRFS S3 최적화 커미터는 다음과 같은 상황에서 사용되지 않습니다.

상황 커미터가 사용되지 않는 이유
HDFS에 쓸 때 커미터는 EMRFS를 사용하는 Amazon S3 쓰기만 지원합니다.
S3A 파일 시스템을 사용할 경우 커미터는 EMRFS만 지원합니다.
사용할 경우 MapReduce 또는 Spark의 RDD API 커미터는 SparkSQL, DataFrame 또는 데이터세트 API 사용만 지원합니다.

다음 Scala 예제는 EMRFS S3 최적화 커미터를 전체적으로 (첫 번째 예) 또는 부분적으로 (두 번째 예) 사용하지 못하게하는 몇 가지 추가 상황을 보여줍니다.

예 —동적 파티션 덮어쓰기 모드

다음 스칼라 예제에서는 Spark에 다른 커밋 알고리즘을 사용하도록 지시합니다. 이 알고리즘은 EMRFS S3 최적화 커미터를 완전히 사용하지 않습니다. 코드는 다음을 설정합니다.partitionOverwriteMode속성dynamic데이터를 쓰고 있는 파티션만 덮어씁니다. 그런 다음 동적 파티션 열은 다음과 같이 지정됩니다.partitionBy를 사용하여 쓰기 모드가 로 설정됩니다.overwrite.

EMRFS S3 최적화 커미터를 사용하지 않으려면 세 가지 설정을 모두 구성해야 합니다. 이렇게 하면 Spark가 Spark의 스테이징 디렉터리를 사용하는 여러 커밋 알고리즘을 실행합니다. 이 디렉터리는 으로 시작하는 출력 위치에 생성되는 임시 디렉터리입니다..spark-staging. 알고리즘은 파티션 디렉터리의 이름을 순차적으로 변경하여 성능에 부정적인 영향을 줄 수 있습니다.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

Spark 2.4.0의 알고리즘은 다음 단계를 수행합니다.

  1. 작업 시도는 Spark의 스테이징 디렉터리 아래의 파티션 디렉터리에 출력을 씁니다. 예를 들면 다음과 같습니다.${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. 작성된 각 파티션에 대해 작업 시도는 상대 파티션 경로를 추적합니다 (예:k1=v1/k2=v2.

  3. 작업이 성공적으로 완료되면 추적된 모든 상대적 파티션 경로를 드라이버에 제공합니다.

  4. 모든 작업이 완료된 후 작업 커밋 단계에서 성공적인 작업 시도가 Spark의 스테이징 디렉터리에 쓴 모든 파티션 디렉터리를 수집합니다. Spark는 디렉터리 트리 이름 변경 작업을 사용하여 각 디렉터리의 이름을 순차적으로 최종 출력 위치로 변경합니다.

  5. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.

예 —사용자 지정 파티션 위치

이 예에서 Scala 코드는 두 개의 파티션으로 삽입됩니다. 한 파티션은 사용자 지정 파티션 위치를 갖습니다. 다른 파티션은 기본 파티션 위치를 사용합니다. EMRFS S3 최적화 커미터는 기본 파티션 위치를 사용하는 파티션에 작업 출력을 쓸 때만 사용됩니다.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Scala 코드는 다음과 같은 Amazon S3 객체를 생성합니다.

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

사용자 지정 위치의 파티션에 쓸 때 Spark는 앞의 예와 비슷한 커밋 알고리즘을 사용합니다. 이에 대해서는 아래에서 간단히 설명합니다. 앞의 예와 같이 이 알고리즘은 이름을 순차적으로 변경하여 성능 단계에 부정적인 영향을 줄 수 있습니다.

  1. 사용자 지정 위치의 파티션에 출력을 쓸 경우 작업은 최종 출력 위치에 생성된 Spark의 스테이징 디렉터리에 있는 파일에 씁니다. 파일 이름에는 파일 충돌을 방지하기 위해 무작위 UUID가 포함됩니다. 작업 시도는 각 파일의 추적과, 필요한 최종 출력 경로를 보존합니다.

  2. 작업이 성공적으로 완료되면 드라이버에 파일과 해당 출력 경로를 제공합니다.

  3. 모든 작업이 완료된 후 작업 커밋 단계에서는 사용자 지정 위치에 파티션에 대해 써진 모든 파일의 이름을 최종 출력 경로로 순차적으로 변경합니다.

  4. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.