本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
修剪动态分区
Spark 3.0 及更高版本包括动态分区修剪 (DPP)。动态分区修剪是一种优化技术 Spark 这样可以防止在读取数据时扫描不必要的分区。以下是关于 DPP 的一些重要须知:
-
它检查查询过滤器和谓词中请求的分区值,并确定需要哪些分区来满足查询。任何被认为不必要的分区都会自动且透明地被修剪。
-
DPP 通过跳过不包含适用数据的分区来减少处理时间和资源利用率。这有帮助 Spark 只关注相关的分区。
-
它既适用于静态分区,也适用于通过插入或增量加载添加的动态生成的分区。Spark 识别新分区并可以继续应用动态修剪。
-
DPP 对开发者完全透明或不可见。无需特殊编码即可启用 DPP。在生成查询计划期间,它会作为优化自动在幕后发生。
以下是确保 DPP 高效工作的一些最佳实践:
-
通过在你的早期应用过滤器来使用谓词下推 Spark 数据框操作。这有帮助 Spark 使用分区元数据尽早消除分区。
-
通过
ANALYZE TABLE
经常运行来收集有关数据的统计信息。这减少了有帮助的列级统计信息 Spark 以更准确地确定哪些分区可以忽略。 -
避免对数据进行过度分区。当驱动程序节点收集统计数据时,分区过多可能会使它过载。目标是为每个大表设置 10—100 个分区。
-
在联接之前对数据帧进行重新分区。这样可以防止需要移动所有数据的随机联接,并进一步优化读取的数据量。
-
对要联接的不同表使用一致的分区列类型和命名。这有帮助 Spark 更好地匹配分区以进行联接优化。
-
使用测试查询
EXPLAIN
以确保正在应用 DPP,并验证是否需要进行附加调整。
在星型架构中,表主要分为两种类型:事实表和维度表。维度表往往比事实表小得多。将事实表与维度表联接时,DPP 会优化查询计划。它根据应用于维度表的所有筛选器创建子查询。它广播这个子查询并从中生成一个哈希表。然后,在读取事实表数据之前,它将哈希表应用于事实表的扫描阶段。这有助于 DPP 减少必须从较大的事实表中读取的数据量。
以下示例查询显示了 DPP 的运行情况。该查询从该国家(印度)获取订单数量,并包括事实表()和维度表(fact_orders
nation
)之间的内部联接。fact_orders
表按列o_nationkey
分区。
- "select n.n_name as country, count(1) as no_of_orders from fact_orders o join nation n on o.o_nationkey = n.n_nationkey where n.n_name = 'INDIA' group by n.n_name"
以下是EXPLAIN
计划中使用的步骤:
-
扫描较小的维度表 (
nation
) 并按列筛选n_name = 'INDIA'
。 -
广播上一步的结果。
-
创建一个子查询,根据第一步的结果进行筛选。
-
将其按下
PartitionFilter
推,使其仅扫描所需的事实表分区,而不是全表扫描。
以下是这个 DPP 优化查询的EXPLAIN
计划。
== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#208L]) +- ShuffleQueryStage 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#353] +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#212L]) +- *(3) HashAggregate(keys=[n_name#31], functions=[], output=[]) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(n_name#31, 36), ENSURE_REQUIREMENTS, [id=#315] +- *(2) HashAggregate(keys=[n_name#31], functions=[], output=[n_name#31]) +- *(2) Project [n_name#31] +- *(2) BroadcastHashJoin [cast(o_nationkey#145 as bigint)], [n_nationkey#32L], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [o_nationkey#145] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://aws-spark-tuning/fact_orders], PartitionFilters: [isnotnull(o_nationkey#145), dynamicpruningexpression(cast(o_nationkey#145 as bigint) IN dynamicp..., PushedFilters: [], ReadSchema: struct<> : +- SubqueryBroadcast dynamicpruning#210, 0, [n_nationkey#32L], [id=#200] : +- OutputAdapter [n_name#31, n_nationkey#32L] : +- AdaptiveSparkPlan isFinalPlan=true : +- BroadcastQueryStage 2 : +- ReusedExchange [n_name#31, n_nationkey#32L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- BroadcastQueryStage 0 +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- *(1) Filter ((isnotnull(n_name#31) AND (n_name#31 = INDIA)) AND isnotnull(n_nationkey#32L)) +- FileScan json [n_name#31,n_nationkey#32L] Batched: false, DataFilters: [isnotnull(n_name#31), (n_name#31 = INDIA), isnotnull(n_nationkey#32L)], Format: JSON, Location: InMemoryFileIndex[s3://aws-spark-tuning/input/demo/json/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_name), EqualTo(n_name,INDIA), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,n_nationkey:bigint>
尽管o_nationkey
列上没有添加直接筛选器,但由于 DPP 功能,Spark 仅自动扫描所需的分区,而不是整个表。