InfluxDB 自動化のための Amazon S3 から Timestream へのデータの取り込み InfluxDB - Amazon Timestream

Amazon Timestream for LiveAnalytics と同様の機能については、Amazon Timestream for InfluxDB を検討してください。リアルタイム分析のために、簡略化されたデータ取り込みと 1 桁ミリ秒のクエリ応答時間を提供します。詳細については、こちらを参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

InfluxDB 自動化のための Amazon S3 から Timestream へのデータの取り込み InfluxDB

Timestream for LiveAnalytics エクスポートツールがアンロードプロセスを完了すると、自動化プロセスの次のステップが開始されます。この自動化では、InfluxDB のインポートツールを使用して、データを特殊な時系列構造に転送します。このプロセスは、Timestream のデータモデルを InfluxDB の測定値、タグ、フィールドの概念と一致するように変換します。最後に、InfluxDB のラインプロトコルを使用してデータを効率的にロードします

移行を完了するためのワークフローは、次の 4 つのステージに分かれています。

  1. Timestream for LiveAnalytics エクスポートツールを使用してデータをアンロードします。

  2. データ変換: Amazon Athena を使用して LiveAnalytics データの Timestream を InfluxDB ラインプロトコル形式 (カーディナリティ評価後に定義されたスキーマに基づく) に変換します。

  3. データ取り込み: InfluxDB インスタンスの Timestream にラインプロトコルデータセットを取り込みます。

  4. 検証: オプションで、すべてのラインプロトコルポイントが取り込まれたことを検証できます (データ変換ステップ--add-validation-field true中に必要)。

データ変換

データ変換のために、Amazon Athena を使用して LiveAnalytics でエクスポートされたデータパーケット形式を InfluxDB の Line Protocol 形式に変換するスクリプトを開発しました。Amazon Athena は、サーバーレスクエリサービスと、専用のコンピューティングリソースを必要とせずに大量の時系列データを変換するコスト効率の高い方法を提供します。

スクリプトは、次を実行します。

  • LiveAnalytics のエクスポートされた Timestream データを Amazon S3 バケットから Amazon Athena テーブルにロードします。

  • Athena テーブルに保存されているデータからラインプロトコルへのデータマッピングと変換を実行し、S3 バケットに保存します。

データマッピング

次の表は、Timestream for LiveAnalytics データをラインプロトコルデータにマッピングする方法を示しています。

LiveAnalytics の Timestream の概念 ラインプロトコルの概念

テーブル名

測定

ディメンション

[タグ]

メジャー名

タグ (オプション)

メジャー

[フィールド]

時間

タイムスタンプ

前提条件とインストール

変換スクリプトの README の「前提条件とインストール」セクションを参照してください。

使用方法

example_database の Timestream for LiveAnalytics テーブル example_table からバケット example_s3_bucket に保存されているデータを変換するには、次のコマンドを実行します。

python3 transform.py \ --database-name example_database \ --tables example_table \ --s3-bucket-path example_s3_bucket \ --add-validation-field false

スクリプトが完了したら、

  • Athena では、テーブル example_database_example_table が作成され、LiveAnalytics データの Timestream が含まれます。

  • Athena では、テーブル lp_example_database_example_table が作成され、ラインプロトコルポイントに変換された LiveAnalytics データの Timestream が含まれます。

  • S3 バケット example_s3_bucket のパス 内に example_database/example_table/unload-<%Y-%m-%d-%H:%M:%S>/line-protocol-output、ラインプロトコルデータが保存されます。

レコメンデーション

スクリプトの最新の使用状況の詳細については、変換スクリプトの README を参照してください。出力は、検証など、移行の後のステップで必要です。カーディナリティを向上させるためにディメンションを除外した場合は、 --dimensions-to-fields引数を使用して特定のディメンションをフィールドに変更することで、カーディナリティを減らすようにスキーマを調整します。

検証用のフィールドの追加

検証用のフィールドを追加する方法については、変換スクリプトの README の「検証用のフィールドの追加」セクションを参照してください。

InfluxDB の Timestream へのデータ取り込み

InfluxDB 取り込みスクリプトは、圧縮されたラインプロトコルデータセットを InfluxDB の Timestream に取り込みます。gzip 圧縮ラインプロトコルファイルを含むディレクトリは、取り込み先 InfluxDB バケットとともにコマンドライン引数として渡されます。このスクリプトは、マルチ処理を使用して一度に複数のファイルを取り込むように設計されており、InfluxDB とスクリプトを実行するマシンのリソースを利用します。

スクリプトは以下を実行します。

  • 圧縮ファイルを抽出し、InfluxDB に取り込みます。

  • 再試行メカニズムとエラー処理を実装します。

  • 再開のための取り込みの成功と失敗を追跡します。

  • ラインプロトコルデータセットから読み取るときに I/O オペレーションを最適化します。

前提条件とインストール

GitHub の取り込みスクリプトの README の「前提条件とインストール」セクションを参照してください。

データ準備

取り込みに必要な zip 行のプロトコルファイルは、データ変換スクリプトによって生成されます。データを準備するには、次の手順に従います。

  1. 変換されたデータセットを保持するのに十分なストレージを持つ EC2 インスタンスを設定します。

  2. 変換されたデータを S3 バケットからローカルディレクトリに同期します。

    aws s3 sync \ s3://your-bucket-name/path/to/transformed/data \ ./data_directory
  3. データディレクトリ内のすべてのファイルへの読み取りアクセス権があることを確認します。

  4. 次の取り込みスクリプトを実行して、InfluxDB の Timestream にデータを取り込みます。

使用方法

python influxdb_ingestion.py <bucket_name> <data_directory> [options]

基本的な使用法

python influxdb_ingestion.py my_bucket ./data_files

取り込みレート

取り込みレートのテストをいくつか実行しました。10 人のワーカーで取り込みスクリプトを実行し、InfluxDB インスタンス用に最大 500 GB のラインプロトコルを 8XL Timestream に取り込む C5N.9XL EC2 インスタンスを使用した取り込みテスト: InfluxDB

  • 3K IOPS 15.86 GB/時間。

  • 12K IOPS 70.34 GB/時間。

  • 16K IOPS 71.28 GB/時間。

レコメンデーション

  • 並列処理を処理するのに十分な CPU コアを持つ EC2 インスタンスを使用します。

  • インスタンスに、変換されたデータセット全体を保持するのに十分なストレージがあり、抽出の余地があることを確認します。

    • 一度に抽出されるファイルの数は、スクリプトの実行中に設定されたワーカーの数と等しくなります。

  • レイテンシーを最小限に抑えるために、EC2 インスタンスを InfluxDB インスタンスと同じリージョンと AZ (可能な場合) に配置します。

  • C5N など、ネットワークオペレーションに最適化されたインスタンスタイプの使用を検討してください。

  • 高い取り込みレートが必要な場合は、InfluxDB インスタンスの Timestream に少なくとも 12K IOPS をお勧めします。 InfluxDB InfluxDB インスタンスサイズの Timestream に依存するスクリプトのワーカー数を増やすことで、追加の最適化を実現できます。

詳細については、取り込みスクリプトの README を参照してください。