셔플 최적화 -

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

셔플 최적화

join()및 와 같은 특정 작업의 경우 Spark에서 groupByKey() 셔플을 수행해야 합니다. 셔플은 데이터를 재분배하여 파티션 간에 다르게 그룹화되도록 하는 Spark의 메커니즘입니다. RDD 셔플링은 성능 병목 현상을 해결하는 데 도움이 될 수 있습니다. 그러나 셔플은 일반적으로 Spark 실행기 간에 데이터를 복사하는 것이므로 셔플은 복잡하고 비용이 많이 드는 작업입니다. 예를 들어 셔플을 사용하면 다음과 같은 비용이 발생합니다.

  • 디스크 I/O:

    • 디스크에 많은 수의 중간 파일을 생성합니다.

  • 네트워크 I/O:

    • 많은 네트워크 연결이 필요합니다 (연결 수 =Mapper × Reducer).

    • 레코드는 다른 Spark 실행기에서 호스팅될 수 있는 새 RDD 파티션에 집계되므로 데이터세트의 상당 부분이 네트워크를 통해 Spark 실행기 간에 이동할 수 있습니다.

  • CPU및 메모리 로드:

    • 값을 정렬하고 데이터 세트를 병합합니다. 이러한 작업은 실행자에서 계획되므로 실행자에 많은 부하가 가해집니다.

셔플은 Spark 애플리케이션 성능 저하의 가장 큰 요인 중 하나입니다. 중간 데이터를 저장하는 동안 실행기의 로컬 디스크 공간이 고갈되어 Spark 작업이 실패할 수 있습니다.

셔플 성능은 CloudWatch 메트릭과 Spark UI에서 평가할 수 있습니다.

CloudWatch 메트릭스

셔플 바이트 쓰기 값이 읽은 바이트와 비교하여 높으면 Spark 작업에서 또는 같은 셔플 연산을 사용할 수 있습니다. join() groupByKey()

쓰여진 셔플 바이트의 급증을 보여주는 실행자 간 데이터 셔플 (바이트) 그래프입니다.

Spark UI

Spark UI의 스테이지 탭에서 셔플 읽기 크기/레코드 값을 확인할 수 있습니다. Executors 탭에서도 확인할 수 있습니다.

다음 스크린샷에서 각 실행기는 셔플 프로세스를 통해 약 18.6GB/4020000개의 레코드를 교환하며 총 셔플 읽기 크기는 약 75GB입니다.

Shuffle Spill (Disk) 열에는 디스크로 유출된 많은 양의 데이터 메모리가 표시되어 디스크가 가득 차거나 성능 문제가 발생할 수 있습니다.

""

이러한 증상이 나타나고 스테이지가 성능 목표에 비해 너무 오래 Out Of Memory 걸리거나 No space left on device 오류와 함께 실패하는 경우 다음 해결 방법을 고려해 보십시오.

조인을 최적화하세요.

테이블을 조인하는 join() 작업은 가장 일반적으로 사용되는 셔플 작업이지만 성능 병목 현상이 발생하는 경우가 많습니다. 조인은 비용이 많이 드는 작업이므로 비즈니스 요구 사항에 꼭 필요한 경우가 아니면 사용하지 않는 것이 좋습니다. 다음 질문을 통해 데이터 파이프라인을 효율적으로 사용하고 있는지 다시 한 번 확인하세요.

  • 재사용할 수 있는 다른 작업에서도 수행되는 조인을 다시 계산하고 있습니까?

  • 출력 시 소비자가 사용하지 않는 값으로 외래 키를 해석하는 데 참여하고 계신가요?

조인 작업이 비즈니스 요구 사항에 필수적인지 확인한 후 요구 사항에 맞는 방식으로 조인을 최적화하기 위한 다음 옵션을 참조하십시오.

가입하기 전에 푸시다운을 사용하세요.

조인을 수행하기 DataFrame 전에 에서 불필요한 행과 열을 걸러냅니다. 여기에는 다음과 같은 이점이 있습니다.

  • 셔플 중에 전송되는 데이터 양을 줄입니다.

  • Spark 실행기의 처리량을 줄입니다.

  • 데이터 스캔량을 줄입니다.

# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])

DataFrame Join 사용

또는 조인 대신 SparkSQL, DataFrame, Datasets와 API 같은 Spark 상위 레벨을 사용해 보세요. RDD API DynamicFrame DataFrame 와 같은 메서드 DynamicFrame 호출을 사용하여 변환할 수 있습니다. dyf.toDF() Apache Spark의 주요 항목에서 설명한 것처럼 이러한 조인 작업은 내부적으로 Catalyst 최적화 프로그램의 쿼리 최적화를 활용합니다.

셔플 및 브로드캐스트 해시 조인 및 힌트

Spark는 셔플 조인과 브로드캐스트 해시 조인이라는 두 가지 유형의 조인을 지원합니다. 브로드캐스트 해시 조인은 셔플이 필요하지 않으며 셔플 조인보다 처리가 덜 필요할 수 있습니다. 하지만 작은 테이블을 큰 테이블에 조인하는 경우에만 적용됩니다. 단일 Spark 실행자의 메모리에 들어갈 수 있는 테이블을 조인할 때는 브로드캐스트 해시 조인을 사용하는 것이 좋습니다.

다음 다이어그램은 브로드캐스트 해시 조인과 셔플 조인의 상위 구조 및 단계를 보여줍니다.

테이블과 조인된 테이블을 직접 연결하는 브로드캐스트 조인과 테이블과 조인된 테이블 사이에 두 개의 셔플 단계가 있는 셔플 조인.

각 조인의 세부 정보는 다음과 같습니다.

  • 셔플 조인:

    • 셔플 해시 조인은 정렬 없이 두 테이블을 조인하고 두 테이블 간에 조인을 분산합니다. Spark 실행기의 메모리에 저장할 수 있는 작은 테이블을 조인하는 데 적합합니다.

    • sort-merge 조인은 조인할 두 테이블을 키로 분산하고 조인 전에 정렬합니다. 대형 테이블을 조인하는 데 적합합니다.

  • 브로드캐스트 해시 조인:

    • 브로드캐스트 해시 조인은 더 작은 RDD OR 테이블을 각 워커 노드로 푸시합니다. 그런 다음 더 큰 OR 테이블의 각 파티션과 맵 측 결합을 수행합니다. RDD

      테이블 RDDs 또는 테이블 중 하나를 메모리에 담을 수 있거나 메모리에 맞게 만들 수 있는 경우 조인에 적합합니다. 브로드캐스트 해시 조인은 셔플이 필요하지 않으므로 가능하면 수행하는 것이 좋습니다. 다음과 같이 조인 힌트를 사용하여 Spark에 브로드캐스트 조인을 요청할 수 있습니다.

      # DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

      조인 힌트에 대한 자세한 내용은 조인 힌트를 참조하십시오.

AWS Glue 3.0 이상에서는 적응형 쿼리 실행 및 추가 매개변수를 활성화하여 브로드캐스트 해시 조인을 자동으로 활용할 수 있습니다. 적응형 쿼리 실행은 어느 쪽 조인의 런타임 통계가 적응형 브로드캐스트 해시 조인 임계값보다 작을 때 정렬-병합 조인을 브로드캐스트 해시 조인으로 변환합니다.

AWS Glue 3.0에서는 설정을 통해 적응형 쿼리 실행을 활성화할 수 있습니다. spark.sql.adaptive.enabled=true 적응형 쿼리 실행은 AWS Glue 4.0에서 기본적으로 활성화되어 있습니다.

셔플 및 브로드캐스트 해시 조인과 관련된 추가 파라미터를 설정할 수 있습니다.

  • spark.sql.adaptive.localShuffleReader.enabled

  • spark.sql.adaptive.autoBroadcastJoinThreshold

관련 파라미터에 대한 자세한 내용은 sort-merge 조인을 브로드캐스트 조인으로 변환을 참조하십시오.

AWS Glue 3.0 이상에서는 셔플용 다른 조인 힌트를 사용하여 동작을 조정할 수 있습니다.

-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

버켓팅 사용하기

정렬-병합 조인에는 셔플과 정렬, 병합이라는 두 단계가 필요합니다. 이 두 단계는 Spark 실행기에 과부하가 걸리고 일부 실행기가 병합되고 다른 실행기가 OOM 동시에 정렬될 때 성능 문제가 발생할 수 있습니다. 이런 경우에는 버켓팅을 사용하여 효율적으로 조인할 수 있습니다. 버킷팅은 조인 키의 입력을 미리 섞고 미리 정렬한 다음 정렬된 데이터를 중간 테이블에 기록합니다. 정렬된 중간 테이블을 미리 정의하여 대형 테이블을 조인할 때 셔플 및 정렬 단계의 비용을 줄일 수 있습니다.

정렬-병합 조인에는 추가 셔플 및 정렬 단계가 있습니다.

버킷이 있는 테이블은 다음과 같은 경우에 유용합니다.

  • 데이터는 다음과 같이 동일한 키를 통해 자주 조인됩니다. account_id

  • 일별 누적 테이블 (예: 공통 열에 버킷으로 지정할 수 있는 기본 및 델타 테이블) 로드

다음 코드를 사용하여 버킷이 있는 테이블을 만들 수 있습니다.

df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")

조인 전에 조인 키를 다시 DataFrames 파티셔닝합니다.

조인 전에 조인 DataFrames 키에서 두 키를 다시 분할하려면 다음 명령문을 사용하십시오.

df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")

이렇게 하면 조인을 시작하기 전에 조인 RDDs 키에서 두 개의 파티션 (여전히 분리되어 있음) 이 분할됩니다. 두 RDDs 레코드가 동일한 키로 동일한 파티셔닝 코드를 사용하여 파티셔닝된 경우, 함께 조인하려는 RDD 레코드가 조인을 위해 섞이기 전에 동일한 워커에 같은 위치에 배치될 가능성이 높습니다. 이렇게 하면 조인 중에 네트워크 활동 및 데이터 왜곡이 줄어들어 성능이 향상될 수 있습니다.

데이터 편차를 극복하세요.

데이터 왜곡은 Spark 작업 병목 현상의 가장 일반적인 원인 중 하나입니다. 데이터가 파티션 전체에 균일하게 분산되지 않을 때 발생합니다. RDD 이로 인해 해당 파티션의 작업이 다른 파티션보다 훨씬 오래 걸리고 응용 프로그램의 전체 처리 시간이 지연됩니다.

데이터 편향을 식별하려면 Spark UI에서 다음 지표를 평가하세요.

  • Spark UI의 스테이지 탭에서 이벤트 타임라인 페이지를 살펴보세요. 다음 스크린샷에서 작업이 고르지 않게 분산되어 있는 것을 확인할 수 있습니다. 작업이 고르지 않게 분산되거나 실행 시간이 너무 오래 걸리는 것은 데이터 왜곡을 의미할 수 있습니다.

    한 작업의 실행자 계산 시간이 다른 작업보다 훨씬 더 깁니다.
  • 또 다른 중요한 페이지는 Spark 작업에 대한 통계를 보여주는 요약 지표입니다. 다음 스크린샷은 지속 시간, GC 시간, 유출 (메모리), 유출 (디스크) 등에 대한 백분위수가 포함된 메트릭을 보여줍니다.

    기간 행이 강조 표시된 요약 측정치 표

    작업이 균등하게 분배되면 모든 백분위수에서 비슷한 수치를 볼 수 있습니다. 데이터 편차가 있는 경우 각 백분위수에서 매우 편향된 값을 볼 수 있습니다. 이 예제에서 작업 지속 시간은 최소, 25번째 백분위수, 중앙값, 75번째 백분위수 단위로 13초 미만입니다. Max 작업은 75번째 백분위수보다 100배 많은 데이터를 처리했지만, 6.4분이라는 시간은 약 30배 더 깁니다. 즉, 하나 이상의 작업 (또는 작업의 최대 25%) 이 나머지 작업보다 훨씬 오래 걸렸습니다.

데이터 왜곡이 보이면 다음을 시도해 보세요.

  • AWS Glue 3.0을 사용하는 경우 설정을 spark.sql.adaptive.enabled=true 통해 적응형 쿼리 실행을 활성화하십시오. 적응형 쿼리 실행은 AWS Glue 4.0에서 기본적으로 활성화됩니다.

    다음과 같은 관련 매개변수를 설정하여 조인으로 인한 데이터 왜곡에 대해 적응형 쿼리 실행을 사용할 수도 있습니다.

    • spark.sql.adaptive.skewJoin.skewedPartitionFactor

    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    • spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)

    • spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)

    자세한 내용은 Apache Spark 설명서를 참조하십시오.

  • 조인 키에는 값 범위가 넓은 키를 사용하십시오. 셔플 조인에서는 키의 각 해시 값에 대해 파티션이 결정됩니다. 조인 키의 카디널리티가 너무 낮으면 해시 함수가 파티션 간에 데이터를 분산시키는 잘못된 역할을 할 가능성이 높습니다. 따라서 애플리케이션과 비즈니스 로직에서 지원하는 경우 더 높은 카디널리티 키나 복합 키를 사용하는 것이 좋습니다.

    # Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])

캐시 사용

반복적으로 DataFrames 사용할 때는 계산 결과를 각 Spark 실행기의 메모리와 디스크에 df.cache() 캐시하거나 df.persist() 사용하여 추가 셔플 또는 계산을 수행하지 마십시오. 또한 Spark는 RDDs 디스크에 보관하거나 여러 노드에 걸쳐 복제하는 기능 (스토리지 수준) 을 지원합니다.

예를 들어, 추가하여 지속할 수 있습니다. DataFrames df.persist() 캐시가 더 이상 필요하지 않은 경우 를 unpersist 사용하여 캐시된 데이터를 삭제할 수 있습니다.

df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()

불필요한 Spark 액션을 제거합니다.

countshow, 또는 같은 불필요한 작업은 실행하지 마세요. collect Apache Spark의 주요 항목 섹션에서 설명한 것처럼 Spark는 게으르다. 변환된 각 변환은 작업을 실행할 때마다 다시 RDD 계산될 수 있습니다. 많은 Spark 액션을 사용하는 경우 각 액션에 대한 다중 소스 액세스, 작업 계산 및 셔플 실행이 호출됩니다.

상용 환경에서 필요하지 collect() 않거나 다른 작업이 없는 경우 해당 작업을 제거하는 것이 좋습니다.

참고

상업용 collect() 환경에서는 Spark를 최대한 사용하지 마세요. 이 collect() 액션은 Spark 실행기의 모든 계산 결과를 Spark 드라이버로 반환하며, 이로 인해 Spark 드라이버에서 오류가 반환될 수 있습니다. OOM OOM오류를 방지하기 위해 Spark는 spark.driver.maxResultSize = 1GB 기본적으로 Spark 드라이버에 반환되는 최대 데이터 크기를 1GB로 제한하도록 설정합니다.