Élagage de cloisons dynamiques - AWS Conseils prescriptifs

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Élagage de cloisons dynamiques

Spark Les versions 3.0 et ultérieures incluent Dynamic Partition Pruning (DPP). L'élagage dynamique des partitions est une technique d'optimisation dans Spark qui empêche l'analyse de partitions inutiles lors de la lecture de données. Voici quelques points essentiels à connaître au sujet du DPP :

  • Il examine les valeurs de partition demandées dans les filtres de requête et les prédicats et détermine quelles partitions sont requises pour répondre à la requête. Toutes les partitions jugées inutiles sont élaguées automatiquement et de manière transparente.

  • DPP réduit le temps de traitement et l'utilisation des ressources en évitant les partitions qui ne contiennent aucune donnée applicable. Cela aide Spark pour se concentrer uniquement sur les partitions pertinentes.

  • Il fonctionne à la fois avec les partitions statiques et les partitions générées dynamiquement qui sont ajoutées par le biais d'insertions ou de chargements incrémentiels. Spark reconnaît les nouvelles partitions et peut continuer à appliquer un élagage dynamique.

  • DPP est totalement transparent ou invisible pour les développeurs. Aucun codage spécial n'est nécessaire pour activer le DPP. Il se produit automatiquement en arrière-plan à titre d'optimisation lors de la génération du plan de requête.

Voici quelques bonnes pratiques pour garantir l'efficacité du DPP :

  • Utilisez le pushdown des prédicats en appliquant des filtres dès le début de votre Spark opérations sur les trames de données. Cela aide Spark pour éliminer les partitions de manière précoce en utilisant les métadonnées des partitions.

  • Collectez des statistiques sur vos données en ANALYZE TABLE les exécutant fréquemment. Cela réduit les statistiques au niveau des colonnes qui aident Spark pour déterminer avec plus de précision quelles partitions peuvent être ignorées.

  • Évitez de trop partitionner vos données. Un trop grand nombre de partitions peut surcharger le nœud pilote lorsqu'il collecte des statistiques. Visez 10 à 100 partitions pour chaque grande table.

  • Répartitionnez les trames de données avant les jointures. Cela permet d'éviter les jointures aléatoires qui nécessitent le déplacement de toutes les données et d'optimiser davantage la quantité de données lues.

  • Utilisez des types de colonnes de partition et des noms cohérents entre les différentes tables jointes. Cela aide Spark mieux faire correspondre les partitions pour optimiser les jointures.

  • Testez les requêtes EXPLAIN pour vous assurer que le DPP est appliqué et vérifiez si un réglage supplémentaire est nécessaire.

Dans un schéma en étoile, les tables sont divisées en deux types principaux : les tables de faits et les tables de dimensions. Les tables de dimensions ont tendance à être beaucoup plus petites que les tables de faits. Lorsque vous joignez une table de faits à une table de dimensions, DPP optimise le plan de requête. Il crée une sous-requête à partir de tous les filtres appliqués à la table de dimensions. Il diffuse cette sous-requête et construit une table de hachage à partir de celle-ci. Il applique ensuite la table de hachage à la phase d'analyse de la table de faits, avant de lire les données de la table de faits. Cela permet à DPP de réduire la quantité de données qui doivent être lues à partir de la grande table d'informations.

L'exemple de requête suivant montre DPP en action. La requête extrait le nombre de commandes depuis le pays (Inde) et inclut une jointure interne entre une table de faits (fact_orders) et une table de dimensions (nation). La fact_orders table est partitionnée par colonneo_nationkey.

- "select n.n_name as country, count(1) as no_of_orders from fact_orders o join nation n on o.o_nationkey = n.n_nationkey where n.n_name = 'INDIA' group by n.n_name"

Les étapes utilisées dans le EXPLAIN plan sont les suivantes :

  1. Scannez le plus petit tableau des dimensions (nation) et filtrez par colonnen_name = 'INDIA'.

  2. Diffusez les résultats de l'étape précédente.

  3. Créez une sous-requête qui filtre les résultats dès la première étape.

  4. Repoussez-le vers le bas de PartitionFilter manière à ce qu'il analyse uniquement les partitions nécessaires de la table des faits, au lieu d'une analyse complète de la table.

Voici le EXPLAIN plan de cette requête optimisée pour le protocole DPP.

== Physical Plan == AdaptiveSparkPlan isFinalPlan=true +- == Final Plan == *(4) HashAggregate(keys=[], functions=[count(1)], output=[count#208L]) +- ShuffleQueryStage 3 +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [id=#353] +- *(3) HashAggregate(keys=[], functions=[partial_count(1)], output=[count#212L]) +- *(3) HashAggregate(keys=[n_name#31], functions=[], output=[]) +- ShuffleQueryStage 1 +- Exchange hashpartitioning(n_name#31, 36), ENSURE_REQUIREMENTS, [id=#315] +- *(2) HashAggregate(keys=[n_name#31], functions=[], output=[n_name#31]) +- *(2) Project [n_name#31] +- *(2) BroadcastHashJoin [cast(o_nationkey#145 as bigint)], [n_nationkey#32L], Inner, BuildRight, false :- *(2) ColumnarToRow : +- FileScan parquet [o_nationkey#145] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex[s3://aws-spark-tuning/fact_orders], PartitionFilters: [isnotnull(o_nationkey#145), dynamicpruningexpression(cast(o_nationkey#145 as bigint) IN dynamicp..., PushedFilters: [], ReadSchema: struct<> : +- SubqueryBroadcast dynamicpruning#210, 0, [n_nationkey#32L], [id=#200] : +- OutputAdapter [n_name#31, n_nationkey#32L] : +- AdaptiveSparkPlan isFinalPlan=true : +- BroadcastQueryStage 2 : +- ReusedExchange [n_name#31, n_nationkey#32L], BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- BroadcastQueryStage 0 +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, false]),false), [id=#233] +- *(1) Filter ((isnotnull(n_name#31) AND (n_name#31 = INDIA)) AND isnotnull(n_nationkey#32L)) +- FileScan json [n_name#31,n_nationkey#32L] Batched: false, DataFilters: [isnotnull(n_name#31), (n_name#31 = INDIA), isnotnull(n_nationkey#32L)], Format: JSON, Location: InMemoryFileIndex[s3://aws-spark-tuning/input/demo/json/nation], PartitionFilters: [], PushedFilters: [IsNotNull(n_name), EqualTo(n_name,INDIA), IsNotNull(n_nationkey)], ReadSchema: struct<n_name:string,n_nationkey:bigint>

Même si aucun filtre direct n'est ajouté sur la o_nationkey colonne, en raison de la fonctionnalité DPP, Spark analyse automatiquement uniquement les partitions nécessaires, au lieu de l'intégralité de la table.