Mise à jour du schéma et ajout de nouvelles partitions dans le catalogue de données à l'aide de tâches AWS Glue ETL - AWS Glue

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Mise à jour du schéma et ajout de nouvelles partitions dans le catalogue de données à l'aide de tâches AWS Glue ETL

Votre travail d'extraction, de transformation et de chargement (ETL) peut créer de nouvelles partitions de table dans le magasin de données cible. Votre schéma de jeu de données peut évoluer et diverger du schéma de catalogue de données AWS Glue avec le temps. AWS Glue Les tâches ETL fournissent désormais plusieurs fonctions que vous pouvez utiliser dans votre script ETL pour mettre à jour votre schéma et vos partitions dans le catalogue de données. Ces fonctions vous permettent de voir les résultats de votre tâche ETL dans le catalogue de données, sans avoir à exécuter à nouveau l'crawler.

Nouvelles partitions

Si vous souhaitez afficher les nouvelles partitions dans le AWS Glue Data Catalog, vous pouvez effectuer l'une des opérations suivantes :

  • Une fois la tâche terminée, réexécutez l'crawler pour afficher les nouvelles partitions sur la console.

  • Une fois la tâche terminée, affichez immédiatement les nouvelles partitions sur la console, sans avoir à réexécuter l'crawler. Pour activer cette fonctionnalité, ajoutez quelques lignes de code au script ETL, comme illustré dans les exemples suivants. Le code utilise l'argument enableUpdateCatalog pour indiquer que le catalogue de données doit être mis à jour pendant l'exécution de la tâche lors de la création des partitions.

Méthode 1 :

Transmettez enableUpdateCatalog et partitionKeys dans un argument d'options.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
Méthode 2 :

Transmettez enableUpdateCatalog et partitionKeys dans getSink() et appelez setCatalogInfo() au niveau de l'objet DataSink.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

Maintenant, vous pouvez créer des tables de catalogue, mettre à jour les tables existantes avec le schéma modifié et ajouter de nouvelles partitions de table au catalogue de données à l'aide d'une tâche ETL AWS Glue, sans avoir besoin de relancer les crawlers.

Mise à jour du schéma de table

Si vous souhaitez remplacer le schéma de la table du catalogue de données, vous pouvez effectuer l'une des opérations suivantes :

  • Une fois la tâche terminée, exécutez à nouveau l’crawler et assurez-vous que celui-ci est configuré pour mettre à jour également la définition de la table. Affichez les nouvelles partitions sur la console ainsi que les mises à jour de schéma lorsque l'crawler se termine. Pour plus d'informations, consultez Configuration d'un crawler utilisant l'API.

  • Une fois la tâche terminée, affichez immédiatement le schéma modifié sur la console, sans avoir à exécuter de nouveau l'crawler. Pour activer cette fonctionnalité, ajoutez quelques lignes de code au script ETL, comme illustré dans les exemples suivants. Le code utilise enableUpdateCatalog défini sur true et updateBehavior défini sur UPDATE_IN_DATABASE, ce qui indique que le schéma doit être remplacé et que de nouvelles partitions doivent être ajoutées au catalogue de données pendant l'exécution de la tâche.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

Vous pouvez également définir la valeur de updateBehavior sur LOG si vous souhaitez empêcher le remplacement du schéma de table, mais que vous souhaitez toujours ajouter les nouvelles partitions. Comme la valeur par défaut de updateBehavior est UPDATE_IN_DATABASE, si vous ne le définissez pas explicitement, le schéma de table est remplacé.

Si enableUpdateCatalog n'a pas la valeur true, quelle que soit l'option sélectionnée pour updateBehavior, la tâche ETL ne mettra pas à jour la table dans le catalogue de données.

Création de tables

Vous pouvez également utiliser les mêmes options pour créer une table dans le catalogue de données. Vous pouvez spécifier la base de données et le nouveau nom de table avec setCatalogInfo.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

Restrictions

Prenez note des restrictions suivantes :

  • Seules des cibles Amazon Simple Storage Service (Amazon S3) sont prises en charge.

  • La fonctionnalité enableUpdateCatalog n'est pas prise en charge pour les tables gouvernées.

  • Seuls les formats suivants sont pris en charge : json, csv, avro et parquet.

  • Pour créer ou mettre à jour des tableaux avec la parquet classification, vous devez utiliser le rédacteur de parquet AWS Glue optimisé pour DynamicFrames. Cela peut être réalisé à l'aide d'un des moyens suivants :

    • Si vous mettez à jour une table existante dans le catalogue avec la classification parquet, la propriété de la table "useGlueParquetWriter" doit être définie sur true avant que vous ne la mettiez à jour. Vous pouvez définir cette propriété via les AWS Glue API/SDK, via la console ou via une instruction Athena DDL.

      Champ d'édition des propriétés de la table de catalogue dans AWS Glue la console.

      Une fois la propriété de la table de catalogue définie, vous pouvez utiliser l'extrait de code suivant pour mettre à jour la table de catalogue avec les nouvelles données :

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • Si la table n'existe pas déjà dans le catalogue, vous pouvez utiliser la méthode getSink() de votre script avec connection_type="s3" pour ajouter la table et ses partitions au catalogue, ainsi que pour écrire les données sur Amazon S3. Fournissez les partitionKeys et la compression appropriés pour votre flux de travail.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=true) s3sink.writeFrame(frameToWrite)
    • La valeur de glueparquet format est une ancienne méthode permettant d'activer le rédacteur de AWS Glue parquet.

  • Quand updateBehavior a la valeur LOG, de nouvelles partitions sont ajoutées uniquement si le schéma DynamicFrame est équivalent ou qu'il contient un sous-ensemble des colonnes définies dans le schéma de la table du catalogue de données.

  • Les mises à jour du schéma ne sont pas prises en charge pour les tables non partitionnées (qui n'utilisent pas l'option « partitionKeys »).

  • Vos clés de partition (partitionKeys) doivent être équivalentes, et dans le même ordre, entre votre paramètre passé dans votre script ETL et les clés de partition (partitionKeys) du schéma de votre table du catalogue de données.

  • Cette fonction ne prend pas encore en charge la mise à jour/création de tables dans lesquelles les schémas de mise à jour sont imbriqués (par exemple, les tableaux à l'intérieur de structures).

Pour plus d'informations, voir Programmation de scripts Spark.