Optimieren der von von von der von - Amazon EMR

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Optimieren der von von von der von

Amazon EMR bietet mehrere Funktionen zur Leistungsoptimierung für Spark. In diesem Thema werden die einzelnen Optimierungsfunktionen im Detail erläutert.

Weitere Informationen zum Festlegen der Spark-Konfiguration finden Sie unter Konfigurieren von Spark.

Ausführung von adaptiven Abfragen

Die adaptive Abfrageausführung ist ein Framework zur Reoptimierung von Abfrageplänen auf der Grundlage von Laufzeitstatistiken. Ab Amazon EMR 5.30.0 sind die folgenden adaptiven Optimierungen der Abfrageausführung von Apache Spark 3 auf Apache EMR Runtime for Spark 2 verfügbar.

  • Konvertierung von Adaptiven Joins

  • Adaptives Zusammenfügen von Shuffle-Partitionen

Adaptive Join-Konvertierung

Die adaptive Join-Konvertierung verbessert die Abfrageleistung, indem sort-merge-join Operationen in Operationen umgewandelt werden, die auf den Laufzeitgrößen der Abfragestufen basieren. broadcast-hash-joins Broadcast-hash-joins schneidet tendenziell besser ab, wenn eine Seite des Joins klein genug ist, um seine Ausgabe effizient an alle Executors zu übertragen. Dadurch entfällt die Notwendigkeit, den Austausch zu mischen und beide Seiten des Joins zu sortieren. Die adaptive Join-Konvertierung erweitert das Spektrum der Fälle, in denen Spark automatisch ausgeführt wird broadcast-hash-joins.

Diese Funktion ist standardmäßig aktiviert. Es kann deaktiviert werden, indem Siespark.sql.adaptive.enabled auf setzenfalse, wodurch auch das Framework zur adaptiven Abfrageausführung deaktiviert wird. Spark beschließt, a in a sort-merge-join zu konvertieren, broadcast-hash-join wenn die Laufzeitgrößenstatistik einer der Join-Seiten den Wert nicht überschreitetspark.sql.autoBroadcastJoinThreshold, was standardmäßig 10.485.760 Byte (10 MiB) beträgt.

Adaptives Zusammenfügen von Shuffle-Partitionen

Adaptives Zusammenfügen von Shuffle-Partitionen verbessert die Abfrageleistung, indem kleine zusammenhängende Shuffle-Partitionen zusammengeführt werden, um den Overhead zu vieler kleiner Aufgaben zu vermeiden. Auf diese Weise können Sie im Voraus eine höhere Anzahl von initialen Shuffle-Partitionen konfigurieren, die dann zur Laufzeit auf eine angestrebte Größe reduziert werden, was die Wahrscheinlichkeit erhöht, dass Shuffle-Partitionen gleichmäßiger verteilt werden.

Diese Funktion ist standardmäßig aktiviert, sofern diese Funktion nicht explizit festgelegtspark.sql.shuffle.partitions ist. Es kann aktiviert werden, indem Siespark.sql.adaptive.coalescePartitions.enabled auf einstellentrue. Sowohl die anfängliche Anzahl der Shuffle-Partitionen als auch die Zielpartitionsgröße können jeweils mit denspark.sql.adaptive.advisoryPartitionSizeInBytes Eigenschaftenspark.sql.adaptive.coalescePartitions.minPartitionNum und eingestellt werden. In der folgenden Tabelle finden Sie weitere Informationen zu den zugehörigen Spark-Eigenschaften für dieses Feature.

Eigenschaften der adaptiven Koaleszenzpartition von Spark
Property (Eigenschaft) Standardwert Beschreibung

spark.sql.adaptive.coalescePartitions.enabled

wahr, es sei denn, esspark.sql.shuffle.partitions ist explizit gesetzt

Wenn true und spark.sql.adaptive.enabled den Wert true haben, führt Spark zusammenhängende Shuffle-Partitionen entsprechend der Zielgröße (angegeben vonspark.sql.adaptive.advisoryPartitionSizeInBytes) zusammen, um zu viele kleine Aufgaben zu vermeiden.

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 MB

Die empfohlene Größe der Shuffle-Partition für die Koaleszenz. Diese Konfiguration wirkt sich nur aus, wennspark.sql.adaptive.enabled undspark.sql.adaptive.coalescePartitions.enabled beides isttrue.

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

Die Mindestanzahl von Shuffle-Partitionen nach der Koaleszenz. Diese Konfiguration wirkt sich nur aus, wennspark.sql.adaptive.enabled undspark.sql.adaptive.coalescePartitions.enabled beides isttrue.

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1000

Die anfängliche Anzahl der Shuffle-Partitionen vor dem Zusammenfügen. Diese Konfiguration wirkt sich nur aus, wennspark.sql.adaptive.enabled undspark.sql.adaptive.coalescePartitions.enabled beides isttrue.

Beschneiden von Partitionen

Die dynamische Partitionsbereinigung verbessert die Auftragsleistung durch eine sorgfältigere Auswahl der spezifischen Partitionen in einer Tabelle, die für eine bestimmte Abfrage gelesen und verarbeitet werden müssen. Indem Sie die Datenmenge reduzieren, die für die Ausführung eines Auftrags gelesen und verarbeitet werden muss, können Sie viel Zeit sparen. In Amazon EMR 5.26.0 ist diese Funktion standardmäßig aktiviert. Mit Amazon EMR 5.24.0 und 5.25.0 können Sie diese Funktion aktivieren, indem Sie die Spark-Eigenschaft in Spark oder beim Erstellenspark.sql.dynamicPartitionPruning.enabled von Clustern festlegen.

Eigenschaften von Dynamic Partition Pruning in Spark
Property (Eigenschaft) Standardwert Beschreibung

spark.sql.dynamicPartitionPruning.enabled

true

Wenn der Wert true ist, aktivieren Sie das dynamische Pruning von Partitionen.

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

Wann führt Spark vor der Ausführung der Abfrage eine Abwehrprüfung durchtrue, um sicherzustellen, dass die Wiederverwendung von Broadcast-Exchanges in dynamischen Pruning-Filtern nicht durch spätere Vorbereitungsregeln, wie z. B. benutzerdefinierte Spaltenregeln, beeinträchtigt wird. Wenn die Wiederverwendung unterbrochen ist und diese Konfiguration nicht funktionierttrue, entfernt Spark die betroffenen dynamischen Pruning-Filter, um Leistungs- und Korrektheitsproblemen vorzubeugen. Probleme mit der Richtigkeit können auftreten, wenn der Broadcast-Austausch des dynamischen Pruning-Filters zu unterschiedlichen, inkonsistenten Ergebnissen führt, wenn der Übertragungsaustausch des entsprechenden Join-Vorgangs zu unterschiedlichen, inkonsistenten Ergebnissen führt. Die Einstellung dieser Konfigurationfalse sollte mit Vorsicht erfolgen. Sie ermöglicht die Umgehung von Szenarien, z. B. wenn die Wiederverwendung durch benutzerdefinierte Spaltenregeln unterbrochen wird. Wenn die Adaptive Query-Ausführung aktiviert ist, wird die Wiederverwendung von Sendungen immer erzwungen.

Diese Optimierung verbessert die vorhandenen Funktionen von Spark 2.4.2, das nur die Weitergabe statischer Prädikate unterstützt, um diese zu geplanten Zeiten aufzulösen.

Im Folgenden finden Sie Beispiele für die Weitergabe statischer Prädikate in Sparke 2.4.2.

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

Die dynamische Partitionsbereinigung ermöglicht es der Spark-Engine, während der Laufzeit dynamisch abzuleiten, welche Partitionen gelesen werden müssen und welche problemlos eliminiert werden können. Die folgende Abfrage beinhaltet beispielsweise zwei Tabellen: die Tabelle store_sales, die den Gesamtumsatz aller Geschäfte enthält und nach Regionen partitioniert ist, sowie die Tabelle store_regions, die für die einzelnen Länder eine Zuweisung nach Regionen enthält. Die Tabellen enthalten Daten für Geschäfte, die auf der ganzen Welt verteilt sind, allerdings benötigen wir nur die Daten für Nordamerika.

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

Ohne die dynamische Partitionsbereinigung, liest diese Anfrage alle Regionen, bevor Sie die Untergruppe an Regionen herausfiltert, die mit den Ergebnissen der Unterabfrage übereinstimmen. Mit der dynamischen Partitionsbereinigung liest und verarbeitet diese Abfrage nur die Partitionen für die Regionen, die in der Unterabfrage zurückgegeben wurden. Da weniger Daten im Speicher gelesen und weniger Datensätze verarbeitet werden müssen, spart dies Zeit und Ressourcen.

Reduzieren von skalare Unterabfragen

Diese Optimierung verbessert die Leistung von Abfragen, die in der gleichen Tabelle skalare Unterabfragen ausführen. In Amazon EMR 5.26.0 ist diese Funktion standardmäßig aktiviert. Mit Amazon EMR 5.24.0 und 5.25.0 können Sie es aktivieren, indem Sie die Spark-Eigenschaft in Spark oder beim Erstellenspark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled von Clustern festlegen. Wenn diese Eigenschaft auf „true“ festgelegt ist, flacht der Abfrageoptimierer aggregierte skalare Unterabfragen, die, wenn möglich, dieselbe Relation verwenden, ab. Die skalaren Unterabfragen werden abgeflacht, indem alle in der Unterabfrage vorhandenen Prädikate an die Aggregationsfunktionen weitergegeben werden. Im Anschluss daran wird pro Relation eine Aggregation mit allen Aggregatfunktionen ausgeführt.

Im Folgenden finden Sie ein Beispiel für eine Abfrage, die von dieser Optimierung profitiert.

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

Die Optimierung schreibt die vorherige Abfrage folgendermaßen um:

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

Beachten Sie, dass die umgeschriebene Abfrage die Studententabelle nur einmal liest und die Prädikate der drei Unterabfragen per Push-Verfahren in die avg-Funktion weitergegeben werden.

DISTINCT vor INTERSECT

Diese Optimierung verbessert Joins, wenn INTERSECT verwendet wird. In Amazon EMR 5.26.0 ist diese Funktion standardmäßig aktiviert. Mit Amazon EMR 5.24.0 und 5.25.0 können Sie es aktivieren, indem Sie die Spark-Eigenschaft in Spark oder beim Erstellenspark.sql.optimizer.distinctBeforeIntersect.enabled von Clustern festlegen. Abfragen mit INTERSECT werden automatisch so konvertiert, dass sie einen linken halben Join verwenden. Wenn diese Eigenschaft auf true gesetzt ist, gibt der Abfrageoptimierer den DISTINCT-Operator an die untergeordneten Elemente von INTERSECT weiter, falls er feststellt, dass der DISTINCT-Operator die Links-Semi-Verknüpfung zu a BroadcastHashJoin statt zu a machen kann SortMergeJoin.

Im Folgenden finden Sie ein Beispiel für eine Abfrage, die von dieser Optimierung profitiert.

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

Wenn die Eigenschaft spark.sql.optimizer.distinctBeforeIntersect.enabled, nicht aktiviert ist, wird die Abfrage folgendermaßen neu geschrieben.

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Wenn die Eigenschaft spark.sql.optimizer.distinctBeforeIntersect.enabled, nicht aktiviert ist, wird die Abfrage folgendermaßen neu geschrieben.

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Join

Durch diese Optimierung kann die Leistung einiger Joins verbessert werden, da eine Seite eines Joins mit einem Bloomfilter vorgefiltert wird, der aus den Werten von der anderen Seite des Joins generiert wird. In Amazon EMR 5.26.0 ist diese Funktion standardmäßig aktiviert. Mit Amazon EMR 5.25.0 können Sie diese Funktion aktivieren, indem Sie die Spark-Eigenschaft in Spark oder beim Erstellentrue von Clusternspark.sql.bloomFilterJoin.enabled auf Aus setzen.

Im Folgenden finden Sie eine Beispielabfrage, für die die Anwendung eines Bloomfilters geeignet wäre.

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

Wenn diese Funktion aktiviert ist, wird der Bloomfilter aus allen Artikel-IDs erstellt, deren Kategorie sich innerhalb des Kategoriensatzes befindet, der abgefragt wird. Beim Scannen der Tabelle SALES wird der Bloomfilter verwendet, um zu ermitteln, welche Verkäufe für Artikel gelten, die sich definitiv nicht im vom Bloomfilter definierten Satz befinden. So können diese ermittelten Verkäufe so früh wie möglich herausgefiltert werden.

Optimierte Neureihenfolge von Verknüpfungen

Durch diese Optimierung kann die Abfrageleistung verbessert werden, indem bei Tabellen mit Filtern die Joins neu angeordnet werden. In Amazon EMR 5.26.0 ist diese Funktion standardmäßig aktiviert. Mit Amazon EMR 5.25.0 können Sie diese Funktion aktivieren, indem Sie den Spark-Konfigurationsparameterspark.sql.optimizer.sizeBasedJoinReorder.enabled auf true setzen. Das Standardverhalten von Spark ist, dass Tabellen von links nach rechts verknüpft werden, wie in der Abfrage aufgeführt. Dabei kann die Chance verpasst werden, zuerst kleinere Joins mit Filtern auszuführen, was später teureren Joins zugute kommen würde.

Bei der Beispielabfrage unten werden alle zurückgegebenen Artikel aus allen Läden in einem Land ausgewertet. Ohne optimierte Join-Neuanordnung verknüpft Spark zuerst die zwei großen Tabellen store_sales und store_returns und verknüpft sie dann mit store und schließlich mit item.

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

Bei der optimierten Join-Neuanordnung verknüpft Spark zuerst store_sales mit store, da store über einen Filter verfügt und kleiner ist als store_returns und broadcastable. Danach führt Spark einen Join mit store_returns und schließlich mit item durch. Wenn item über einen Filter verfügen würde und sendungsfähig wäre, würde es sich ebenfalls für die Neuanordnung eignen, was dazu führen würde, dass store_sales mit store, dann mit item und schließlich mit store_returns verknüpft würde.