本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
优化洗牌
某些操作(例如join()
和groupByKey()
)需要 Spark 执行随机播放。shuffle 是 Spark 用于重新分配数据的机制,以便在分区之间RDD对数据进行不同的分组。洗牌可以帮助修复性能瓶颈。但是,由于洗牌通常涉及在 Spark 执行器之间复制数据,因此洗牌是一项复杂且成本高昂的操作。例如,洗牌会产生以下成本:
-
磁盘 I/O:
-
在磁盘上生成大量中间文件。
-
-
网络 I/O:
-
需要多个网络连接(连接数 =
Mapper × Reducer
)。 -
由于记录会聚合到可能托管在不同的 Spark 执行器上的新RDD分区,因此数据集的很大一部分可能会通过网络在 Spark 执行器之间移动。
-
-
CPU和内存负载:
-
对值进行排序并合并数据集。这些操作是在执行者身上计划的,这给执行者带来了沉重的负担。
-
Shuffle 是导致 Spark 应用程序性能下降的最重要因素之一。在存储中间数据时,它可能会耗尽执行程序本地磁盘上的空间,从而导致 Spark 作业失败。
你可以通过 CloudWatch 指标和 Spark UI 来评估你的 shuffle 表现。
CloudWatch 指标
如果 Shuffle Bytes W rited 值与 Shuffle Bytes Re ad 相比高,则你的 Spark 作业可能会使用洗牌操作,例如或join()
groupByKey()
Spark UI
在 Spark 用户界面的舞台选项卡上,你可以查看 Shuffle 读取大小/记录值。您也可以在 “执行者” 选项卡上看到它。
在以下屏幕截图中,每个执行者与洗牌过程交换了大约 18.6GB/40200000 条记录,总随机读取大小约为 75 GB)。
Shuffle Spill(磁盘)列显示大量数据溢出到磁盘,这可能会导致磁盘已满或性能问题。
如果您观察到这些症状,并且该阶段与您的绩效目标相比花费的时间太长,或者失败并Out Of Memory
No space
left on device
出现错误,请考虑以下解决方案。
优化联接
连接表的join()
操作是最常用的洗牌操作,但它通常是性能瓶颈。由于加入是一项昂贵的操作,因此除非它对您的业务需求至关重要,否则我们建议不要使用它。通过询问以下问题,仔细检查您的数据管道是否得到有效利用:
-
您是否正在重新计算在其他可以重复使用的作业中执行的联接?
-
您是否加入是为了将外键解析为输出使用者未使用的值?
确认您的联接操作对您的业务需求至关重要,请查看以下选项,以满足您的要求来优化您的联接。
加入前使用下推
在执行联接 DataFrame 之前,过滤掉中不必要的行和列。这具有以下优点:
-
减少随机播放期间的数据传输量
-
减少 Spark 执行器中的处理量
-
减少数据扫描量
# Default df_joined = df1.join(df2, ["product_id"]) # Use Pushdown df1_select = df1.select("product_id","product_title","star_rating").filter(col("star_rating")>=4.0) df2_select = df2.select("product_id","category_id") df_joined = df1_select.join(df2_select, ["product_id"])
使用 “ DataFrame 加入”
尝试使用 Spark 高级别 APIdyf.toDF()
。正如 Apache Spark 的关键主题部分所述,这些联接操作在内部利用了 Catalyst 优化器的查询优化。
随机播放和广播哈希连接和提示
Spark 支持两种类型的加入:随机连接和广播哈希联接。广播哈希联接不需要洗牌,而且与随机连接相比,它需要的处理量更少。但是,它仅在将小表与大表连接时才适用。在加入可容纳单个 Spark 执行器内存的表时,可以考虑使用广播哈希联接。
下图显示了广播哈希联接和随机连接的高级结构和步骤。
每个联接的详细信息如下:
-
随机加入:
-
shuffle 哈希联接在不进行排序的情况下连接两个表,并在两个表之间分配联接。它适用于连接可以存储在 Spark 执行器内存中的小表。
-
排序合并联接将按键分配要连接的两个表,并在联接之前对它们进行排序。它适用于大型表的联接。
-
-
广播哈希加入:
-
广播哈希联接将较小的RDD或表推送到每个工作节点。然后它对较大的RDD或表的每个分区进行地图端合并。
当您的一个或一个表可以容纳在内存中RDDs或可以放入内存中时,它适用于联接。尽可能进行广播哈希连接是有益的,因为它不需要洗牌。您可以使用加入提示从 Spark 请求广播加入,如下所示。
# DataFrame from pySpark.sql.functions import broadcast df_joined= df_big.join(broadcast(df_small), right_df[key] == left_df[key], how='inner') -- SparkSQL SELECT /*+ BROADCAST(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;
有关联接提示的更多信息,请参阅联接提示
。
-
在 AWS Glue 3.0 及更高版本中,您可以通过启用自适应查询执行
在 AWS Glue 3.0 中,您可以通过设置启用自适应查询执行spark.sql.adaptive.enabled=true
。AWSGlue 4.0 中默认启用自适应查询执行。
您可以设置与洗牌和广播哈希联接相关的其他参数:
-
spark.sql.adaptive.localShuffleReader.enabled
-
spark.sql.adaptive.autoBroadcastJoinThreshold
有关相关参数的更多信息,请参阅将排序合并联接转换为广播联接
在 AWS Glue 3.0 及更高版本中,你可以使用 shuffle 的其他联接提示来调整你的行为。
-- Join Hints for shuffle sort merge join SELECT /*+ SHUFFLE_MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGEJOIN(t2) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; SELECT /*+ MERGE(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle hash join SELECT /*+ SHUFFLE_HASH(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key; -- Join Hints for shuffle-and-replicate nested loop join SELECT /*+ SHUFFLE_REPLICATE_NL(t1) / FROM t1 INNER JOIN t2 ON t1.key = t2.key;
使用分桶
排序合并联接需要两个阶段:洗牌和排序,然后合并。当一些执行器合并而其他执行器同时排序时,这两个阶段可能会使 Spark 执行器过载,并导致OOM性能问题。在这种情况下,也许可以通过使用分桶来高效地加入。
存储桶表可用于以下用途:
-
数据经常通过同一个密钥联接,例如
account_id
-
加载每日累积表,例如可以存储在公共列上的基表和增量表
您可以使用以下代码创建存储桶表。
df.write.bucketBy(50, "account_id").sortBy("age").saveAsTable("bucketed_table")
DataFrames 在加入之前对联接键进行重新分区
要在联接之前在联接键 DataFrames 上对两者进行重新分区,请使用以下语句。
df1_repartitioned = df1.repartition(N,"join_key") df2_repartitioned = df2.repartition(N,"join_key") df_joined = df1_repartitioned.join(df2_repartitioned,"product_id")
在启动联接之前,这将在连接键RDDs上分成两个(仍然是分开的)。如果两RDDs者使用相同的分区代码在同一个密钥上进行分区,则RDD记录在一起的计划很有可能在洗牌加入之前位于同一个工作线程上。这可能会减少加入期间的网络活动和数据偏差,从而提高性能。
克服数据偏差
数据倾斜是 Spark 作业出现瓶颈的最常见原因之一。当数据在RDD分区之间分布不均匀时,就会发生这种情况。这会导致该分区的任务比其他分区的任务花费的时间长得多,从而延迟了应用程序的总体处理时间。
要识别数据偏差,请在 Spark 用户界面中评估以下指标:
-
在 Spark 用户界面的 “舞台” 选项卡上,查看 “事件时间表” 页面。您可以在以下屏幕截图中看到任务分布不均匀。分布不均匀或运行时间过长的任务可能表明数据存在偏差。
-
另一个重要的页面是摘要指标,它显示了 Spark 任务的统计信息。以下屏幕截图显示了持续时间、GC 时间、溢出(内存)、溢出(磁盘)等的百分位数指标。
当任务均匀分布时,您将在所有百分位数中看到相似的数字。当数据出现偏差时,您将在每个百分位数中看到非常有偏差的值。在示例中,任务持续时间在最小值、第 25 个百分位、中位数和第 75 个百分位数中小于 13 秒。虽然 Max 任务处理的数据量是第 75 个百分位数的 100 倍,但其 6.4 分钟的持续时间大约长了 30 倍。这意味着至少一项任务(或最多占任务的25%)花费的时间比其余任务长得多。
如果您看到数据偏斜,请尝试以下方法:
-
如果您使用 AWS Glue 3.0,请通过设置启用自适应查询执行
spark.sql.adaptive.enabled=true
。在 AWS Glue 4.0 中,自适应查询执行功能默认处于启用状态。您还可以通过设置以下相关参数,使用自适应查询执行来处理联接引入的数据偏差:
-
spark.sql.adaptive.skewJoin.skewedPartitionFactor
-
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes
-
spark.sql.adaptive.advisoryPartitionSizeInBytes=128m (128 mebibytes or larger should be good)
-
spark.sql.adaptive.coalescePartitions.enabled=true (when you want to coalesce partitions)
有关更多信息,请参阅 Apache Spark 文档
。 -
-
使用值范围较大的键作为联接键。在 shuffle 联接中,分区是为密钥的每个哈希值确定的。如果联接键的基数太低,则哈希函数更有可能在分区之间分配数据方面做得不好。因此,如果您的应用程序和业务逻辑支持它,请考虑使用更高的基数键或复合键。
# Use Single Primary Key df_joined = df1_select.join(df2_select, ["primary_key"]) # Use Composite Key df_joined = df1_select.join(df2_select, ["primary_key","secondary_key"])
使用缓存
重复使用时 DataFrames,请使用或df.persist()
将计算结果缓存在每个 Spark 执行程序的内存和磁盘上,从而避免额外的随机处理df.cache()
或计算。Spark 还支持RDDs在磁盘上持久化或跨多个节点复制(存储级别
例如,您可以 DataFrames 通过添加来保留df.persist()
。当不再需要缓存时,您可以使用unpersist
丢弃缓存的数据。
df = spark.read.parquet("s3://<Bucket>/parquet/product_category=Books/") df_high_rate = df.filter(col("star_rating")>=4.0) df_high_rate.persist() df_joined1 = df_high_rate.join(<Table1>, ["key"]) df_joined2 = df_high_rate.join(<Table2>, ["key"]) df_joined3 = df_high_rate.join(<Table3>, ["key"]) ... df_high_rate.unpersist()
移除不需要的 Spark 动作
避免运行不必要的操作count
,例如show
、或collect
。正如 Apache Spark 的关键主题部分所讨论的那样,Spark 很懒惰。每次对其执行操作时,RDD可能会重新计算每个变换。当你使用许多 Spark 操作时,会为每个操作调用多个源代码访问、任务计算和随机运行。
如果您不需要collect()
在商业环境中执行其他操作,请考虑将其删除。
注意
尽量避免collect()
在商业环境中使用 Spark。该collect()
操作将 Spark 执行器中的所有计算结果返回给 Spark 驱动程序,这可能会导致 Spark 驱动程序返回OOM错误。为避免OOM错误,Spark spark.driver.maxResultSize = 1GB
默认设置为将返回给 Spark 驱动程序的最大数据大小限制为 1 GB。