Optimisation des performances de Spark - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Optimisation des performances de Spark

Amazon EMR propose plusieurs fonctionnalités d'optimisation des performances pour Spark. Cette rubrique explique en détail chaque fonction d'optimisation.

Pour de plus amples informations sur la définition de la configuration Spark, veuillez consulter Configuration de Spark.

Exécution de requêtes adaptative

L'exécution adaptative des requêtes est un framework permettant de réoptimiser les plans de requêtes en fonction des statistiques d'exécution. À partir d'Amazon EMR 5.30.0, les optimisations d'exécution adaptative des requêtes suivantes d'Apache Spark 3 sont disponibles sur Apache Amazon EMR Runtime pour Spark 2.

  • Conversion de jointure adaptative

  • Coalescence adaptative de partitions shuffle

Conversion de jointure adaptative

La conversion adaptative des jointures améliore les performances des requêtes en convertissant les sort-merge-join broadcast-hash-joins opérations en opérations basées sur la taille d'exécution des étapes de requête. B a roadcast-hash-joins tendance à être plus performant lorsqu'un côté de la jointure est suffisamment petit pour diffuser efficacement sa sortie sur tous les exécuteurs, évitant ainsi d'avoir à mélanger les échanges et à trier les deux côtés de la jointure. La conversion adaptative des jointures élargit l'éventail des cas dans lesquels Spark s'exécute broadcast-hash-joins automatiquement.

Cette caractéristique est activée par défaut. Il peut être désactivé en le définissant spark.sql.adaptive.enabled sur false, ce qui désactive également le cadre d'exécution adaptative des requêtes. Spark décide de sort-merge-join convertir a en a broadcast-hash-join lorsque la statistique de taille d'exécution de l'un des côtés de la jointure ne dépasse passpark.sql.autoBroadcastJoinThreshold, la valeur par défaut étant de 10 485 760 octets (10 MiB).

Coalescence adaptative de partitions réorganisées

La fusion adaptative des partitions de distribution améliore les performances des requêtes en fusionnant de petites partitions de distribution contiguës afin d'éviter la surcharge liée à un trop grand nombre de petites tâches. Cela vous permet de configurer un plus grand nombre de partitions de shuffle initiales à l'avance, puis de les réduire à une taille ciblée lors de l'exécution, ce qui augmente les chances d'avoir des partitions shuffle réparties de manière plus uniforme.

Cette fonctionnalité est activée par défaut, sauf si spark.sql.shuffle.partitions est explicitement défini. Il peut être activé en réglant spark.sql.adaptive.coalescePartitions.enabled sur true. Le nombre initial de partitions réorganisées et la taille de la partition cible peuvent être ajustés à l'aide des propriétés spark.sql.adaptive.coalescePartitions.minPartitionNum et spark.sql.adaptive.advisoryPartitionSizeInBytes respectivement. Consultez le tableau suivant pour plus de détails sur les propriétés Spark associées à cette fonctionnalité.

Propriétés de la partition de coalescence adaptative Spark
Propriété Valeur par défaut Description

spark.sql.adaptive.coalescePartitions.enabled

true, sauf si spark.sql.shuffle.partitions est explicitement défini.

Lorsque true et spark.sql.adaptive.enabled est true, Spark fusionne les partitions réorganisées contiguës en fonction de la taille cible (spécifiée par spark.sql.adaptive.advisoryPartitionSizeInBytes), afin d'éviter un trop grand nombre de petites tâches.

spark.sql.adaptive.advisoryPartitionSizeInBytes

64 Mo

Taille conseillée en octets de partition shuffle lors de la fusion. Cette configuration n'a d'effet que lorsque spark.sql.adaptive.enabled et spark.sql.adaptive.coalescePartitions.enabled sont tous deux true.

spark.sql.adaptive.coalescePartitions.minPartitionNum

25

Le nombre minimum de partitions shuffle après fusion. Cette configuration n'a d'effet que lorsque spark.sql.adaptive.enabled et spark.sql.adaptive.coalescePartitions.enabled sont tous deux true.

spark.sql.adaptive.coalescePartitions.initialPartitionNum

1 000

Nombre initial de partitions réorganisées avant la fusion. Cette configuration n'a d'effet que lorsque spark.sql.adaptive.enabled et spark.sql.adaptive.coalescePartitions.enabled sont tous deux true.

Nettoyage dynamique de partition

Le nettoyage dynamique de partition améliore les performances de tâche avec plus de précision en sélectionnant des partitions spécifiques au sein d'une table qui doivent être lues et traitées pour une requête spécifique. En réduisant la quantité de données lues et traitées, l'exécution du travail est nettement moins longue. Avec Amazon EMR 5.26.0, cette fonctionnalité est activée par défaut. Avec Amazon EMR 5.24.0 et 5.25.0, vous pouvez activer cette fonctionnalité en définissant la propriété Spark depuis Spark ou lors spark.sql.dynamicPartitionPruning.enabled de la création de clusters.

Propriétés des partitions d'élimination dynamique Spark
Propriété Valeur par défaut Description

spark.sql.dynamicPartitionPruning.enabled

true

Lorsque c'est vrai, activez l'élimination dynamique des partitions.

spark.sql.optimizer.dynamicPartitionPruning.enforceBroadcastReuse

true

Si la valeur est true, Spark effectue une vérification défensive avant l'exécution de la requête pour s'assurer que la réutilisation des échanges de diffusion dans les filtres d'élimination dynamique n'est pas interrompue par des règles de préparation ultérieures, telles que les règles en colonnes définies par l'utilisateur. Lorsque la réutilisation est interrompue et que cette configuration est true, Spark supprime les filtres d'élimination dynamique concernés afin de se prémunir contre les problèmes de performance et d'exactitude. Des problèmes d'exactitude peuvent survenir lorsque l'échange de diffusion du filtre d'élimination dynamique produit des résultats différents et incohérents par rapport à l'échange de diffusion de l'opération de jointure correspondante. La définition de cette configuration sur false doit être effectuée avec prudence ; elle permet de contourner les scénarios, par exemple lorsque la réutilisation est interrompue par des règles en colonnes définies par l'utilisateur. Lorsque l'exécution adaptative des requêtes est activée, la réutilisation des diffusions est toujours appliquée.

Cette optimisation améliore les fonctionnalités existantes de Spark 2.4.2, qui prend uniquement en charge les transmission des prédicats statiques qui peuvent être résolus au moment voulu.

Voici des exemples de prédicat statiques en mode push Spark 2.4.2.

partition_col = 5 partition_col IN (1,3,5) partition_col between 1 and 3 partition_col = 1 + 3

Le nettoyage dynamique de partition permet au moteur Spark de déduire dynamiquement au moment de l'exécution les partitions qui doivent être lues et qui peuvent être éliminées. Par exemple, la requête suivante implique deux tables : store_sales (qui contient toutes les ventes totales pour tous les magasins et est partitionnée par région) et store_regions (qui contient un mappage de régions pour chaque pays). Les tables contiennent des données sur les magasins répartis dans le monde entier, mais nous interrogeons uniquement des données pour l'Amérique du Nord.

select ss.quarter, ss.region, ss.store, ss.total_sales from store_sales ss, store_regions sr where ss.region = sr.region and sr.country = 'North America'

Sans nettoyage dynamique de partition, cette requête lit toutes les régions avant de filtrer le sous-ensemble de régions qui correspondent aux résultats de la sous-requête. Avec le nettoyage dynamique de partition, cette requête lit et traite uniquement les partitions pour les régions renvoyées dans la sous-requête. Cela permet d'économiser du temps et des ressources en lisant moins de données dans le stockage et en traitant moins d'enregistrements.

Aplatissement des sous-requêtes scalaires

Cette optimisation améliore les performances des requêtes qui ont des sous-requêtes scalaires sur la même table. Avec Amazon EMR 5.26.0, cette fonctionnalité est activée par défaut. Avec Amazon EMR 5.24.0 et 5.25.0, vous pouvez l'activer en définissant la propriété Spark depuis Spark ou lors spark.sql.optimizer.flattenScalarSubqueriesWithAggregates.enabled de la création de clusters. Lorsque cette propriété a la valeur true, l'optimiseur de requête aplatit les sous-requêtes scalaires regroupées qui utilisent la même relation si possible. Les sous-requêtes scalaires sont mises à plat en poussant tous les prédicats présents dans la sous-requête dans les fonctions d'agrégation, puis en effectuant une agrégation, avec toutes les fonctions d'agrégation, par relation.

Voici un exemple de requête qui bénéficie de cette optimisation.

select (select avg(age) from students /* Subquery 1 */ where age between 5 and 10) as group1, (select avg(age) from students /* Subquery 2 */ where age between 10 and 15) as group2, (select avg(age) from students /* Subquery 3 */ where age between 15 and 20) as group3

L'optimisation réécrit la requête précédente comme suit :

select c1 as group1, c2 as group2, c3 as group3 from (select avg (if(age between 5 and 10, age, null)) as c1, avg (if(age between 10 and 15, age, null)) as c2, avg (if(age between 15 and 20, age, null)) as c3 from students);

Notez que la requête réécrite lit la table des étudiants une seule fois et les prédicats des trois sous-requêtes sont ajoutés à la fonction avg.

DISTINCTavant INTERSECT

Cette optimisation optimise les jointures lors de l'utilisationINTERSECT. Avec Amazon EMR 5.26.0, cette fonctionnalité est activée par défaut. Avec Amazon EMR 5.24.0 et 5.25.0, vous pouvez l'activer en définissant la propriété Spark depuis Spark ou lors spark.sql.optimizer.distinctBeforeIntersect.enabled de la création de clusters. Les requêtes utilisant INTERSECT sont automatiquement converties pour utiliser une jointure semi-gauche. Lorsque cette propriété est définie sur true, l'optimiseur de requêtes redirige l'DISTINCTopérateur vers les enfants INTERSECT s'il détecte que l'DISTINCTopérateur peut associer le semi-gauche à un BroadcastHashJoin au lieu de a. SortMergeJoin

Voici un exemple de requête qui bénéficie de cette optimisation.

(select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) intersect (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id)

Sans activer cette propriété spark.sql.optimizer.distinctBeforeIntersect.enabled, la requête sera réécrite comme suit.

select distinct brand from (select item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Lorsque vous activez cette propriété spark.sql.optimizer.distinctBeforeIntersect.enabled, la requête est réécrite comme suit.

select brand from (select distinct item.brand brand from store_sales, item where store_sales.item_id = item.item_id) left semi join (select distinct item.brand cs_brand from catalog_sales, item where catalog_sales.item_id = item.item_id) on brand <=> cs_brand

Jonction de filtre Bloom

Cette optimisation peut améliorer les performances de certaines jonctions en préfiltrant un côté de la jonction à l’aide d’un filtre Bloom généré à partir des valeurs de l’autre côté de la jonction. Avec Amazon EMR 5.26.0, cette fonctionnalité est activée par défaut. Avec Amazon EMR 5.25.0, vous pouvez activer cette fonctionnalité en définissant la propriété Spark de manière spark.sql.bloomFilterJoin.enabled à ce qu'elle provienne true de Spark ou lors de la création de clusters.

L'exemple suivant est une requête qui peut bénéficier d'un filtre Bloom.

select count(*) from sales, item where sales.item_id = item.id and item.category in (1, 10, 16)

Lorsque cette fonction est activée, le filtre Bloom est construit à partir de tous les ID d'éléments dont la catégorie figure dans l'ensemble des catégories interrogées. Pendant l'analyse de la table des ventes, le filtre Bloom est utilisé pour déterminer les ventes pour les éléments qui ne sont pas dans l'ensemble défini par le filtre Bloom. Ainsi, ces ventes identifiées peuvent être filtrées dès que possible.

Réorganisation optimisée des jonctions

Cette optimisation peut améliorer les performances des requêtes en réorganisant les jonctions impliquant des tables avec des filtres. Avec Amazon EMR 5.26.0, cette fonctionnalité est activée par défaut. Avec Amazon EMR 5.25.0, vous pouvez activer cette fonctionnalité en définissant le paramètre de configuration Spark sur spark.sql.optimizer.sizeBasedJoinReorder.enabled true. Par défaut, Spark procède à la jonction des tables de gauche à droite, comme indiqué dans la requête. Cette stratégie peut ignorer des possibilités d’exécution de jonctions plus petites avec des filtres dans un premier temps, afin de bénéficier des jonctions plus coûteuses ultérieurement.

L'exemple de requête ci-dessous indique tous les éléments renvoyés à partir de tous les magasins d’un pays. Sans réorganisation optimisée des jonctions, Spark joint les deux tables volumineuses store_sales et store_returns en premier, puis les joint avec store et finalement avec item.

select ss.item_value, sr.return_date, s.name, i.desc, from store_sales ss, store_returns sr, store s, item i where ss.id = sr.id and ss.store_id = s.id and ss.item_id = i.id and s.country = 'USA'

Avec la réorganisation optimisée des jonctions, Spark joint d’abord store_sales et store, car store possède un filtre et est plus petite que store_returns et broadcastable. Ensuite, Spark joint store_returns, puis item. Si item disposait d’un filtre et pouvait être diffusée, elle pourrait bénéficier d’une réorganisation, ce qui entraînerait la jonction de store_sales avec store, puis avec item, et finalement avec store_returns.