Sélectionner vos préférences de cookies

Nous utilisons des cookies essentiels et des outils similaires qui sont nécessaires au fonctionnement de notre site et à la fourniture de nos services. Nous utilisons des cookies de performance pour collecter des statistiques anonymes afin de comprendre comment les clients utilisent notre site et d’apporter des améliorations. Les cookies essentiels ne peuvent pas être désactivés, mais vous pouvez cliquer sur « Personnaliser » ou « Refuser » pour refuser les cookies de performance.

Si vous êtes d’accord, AWS et les tiers approuvés utiliseront également des cookies pour fournir des fonctionnalités utiles au site, mémoriser vos préférences et afficher du contenu pertinent, y compris des publicités pertinentes. Pour accepter ou refuser tous les cookies non essentiels, cliquez sur « Accepter » ou « Refuser ». Pour effectuer des choix plus détaillés, cliquez sur « Personnaliser ».

Exigences du valideur EMRFS optimisé pour S3 - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exigences du valideur EMRFS optimisé pour S3

Le valideur EMRFS optimisé pour S3 est utilisé lorsque les conditions suivantes sont réunies :

  • Vous exécutez des tâches Spark qui utilisent Spark ou Datasets pour écrire des fichiers sur Amazon S3. DataFrames À partir d'Amazon EMR 6.4.0, ce validateur peut être utilisé pour tous les formats courants, notamment parquet, ORC et les formats basés sur le texte (notamment CSV et JSON). Pour les versions antérieures à Amazon EMR 6.4.0, seul le format Parquet est pris en charge.

  • Les chargements partitionnés sont activés dans Amazon EMR. Il s’agit de l’option par défaut. Pour de plus amples informations, veuillez consulter Le valideur EMRFS optimisé pour S3 et les chargements partitionnés.

  • Le support de format de fichier intégré à Spark est utilisé. La prise en charge du format de fichier intégré est utilisée dans les circonstances suivantes :

    • Pour les tables de métastore Hive, lorsque spark.sql.hive.convertMetastoreParquet est défini sur true pour les tables Parquet, ou lorsque spark.sql.hive.convertMetastoreOrc est défini sur true pour les tables Orc avec Amazon EMR 6.4.0 ou version ultérieure. Il s'agit des paramètres par défaut.

    • Lorsque les tâches écrivent dans des sources de données ou des tables au format de fichier – par exemple, la table cible est créée avec la clause USING parquet.

    • Lorsque des tâches écrivent dans des tableaux Parquet de metastore Hive non partitionnées. Le support Parquet intégré à Spark ne prend pas en charge avec les tableaux Hive partitionnés. Il s'agit d'une limitation connue. Pour plus d'informations, consultez la section Conversion des tables Hive Metastore Parquet dans le guide Apache Spark DataFrames et Datasets.

  • Les opérations de tâche Spark qui écrivent dans un emplacement de partition par défaut (par exemple, ${table_location}/k1=v1/k2=v2/) utilisent le valideur. Le valideur n'est pas utilisé si une opération de tâche écrit dans un emplacement de partition personnalisé, par exemple, si un emplacement de partition personnalisé est défini à l'aide de la commande ALTER TABLE SQL.

  • Les valeurs suivantes pour Spark doivent être utilisées :

    • La propriété spark.sql.parquet.fs.optimized.committer.optimization-enabled doit être définie sur true. Il s'agit du paramètre par défaut avec Amazon EMR 5.20.0 ou version suivante. Avec Amazon EMR 5.19.0, la valeur par défaut est false. Pour plus d'informations sur la configuration de cette valeur, consultez Activer le validateur EMRFS S3 pour Amazon EMR 5.19.0.

    • Si vous écrivez dans des tables de métastore Hive non partitionnées, seuls les formats de fichier Parquet et Orc sont pris en charge. spark.sql.hive.convertMetastoreParquetdoit être défini sur true si vous écrivez sur des tables de métastore Parquet Hive non partitionnées. spark.sql.hive.convertMetastoreOrcdoit être défini sur true si vous écrivez dans des tables de métastore Orc Hive non partitionnées. Il s'agit des paramètres par défaut.

    • spark.sql.parquet.output.committer.class doit être défini sur com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter. Il s’agit du paramètre par défaut.

    • spark.sql.sources.commitProtocolClass doit être défini sur org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol ou org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol est le paramètre par défaut pour les versions 5.30.0 et ultérieures de la série Amazon EMR 5.x, et pour les versions 6.2.0 et ultérieures de la série Amazon EMR 6.x. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol est le paramètre par défaut pour les versions précédentes d’Amazon EMR.

    • Si des tâches Spark remplacent des ensembles de données Parquet partitionnés par des colonnes de partition dynamique, l'option d'écriture partitionOverwriteMode et spark.sql.sources.partitionOverwriteMode doivent être définis sur static. Il s’agit du paramètre par défaut.

      Note

      L'option d'écriture partitionOverwriteMode a été introduite dans Spark 2.4.0. Pour Spark version 2.3.2, inclus avec Amazon EMR version 5.19.0, définissez la propriété spark.sql.sources.partitionOverwriteMode.

Cas dans lesquels le valideur EMRFS optimisé pour S3 n’est pas utilisé

En général, le valideur EMRFS S3 optimisé n’est pas utilisé dans les situations suivantes.

Situation Pourquoi le validateur n'est pas utilisé
Lorsque vous écrivez dans HDFS Le validateur prend uniquement en charge l'écriture sur Amazon S3 à l'aide d'EMRFS.
Lorsque vous utilisez le système de fichiers S3A Le validateur ne prend en charge que EMRFS.
Lorsque vous utilisez l' MapReduce API RDD de Spark Le committer prend uniquement en charge l'utilisation de SparkSQL ou DataFrame Dataset. APIs

Les exemples en Scala suivants illustrent quelques situations supplémentaires qui empêchent d'utiliser le validateur optimisé pour EMRFS S3 en totalité (premier exemple) et en partie (deuxième exemple).

Exemple – Mode de remplacement de partition dynamique

L'exemple Scala suivant indique à Spark d'utiliser un algorithme de validation différent, ce qui empêche complètement l'utilisation du validateur optimisé pour EMRFS S3. Le code définit la propriété partitionOverwriteMode sur dynamic pour n'écraser que les partitions sur lesquelles vous écrivez des données. Ensuite, les colonnes de partition dynamique sont spécifiées par partitionBy, et le mode d'écriture est défini sur overwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

Vous devez configurer les trois paramètres pour éviter d'utiliser le validateur EMRFS S3. Lorsque vous le faites, Spark exécute un algorithme de validation différent qui est spécifié dans le protocole de validation de Spark. Pour les versions d'Amazon EMR 5.x antérieures à la version 5.30.0 et pour les versions d'Amazon EMR 6.x antérieures à la version 6.2.0, le protocole de validation utilise le répertoire de préparation de Spark, qui est un répertoire temporaire créé sous l'emplacement de sortie commençant par .spark-staging. L'algorithme renomme séquentiellement les répertoires de partition, ce qui peut avoir un impact négatif sur les performances. Pour de plus amples informations sur Amazon EMR versions 5.30.0 et versions ultérieures, ainsi que 6.2.0 et versions ultérieures, veuillez consulter Utiliser le protocole de validation optimisé pour EMRFS S3.

L'algorithme dans Spark 2.4.0 exécute les étapes suivantes :

  1. Les tentatives de tâches écrivent leur résultat dans les répertoires de partition situés dans le répertoire de préparation de Spark, par exemple, ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. Pour chaque partition écrite, la tâche tente de suivre les chemins de partition relatifs, par exemple, k1=v1/k2=v2.

  3. Lorsqu'une tâche se termine avec succès elle fournit le pilote avec tous les chemins de partition relatifs qu'elle a suivis.

  4. Une fois toutes les tâches terminées, la phase de validation de tâche collecte tous les répertoires de partition que les tentatives de tâche réussies ont écrit dans le répertoire intermédiaire de Spark. Spark renomme de manière séquentielle chacun de ces répertoires dans son emplacement de sortie final à l'aide d'opérations attribution de nouveau nom de l'arborescence.

  5. Le répertoire intermédiaire est supprimé avant que la phase de validation de tâche soit terminée.

Exemple – Emplacement de partition personnalisé

Dans cet exemple, le code Scala s'insère dans deux partitions. Une partition possède un emplacement de partition personnalisé. L'autre partition utilise l'emplacement de partition par défaut. Le valideur EMRFS optimisé pour S3 est utilisé uniquement pour écrire la sortie de tâche dans la partition qui utilise l'emplacement de partition par défaut.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Le code Scala crée les objets Amazon S3 suivants :

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

Lorsque vous écrivez des partitions dans des emplacements personnalisés, Spark utilise un algorithme de validation similaire à celui de l'exemple précédent, qui est décrit ci-dessous. Comme dans l'exemple précédent, l'algorithme se traduit par des attributions séquentielles de nouveaux noms, ce qui peut avoir un impact négatif sur les performances.

  1. Lors de l'écriture de la sortie d'une partition dans un emplacement personnalisé, les tâches écrivent un fichier dans le répertoire intermédiaire de Spark, qui est créé sous l'emplacement de sortie final. Le nom du fichier comprend un UUID aléatoire pour éviter les conflits de fichier. La tentative de tâche suit chaque fichier, ainsi que le chemin de la sortie souhaité final.

  2. Lorsqu'une tâche se termine avec succès, elle fournit au pilote les fichiers et les chemins de sortie souhaités finaux.

  3. Une fois toutes les tâches terminées, la phase de validation de tâche renomme de manière séquentielle tous les fichiers qui ont été écrits pour les partitions dans les emplacements personnalisés en leurs chemins de sortie finaux.

  4. Le répertoire intermédiaire est supprimé avant que la phase de validation de tâche soit terminée.

ConfidentialitéConditions d'utilisation du sitePréférences de cookies
© 2025, Amazon Web Services, Inc. ou ses affiliés. Tous droits réservés.