在 Amazon Data Firehose 中转换输入数据格式 - Amazon Data Firehose

将 Amazon Data Firehose 流传输到亚马逊 S3 中的 Apache Iceberg Tables 处于预览阶段,可能会发生变化。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Amazon Data Firehose 中转换输入数据格式

在将数据存储到JSON亚马逊 S3 ORC 之前,Amazon Data Firehose 可以将您的输入数据的格式转换为 Apache Parq uet 或 Apache。与面向行的格式相比,Parquet 和 parquet ORC 是列式数据格式,与面向行的格式相比,可以节省空间并实现更快的查询。JSON如果要转换其他输入格式JSON,例如逗号分隔值 (CSV) 或结构化文本,则可以使用 AWS Lambda 将其转换为 first JSON。有关更多信息,请参阅 在 Amazon Data Firehose 中转换源数据

先决条件

Amazon Data Firehose 需要以下三个元素来转换记录数据的格式:

重要

如果您启用记录格式转换,则无法将亚马逊数据 Firehose 目标设置为亚马逊 OpenSearch 服务、亚马逊 Redshift 或 Splunk。启用格式转换后,Amazon S3 是您的 Firehose 直播可以使用的唯一目的地。

即使您在将记录发送到 Amazon Data Firehose 之前汇总了这些记录,也可以转换数据的格式。

选择JSON反序列化器

JSON SerDe如果您的输入JSON包含以下格式的时间戳,请选择 OpenX

  • yyyy-MM-dd'T'HH:mm:ss[.S]'Z',其中小数最多有 9 位,例如:2017-02-07T15:13:01.39256Z

  • yyyy-[M]M-[d]d HH:mm:ss[.S],其中小数最多有 9 位,例如:2017-02-07 15:13:01.14

  • 秒,以纪元格式表示,例如:1518033528

  • 毫秒,以纪元格式表示,例如:1518033528123

  • 浮点秒,以纪元格式表示,例如:1518033528.123

OpenX JSON SerDe 可以将句点 (.) 转换为下划线 (_)。它还可以在反序列化JSON密钥之前将其转换为小写。有关此反序列化器通过 Amazon Data Firehose 提供的选项的更多信息,请参阅 O。penXJson SerDe

如果你不确定要选择哪个反序列化器,请使用 OpenX JSON SerDe,除非你有它不支持的时间戳。

如果您的时间戳格式与之前列出的格式不同,请使用 Apache Hiv JSON SerDe e。选择此解串器后,您可以指定要使用的时间戳格式。为此,请遵循 Joda-Time DateTimeFormat 格式字符串的模式语法。有关更多信息,请参阅类 DateTimeFormat

您还可以使用特殊值 millis 来解析时间戳(毫秒,以纪元格式表示)。如果您未指定格式,Amazon Data Firehose 会java.sql.Timestamp::valueOf默认使用该格式。

Hive JSON SerDe 不允许进行以下操作:

  • 列名称中的句点 (.)。

  • 类型为 uniontype 的字段。

  • 架构中具有数字类型但其中为字符串的字段JSON。例如,如果架构是(int),而架构JSON是{"a":"123"},则 Hive SerDe 会给出错误。

Hive SerDe 不会将嵌套JSON转换为字符串。例如,如果您有 {"a":{"inner":1}},它不会将 {"inner":1} 视为字符串。

选择序列化器

选择的串行化器取决于您的业务需求。要了解有关这两个序列化器选项的更多信息,请参阅ORC SerDe和 Par q SerDe uet。

从控制台启用记录格式转换

创建或更新 Firehose 直播时,可以在控制台上启用数据格式转换。启用数据格式转换后,Amazon S3 是您可以为 Firehose 数据流配置的唯一目标。此外,启用格式转换时,系统将禁用 Amazon S3 压缩。但是,Snappy 压缩会作为自动转换过程的一部分自动进行。Amazon Data Firehose 在本例中使用的 Snappy 框架格式与 Hadoop 兼容。这意味着,您可以使用 Snappy 压缩的结果并在 Athena 中对这些数据运行查询。有关 Hadoop 所依赖的 Snappy 取景格式,请参阅.java。BlockCompressorStream

要为数据 Firehose 流启用数据格式转换
  1. 登录并打开 Amazon Data Firehose 控制台,网址为。 AWS Management Consolehttps://console.aws.amazon.com/firehose/

  2. 选择要更新的 Firehose 直播,或者按照中的步骤创建新的 Firehose 直播。教程:从控制台创建 Firehose 直播

  3. 转换记录格式下,将记录格式转换设置为已启用

  4. 选择所需的输出格式。有关这两个选项的更多信息,请参阅 Apache Par quet 和 Apache。ORC

  5. 选择一个 AWS Glue 表,为您的源记录指定架构。设置区域、数据库、表和表版本。

管理来自 Firehose 的记录格式转换 API

如果你想让 Amazon Data Firehose 将你的输入数据格式从JSON转换为 Parquet 或者ORC,请在 extendedS3 或 extendedS DestinationConfiguration 3 中指定可选DataFormatConversionConfiguration元素。DestinationUpdate如果您指定 DataFormatConversionConfiguration,则适用以下限制。

错误处理

当 Amazon Data Firehose 无法解析或反序列化记录时(例如,当数据与架构不匹配时),它会使用错误前缀将其写入 Amazon S3。如果此写入失败,Amazon Data Firehose 将永远重试,从而阻止进一步交付。对于每条失败的记录,Amazon Data Firehose 都会编写一个包含以下架构的JSON文档:

{ "attemptsMade": long, "arrivalTimestamp": long, "lastErrorCode": string, "lastErrorMessage": string, "attemptEndingTimestamp": long, "rawData": string, "sequenceNumber": string, "subSequenceNumber": long, "dataCatalogTable": { "catalogId": string, "databaseName": string, "tableName": string, "region": string, "versionId": string, "catalogArn": string } }

示例

有关如何使用设置记录格式转换的示例 AWS CloudFormation,请参阅AWS::DataFirehose:: DeliveryStream