Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Dynamische Partitionen beschneiden
Spark 3.0 und höher beinhalten Dynamic Partition Pruning (DPP). Das dynamische Bereinigen von Partitionen ist eine Optimierungstechnik in Spark das verhindert das Scannen unnötiger Partitionen beim Lesen von Daten. Im Folgenden sind einige wichtige Dinge aufgeführt, die Sie über DPP wissen sollten:
-
Es untersucht die in den Abfragefiltern und Prädikaten angeforderten Partitionswerte und bestimmt, welche Partitionen erforderlich sind, um die Abfrage zu erfüllen. Alle Partitionen, die als unnötig erachtet werden, werden automatisch und transparent gelöscht.
-
DPP reduziert die Verarbeitungszeit und die Ressourcennutzung, indem Partitionen übersprungen werden, die keine entsprechenden Daten enthalten. Das hilft Spark um sich nur auf relevante Partitionen zu konzentrieren.
-
Es funktioniert sowohl mit statischen Partitionen als auch mit dynamisch generierten Partitionen, die durch Einfügungen oder inkrementelles Laden hinzugefügt werden. Spark erkennt neue Partitionen und kann weiterhin dynamisches Bereinigen anwenden.
-
DPP ist für Entwickler völlig transparent oder unsichtbar. Für die Aktivierung von DPP ist keine spezielle Codierung erforderlich. Es erfolgt automatisch hinter den Kulissen als Optimierung bei der Generierung von Abfrageplänen.
Im Folgenden finden Sie einige bewährte Methoden, um sicherzustellen, dass DPP effizient funktioniert:
-
Verwenden Sie Predicate Pushdown, indem Sie Filter schon früh in Ihrem Spark Datenrahmenoperationen. Das hilft Spark um Partitionen mithilfe der Partitionsmetadaten frühzeitig zu entfernen.
-
Erfassen Sie Statistiken zu Ihren Daten, indem Sie sie
ANALYZE TABLE
häufig ausführen. Dadurch werden Statistiken auf Spaltenebene reduziert, die hilfreich sind Spark um genauer zu bestimmen, welche Partitionen ignoriert werden können. -
Vermeiden Sie eine Überpartitionierung Ihrer Daten. Zu viele Partitionen können den Treiberknoten beim Sammeln von Statistiken überlasten. Streben Sie für jede große Tabelle 10—100 Partitionen an.
-
Partitionieren Sie Datenframes neu, bevor Sie sie verknüpfen. Dadurch werden Zufallsverknüpfungen verhindert, bei denen alle Daten verschoben werden müssen, und die Menge der gelesenen Daten wird weiter optimiert.
-
Verwenden Sie konsistente Partitionsspaltentypen und Benennungen für verschiedene Tabellen, die verknüpft werden. Das hilft Spark Partitionen für die Join-Optimierung besser aufeinander abstimmen.
-
Testen Sie Abfragen mit
EXPLAIN
, um sicherzustellen, dass DPP angewendet wird, und überprüfen Sie, ob zusätzliche Optimierungen erforderlich sind.
In einem Sternschema werden Tabellen in zwei Haupttypen unterteilt: Faktentabellen und Dimensionstabellen. Dimensionstabellen sind in der Regel viel kleiner als Faktentabellen. Beim Verbinden einer Faktentabelle mit einer Dimensionstabelle optimiert DPP den Abfrageplan. Es erstellt eine Unterabfrage aus allen Filtern, die auf die Dimensionstabelle angewendet werden. Es sendet diese Unterabfrage und erstellt daraus eine Hashtabelle. Anschließend wendet es die Hashtabelle auf die Scan-Phase der Faktentabelle an, bevor die Daten der Faktentabelle gelesen werden. Auf diese Weise kann DPP die Datenmenge reduzieren, die aus der größeren Faktentabelle gelesen werden muss.
Die folgende Beispielabfrage zeigt DPP in Aktion. Die Abfrage ruft die Anzahl der Bestellungen aus dem Land (Indien) ab und beinhaltet eine innere Verknüpfung zwischen einer Faktentabelle (fact_orders
) und einer Dimensionstabelle (nation
). Die fact_orders
Tabelle ist nach der Spalte partitioniert. o_nationkey
- "select n.n_name as country, count(1) as no_of_orders from fact_orders o join nation n on o.o_nationkey = n.n_nationkey where n.n_name = 'INDIA' group by n.n_name"
Im EXPLAIN
Plan werden die folgenden Schritte verwendet:
-
Scannen Sie die Tabelle mit den kleineren Dimensionen (
nation
) und filtern Sie nach Spaltenn_name = 'INDIA'
. -
Senden Sie die Ergebnisse des vorherigen Schritts.
-
Erstellen Sie eine Unterabfrage, die nach den Ergebnissen des ersten Schritts filtert.
-
Verschieben Sie sie als eine,
PartitionFilter
sodass sie nur die Partitionen der Faktentabellen scannt, die tatsächlich benötigt werden, und nicht einen vollständigen Tabellenscan.
Im Folgenden finden Sie den EXPLAIN
Plan für diese DPP-optimierte Abfrage.
== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#208L]) +- ShuffleQueryStage 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#353] +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#212L]) +- *(3) HashAggregate(keys=[n_name#31], functions=[], output=[]) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(n_name#31, 36), ENSURE_REQUIREMENTS, [id=#315] +- *(2) HashAggregate(keys=[n_name#31], functions=[], output=[n_name#31]) +- *(2) Project [n_name#31] +- *(2) BroadcastHashJoin [cast(o_nationkey#145 as bigint)], [n_nationkey#32L], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [o_nationkey#145] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://aws-spark-tuning/fact_orders], PartitionFilters: [isnotnull(o_nationkey#145), dynamicpruningexpression(cast(o_nationkey#145 as bigint) IN dynamicp..., PushedFilters: [], ReadSchema: struct<> : +- SubqueryBroadcast dynamicpruning#210, 0, [n_nationkey#32L], [id=#200] : +- OutputAdapter [n_name#31, n_nationkey#32L] : +- AdaptiveSparkPlan isFinalPlan=true : +- BroadcastQueryStage 2 : +- ReusedExchange [n_name#31, n_nationkey#32L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- BroadcastQueryStage 0 +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- *(1) Filter ((isnotnull(n_name#31) AND (n_name#31 = INDIA)) AND isnotnull(n_nationkey#32L)) +- FileScan json [n_name#31,n_nationkey#32L] Batched: false, DataFilters: [isnotnull(n_name#31), (n_name#31 = INDIA), isnotnull(n_nationkey#32L)], Format: JSON, Location: InMemoryFileIndex[s3://aws-spark-tuning/input/demo/json/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_name), EqualTo(n_name,INDIA), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,n_nationkey:bigint>
Obwohl der o_nationkey
Spalte aufgrund der DPP-Funktion kein direkter Filter hinzugefügt wurde, Spark scannt automatisch nur die Partitionen, die benötigt werden, und nicht die gesamte Tabelle.