EMRFS S3 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

EMRFS S3

EMRFS S3 최적화 커미터는 다음과 같은 상황에서 사용됩니다.

  • Spark SQL 또는 데이터 세트를 사용하여 Amazon S3에 파일을 쓰는 Spark 작업을 실행합니다. DataFrames Amazon EMR 6.4.0부터 이 커미터는 쪽모이 세공 마루, ORC 및 텍스트 기반 형식 (CSV 및 JSON 포함) 을 비롯한 모든 일반 형식에 사용할 수 있습니다. Amazon EMR 6.4.0 이전 릴리스 버전의 경우 파켓 형식만 지원됩니다.

  • Amazon EMR에서는 멀티파트 업로드를 사용할 수 있습니다. 이 값이 기본값입니다. 자세한 내용은 EMRFS S3에 최적화된 커미터 및 멀티파트 업로드을 참조하세요.

  • Spark의 기본 제공 Parquet 지원을 사용할 경우. 기본 제공 Parquet 지원은 다음 상황에서 사용됩니다.

    • spark.sql.hive.convertMetastoreParquettrue로 설정된 경우. 이것이 기본 설정입니다.

    • 예를 들어 Parquet 데이터 소스 또는 테이블에 작업을 기록하면USING parquet 절과 함께 대상 테이블이 생성됩니다.

    • 작업이 파티션 분할되지 않은 Hive 메타스토어 Parquet 테이블에 쓸 경우. Spark의 기본 제공 Parquet 지원은 파티션 분할된 Hive 테이블을 지원하지 않습니다. 이 제한 사항은 알려져 있습니다. Apache Spark SQL. DataFrames

  • 기본 파티션 위치에 쓰는 Spark 작업 (예:${table_location}/k1=v1/k2=v2/) 은 커미터를 사용합니다. 커미터는 작업 작업이 사용자 지정 파티션 위치에 쓰는 경우 (예:ALTER TABLE SQL 명령을 사용하여 사용자 지정 파티션 위치를 설정한 경우) 사용되지 않습니다.

  • Spark에는 다음 값을 사용해야 합니다.

    • spark.sql.parquet.fs.optimized.committer.optimization-enabled 속성을 true로 설정해야 합니다. Amazon EMR 5.20.0. Amazon EMR 5.19.0의 경우 기본값은 입니다false. 이 값의 구성에 대한 자세한 내용은 다음(Amazon EMRFS S3)을 참조하십시오.

    • 파티션 분할 되지 않은 Hive 메타스토어 테이블에 쓸 경우 spark.sql.hive.convertMetastoreParquettrue로 설정해야 합니다. 이것이 기본 설정입니다.

    • spark.sql.parquet.output.committer.classcom.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter로 설정해야 합니다. 이것이 기본 설정입니다.

    • spark.sql.sources.commitProtocolClassorg.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol또는 로 설정해야org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol 합니다. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolEMR 5.x 시리즈 버전 5.30.0 이상 및 EMR 6.x 시리즈 버전 6.2.0 이상의 기본 설정입니다. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol이전 EMR 버전의 기본 설정입니다.

    • Spark 작업이 동적 파티션 열을 포함하는 파티션 분할된 Parquet 데이터 세트를 덮어쓸 경우 partitionOverwriteMode 쓰기 옵션과 spark.sql.sources.partitionOverwriteModestatic으로 설정해야 합니다. 이것이 기본 설정입니다.

      참고

      partitionOverwriteMode 쓰기 옵션은 Spark 2.4.0에서 소개되었습니다. Amazon EMR 릴리스 5.19.0에 포함된 스파크 버전 2.3.2의 경우spark.sql.sources.partitionOverwriteMode 속성을 설정하십시오.

EMRFS S3

EMRFS S3

상황 커미터가 사용되지 않는 이유
HDFS에 데이터를 쓰는 경우 커미터는 EMRFS를 사용하여 Amazon S3에 쓰는 것만 지원합니다.
S3A 파일 시스템을 사용하는 경우 커미터는 EMRFS만 지원합니다.
스파크의 RDD API를 사용하는 MapReduce 경우 커미터는 SparkSQL 또는 데이터세트 API 사용만 지원합니다. DataFrame

다음 Scala 예제는 EMRFS S3에 최적화된 커미터가 전체 (첫 번째 예제) 및 일부 (두 번째 예제) 사용되지 못하게 하는 몇 가지 추가 상황을 보여줍니다.

예 —동적 파티션 덮어쓰기 모드

다음 Scala 예제는 Spark에 다른 커밋 알고리즘을 사용하도록 지시하여 EMRFS S3에 최적화된 커밋터를 전혀 사용하지 못하도록 합니다. 코드는 데이터를 쓰고 있는 파티션만dynamic 덮어쓰도록partitionOverwriteMode 속성을 로 설정합니다. 그런 다음 동적 파티션 열을 로partitionBy 지정하고 쓰기 모드를 로 설정합니다overwrite.

EMRFS S3 이렇게 하면 Spark는 Spark의 스테이징 디렉터리를 사용하는 다른 커밋 알고리즘을 실행합니다. 이 디렉터리는 로 시작되는 출력 위치 아래에 생성되는 임시 디렉터리입니다.spark-staging. 알고리즘은 파티션 디렉터리의 이름을 순차적으로 바꾸므로 성능에 부정적인 영향을 미칠 수 있습니다.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://EXAMPLE-DOC-BUCKET/output")

Spark 2.4.0의 알고리즘은 다음 단계를 수행합니다.

  1. 작업 시도는 출력을 Spark의 스테이징 디렉터리 아래에 있는 파티션 디렉터리에 기록합니다 (예:)${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. 작성된 각 파티션에 대해 작업 시도는 상대 파티션 경로를 추적합니다 (예:)k1=v1/k2=v2.

  3. 작업이 성공적으로 완료되면 추적된 모든 상대적 파티션 경로를 드라이버에 제공합니다.

  4. 모든 작업이 완료된 후 작업 커밋 단계에서 성공적인 작업 시도가 Spark의 스테이징 디렉터리에 쓴 모든 파티션 디렉터리를 수집합니다. Spark는 디렉터리 트리 이름 변경 작업을 사용하여 각 디렉터리의 이름을 순차적으로 최종 출력 위치로 변경합니다.

  5. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.

예 —사용자 지정 파티션 위치

이 예에서 Scala 코드는 두 개의 파티션으로 삽입됩니다. 한 파티션은 사용자 지정 파티션 위치를 갖습니다. 다른 파티션은 기본 파티션 위치를 사용합니다. EMRFS S3 최적화 커미터는 기본 파티션 위치를 사용하는 파티션에 작업 출력을 쓸 때만 사용됩니다.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

스칼라 코드는 다음과 같은 Amazon S3 객체를 생성합니다.

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet custom_$folder$ table/_SUCCESS table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet table/dt=2019-01-29_$folder$ table_$folder$

사용자 지정 위치의 파티션에 쓸 때 Spark는 앞의 예와 비슷한 커밋 알고리즘을 사용합니다. 이에 대해서는 아래에서 간단히 설명합니다. 앞의 예와 같이 이 알고리즘은 이름을 순차적으로 변경하여 성능 단계에 부정적인 영향을 줄 수 있습니다.

  1. 사용자 지정 위치의 파티션에 출력을 쓸 경우 작업은 최종 출력 위치에 생성된 Spark의 스테이징 디렉터리에 있는 파일에 씁니다. 파일 이름에는 파일 충돌을 방지하기 위해 무작위 UUID가 포함됩니다. 작업 시도는 각 파일의 추적과, 필요한 최종 출력 경로를 보존합니다.

  2. 작업이 성공적으로 완료되면 드라이버에 파일과 해당 출력 경로를 제공합니다.

  3. 모든 작업이 완료된 후 작업 커밋 단계에서는 사용자 지정 위치에 파티션에 대해 써진 모든 파일의 이름을 최종 출력 경로로 순차적으로 변경합니다.

  4. 스테이징 디렉터리는 작업 커밋 단계가 완료되기 전에 삭제됩니다.