GlueContext 类 - AWS Glue

GlueContext 类

包装 Apache Spark SparkContext 对象,从而提供与 Apache Spark 平台交互的机制。

__init__

__init__(sparkContext)
  • sparkContext – 要使用的 Apache Spark 上下文。

Creating

getSource

getSource(connection_type, transformation_ctx = "", **options)

创建一个 DataSource 对象,该对象可用于从外部来源读取 DynamicFrames

  • connection_type – 要使用的连接类型,例如 Amazon Simple Storage Service(Amazon S3)、Amazon Redshift 和 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracledynamodb

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • options – 可选名称/值对的集合。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

以下是使用 getSource 的示例。

>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()

create_dynamic_frame_from_rdd

create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")

返回一个从 Apache Spark 弹性分布式数据集 (RDD) 创建的 DynamicFrame

  • data – 要使用的数据源。

  • name – 要使用的数据的名称。

  • schema – 要使用的架构 (可选)。

  • sample_ratio – 要使用的采样率 (可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

create_dynamic_frame_from_catalog

create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)

返回一个使用数据目录数据库和表名称创建的 DynamicFrame。使用此方法时,您可以通过指定的 AWS Glue Data Catalog 表中的表属性提供 format_options,并通过 additional_options 参数提供其他选项。

  • Database – 要从中读取的数据库。

  • table_name – 要从中读取的表的名称。

  • redshift_tmp_dir – 要使用的 Amazon Redshift 临时目录(可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • push_down_predicate – 筛选分区,而不必列出并读取数据集中的所有文件。有关支持的来源和限制,请参阅在 AWS Glue ETL 中使用下推优化读取。有关更多信息,请参阅 使用下推谓词进行预筛选

  • additional_options – 可选名称/值对的集合。可能选项包括 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,但 endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassificationdelimiter 除外。另一个支持的选项是 catalogPartitionPredicate

    catalogPartitionPredicate – 要传递目录表达式以根据索引列进行筛选。这样会将筛选下推到服务器端。有关更多信息,请参阅 AWS Glue 分区数据。请注意,push_down_predicatecatalogPartitionPredicate 使用不同的语法。前者使用 Spark SQL 标准语法,后者使用 JSQL 解析器。

  • catalog_id – 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。

create_dynamic_frame_from_options

create_dynamic_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

返回一个使用指定连接和格式创建的 DynamicFrame

  • connection_type – 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracledynamodb

  • connection_options – 连接选项,例如路径和数据库表(可选)。对于 s3connection_type,定义 Amazon S3 路径的列表。

    connection_options = {"paths": ["s3://aws-glue-target/temp"]}

    对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。

    警告

    不建议在脚本中存储密码。请考虑使用 boto3 从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable 属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

    有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • format – 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • push_down_predicate – 筛选分区,而不必列出并读取数据集中的所有文件。有关支持的来源和限制,请参阅在 AWS Glue ETL 中使用下推优化读取。有关更多信息,请参阅使用下推谓词进行预筛选

create_sample_dynamic_frame_from_catalog

create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)

返回一个使用数据目录数据库和表名称创建的 DynamicFrame 示例。DynamicFrame 只包含来自数据源的第一个 num 记录。

  • database – 要从中读取的数据库。

  • table_name – 要从中读取的表的名称。

  • num – 返回的动态帧示例中的最大记录数。

  • redshift_tmp_dir – 要使用的 Amazon Redshift 临时目录(可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • push_down_predicate – 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选

  • additional_options – 可选名称/值对的集合。可能选项包括 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,但 endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassificationdelimiter 除外。

  • sample_options – 控制采样行为的参数(可选)。Amazon S3 源的当前可用参数:

    • maxSamplePartitions – 采样将读取的最大分区数。默认值为 10

    • maxSampleFilesPerPartition – 采样将在一个分区中读取的最大文件数。默认值为 10。

      这些参数有助于减少文件列表所耗费的时间。例如,假设数据集有 1000 个分区,每个分区有 10 个文件。如果您设置 maxSamplePartitions = 10,以及 maxSampleFilesPerPartition = 10,则取样不会列出所有 10,000 个文件,而是只列出并读取前 10 个分区,每个分区中的前 10 个文件:10*10 = 共 100 个文件。

  • catalog_id – 正在访问的数据目录的目录 ID(数据目录的账户 ID)。默认设置为 NoneNone 默认为服务中调用账户的目录 ID。

create_sample_dynamic_frame_from_options

create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")

返回一个使用指定连接和格式创建的 DynamicFrame 示例。DynamicFrame 只包含来自数据源的第一个 num 记录。

  • connection_type – 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracledynamodb

  • connection_options – 连接选项,例如路径和数据库表(可选)。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • num – 返回的动态帧示例中的最大记录数。

  • sample_options – 控制采样行为的参数(可选)。Amazon S3 源的当前可用参数:

    • maxSamplePartitions – 采样将读取的最大分区数。默认值为 10

    • maxSampleFilesPerPartition – 采样将在一个分区中读取的最大文件数。默认值为 10。

      这些参数有助于减少文件列表所耗费的时间。例如,假设数据集有 1000 个分区,每个分区有 10 个文件。如果您设置 maxSamplePartitions = 10,以及 maxSampleFilesPerPartition = 10,则取样不会列出所有 10,000 个文件,而是只列出并读取前 10 个分区,每个分区中的前 10 个文件:10*10 = 共 100 个文件。

  • format – 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • push_down_predicate – 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选

add_ingestion_time_columns

add_ingestion_time_columns(dataFrame, timeGranularity = "")

将提取时间列(例如 ingest_yearingest_monthingest_dayingest_houringest_minute)附加到输入 DataFrame。当您指定以 Amazon S3 为目标的数据目录表时,AWS Glue 生成的脚本中会自动生成此函数。此函数使用输出表上的提取时间列自动更新分区。这允许根据提取时间自动对输出数据进行分区,而不需要在输入数据中显示提取时间列。

  • dataFrame – 提取时间列要附加到的 dataFrame

  • timeGranularity – 时间列的粒度。有效值为“day”、“hour”和“minute”。例如,如果“hour”传递到函数,原始 dataFrame 将附加“ingest_year”、“ingest_month”、“ingest_day”和“ingest_hour”时间列。

在附加时间粒度列后返回数据框。

例如:

dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))

create_data_frame_from_catalog

create_data_frame_from_catalog(database, table_name, transformation_ctx = "", additional_options = {})

返回一个使用数据目录数据表中的信息创建的 DataFrame

  • database – 要从中读取数据的数据目录数据库。

  • table_name – 要从中读取数据的数据目录表的名称。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • additional_options – 可选名称/值对的集合。可能选项包括流式传输源的 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,例如 startingPositionmaxFetchTimeInMsstartingOffsets

    • useSparkDataSource — 如果设置为 true,则强制 AWS Glue 使用本机 Spark 数据源 API 来读取表。Spark 数据源 API 支持以下格式:AVRO、二进制、CSV、JSON、ORC、Parquet 和文本。在数据目录表中,您可以使用 classification 属性指定格式。要了解有关 Spark 数据源 API 的更多信息,请参阅 Apache Spark 官方文档

      create_data_frame_from_cataloguseSparkDataSource 结合使用具有以下优势:

      • 直接返回一个 DataFrame 并提供 create_dynamic_frame.from_catalog().toDF() 的替代项。

      • 支持对本机格式实施 AWS Lake Formation 表级权限控制。

      • 支持在没有 AWS Lake Formation 表级权限控制的情况下读取数据湖格式。有关更多信息,请参阅 在 AWS Glue ETL 任务中使用数据湖框架

      启用 useSparkDataSource 后,您还可以根据需要在 additional_options 中添加任一 Spark 数据源选项。AWSGlue 会将这些选项直接传递给 Spark 读取器。

    • useCatalogSchema — 如果设置为 true,AWS Glue 会对生成的 DataFrame 应用此数据目录架构。否则,读取器会从数据中推断出架构。如果启用 useCatalogSchema,则必须同时将 useSparkDataSource 设置为 true。

限制

使用 useSparkDataSource 选项时请考虑以下限制:

  • 使用 useSparkDataSource 时,AWS Glue 会在与原始 Spark 会话不同的单独 Spark 会话中创建一个新的 DataFrame

  • Spark DataFrame 分区筛选不适用于以下 AWS Glue 功能。

    要对这些功能使用分区筛选,可以使用 AWS Glue 下推谓词。有关更多信息,请参阅 使用下推谓词进行预筛选。对未分区列的筛选不受影响。

    以下示例脚本演示了使用 excludeStorageClasses 选项执行分区筛选的错误方法。

    // Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")

    以下示例脚本演示了使用下推谓词以使用 excludeStorageClasses 选项执行分区筛选的正确方法。

    // Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")

示例:使用 Spark 数据源读取器创建 CSV 表

// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=<database_name>, table_name=<table_name>, additional_options = {"useSparkDataSource": True, "sep": '\t'} )

create_data_frame_from_options

create_data_frame_from_options(connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

此 API 现已弃用。请改用 getSource() API。返回一个使用指定连接和格式创建的 DataFrame。仅将此函数与 AWS Glue 串流源结合使用。

  • connection_type – 流式传输连接类型。有效值包括 kinesiskafka

  • connection_options – 连接选项,不同于 Kinesis 和 Kafka。您可以在 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 找到每个流式传输数据源的所有连接选项列表。请注意流式传输连接选项的以下差异:

    • Kinesis 流式传输源需要 streamARNstartingPositioninferSchemaclassification

    • Kinesis 流式传输源需要 connectionNametopicNamestartingOffsetsinferSchemaclassification

  • format – 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关所支持格式的信息,请参阅AWS Glue for Spark 中的输入和输出的数据格式选项

  • format_options – 指定格式的格式选项。有关所支持的格式选项的信息,请参阅AWS Glue for Spark 中的输入和输出的数据格式选项

  • transformation_ctx – 要使用的转换上下文 (可选)。

Amazon Kinesis 流式传输源示例:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Kafka 流式传输源示例:

kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)

forEachBatch

forEachBatch(frame, batch_function, options)

将传入的 batch_function 应用于从流式传输源读取的每个微批处理。

  • frame – 包含当前微处理的 DataFrame。

  • batch_function – 应用于每个微处理的函数。

  • options – 键值对集合,其中包含有关如何处理微批处理的信息。以下选项为必填:

    • windowSize – 处理每个批处理所花费的时间量。

    • checkpointLocation – 为流式传输 ETL 任务存储检查点的位置。

    • batchMaxRetries – 在该批处理失败时重试的最大次数。默认值为 3。此选项仅适用于 Glue 版本 2.0 及更高版本。

示例

glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )

在 Amazon S3 中使用数据集

purge_table

purge_table(catalog_id=None, database="", table_name="", options={}, transformation_ctx="")

从 Amazon S3 中删除指定目录的数据库和表的文件。如果删除一个分区中的所有文件,则也会从目录中删除该分区。

如果您希望能恢复已删除的对象,可以在 Amazon S3 存储桶上启用对象版本控制。如果从未启用对象版本控制的存储桶中删除一个对象,则无法恢复该对象。有关如何恢复已启用版本控制的存储桶中的已删除对象的更多信息,请参阅 AWS Support 知识中心中的如何检索已删除的 Amazon S3 对象?

  • catalog_id – 正在访问的数据目录的目录 ID(数据目录的账户 ID)。默认设置为 NoneNone 默认为服务中调用账户的目录 ID。

  • database – 要使用的数据库。

  • table_name – 要使用的表的名称。

  • options – 用于筛选要删除的文件和清单文件生成的选项。

    • retentionPeriod – 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。

    • partitionPredicate – 满足此谓词的分区将被删除。这些分区中保留期内的文件不会被删除。设置为 "" – 默认情况下为空。

    • excludeStorageClassesexcludeStorageClasses 集中包含存储类的文件不会被删除。默认值是 Set() – 一个空集。

    • manifestFilePath – 用于生成清单文件的可选路径。所有成功清除的文件将记录在 Success.csv 中,所有未能清除的文件将记录在 Failed.csv

  • transformation_ctx – 要使用的转换上下文 (可选)。在清单文件路径中使用。

glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

purge_s3_path

purge_s3_path(s3_path, options={}, transformation_ctx="")

以递归方式从指定的 Amazon S3 路径中删除文件。

如果您希望能恢复已删除的对象,可以在 Amazon S3 存储桶上启用对象版本控制。如果从未启用对象版本控制的存储桶中删除一个对象,则无法恢复该对象。有关如何恢复已启用版本控制的存储桶中的已删除对象的更多信息,请参阅 AWS Support 知识中心中的如何检索已删除的 Amazon S3 对象?

  • s3_path – 要删除的文件 Amazon S3 中的路径,格式为 s3://<bucket>/<prefix>/

  • options – 用于筛选要删除的文件和清单文件生成的选项。

    • retentionPeriod – 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。

    • excludeStorageClassesexcludeStorageClasses 集中包含存储类的文件不会被删除。默认值是 Set() – 一个空集。

    • manifestFilePath – 用于生成清单文件的可选路径。所有成功清除的文件将记录在 Success.csv 中,所有未能清除的文件将记录在 Failed.csv

  • transformation_ctx – 要使用的转换上下文 (可选)。在清单文件路径中使用。

glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})

transition_table

transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)

为指定目录的数据库和表转换存储在 Amazon S3 上的文件的存储类。

可以在任意两个存储类之间转换。对于 GLACIERDEEP_ARCHIVE 存储类,您可以转换到这些类。但您可以使用 S3 RESTOREGLACIERDEEP_ARCHIVE 存储类进行转换。

如果您要运行从 Amazon S3 读取文件或分区的 AWS Glue ETL 任务,则可以排除某些 Amazon S3 存储类类型。有关更多信息,请参阅排除 Amazon S3 存储类

  • database – 要使用的数据库。

  • table_name – 要使用的表的名称。

  • transition_to – 要切换到的 Amazon S3 存储类

  • options – 用于筛选要删除的文件和清单文件生成的选项。

    • retentionPeriod – 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。

    • partitionPredicate – 将转换满足此谓词的分区。这些分区中保留期内的文件不会被转换。设置为 "" – 默认情况下为空。

    • excludeStorageClassesexcludeStorageClasses 集中包含存储类的文件不会被转换。默认值是 Set() – 一个空集。

    • manifestFilePath – 用于生成清单文件的可选路径。所有成功转换的文件将记录在 Success.csv 中,所有未能转换的文件将记录在 Failed.csv

    • accountId – 要运行转换的 Amazon Web Services 账户 ID。对于此转换是必需的。

    • roleArn – 要运行转换的 AWS 角色。对于此转换是必需的。

  • transformation_ctx – 要使用的转换上下文 (可选)。在清单文件路径中使用。

  • catalog_id – 正在访问的数据目录的目录 ID(数据目录的账户 ID)。默认设置为 NoneNone 默认为服务中调用账户的目录 ID。

glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

transition_s3_path

transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")

以递归方式转换指定 Amazon S3 路径中的文件的存储类。

可以在任意两个存储类之间转换。对于 GLACIERDEEP_ARCHIVE 存储类,您可以转换到这些类。但您可以使用 S3 RESTOREGLACIERDEEP_ARCHIVE 存储类进行转换。

如果您要运行从 Amazon S3 读取文件或分区的 AWS Glue ETL 任务,则可以排除某些 Amazon S3 存储类类型。有关更多信息,请参阅排除 Amazon S3 存储类

  • s3_path – 要转换的文件的 Amazon S3 中的路径,格式为 s3://<bucket>/<prefix>/

  • transition_to – 要切换到的 Amazon S3 存储类

  • options – 用于筛选要删除的文件和清单文件生成的选项。

    • retentionPeriod – 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。

    • partitionPredicate – 将转换满足此谓词的分区。这些分区中保留期内的文件不会被转换。设置为 "" – 默认情况下为空。

    • excludeStorageClassesexcludeStorageClasses 集中包含存储类的文件不会被转换。默认值是 Set() – 一个空集。

    • manifestFilePath – 用于生成清单文件的可选路径。所有成功转换的文件将记录在 Success.csv 中,所有未能转换的文件将记录在 Failed.csv

    • accountId – 要运行转换的 Amazon Web Services 账户 ID。对于此转换是必需的。

    • roleArn – 要运行转换的 AWS 角色。对于此转换是必需的。

  • transformation_ctx – 要使用的转换上下文 (可选)。在清单文件路径中使用。

glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})

提取

extract_jdbc_conf

extract_jdbc_conf(connection_name, catalog_id = None)

从数据目录中的 dict 连接对象返回具有键(包含配置属性)的 AWS Glue。

  • user - 数据库用户名。

  • password - 数据库密码。

  • vendor - 指定供应商(mysqlpostgresqloraclesqlserver、等)。

  • enforceSSL - 一个布尔字符串,指示是否需要安全连接。

  • customJDBCCert - 使用指示的 Amazon S3 路径中的特定客户端证书。

  • skipCustomJDBCCertValidation - 一个布尔字符串,指示 customJDBCCert 是否必须由 CA 验证。

  • customJDBCCertString - 关于自定义证书的其他信息,特定于驱动程序类型。

  • url -(已弃用)仅包含协议、服务器和端口的 JDBC URL。

  • fullUrl - 创建连接时输入的 JDBC URL(AWS Glue 版本 3.0 或更高版本中可用)。

检索 JDBC 配置的示例:

jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}

事务

start_transaction

start_transaction(read_only)

开启新事务。内部调用 Lake Formation startTransaction API。

  • read_only –(布尔值)指示此事务应为只读还是读写。使用只读事务 ID 进行的写入将被拒绝。只读事务不需要提交。

返回事务 ID。

commit_transaction

commit_transaction(transaction_id, wait_for_commit = True)

尝试提交指定的事务。可能在事务完成提交之前返回 commit_transaction。内部调用 Lake Formation commitTransaction API。

  • transaction_id –(字符串)要提交的事务。

  • wait_for_commit –(布尔值)确定是否立即返回 commit_transaction。默认值为 true。如果为假,则轮询 commit_transaction 并等待事务提交。等待时间限制为 1 分钟使用指数回退,最多重试 6 次。

返回布尔值,以指示是否完成提交。

cancel_transaction

cancel_transaction(transaction_id)

尝试取消指定的事务。如果事务以前已提交,则返回 TransactionCommittedException 异常。内部调用 Lake Formation CancelTransaction API。

  • transaction_id –(字符串)要取消的事务。

编写

getSink

getSink(connection_type, format = None, transformation_ctx = "", **options)

获取一个 DataSink 对象,该对象可用于将 DynamicFrames 写入外部来源。先检查 SparkSQL format 以确保获得预期的接收器。

  • connection_type – 要使用的连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroraclekinesiskafka

  • format – 要使用的 SparkSQL 格式 (可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • options – 用于指定连接选项的名称-值对的集合。一些可能的值包括:

    • userpassword:适用于授权

    • url:数据存储的端点

    • dbtable:目标表的名称

    • bulkSize:插入操作的并行度

可以指定的选项取决于连接类型。有关其他值和示例,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

例如:

>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)

write_dynamic_frame_from_options

write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None, format_options={}, transformation_ctx = "")

写入并返回一个使用指定连接和格式的 DynamicFrame

  • frame – 要编写的 DynamicFrame

  • connection_type – 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroraclekinesiskafka

  • connection_options – 连接选项,例如路径和数据库表 (可选)。对于 connection_types3,将会定义 Amazon S3 路径。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。

    警告

    不建议在脚本中存储密码。请考虑使用 boto3 从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable 属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

    有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • format – 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • transformation_ctx – 要使用的转换上下文 (可选)。

write_from_options

write_from_options(frame_or_dfc, connection_type, connection_options={}, format={}, format_options={}, transformation_ctx = "")

写入并返回一个使用指定连接和格式信息创建的 DynamicFrame or DynamicFrameCollection

  • frame_or_dfc – 要写入的 DynamicFrameDynamicFrameCollection

  • connection_type – 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括 s3mysqlpostgresqlredshiftsqlserveroracle

  • connection_options – 连接选项,例如路径和数据库表 (可选)。对于 connection_types3,将会定义 Amazon S3 路径。

    connection_options = {"path": "s3://aws-glue-target/temp"}

    对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。

    警告

    不建议在脚本中存储密码。请考虑使用 boto3 从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。

    connection_options = {"url": "jdbc-url/database", "user": "username", "password": passwordVariable,"dbtable": "table-name", "redshiftTmpDir": "s3-tempdir-path"}

    dbtable 属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。

    有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • format – 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • format_options – 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • transformation_ctx – 要使用的转换上下文 (可选)。

write_dynamic_frame_from_catalog

write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

写入并返回一个使用数据目录数据库和表中信息的 DynamicFrame

  • frame – 要编写的 DynamicFrame

  • Database – 包含表的数据目录数据库。

  • table_name – 与目标关联的数据目录表的名称。

  • redshift_tmp_dir – 要使用的 Amazon Redshift 临时目录(可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • additional_options – 可选名称/值对的集合。

  • catalog_id – 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。

write_data_frame_from_catalog

write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)

写入并返回一个使用数据目录数据库和表中信息的 DataFrame。此方法支持写入数据湖格式(Hudi、Iceberg 和 Delta Lake)。有关更多信息,请参阅 在 AWS Glue ETL 任务中使用数据湖框架

  • frame – 要编写的 DataFrame

  • Database – 包含表的数据目录数据库。

  • table_name – 与目标关联的数据目录表的名称。

  • redshift_tmp_dir – 要使用的 Amazon Redshift 临时目录(可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • additional_options – 可选名称/值对的集合。

    • useSparkDataSink — 如果设置为 true,则强制 AWS Glue 使用本机 Spark 数据接收器 API 来写入表。启用此选项后,您还可以根据需要在 additional_options 中添加任何 Spark 数据来源选项。AWSGlue 将这些选项直接传递给 Spark 写入器。

  • catalog_id – 正在访问的数据目录 ID(账户 ID)。如果您未指定值,则使用调用方的默认账户 ID。

限制

使用 useSparkDataSink 选项时请考虑以下限制:

示例:使用 Spark 数据源写入器写入 Hudi 表

hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name': <table_name>, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name': <table_name>, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database': <database_name>, 'hoodie.datasource.hive_sync.table': <table_name>, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame = <df_product_inserts>, database = <database_name>, table_name = <table_name>, additional_options = hudi_options )

write_dynamic_frame_from_jdbc_conf

write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

写入并返回一个使用指定 JDBC 和连接信息的 DynamicFrame

  • frame – 要编写的 DynamicFrame

  • catalog_connection – 要使用的目录连接。

  • connection_options – 连接选项,例如路径和数据库表 (可选)。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • redshift_tmp_dir – 要使用的 Amazon Redshift 临时目录(可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • catalog_id – 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。

write_from_jdbc_conf

write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)

写入并返回一个使用指定 JDBC 和连接信息的 DynamicFrame or DynamicFrameCollection

  • frame_or_dfc – 要写入的 DynamicFrameDynamicFrameCollection

  • catalog_connection – 要使用的目录连接。

  • connection_options – 连接选项,例如路径和数据库表 (可选)。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • redshift_tmp_dir – 要使用的 Amazon Redshift 临时目录(可选)。

  • transformation_ctx – 要使用的转换上下文 (可选)。

  • catalog_id – 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。