GlueContext 类
包装 Apache Spark SparkContext
__init__
__init__(sparkContext)
sparkContext
– 要使用的 Apache Spark 上下文。
Creating
getSource
getSource(connection_type, transformation_ctx = "", **options)
创建一个 DataSource
对象,该对象可用于从外部来源读取 DynamicFrames
。
connection_type
– 要使用的连接类型,例如 Amazon Simple Storage Service(Amazon S3)、Amazon Redshift 和 JDBC。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
和dynamodb
。transformation_ctx
– 要使用的转换上下文 (可选)。options
– 可选名称/值对的集合。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。
以下是使用 getSource
的示例。
>>> data_source = context.getSource("file", paths=["/in/path"]) >>> data_source.setFormat("json") >>> myFrame = data_source.getFrame()
create_dynamic_frame_from_rdd
create_dynamic_frame_from_rdd(data, name, schema=None, sample_ratio=None, transformation_ctx="")
返回一个从 Apache Spark 弹性分布式数据集 (RDD) 创建的 DynamicFrame
。
data
– 要使用的数据源。name
– 要使用的数据的名称。schema
– 要使用的架构 (可选)。sample_ratio
– 要使用的采样率 (可选)。transformation_ctx
– 要使用的转换上下文 (可选)。
create_dynamic_frame_from_catalog
create_dynamic_frame_from_catalog(database, table_name, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, catalog_id = None)
返回一个使用数据目录数据库和表名称创建的 DynamicFrame
。使用此方法时,您可以通过指定的 AWS Glue Data Catalog 表中的表属性提供 format_options
,并通过 additional_options
参数提供其他选项。
Database
– 要从中读取的数据库。table_name
– 要从中读取的表的名称。redshift_tmp_dir
– 要使用的 Amazon Redshift 临时目录(可选)。transformation_ctx
– 要使用的转换上下文 (可选)。push_down_predicate
– 筛选分区,而不必列出并读取数据集中的所有文件。有关支持的来源和限制,请参阅在 AWS Glue ETL 中使用下推优化读取。有关更多信息,请参阅 使用下推谓词进行预筛选。additional_options
– 可选名称/值对的集合。可能选项包括 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,但endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
和delimiter
除外。另一个支持的选项是catalogPartitionPredicate
:catalogPartitionPredicate
– 要传递目录表达式以根据索引列进行筛选。这样会将筛选下推到服务器端。有关更多信息,请参阅 AWS Glue 分区数据。请注意,push_down_predicate
和catalogPartitionPredicate
使用不同的语法。前者使用 Spark SQL 标准语法,后者使用 JSQL 解析器。catalog_id
– 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。
create_dynamic_frame_from_options
create_dynamic_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
返回一个使用指定连接和格式创建的 DynamicFrame
。
connection_type
– 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
和dynamodb
。connection_options
– 连接选项,例如路径和数据库表(可选)。对于s3
的connection_type
,定义 Amazon S3 路径的列表。connection_options = {"paths": ["
s3://aws-glue-target/temp
"]}对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。
警告
不建议在脚本中存储密码。请考虑使用
boto3
从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}dbtable
属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定schema.table-name
。如果未提供架构,则使用默认的“public”架构。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。
format
– 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。format_options
– 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。transformation_ctx
– 要使用的转换上下文 (可选)。push_down_predicate
– 筛选分区,而不必列出并读取数据集中的所有文件。有关支持的来源和限制,请参阅在 AWS Glue ETL 中使用下推优化读取。有关更多信息,请参阅使用下推谓词进行预筛选。
create_sample_dynamic_frame_from_catalog
create_sample_dynamic_frame_from_catalog(database, table_name, num, redshift_tmp_dir, transformation_ctx = "", push_down_predicate= "", additional_options = {}, sample_options = {}, catalog_id = None)
返回一个使用数据目录数据库和表名称创建的 DynamicFrame
示例。DynamicFrame
只包含来自数据源的第一个 num
记录。
-
database
– 要从中读取的数据库。 -
table_name
– 要从中读取的表的名称。 -
num
– 返回的动态帧示例中的最大记录数。 redshift_tmp_dir
– 要使用的 Amazon Redshift 临时目录(可选)。-
transformation_ctx
– 要使用的转换上下文 (可选)。 push_down_predicate
– 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选。-
additional_options
– 可选名称/值对的集合。可能选项包括 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,但endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
和delimiter
除外。 -
sample_options
– 控制采样行为的参数(可选)。Amazon S3 源的当前可用参数:maxSamplePartitions
– 采样将读取的最大分区数。默认值为 10maxSampleFilesPerPartition
– 采样将在一个分区中读取的最大文件数。默认值为 10。这些参数有助于减少文件列表所耗费的时间。例如,假设数据集有 1000 个分区,每个分区有 10 个文件。如果您设置
maxSamplePartitions
= 10,以及maxSampleFilesPerPartition
= 10,则取样不会列出所有 10,000 个文件,而是只列出并读取前 10 个分区,每个分区中的前 10 个文件:10*10 = 共 100 个文件。
-
catalog_id
– 正在访问的数据目录的目录 ID(数据目录的账户 ID)。默认设置为None
。None
默认为服务中调用账户的目录 ID。
create_sample_dynamic_frame_from_options
create_sample_dynamic_frame_from_options(connection_type, connection_options={}, num, sample_options={}, format=None, format_options={}, transformation_ctx = "")
返回一个使用指定连接和格式创建的 DynamicFrame
示例。DynamicFrame
只包含来自数据源的第一个 num
记录。
connection_type
– 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
和dynamodb
。connection_options
– 连接选项,例如路径和数据库表(可选)。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。-
num
– 返回的动态帧示例中的最大记录数。 -
sample_options
– 控制采样行为的参数(可选)。Amazon S3 源的当前可用参数:maxSamplePartitions
– 采样将读取的最大分区数。默认值为 10maxSampleFilesPerPartition
– 采样将在一个分区中读取的最大文件数。默认值为 10。这些参数有助于减少文件列表所耗费的时间。例如,假设数据集有 1000 个分区,每个分区有 10 个文件。如果您设置
maxSamplePartitions
= 10,以及maxSampleFilesPerPartition
= 10,则取样不会列出所有 10,000 个文件,而是只列出并读取前 10 个分区,每个分区中的前 10 个文件:10*10 = 共 100 个文件。
format
– 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。format_options
– 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。-
transformation_ctx
– 要使用的转换上下文 (可选)。 push_down_predicate
– 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选。
add_ingestion_time_columns
add_ingestion_time_columns(dataFrame, timeGranularity = "")
将提取时间列(例如 ingest_year
、ingest_month
、ingest_day
、ingest_hour
、ingest_minute
)附加到输入 DataFrame
。当您指定以 Amazon S3 为目标的数据目录表时,AWS Glue 生成的脚本中会自动生成此函数。此函数使用输出表上的提取时间列自动更新分区。这允许根据提取时间自动对输出数据进行分区,而不需要在输入数据中显示提取时间列。
-
dataFrame
– 提取时间列要附加到的dataFrame
。 -
timeGranularity
– 时间列的粒度。有效值为“day
”、“hour
”和“minute
”。例如,如果“hour
”传递到函数,原始dataFrame
将附加“ingest_year
”、“ingest_month
”、“ingest_day
”和“ingest_hour
”时间列。
在附加时间粒度列后返回数据框。
例如:
dynamic_frame = DynamicFrame.fromDF(glueContext.add_ingestion_time_columns(dataFrame, "hour"))
create_data_frame_from_catalog
create_data_frame_from_catalog(database, table_name, transformation_ctx = "",
additional_options = {})
返回一个使用数据目录数据表中的信息创建的 DataFrame
。
-
database
– 要从中读取数据的数据目录数据库。 -
table_name
– 要从中读取数据的数据目录表的名称。 -
transformation_ctx
– 要使用的转换上下文 (可选)。 -
additional_options
– 可选名称/值对的集合。可能选项包括流式传输源的 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,例如startingPosition
、maxFetchTimeInMs
和startingOffsets
。-
useSparkDataSource
— 如果设置为 true,则强制 AWS Glue 使用本机 Spark 数据源 API 来读取表。Spark 数据源 API 支持以下格式:AVRO、二进制、CSV、JSON、ORC、Parquet 和文本。在数据目录表中,您可以使用classification
属性指定格式。要了解有关 Spark 数据源 API 的更多信息,请参阅 Apache Spark 官方文档。 create_data_frame_from_catalog
与useSparkDataSource
结合使用具有以下优势:-
直接返回一个
DataFrame
并提供create_dynamic_frame.from_catalog().toDF()
的替代项。 -
支持对本机格式实施 AWS Lake Formation 表级权限控制。
-
支持在没有 AWS Lake Formation 表级权限控制的情况下读取数据湖格式。有关更多信息,请参阅 在 AWS Glue ETL 任务中使用数据湖框架。
启用
useSparkDataSource
后,您还可以根据需要在additional_options
中添加任一 Spark 数据源选项。AWSGlue 会将这些选项直接传递给 Spark 读取器。 -
-
useCatalogSchema
— 如果设置为 true,AWS Glue 会对生成的DataFrame
应用此数据目录架构。否则,读取器会从数据中推断出架构。如果启用useCatalogSchema
,则必须同时将useSparkDataSource
设置为 true。
-
限制
使用 useSparkDataSource
选项时请考虑以下限制:
-
使用
useSparkDataSource
时,AWS Glue 会在与原始 Spark 会话不同的单独 Spark 会话中创建一个新的DataFrame
。 -
Spark DataFrame 分区筛选不适用于以下 AWS Glue 功能。
要对这些功能使用分区筛选,可以使用 AWS Glue 下推谓词。有关更多信息,请参阅 使用下推谓词进行预筛选。对未分区列的筛选不受影响。
以下示例脚本演示了使用
excludeStorageClasses
选项执行分区筛选的错误方法。// Incorrect partition filtering using Spark filter with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Suppose year and month are partition keys. // Filtering on year and month won't work, the filtered_df will still // contain data with other year/month values. filtered_df = read_df.filter("year == '2017 and month == '04' and 'state == 'CA'")
以下示例脚本演示了使用下推谓词以使用
excludeStorageClasses
选项执行分区筛选的正确方法。// Correct partition filtering using the AWS Glue pushdown predicate // with excludeStorageClasses read_df = glueContext.create_data_frame.from_catalog( database=database_name, table_name=table_name, // Use AWS Glue pushdown predicate to perform partition filtering push_down_predicate = "(year=='2017' and month=='04')" additional_options = { "useSparkDataSource": True, "excludeStorageClasses" : ["GLACIER", "DEEP_ARCHIVE"] } ) // Use Spark filter only on non-partitioned columns filtered_df = read_df.filter("state == 'CA'")
示例:使用 Spark 数据源读取器创建 CSV 表
// Read a CSV table with '\t' as separator read_df = glueContext.create_data_frame.from_catalog( database=
<database_name>
, table_name=<table_name>
, additional_options = {"useSparkDataSource": True, "sep": '\t'} )
create_data_frame_from_options
create_data_frame_from_options(connection_type, connection_options={},
format=None, format_options={}, transformation_ctx = "")
此 API 现已弃用。请改用 getSource()
API。返回一个使用指定连接和格式创建的 DataFrame
。仅将此函数与 AWS Glue 串流源结合使用。
-
connection_type
– 流式传输连接类型。有效值包括kinesis
和kafka
。 -
connection_options
– 连接选项,不同于 Kinesis 和 Kafka。您可以在 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 找到每个流式传输数据源的所有连接选项列表。请注意流式传输连接选项的以下差异:-
Kinesis 流式传输源需要
streamARN
、startingPosition
、inferSchema
和classification
。 -
Kinesis 流式传输源需要
connectionName
、topicName
、startingOffsets
、inferSchema
和classification
。
-
-
format
– 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关所支持格式的信息,请参阅AWS Glue for Spark 中的输入和输出的数据格式选项。 -
format_options
– 指定格式的格式选项。有关所支持的格式选项的信息,请参阅AWS Glue for Spark 中的输入和输出的数据格式选项。 -
transformation_ctx
– 要使用的转换上下文 (可选)。
Amazon Kinesis 流式传输源示例:
kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Kafka 流式传输源示例:
kafka_options = { "connectionName": "ConfluentKafka", "topicName": "kafka-auth-topic", "startingOffsets": "earliest", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kafka", connection_options=kafka_options)
forEachBatch
forEachBatch(frame, batch_function, options)
将传入的 batch_function
应用于从流式传输源读取的每个微批处理。
-
frame
– 包含当前微处理的 DataFrame。 -
batch_function
– 应用于每个微处理的函数。 -
options
– 键值对集合,其中包含有关如何处理微批处理的信息。以下选项为必填:-
windowSize
– 处理每个批处理所花费的时间量。 -
checkpointLocation
– 为流式传输 ETL 任务存储检查点的位置。 -
batchMaxRetries
– 在该批处理失败时重试的最大次数。默认值为 3。此选项仅适用于 Glue 版本 2.0 及更高版本。
-
示例:
glueContext.forEachBatch( frame = data_frame_datasource0, batch_function = processBatch, options = { "windowSize": "100 seconds", "checkpointLocation": "s3://kafka-auth-dataplane/confluent-test/output/checkpoint/" } ) def processBatch(data_frame, batchId): if (data_frame.count() > 0): datasource0 = DynamicFrame.fromDF( glueContext.add_ingestion_time_columns(data_frame, "hour"), glueContext, "from_data_frame" ) additionalOptions_datasink1 = {"enableUpdateCatalog": True} additionalOptions_datasink1["partitionKeys"] = ["ingest_yr", "ingest_mo", "ingest_day"] datasink1 = glueContext.write_dynamic_frame.from_catalog( frame = datasource0, database = "tempdb", table_name = "kafka-auth-table-output", transformation_ctx = "datasink1", additional_options = additionalOptions_datasink1 )
在 Amazon S3 中使用数据集
purge_table
purge_table(catalog_id=None, database="", table_name="", options={},
transformation_ctx="")
从 Amazon S3 中删除指定目录的数据库和表的文件。如果删除一个分区中的所有文件,则也会从目录中删除该分区。
如果您希望能恢复已删除的对象,可以在 Amazon S3 存储桶上启用对象版本控制。如果从未启用对象版本控制的存储桶中删除一个对象,则无法恢复该对象。有关如何恢复已启用版本控制的存储桶中的已删除对象的更多信息,请参阅 AWS Support 知识中心中的如何检索已删除的 Amazon S3 对象?
-
catalog_id
– 正在访问的数据目录的目录 ID(数据目录的账户 ID)。默认设置为None
。None
默认为服务中调用账户的目录 ID。 database
– 要使用的数据库。table_name
– 要使用的表的名称。options
– 用于筛选要删除的文件和清单文件生成的选项。retentionPeriod
– 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。partitionPredicate
– 满足此谓词的分区将被删除。这些分区中保留期内的文件不会被删除。设置为""
– 默认情况下为空。excludeStorageClasses
–excludeStorageClasses
集中包含存储类的文件不会被删除。默认值是Set()
– 一个空集。manifestFilePath
– 用于生成清单文件的可选路径。所有成功清除的文件将记录在Success.csv
中,所有未能清除的文件将记录在Failed.csv
中
transformation_ctx
– 要使用的转换上下文 (可选)。在清单文件路径中使用。
glueContext.purge_table("database", "table", {"partitionPredicate": "(month=='march')", "retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})
purge_s3_path
purge_s3_path(s3_path, options={}, transformation_ctx="")
以递归方式从指定的 Amazon S3 路径中删除文件。
如果您希望能恢复已删除的对象,可以在 Amazon S3 存储桶上启用对象版本控制。如果从未启用对象版本控制的存储桶中删除一个对象,则无法恢复该对象。有关如何恢复已启用版本控制的存储桶中的已删除对象的更多信息,请参阅 AWS Support 知识中心中的如何检索已删除的 Amazon S3 对象?
s3_path
– 要删除的文件 Amazon S3 中的路径,格式为s3://<
bucket
>/<prefix
>/options
– 用于筛选要删除的文件和清单文件生成的选项。retentionPeriod
– 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。excludeStorageClasses
–excludeStorageClasses
集中包含存储类的文件不会被删除。默认值是Set()
– 一个空集。manifestFilePath
– 用于生成清单文件的可选路径。所有成功清除的文件将记录在Success.csv
中,所有未能清除的文件将记录在Failed.csv
中
transformation_ctx
– 要使用的转换上下文 (可选)。在清单文件路径中使用。
glueContext.purge_s3_path("s3://bucket/path/", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/"})
transition_table
transition_table(database, table_name, transition_to, options={}, transformation_ctx="", catalog_id=None)
为指定目录的数据库和表转换存储在 Amazon S3 上的文件的存储类。
可以在任意两个存储类之间转换。对于 GLACIER
和 DEEP_ARCHIVE
存储类,您可以转换到这些类。但您可以使用 S3 RESTORE
从 GLACIER
和 DEEP_ARCHIVE
存储类进行转换。
如果您要运行从 Amazon S3 读取文件或分区的 AWS Glue ETL 任务,则可以排除某些 Amazon S3 存储类类型。有关更多信息,请参阅排除 Amazon S3 存储类。
database
– 要使用的数据库。table_name
– 要使用的表的名称。transition_to
– 要切换到的 Amazon S3 存储类。options
– 用于筛选要删除的文件和清单文件生成的选项。retentionPeriod
– 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。partitionPredicate
– 将转换满足此谓词的分区。这些分区中保留期内的文件不会被转换。设置为""
– 默认情况下为空。excludeStorageClasses
–excludeStorageClasses
集中包含存储类的文件不会被转换。默认值是Set()
– 一个空集。manifestFilePath
– 用于生成清单文件的可选路径。所有成功转换的文件将记录在Success.csv
中,所有未能转换的文件将记录在Failed.csv
中accountId
– 要运行转换的 Amazon Web Services 账户 ID。对于此转换是必需的。roleArn
– 要运行转换的 AWS 角色。对于此转换是必需的。
transformation_ctx
– 要使用的转换上下文 (可选)。在清单文件路径中使用。catalog_id
– 正在访问的数据目录的目录 ID(数据目录的账户 ID)。默认设置为None
。None
默认为服务中调用账户的目录 ID。
glueContext.transition_table("database", "table", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})
transition_s3_path
transition_s3_path(s3_path, transition_to, options={}, transformation_ctx="")
以递归方式转换指定 Amazon S3 路径中的文件的存储类。
可以在任意两个存储类之间转换。对于 GLACIER
和 DEEP_ARCHIVE
存储类,您可以转换到这些类。但您可以使用 S3 RESTORE
从 GLACIER
和 DEEP_ARCHIVE
存储类进行转换。
如果您要运行从 Amazon S3 读取文件或分区的 AWS Glue ETL 任务,则可以排除某些 Amazon S3 存储类类型。有关更多信息,请参阅排除 Amazon S3 存储类。
s3_path
– 要转换的文件的 Amazon S3 中的路径,格式为s3://<
bucket
>/<prefix
>/transition_to
– 要切换到的 Amazon S3 存储类。options
– 用于筛选要删除的文件和清单文件生成的选项。retentionPeriod
– 指定保留文件的时段(以小时为单位)。比保留期更新的文件将被保留。默认情况下设置为 168 小时(7 天)。partitionPredicate
– 将转换满足此谓词的分区。这些分区中保留期内的文件不会被转换。设置为""
– 默认情况下为空。excludeStorageClasses
–excludeStorageClasses
集中包含存储类的文件不会被转换。默认值是Set()
– 一个空集。manifestFilePath
– 用于生成清单文件的可选路径。所有成功转换的文件将记录在Success.csv
中,所有未能转换的文件将记录在Failed.csv
中accountId
– 要运行转换的 Amazon Web Services 账户 ID。对于此转换是必需的。roleArn
– 要运行转换的 AWS 角色。对于此转换是必需的。
transformation_ctx
– 要使用的转换上下文 (可选)。在清单文件路径中使用。
glueContext.transition_s3_path("s3://bucket/prefix/", "STANDARD_IA", {"retentionPeriod": 1, "excludeStorageClasses": ["STANDARD_IA"], "manifestFilePath": "s3://bucketmanifest/", "accountId": "12345678901", "roleArn": "arn:aws:iam::123456789012:user/example-username"})
提取
extract_jdbc_conf
extract_jdbc_conf(connection_name, catalog_id = None)
从数据目录中的 dict
连接对象返回具有键(包含配置属性)的 AWS Glue。
user
- 数据库用户名。password
- 数据库密码。vendor
- 指定供应商(mysql
、postgresql
、oracle
、sqlserver
、等)。enforceSSL
- 一个布尔字符串,指示是否需要安全连接。customJDBCCert
- 使用指示的 Amazon S3 路径中的特定客户端证书。skipCustomJDBCCertValidation
- 一个布尔字符串,指示customJDBCCert
是否必须由 CA 验证。customJDBCCertString
- 关于自定义证书的其他信息,特定于驱动程序类型。url
-(已弃用)仅包含协议、服务器和端口的 JDBC URL。fullUrl
- 创建连接时输入的 JDBC URL(AWS Glue 版本 3.0 或更高版本中可用)。
检索 JDBC 配置的示例:
jdbc_conf = glueContext.extract_jdbc_conf(connection_name="your_glue_connection_name") print(jdbc_conf) >>> {'enforceSSL': 'false', 'skipCustomJDBCCertValidation': 'false', 'url': 'jdbc:mysql://myserver:3306', 'fullUrl': 'jdbc:mysql://myserver:3306/mydb', 'customJDBCCertString': '', 'user': 'admin', 'customJDBCCert': '', 'password': '1234', 'vendor': 'mysql'}
事务
start_transaction
start_transaction(read_only)
开启新事务。内部调用 Lake Formation startTransaction API。
read_only
–(布尔值)指示此事务应为只读还是读写。使用只读事务 ID 进行的写入将被拒绝。只读事务不需要提交。
返回事务 ID。
commit_transaction
commit_transaction(transaction_id, wait_for_commit = True)
尝试提交指定的事务。可能在事务完成提交之前返回 commit_transaction
。内部调用 Lake Formation commitTransaction API。
transaction_id
–(字符串)要提交的事务。wait_for_commit
–(布尔值)确定是否立即返回commit_transaction
。默认值为 true。如果为假,则轮询commit_transaction
并等待事务提交。等待时间限制为 1 分钟使用指数回退,最多重试 6 次。
返回布尔值,以指示是否完成提交。
cancel_transaction
cancel_transaction(transaction_id)
尝试取消指定的事务。如果事务以前已提交,则返回 TransactionCommittedException
异常。内部调用 Lake Formation CancelTransaction API。
-
transaction_id
–(字符串)要取消的事务。
编写
getSink
getSink(connection_type, format = None, transformation_ctx = "", **options)
获取一个 DataSink
对象,该对象可用于将 DynamicFrames
写入外部来源。先检查 SparkSQL format
以确保获得预期的接收器。
connection_type
– 要使用的连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
、kinesis
和kafka
。format
– 要使用的 SparkSQL 格式 (可选)。transformation_ctx
– 要使用的转换上下文 (可选)。options
– 用于指定连接选项的名称-值对的集合。一些可能的值包括:-
user
和password
:适用于授权 -
url
:数据存储的端点 -
dbtable
:目标表的名称 -
bulkSize
:插入操作的并行度
-
可以指定的选项取决于连接类型。有关其他值和示例,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。
例如:
>>> data_sink = context.getSink("s3") >>> data_sink.setFormat("json"), >>> data_sink.writeFrame(myFrame)
write_dynamic_frame_from_options
write_dynamic_frame_from_options(frame, connection_type, connection_options={}, format=None,
format_options={}, transformation_ctx = "")
写入并返回一个使用指定连接和格式的 DynamicFrame
。
frame
– 要编写的DynamicFrame
。connection_type
– 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
、oracle
、kinesis
和kafka
。connection_options
– 连接选项,例如路径和数据库表 (可选)。对于connection_type
的s3
,将会定义 Amazon S3 路径。connection_options = {"path": "
s3://aws-glue-target/temp
"}对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。
警告
不建议在脚本中存储密码。请考虑使用
boto3
从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}dbtable
属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定schema.table-name
。如果未提供架构,则使用默认的“public”架构。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。
format
– 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。format_options
– 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。transformation_ctx
– 要使用的转换上下文 (可选)。
write_from_options
write_from_options(frame_or_dfc, connection_type,
connection_options={}, format={}, format_options={}, transformation_ctx = "")
写入并返回一个使用指定连接和格式信息创建的 DynamicFrame
or DynamicFrameCollection
。
frame_or_dfc
– 要写入的DynamicFrame
或DynamicFrameCollection
。connection_type
– 连接类型,例如 Amazon S3、Amazon Redshift 和 JDBC。有效值包括s3
、mysql
、postgresql
、redshift
、sqlserver
和oracle
。connection_options
– 连接选项,例如路径和数据库表 (可选)。对于connection_type
的s3
,将会定义 Amazon S3 路径。connection_options = {"path": "
s3://aws-glue-target/temp
"}对于 JDBC 连接,必须定义多个属性。请注意,数据库名称必须是 URL 的一部分。它可以选择性地包含在连接选项中。
警告
不建议在脚本中存储密码。请考虑使用
boto3
从 AWS Secrets Manager 或 AWS Glue Data Catalog 检索它们。connection_options = {"url": "
jdbc-url/database
", "user": "username
", "password":passwordVariable
,"dbtable": "table-name
", "redshiftTmpDir": "s3-tempdir-path
"}dbtable
属性是 JDBC 表的名称。对于在数据库中支持架构的 JDBC 数据存储,指定schema.table-name
。如果未提供架构,则使用默认的“public”架构。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。
format
– 格式规范。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。format_options
– 指定格式的格式选项。有关支持的格式,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项。transformation_ctx
– 要使用的转换上下文 (可选)。
write_dynamic_frame_from_catalog
write_dynamic_frame_from_catalog(frame, database, table_name, redshift_tmp_dir, transformation_ctx = "", additional_options = {}, catalog_id = None)
写入并返回一个使用数据目录数据库和表中信息的 DynamicFrame
。
frame
– 要编写的DynamicFrame
。Database
– 包含表的数据目录数据库。table_name
– 与目标关联的数据目录表的名称。redshift_tmp_dir
– 要使用的 Amazon Redshift 临时目录(可选)。transformation_ctx
– 要使用的转换上下文 (可选)。-
additional_options
– 可选名称/值对的集合。 catalog_id
– 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。
write_data_frame_from_catalog
write_data_frame_from_catalog(frame, database, table_name, redshift_tmp_dir,
transformation_ctx = "", additional_options = {}, catalog_id = None)
写入并返回一个使用数据目录数据库和表中信息的 DataFrame
。此方法支持写入数据湖格式(Hudi、Iceberg 和 Delta Lake)。有关更多信息,请参阅 在 AWS Glue ETL 任务中使用数据湖框架。
frame
– 要编写的DataFrame
。Database
– 包含表的数据目录数据库。table_name
– 与目标关联的数据目录表的名称。redshift_tmp_dir
– 要使用的 Amazon Redshift 临时目录(可选)。transformation_ctx
– 要使用的转换上下文 (可选)。-
additional_options
– 可选名称/值对的集合。-
useSparkDataSink
— 如果设置为 true,则强制 AWS Glue 使用本机 Spark 数据接收器 API 来写入表。启用此选项后,您还可以根据需要在additional_options
中添加任何 Spark 数据来源选项。AWSGlue 将这些选项直接传递给 Spark 写入器。
-
catalog_id
– 正在访问的数据目录 ID(账户 ID)。如果您未指定值,则使用调用方的默认账户 ID。
限制
使用 useSparkDataSink
选项时请考虑以下限制:
-
使用
useSparkDataSink
选项时不支持 enableUpdateCatalog 选项。
示例:使用 Spark 数据源写入器写入 Hudi 表
hudi_options = { 'useSparkDataSink': True, 'hoodie.table.name':
<table_name>
, 'hoodie.datasource.write.storage.type': 'COPY_ON_WRITE', 'hoodie.datasource.write.recordkey.field': 'product_id', 'hoodie.datasource.write.table.name':<table_name>
, 'hoodie.datasource.write.operation': 'upsert', 'hoodie.datasource.write.precombine.field': 'updated_at', 'hoodie.datasource.write.hive_style_partitioning': 'true', 'hoodie.upsert.shuffle.parallelism': 2, 'hoodie.insert.shuffle.parallelism': 2, 'hoodie.datasource.hive_sync.enable': 'true', 'hoodie.datasource.hive_sync.database':<database_name>
, 'hoodie.datasource.hive_sync.table':<table_name>
, 'hoodie.datasource.hive_sync.use_jdbc': 'false', 'hoodie.datasource.hive_sync.mode': 'hms'} glueContext.write_data_frame.from_catalog( frame =<df_product_inserts>
, database =<database_name>
, table_name =<table_name>
, additional_options = hudi_options )
write_dynamic_frame_from_jdbc_conf
write_dynamic_frame_from_jdbc_conf(frame, catalog_connection, connection_options={},
redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
写入并返回一个使用指定 JDBC 和连接信息的 DynamicFrame
。
frame
– 要编写的DynamicFrame
。catalog_connection
– 要使用的目录连接。connection_options
– 连接选项,例如路径和数据库表 (可选)。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。redshift_tmp_dir
– 要使用的 Amazon Redshift 临时目录(可选)。transformation_ctx
– 要使用的转换上下文 (可选)。catalog_id
– 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。
write_from_jdbc_conf
write_from_jdbc_conf(frame_or_dfc, catalog_connection, connection_options={}, redshift_tmp_dir = "", transformation_ctx = "", catalog_id = None)
写入并返回一个使用指定 JDBC 和连接信息的 DynamicFrame
or DynamicFrameCollection
。
frame_or_dfc
– 要写入的DynamicFrame
或DynamicFrameCollection
。catalog_connection
– 要使用的目录连接。connection_options
– 连接选项,例如路径和数据库表 (可选)。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项。redshift_tmp_dir
– 要使用的 Amazon Redshift 临时目录(可选)。transformation_ctx
– 要使用的转换上下文 (可选)。catalog_id
– 正在访问的数据目录 ID(账户 ID)。当为 None 时,将使用调用方的默认账户 ID。