将数据从 Amazon S3 摄取到 Timestream 以实现 InfluxDB 自动化 - Amazon Timestream

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics,可以考虑适用于 InfluxDB 的亚马逊 Timestream。它为实时分析提供了简化的数据摄取和个位数毫秒的查询响应时间。点击此处了解更多。

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将数据从 Amazon S3 摄取到 Timestream 以实现 InfluxDB 自动化

LiveAnalytics 导出的 Timestream 工具完成卸载过程后,自动化过程的下一步就开始了。这种自动化使用 InfluxDB的导入工具将数据传输到其专门的时间序列结构中。该过程转换了 Timestream 的数据模型,使其与 InfluxDB 的测量、标签和字段概念相匹配。最后,它使用 InfluxDB的线路协议高效地加载数据。

完成迁移的工作流程分为四个阶段:

  1. 使用 Timestream LiveAnalytics 导出工具卸载数据。

  2. 数据转换:使用 Amazon Athena 将 LiveAnalytics 数据的 Timestream 转换为 InfluxDB 线路协议格式(基于基数评估后定义的架构)。

  3. 数据摄取:将线路协议数据集提取到 InfluxDB 实例的 Timestream 中。

  4. 验证:(可选)您可以验证是否已提取每个线路协议点(--add-validation-field true在数据转换步骤中需要)。

数据转换

为了进行数据转换,我们开发了一个脚本,使用Amazon Athena将 LiveAnalytics 导出的数据镶木地板格式的时间流转换为InfluxDB的线路协议格式。Amazon Athena 提供无服务器查询服务和一种经济实惠的方式,无需专用的计算资源即可转换大量时间序列数据。

脚本执行以下操作:

  • 将来自亚马逊 S3 存储桶 LiveAnalytics 的数据导出的 Timestream 加载到亚马逊 Athena 表中。

  • 执行数据映射,将存储在 Athena 表中的数据转换为线路协议,并将其存储在 S3 存储桶中。

数据映射

下表显示了如何将 LiveAnalytics 数据的 Timestream 映射到线路协议数据。

概念的时间 LiveAnalytics 流 线路协议概念

表名

测量

Dimensions

标签

度量名称

标签(可选)

措施

字段

时间

Timestamp

先决条件和安装

请参阅转换脚本的 README 中的 “先决条件” 和 “安装” 部分。

用法

要将存储在存储桶 example_s3_bucket 中的数据从 example_database 中的 LiveAnalytics 表的时间流转换为 example_table,请运行以下命令:

python3 transform.py \ --database-name example_database \ --tables example_table \ --s3-bucket-path example_s3_bucket \ --add-validation-field false

脚本完成后,

  • 在 Athena 中,将创建 example_database_example_table 表,其中包含数据的时间流。 LiveAnalytics

  • 在 Athena 中,将创建 lp_example_database_example_table 表,其中包含转换为线路协议点的数据的时间流。 LiveAnalytics

  • 在 S3 存储桶 example_s3_bucket 中,将在路径中存储线路协议 example_database/example_table/unload-<%Y-%m-%d-%H:%M:%S>/line-protocol-output数据。

建议

有关脚本最新用法的更多详细信息,请参阅转换脚本的自述文件,以及后续迁移步骤(例如验证)需要输出。如果为了提高基数而排除维度,请使用--dimensions-to-fields参数将特定维度更改为字段,从而调整架构以减少基数。

添加要验证的字段

有关如何添加用于验证的字段的信息,请参阅转换脚本自述文件中的 “添加用于验证的字段” 部分。

将数据提取到 InfluxDB 的 Timestream 中

InfluxDB 摄取脚本将压缩线路协议数据集提取到 InfluxDB 的 Timestream 中。包含 gzip 压缩行协议文件的目录作为命令行参数与摄取目标 InfluxDB 存储桶一起传入。该脚本旨在使用多处理一次提取多个文件,以利用InfluxDB和执行脚本的机器上的资源。

该脚本执行以下操作:

  • 提取压缩后的文件并将其提取到 InfluxDB 中。

  • 实现重试机制和错误处理。

  • 跟踪成功和失败的摄取以供恢复。

  • 优化从线路协议数据集读取时的 I/O 操作。

先决条件和安装

请参阅摄取脚本的 READ ME 中的 “先决条件和安装” 部分。 GitHub

数据准备

提取所需的压缩线路协议文件由数据转换脚本生成。请按照以下步骤准备数据:

  1. 设置一个具有足够存储空间的 EC2 实例,以容纳转换后的数据集。

  2. 将转换后的数据从 S3 存储桶同步到您的本地目录:

    aws s3 sync \ s3://your-bucket-name/path/to/transformed/data \ ./data_directory
  3. 确保您对数据目录中的所有文件具有读取权限。

  4. 运行以下摄取脚本,将数据提取到 InfluxDB 的 Timestream 中。

用法

python influxdb_ingestion.py <bucket_name> <data_directory> [options]

基本用法

python influxdb_ingestion.py my_bucket ./data_files

摄入率

我们已经对摄取率进行了一些测试。使用 C5N.9XL EC2 实例进行摄取测试,使用 10 个工作人员执行摄取脚本,并将大约 500 GB 的线路协议摄取到 8XL Timestream 用于 InfluxDB 实例:

  • 3K IOPS 15.86 GB/小时。

  • 12K IOPS 70.34 GB/小时。

  • 16K IOPS 71.28 GB/小时。

建议

  • 使用具有足够 CPU 内核的 EC2 实例来处理并行处理。

  • 确保实例有足够的存储空间来容纳整个转换后的数据集,并有额外的提取空间。

    • 一次提取的文件数等于脚本执行期间配置的工作程序数。

  • 将 EC2 实例放置在与 InfluxDB 实例相同的区域和可用区(如果可能),以最大限度地减少延迟。

  • 考虑使用针对网络操作进行了优化的实例类型,例如 C5N。

  • 如果需要较高的摄取速率,则建议InfluxDB实例的时间流至少使用1.2K IOPS。通过增加依赖于 Timestream 的 InfluxDB 实例大小的脚本的工作程序数量,可以获得其他优化。

有关更多信息,请参阅摄取脚本的自述文件。