优化洗牌 -

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

优化洗牌

某些操作(例如join()groupByKey())需要 Spark 执行随机播放。shuffle 是 Spark 用于重新分配数据的机制,以便在分区之间RDD对数据进行不同的分组。洗牌可以帮助修复性能瓶颈。但是,由于洗牌通常涉及在 Spark 执行器之间复制数据,因此洗牌是一项复杂且成本高昂的操作。例如,洗牌会产生以下成本:

  • 磁盘 I/O:

    • 在磁盘上生成大量中间文件。

  • 网络 I/O:

    • 需要多个网络连接(连接数 =Mapper × Reducer)。

    • 由于记录会聚合到可能托管在不同的 Spark 执行器上的新RDD分区,因此数据集的很大一部分可能会通过网络在 Spark 执行器之间移动。

  • CPU和内存负载:

    • 对值进行排序并合并数据集。这些操作是在执行者身上计划的,这给执行者带来了沉重的负担。

Shuffle 是导致 Spark 应用程序性能下降的最重要因素之一。在存储中间数据时,它可能会耗尽执行程序本地磁盘上的空间,从而导致 Spark 作业失败。

你可以通过 CloudWatch 指标和 Spark UI 来评估你的 shuffle 表现。

CloudWatch 指标

如果 Shuffle Bytes W rited 值与 Shuffle Bytes Re ad 相比高,则你的 Spark 作业可能会使用洗牌操作,例如或join() groupByKey()

数据跨执行器洗牌(字节)图显示了写入的随机字节的峰值。

Spark UI

在 Spark 用户界面的舞台选项卡上,你可以查看 Shuffle 读取大小/记录值。您也可以在 “执行者” 选项卡上看到它。

在以下屏幕截图中,每个执行者与洗牌过程交换了大约 18.6GB/40200000 条记录,总随机读取大小约为 75 GB)。

Shuffle Spill(磁盘)列显示大量数据溢出到磁盘,这可能会导致磁盘已满或性能问题。

""

如果您观察到这些症状,并且该阶段与您的绩效目标相比花费的时间太长,或者失败并Out Of MemoryNo space left on device出现错误,请考虑以下解决方案。

优化联接

连接表的join()操作是最常用的洗牌操作,但它通常是性能瓶颈。由于加入是一项昂贵的操作,因此除非它对您的业务需求至关重要,否则我们建议不要使用它。通过询问以下问题,仔细检查您的数据管道是否得到有效利用:

  • 您是否正在重新计算在其他可以重复使用的作业中执行的联接?

  • 您是否加入是为了将外键解析为输出使用者未使用的值?

确认您的联接操作对您的业务需求至关重要,请查看以下选项,以满足您的要求来优化您的联接。

加入前使用下推

在执行联接 DataFrame 之前,过滤掉中不必要的行和列。这具有以下优点:

  • 减少随机播放期间的数据传输量

  • 减少 Spark 执行器中的处理量

  • 减少数据扫描量

# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])

使用 “ DataFrame 加入”

尝试使用 Spark 高级别 API(例如 Spark SQL、 DataFrame、和数据集)来代替RDDAPI或 DynamicFrame 联接。您可以使用诸如 DynamicFrame 之类 DataFrame 的方法调用转换为dyf.toDF()。正如 Apache Spark 的关键主题部分所述,这些联接操作在内部利用了 Catalyst 优化器的查询优化。

随机播放和广播哈希连接和提示

Spark 支持两种类型的加入:随机连接和广播哈希联接。广播哈希联接不需要洗牌,而且与随机连接相比,它需要的处理量更少。但是,它仅在将小表与大表连接时才适用。在加入可容纳单个 Spark 执行器内存的表时,可以考虑使用广播哈希联接。

下图显示了广播哈希联接和随机连接的高级结构和步骤。

广播联接使用表和联接的表之间的直接连接,在表和联接的表之间有两个随机播放阶段。

每个联接的详细信息如下:

  • 随机加入:

    • shuffle 哈希联接在不进行排序的情况下连接两个表,并在两个表之间分配联接。它适用于连接可以存储在 Spark 执行器内存中的小表。

    • 排序合并联接将按键分配要连接的两个表,并在联接之前对它们进行排序。它适用于大型表的联接。

  • 广播哈希加入:

    • 广播哈希联接将较小的RDD或表推送到每个工作节点。然后它对较大的RDD或表的每个分区进行地图端合并。

      当您的一个或一个表可以容纳在内存中RDDs或可以放入内存中时,它适用于联接。尽可能进行广播哈希连接是有益的,因为它不需要洗牌。您可以使用加入提示从 Spark 请求广播加入,如下所示。

      # DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

      有关联接提示的更多信息,请参阅联接提示

在 AWS Glue 3.0 及更高版本中,您可以通过启用自适应查询执行和其他参数来自动利用广播哈希联接。当任一联接方的运行时统计数据小于自适应广播哈希联接阈值时,自适应查询执行会将排序合并联接转换为广播哈希联接。

在 AWS Glue 3.0 中,您可以通过设置启用自适应查询执行spark.sql.adaptive.enabled=true。AWSGlue 4.0 中默认启用自适应查询执行。

您可以设置与洗牌和广播哈希联接相关的其他参数:

  • spark.sql.adaptive.localShuffleReader.enabled

  • spark.sql.adaptive.autoBroadcastJoinThreshold

有关相关参数的更多信息,请参阅将排序合并联接转换为广播联接

在 AWS Glue 3.0 及更高版本中,你可以使用 shuffle 的其他联接提示来调整你的行为。

-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;

使用分桶

排序合并联接需要两个阶段:洗牌和排序,然后合并。当一些执行器合并而其他执行器同时排序时,这两个阶段可能会使 Spark 执行器过载,并导致OOM性能问题。在这种情况下,也许可以通过使用分桶来高效地加入。Bucketing 将对连接键上的输入进行预洗和预排序,然后将排序后的数据写入中间表。通过预先定义排序的中间表,可以降低联接大型表时洗牌和排序步骤的成本。

排序合并联接还有额外的洗牌和排序步骤。

存储桶表可用于以下用途:

  • 数据经常通过同一个密钥联接,例如 account_id

  • 加载每日累积表,例如可以存储在公共列上的基表和增量表

您可以使用以下代码创建存储桶表。

df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")

DataFrames 在加入之前对联接键进行重新分区

要在联接之前在联接键 DataFrames 上对两者进行重新分区,请使用以下语句。

df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")

在启动联接之前,这将在连接键RDDs上分成两个(仍然是分开的)。如果两RDDs者使用相同的分区代码在同一个密钥上进行分区,则RDD记录在一起的计划很有可能在洗牌加入之前位于同一个工作线程上。这可能会减少加入期间的网络活动和数据偏差,从而提高性能。

克服数据偏差

数据倾斜是 Spark 作业出现瓶颈的最常见原因之一。当数据在RDD分区之间分布不均匀时,就会发生这种情况。这会导致该分区的任务比其他分区的任务花费的时间长得多,从而延迟了应用程序的总体处理时间。

要识别数据偏差,请在 Spark 用户界面中评估以下指标:

  • 在 Spark 用户界面的 “舞台” 选项卡上,查看 “事件时间表” 页面。您可以在以下屏幕截图中看到任务分布不均匀。分布不均匀或运行时间过长的任务可能表明数据存在偏差。

    一项任务的执行者计算时间比其他任务长得多。
  • 另一个重要的页面是摘要指标,它显示了 Spark 任务的统计信息。以下屏幕截图显示了持续时间、GC 时间溢出(内存)、溢出(磁盘)等的百分位数指标。

    摘要指标表,突出显示了 “持续时间” 行。

    当任务均匀分布时,您将在所有百分位数中看到相似的数字。当数据出现偏差时,您将在每个百分位数中看到非常有偏差的值。在示例中,任务持续时间在小值、第 25 个百分位、中位数和第 75 个百分位数中小于 13 秒。虽然 Max 任务处理的数据量是第 75 个百分位数的 100 倍,但其 6.4 分钟的持续时间大约长了 30 倍。这意味着至少一项任务(或最多占任务的25%)花费的时间比其余任务长得多。

如果您看到数据偏斜,请尝试以下方法:

  • 如果您使用 AWS Glue 3.0,请通过设置启用自适应查询执行spark.sql.adaptive.enabled=true。在 AWS Glue 4.0 中,自适应查询执行功能默认处于启用状态。

    您还可以通过设置以下相关参数,使用自适应查询执行来处理联接引入的数据偏差:

    • spark.sql.adaptive.skewJoin.skewedPartitionFactor

    • spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

    • spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)

    • spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)

    有关更多信息,请参阅 Apache Spark 文档

  • 使用值范围较大的键作为联接键。在 shuffle 联接中,分区是为密钥的每个哈希值确定的。如果联接键的基数太低,则哈希函数更有可能在分区之间分配数据方面做得不好。因此,如果您的应用程序和业务逻辑支持它,请考虑使用更高的基数键或复合键。

    # Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])

使用缓存

重复使用时 DataFrames,请使用或df.persist()将计算结果缓存在每个 Spark 执行程序的内存和磁盘上,从而避免额外的随机处理df.cache()或计算。Spark 还支持RDDs在磁盘上持久化或跨多个节点复制(存储级别)。

例如,您可以 DataFrames 通过添加来保留df.persist()。当不再需要缓存时,您可以使用unpersist丢弃缓存的数据。

df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()

移除不需要的 Spark 动作

避免运行不必要的操作count,例如show、或collect。正如 Apache Spark 的关键主题部分所讨论的那样,Spark 很懒惰。每次对其执行操作时,RDD可能会重新计算每个变换。当你使用许多 Spark 操作时,会为每个操作调用多个源代码访问、任务计算和随机运行。

如果您不需要collect()在商业环境中执行其他操作,请考虑将其删除。

注意

尽量避免collect()在商业环境中使用 Spark。该collect()操作将 Spark 执行器中的所有计算结果返回给 Spark 驱动程序,这可能会导致 Spark 驱动程序返回OOM错误。为避免OOM错误,Spark spark.driver.maxResultSize = 1GB 默认设置为将返回给 Spark 驱动程序的最大数据大小限制为 1 GB。