将 Amazon Data Firehose 流传输到亚马逊 S3 中的 Apache Iceberg Tables 处于预览阶段,可能会发生变化。
本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
在 Amazon Data Firehose 中转换输入数据格式
在将数据存储到JSON亚马逊 S3 ORC 之前,Amazon Data Firehose 可以将您的输入数据的格式转换为 Apache Parq
先决条件
Amazon Data Firehose 需要以下三个元素来转换记录数据的格式:
-
用于读取输入数据的反序列化器 — 您可以选择两种类型的反序列化器之一:Apache Hive 或 OpenX。JSON JSON SerDe JSON SerDe
注意
将多个JSON文档合并成同一条记录时,请确保您的输入仍以支持的JSON格式显示。JSON文档数组不是有效的输入。
例如,这是正确的输入:
{"a":1}{"a":2}
这是错误的输入:
[{"a":1}, {"a":2}]
-
用于确定如何解释该数据的架构:使用 AWS Glue 在 AWS Glue Data Catalog中创建架构。然后,Amazon Data Firehose 会引用该架构并使用它来解释您的输入数据。您可以使用相同的架构来配置 Amazon Data Firehose 和您的分析软件。有关更多信息,请参阅《AWS Glue 开发者指南》中的 “填充 AWS Glue 数据目录”。
注意
在 AWS Glue 数据目录中创建的架构应与输入数据结构相匹配。否则,转换后的数据将不会包含架构中未指定的属性。如果您使用嵌套JSON,请在架构中使用一种能够镜像JSON数据结构的STRUCT类型。有关如何处理嵌套STRUCT类型的信息JSON,请参阅此示例。
-
用于将@@ 数据转换为目标列式存储格式的序列化器(Parquet 或ORC)— 您可以选择两种类型的序列化器之一:或 Parquet。ORC SerDe
SerDe
重要
如果您启用记录格式转换,则无法将亚马逊数据 Firehose 目标设置为亚马逊 OpenSearch 服务、亚马逊 Redshift 或 Splunk。启用格式转换后,Amazon S3 是您的 Firehose 直播可以使用的唯一目的地。
即使您在将记录发送到 Amazon Data Firehose 之前汇总了这些记录,也可以转换数据的格式。
选择JSON反序列化器
JSON SerDe如果您的输入JSON包含以下格式的时间戳,请选择 OpenX
-
yyyy-MM-dd'T'HH:mm:ss[.S]'Z',其中小数最多有 9 位,例如:
2017-02-07T15:13:01.39256Z
。 -
yyyy-[M]M-[d]d HH:mm:ss[.S],其中小数最多有 9 位,例如:
2017-02-07 15:13:01.14
。 -
秒,以纪元格式表示,例如:
1518033528
。 -
毫秒,以纪元格式表示,例如:
1518033528123
。 -
浮点秒,以纪元格式表示,例如:
1518033528.123
。
OpenX JSON SerDe 可以将句点 (.
) 转换为下划线 (_
)。它还可以在反序列化JSON密钥之前将其转换为小写。有关此反序列化器通过 Amazon Data Firehose 提供的选项的更多信息,请参阅 O。penXJson SerDe
如果你不确定要选择哪个反序列化器,请使用 OpenX JSON SerDe,除非你有它不支持的时间戳。
如果您的时间戳格式与之前列出的格式不同,请使用 Apache Hiv JSON SerDe eDateTimeFormat
格式字符串的模式语法。有关更多信息,请参阅类 DateTimeFormat
您还可以使用特殊值 millis
来解析时间戳(毫秒,以纪元格式表示)。如果您未指定格式,Amazon Data Firehose 会java.sql.Timestamp::valueOf
默认使用该格式。
Hive JSON SerDe 不允许进行以下操作:
-
列名称中的句点 (
.
)。 -
类型为
uniontype
的字段。 -
架构中具有数字类型但其中为字符串的字段JSON。例如,如果架构是(int),而架构JSON是
{"a":"123"}
,则 Hive SerDe 会给出错误。
Hive SerDe 不会将嵌套JSON转换为字符串。例如,如果您有 {"a":{"inner":1}}
,它不会将 {"inner":1}
视为字符串。
选择序列化器
选择的串行化器取决于您的业务需求。要了解有关这两个序列化器选项的更多信息,请参阅ORC SerDe
从控制台启用记录格式转换
创建或更新 Firehose 直播时,可以在控制台上启用数据格式转换。启用数据格式转换后,Amazon S3 是您可以为 Firehose 数据流配置的唯一目标。此外,启用格式转换时,系统将禁用 Amazon S3 压缩。但是,Snappy 压缩会作为自动转换过程的一部分自动进行。Amazon Data Firehose 在本例中使用的 Snappy 框架格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 取景格式,请参阅.java。BlockCompressorStream
要为数据 Firehose 流启用数据格式转换
-
登录并打开 Amazon Data Firehose 控制台,网址为。 AWS Management Consolehttps://console.aws.amazon.com/firehose/
-
选择要更新的 Firehose 直播,或者按照中的步骤创建新的 Firehose 直播。教程:从控制台创建 Firehose 直播
-
在转换记录格式下,将记录格式转换设置为已启用。
-
选择一个 AWS Glue 表,为您的源记录指定架构。设置区域、数据库、表和表版本。
管理来自 Firehose 的记录格式转换 API
如果你想让 Amazon Data Firehose 将你的输入数据格式从JSON转换为 Parquet 或者ORC,请在 extendedS3 或 extendedS DestinationConfiguration 3 中指定可选DataFormatConversionConfiguration元素。DestinationUpdate如果您指定 DataFormatConversionConfiguration,则适用以下限制。
-
在中 BufferingHints,如果启用记录格式转换,则不能
SizeInMBs
将值设置为小于 64。此外,如果未启用格式转换,则默认值为 5。在启用格式转换后,该值将变为 128。 -
你必须
CompressionFormat
在 extendedS3 DestinationConfiguration 或 extendes3 中将设置为。DestinationUpdateUNCOMPRESSED
CompressionFormat
的默认值为UNCOMPRESSED
。因此,您也可以在 ext en DestinationConfiguration dedS3 中将其保留为未指定。默认情况下,数据将使用 Snappy 压缩来作为串行化过程的一部分得到压缩。Amazon Data Firehose 在本例中使用的 Snappy 框架格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 取景格式,请参阅.java。BlockCompressorStream当配置串行化器时,您可以选择其他类型的压缩。
错误处理
当 Amazon Data Firehose 无法解析或反序列化记录时(例如,当数据与架构不匹配时),它会使用错误前缀将其写入 Amazon S3。如果此写入失败,Amazon Data Firehose 将永远重试,从而阻止进一步交付。对于每条失败的记录,Amazon Data Firehose 都会编写一个包含以下架构的JSON文档:
{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }
示例
有关如何使用设置记录格式转换的示例 AWS CloudFormation,请参阅AWS::DataFirehose:: DeliveryStream。