Exigences relatives au transmetteur EMRFS optimisé pour S3 - Amazon EMR

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exigences relatives au transmetteur EMRFS optimisé pour S3

Le EMRFS validateur optimisé pour S3 est utilisé lorsque les conditions suivantes sont remplies :

  • Vous exécutez des tâches Spark qui utilisent Spark ou Datasets pour écrire des fichiers sur Amazon S3. DataFrames À partir d'Amazon EMR 6.4.0, ce validateur peut être utilisé pour tous les formats courantsORC, y compris le parquet et les formats basés sur le texte (y compris et). CSV JSON Pour les versions antérieures à Amazon EMR 6.4.0, seul le format Parquet est pris en charge.

  • Les téléchargements partitionnés sont activés sur Amazon. EMR Il s’agit de l’option par défaut. Pour de plus amples informations, veuillez consulter Le validateur EMRFS optimisé pour S3 et les téléchargements partitionnés.

  • Le support de format de fichier intégré à Spark est utilisé. La prise en charge du format de fichier intégré est utilisée dans les circonstances suivantes :

    • Pour les tables Hive Metastore, le paramètre spark.sql.hive.convertMetastoreParquet est défini sur true pour les tables Parquet ou spark.sql.hive.convertMetastoreOrc sur les tables Orc true avec Amazon EMR 6.4.0 ou version ultérieure. Il s'agit des paramètres par défaut.

    • Lorsque les tâches écrivent dans des sources de données ou des tables au format de fichier – par exemple, la table cible est créée avec la clause USING parquet.

    • Lorsque des tâches écrivent dans des tableaux Parquet de metastore Hive non partitionnées. Le support Parquet intégré à Spark ne prend pas en charge avec les tableaux Hive partitionnés. Il s'agit d'une limitation connue. Pour plus d'informations, consultez la section Conversion des tables Hive Metastore Parquet dans le guide Apache Spark DataFrames et Datasets.

  • Les opérations de tâche Spark qui écrivent dans un emplacement de partition par défaut (par exemple, ${table_location}/k1=v1/k2=v2/) utilisent le valideur. Le valideur n'est pas utilisé si une opération de tâche écrit dans un emplacement de partition personnalisé, par exemple, si un emplacement de partition personnalisé est défini à l'aide de la commande ALTER TABLE SQL.

  • Les valeurs suivantes pour Spark doivent être utilisées :

    • La propriété spark.sql.parquet.fs.optimized.committer.optimization-enabled doit être définie sur true. Il s'agit du paramètre par défaut d'Amazon EMR 5.20.0 et versions ultérieures. Avec Amazon EMR 5.19.0, la valeur par défaut est. false Pour plus d'informations sur la configuration de cette valeur, consultez Activer le validateur EMRFS optimisé pour S3 pour Amazon 5.19.0 EMR.

    • Si vous écrivez dans des tables de métastore Hive non partitionnées, seuls les formats de fichier Parquet et Orc sont pris en charge. spark.sql.hive.convertMetastoreParquetdoit être défini sur true si vous écrivez sur des tables de métastore Parquet Hive non partitionnées. spark.sql.hive.convertMetastoreOrcdoit être défini sur true si vous écrivez dans des tables de métastore Orc Hive non partitionnées. Il s'agit des paramètres par défaut.

    • spark.sql.parquet.output.committer.class doit être défini sur com.amazon.emr.committer.EmrOptimizedSparkSqlParquetOutputCommitter. Il s’agit du paramètre par défaut.

    • spark.sql.sources.commitProtocolClassdoit être réglé sur org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocol ouorg.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocol. org.apache.spark.sql.execution.datasources.SQLEmrOptimizedCommitProtocolest le paramètre par défaut pour la série Amazon EMR 5.x version 5.30.0 et supérieure, et pour la série Amazon EMR 6.x version 6.2.0 et supérieure. org.apache.spark.sql.execution.datasources.SQLHadoopMapReduceCommitProtocolest le paramètre par défaut pour les EMR versions précédentes d'Amazon.

    • Si des tâches Spark remplacent des ensembles de données Parquet partitionnés par des colonnes de partition dynamique, l'option d'écriture partitionOverwriteMode et spark.sql.sources.partitionOverwriteMode doivent être définis sur static. Il s’agit du paramètre par défaut.

      Note

      L'option d'écriture partitionOverwriteMode a été introduite dans Spark 2.4.0. Pour la version 2.3.2 de Spark, incluse dans la EMR version 5.19.0 d'Amazon, définissez la propriété. spark.sql.sources.partitionOverwriteMode

Occasions où le EMRFS validateur optimisé pour S3 n'est pas utilisé

En général, le EMRFS validateur optimisé pour S3 n'est pas utilisé dans les situations suivantes.

Situation Pourquoi le validateur n'est pas utilisé
Lorsque vous écrivez à HDFS Le committer prend uniquement en charge l'écriture sur Amazon S3 en utilisantEMRFS.
Lorsque vous utilisez le système de fichiers S3A Le comité ne fait que soutenirEMRFS.
Lorsque vous utilisez MapReduce ou Spark RDD API Le validateur prend uniquement en charge l'utilisation de Spark SQL ou APIs Dataset. DataFrame

Les exemples Scala suivants illustrent certaines situations supplémentaires qui empêchent le commit EMRFS optimisé pour S3 d'être utilisé en totalité (le premier exemple) et en partie (le second exemple).

Exemple – Mode de remplacement de partition dynamique

L'exemple Scala suivant indique à Spark d'utiliser un algorithme de validation différent, ce qui empêche complètement l'utilisation du validateur optimisé pour EMRFS S3. Le code définit la propriété partitionOverwriteMode sur dynamic pour n'écraser que les partitions sur lesquelles vous écrivez des données. Ensuite, les colonnes de partition dynamique sont spécifiées par partitionBy, et le mode d'écriture est défini sur overwrite.

val dataset = spark.range(0, 10) .withColumn("dt", expr("date_sub(current_date(), id)")) dataset.write.mode("overwrite") .option("partitionOverwriteMode", "dynamic") .partitionBy("dt") .parquet("s3://amzn-s3-demo-bucket1/output")

Vous devez configurer les trois paramètres pour éviter d'utiliser le validateur EMRFS optimisé pour S3. Lorsque vous le faites, Spark exécute un algorithme de validation différent qui est spécifié dans le protocole de validation de Spark. Pour les versions d'Amazon EMR 5.x antérieures à la version 5.30.0 et pour les versions d'Amazon EMR 6.x antérieures à la version 6.2.0, le protocole de validation utilise le répertoire de préparation de Spark, qui est un répertoire temporaire créé sous l'emplacement de sortie commençant par. .spark-staging L'algorithme renomme séquentiellement les répertoires de partition, ce qui peut avoir un impact négatif sur les performances. Pour plus d'informations sur les EMR versions 5.30.0 et ultérieures d'Amazon et 6.2.0 et versions ultérieures, consultez. Utilisez le protocole de EMRFS validation optimisé pour S3

L'algorithme dans Spark 2.4.0 exécute les étapes suivantes :

  1. Les tentatives de tâches écrivent leur résultat dans les répertoires de partition situés dans le répertoire de préparation de Spark, par exemple, ${outputLocation}/spark-staging-${jobID}/k1=v1/k2=v2/.

  2. Pour chaque partition écrite, la tâche tente de suivre les chemins de partition relatifs, par exemple, k1=v1/k2=v2.

  3. Lorsqu'une tâche se termine avec succès elle fournit le pilote avec tous les chemins de partition relatifs qu'elle a suivis.

  4. Une fois toutes les tâches terminées, la phase de validation de tâche collecte tous les répertoires de partition que les tentatives de tâche réussies ont écrit dans le répertoire intermédiaire de Spark. Spark renomme de manière séquentielle chacun de ces répertoires dans son emplacement de sortie final à l'aide d'opérations attribution de nouveau nom de l'arborescence.

  5. Le répertoire intermédiaire est supprimé avant que la phase de validation de tâche soit terminée.

Exemple – Emplacement de partition personnalisé

Dans cet exemple, le code Scala s'insère dans deux partitions. Une partition possède un emplacement de partition personnalisé. L'autre partition utilise l'emplacement de partition par défaut. Le validateur EMRFS optimisé pour S3 est uniquement utilisé pour écrire le résultat de la tâche sur la partition qui utilise l'emplacement de partition par défaut.

val table = "dataset" val location = "s3://bucket/table" spark.sql(s""" CREATE TABLE $table (id bigint, dt date) USING PARQUET PARTITIONED BY (dt) LOCATION '$location' """) // Add a partition using a custom location val customPartitionLocation = "s3://bucket/custom" spark.sql(s""" ALTER TABLE $table ADD PARTITION (dt='2019-01-28') LOCATION '$customPartitionLocation' """) // Add another partition using default location spark.sql(s"ALTER TABLE $table ADD PARTITION (dt='2019-01-29')") def asDate(text: String) = lit(text).cast("date") spark.range(0, 10) .withColumn("dt", when($"id" > 4, asDate("2019-01-28")).otherwise(asDate("2019-01-29"))) .write.insertInto(table)

Le code Scala crée les objets Amazon S3 suivants :

custom/part-00001-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
custom_$folder$
table/_SUCCESS
table/dt=2019-01-29/part-00000-035a2a9c-4a09-4917-8819-e77134342402.c000.snappy.parquet
table/dt=2019-01-29_$folder$
table_$folder$

Lorsque vous écrivez des partitions dans des emplacements personnalisés, Spark utilise un algorithme de validation similaire à celui de l'exemple précédent, qui est décrit ci-dessous. Comme dans l'exemple précédent, l'algorithme se traduit par des attributions séquentielles de nouveaux noms, ce qui peut avoir un impact négatif sur les performances.

  1. Lors de l'écriture de la sortie d'une partition dans un emplacement personnalisé, les tâches écrivent un fichier dans le répertoire intermédiaire de Spark, qui est créé sous l'emplacement de sortie final. Le nom du fichier inclut un caractère aléatoire UUID pour éviter les collisions de fichiers. La tentative de tâche suit chaque fichier, ainsi que le chemin de la sortie souhaité final.

  2. Lorsqu'une tâche se termine avec succès, elle fournit au pilote les fichiers et les chemins de sortie souhaités finaux.

  3. Une fois toutes les tâches terminées, la phase de validation de tâche renomme de manière séquentielle tous les fichiers qui ont été écrits pour les partitions dans les emplacements personnalisés en leurs chemins de sortie finaux.

  4. Le répertoire intermédiaire est supprimé avant que la phase de validation de tâche soit terminée.