翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
Amazon OpenSearch インジェストのパイプライン機能の概要
Amazon OpenSearch Ingestion は、ソース、バッファ、0 個以上のプロセッサ、1 つ以上のシンクで構成されるパイプラインをプロビジョニングします。取り込みパイプラインは、データエンジンとして Data Prepper を利用しています。パイプラインのさまざまなコンポーネントの概要については、「主要なコンセプト」を参照してください。
以下のセクションでは、Amazon OpenSearch Ingestion で最も一般的に使用される機能の概要を説明します。
注記
これは、パイプラインで利用可能な機能をすべて網羅したリストではありません。使用可能なすべてのパイプライン機能に関する包括的なドキュメントについては、「Data Prepper のドキュメント
永続的バッファリング
永続バッファは、データの耐久性を高めるため、複数のアベイラビリティーゾーンにまたがるディスクベースのバッファにデータを格納します。永続バッファリングを使用すると、スタンドアロンバッファを設定しなくても、サポートされているすべてのプッシュベースソースのデータを取り込むことができます。これらには HTTP のほか、ログ、トレース、 OpenTelemetry メトリクスのソースが含まれます。
永続的バッファリングを有効にするには、パイプラインを作成または更新するときに [永続的バッファを有効にする] を選択します。詳細については、「」を参照してくださいAmazon OpenSearch Ingestion パイプラインの作成。 OpenSearch インジェストは、 OpenSearch パイプラインに指定したインジェストコンピュートユニット (Ingestion OCU) に基づいて、必要なバッファリング容量を自動的に決定します。
デフォルトでは、パイプラインはを使用してバッファーデータを暗号化します。 AWS 所有のキー これらのパイプラインには、パイプラインロールに追加の権限は必要ありません。または、カスタマー管理キーを指定して、パイプラインロールに次の IAM 権限を追加することもできます。
{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:
{region}
:{aws-account-id}
:key/1234abcd-12ab-34cd-56ef-1234567890ab
" } ] }
詳細については、「AWS Key Management Service デベロッパーガイド」の「カスタマーマネージドキー」を参照してください。
注記
永続的バッファリングを無効にすると、パイプラインが更新され、メモリ内バッファリングのみで実行されるようになります。
リクエストペイロードの最大サイズのチューニング
パイプラインの永続的バッファリングを有効にすると、リクエストペイロードの最大サイズはデフォルトで 1 MB になります。デフォルト値が最も高いパフォーマンスを発揮します。ただし、クライアントが 1 MB を超えるリクエストを送信する場合は、この値を増やすことができます。最大ペイロードサイズを調整するには、max_request_length
ソース設定でオプションを設定します。永続的バッファリングと同様に、このオプションは HTTP とログ、トレース、 OpenTelemetry メトリクスのソースでのみサポートされます。
max_request_length
このオプションで有効な値は、1 MB、1.5 MB、2 MB、2.5 MB、3 MB、3.5 MB、4 MB だけです。別の値を指定すると、エラーが表示されます。
次の例は、パイプライン設定内の最大ペイロードサイズを設定する方法を示しています。
... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length:
4mb
processor: ...
パイプラインの永続的バッファリングを有効にしない場合、max_request_length
オプションの値はすべてのソースでデフォルトで 10 MB になり、変更できません。
分割
受信イベントをサブパイプラインに分割するように OpenSearch Ingestion パイプラインを設定すると、同じ受信イベントに対して異なる種類の処理を実行できます。
次のパイプラインの例では、受信イベントを 2 つのサブパイプラインに分割します。各サブパイプラインは独自のプロセッサを使用してデータを強化および操作し、データを異なるインデックスに送信します。 OpenSearch
version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"
チェーン
複数のサブパイプラインを連結し、データ処理とエンリッチメントをチャンク単位で実行できます。つまり、受信イベントをあるサブパイプラインの特定の処理機能で強化し、それを別のサブパイプラインに送信して別のプロセッサでさらに強化し、最後にシンクに送信できます。 OpenSearch
次の例では、log_pipeline
サブパイプラインは受信ログイベントを一連のプロセッサで強化し、そのイベントをという名前のインデックスに送信します。 OpenSearch enriched_logs
パイプラインは同じイベントをサブパイプラインに送信し、log_advanced_pipeline
サブパイプラインはそれを処理して、という名前の別のインデックスに送信します。 OpenSearch enriched_advanced_logs
version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"
デッドレターキュー
デッドレターキュー (DLQ) とは、パイプラインがシンクへの書き込みに失敗したイベントの送信先です。 OpenSearch Ingestion では、DLQ として使用する適切な書き込み権限を持つ Amazon S3 バケットを指定する必要があります。パイプライン内のすべてのシンクに DLQ 設定を追加できます。パイプラインで書き込みエラーが発生すると、設定された S3 バケットに DLQ オブジェクトが作成されます。DLQ オブジェクトは、失敗したイベントの配列として JSON ファイル内に存在します。
次のいずれかの条件が満たされたとき、パイプラインは DLQ にイベントを書き込みます。
-
max_retries
OpenSearch シンク用のは使い果たされました。 OpenSearch このオプションでは、取り込みには最低でも 16 個必要です。 -
エラー状態のため、イベントがシンクによって拒否されたとき。
構成
サブパイプラインのデッドレターキューを設定するには、opensearch
シンク設定内で dlq
オプションを指定します。
apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"
この S3 DLQ に書き込まれたファイルには、次の命名パターンが付けられます。
dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}
詳細については、「デッドレターキュー (DLQ)
sts_role_arn
ロールを設定する手順については、「デッドレターキューへの書き込み」を参照してください。
例
次の DLQ ファイルの例を考えてみます。
dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343
次は、シンクへの書き込みに失敗したデータの例です。これは、さらなる分析のために DLQ S3 バケットに送信されます。
Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"
インデックス管理
Amazon OpenSearch Ingestion には、次のような多数のインデックス管理機能があります。
インデックスの作成
パイプラインシンクでインデックス名を指定でき、 OpenSearch Igestion はパイプラインをプロビジョニングするときにインデックスを作成します。インデックスが既に存在する場合、パイプラインはそれを使用して受信イベントのインデックスを作成します。インデックスがまだ存在しない場合、パイプラインを停止して再起動する、または YAML 設定を更新すると、パイプラインは新しいインデックスの作成を試みます。パイプラインではインデックスを一切削除できません。
次のシンクのサンプルでは、パイプラインがプロビジョニングされるときに 2 つのインデックスを作成します。
sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs
インデックス名とパターンの生成
受信イベントのフィールドにある変数を使用すると、動的なインデックス名を生成できます。シンク設定では、形式 string${}
を使用して文字列補間を示し、JSON ポインタを使用してイベントからフィールドを抽出します。index_type
のオプションは custom
または management_disabled
です。index_type
OpenSearch management_disabled
OpenSearch ドメインとサーバーレスコレクションではデフォルトになるため、custom
未設定のままでかまいません。
例えば、次のパイプラインは、受信イベントから metadataType
フィールドを選択してインデックス名を生成します。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}"
次の設定では、1 日または 1 時間ごとに新しいインデックスを生成し続けます。
pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"
インデックス名は、my-index-%{yyyy.MM.dd}
のように、サフィックスとして日付/時刻パターンを持つプレーン文字列にすることもできます。シンクがにデータを送信すると OpenSearch、日時パターンが UTC 時間に置き換えられ、日ごとに新しいインデックス (など) が作成されます。my-index-2022.01.25
詳細については、クラスを参照してください。DateTimeFormatter
このインデックス名は、my-${index}-name
のように形式化された文字列にすることもできます (日付/時刻パターンのサフィックスの有無にかかわらず)。シンクはにデータを送信すると OpenSearch、"${index}"
その部分を処理中のイベントの値に置き換えます。形式が "${index1/index2/index3}"
の場合、フィールド index1/index2/index3
をイベント内の値に置き換えます。
ドキュメント ID の生成
パイプラインは、ドキュメントのインデックスを作成する際にドキュメント ID を生成できます。 OpenSearchまた、それらのドキュメント ID を受信イベント内のフィールドから推測することも可能です。
次の例では、受信イベントの uuid
フィールドを使用してドキュメント ID を生成します。
pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"
次の例では、Add entriesuuid
フィールドと other_field
フィールドをマージしてドキュメント ID を生成します。
create
アクションは、同じ ID のドキュメントが上書きされないようにします。パイプラインは再試行や DLQ イベントを必要とせずに、重複したドキュメントを削除します。ここでの目的は、既存ドキュメントの更新を避けることなので、このアクションを使用するパイプライン作成者にとっては当然想定されるものです。
pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"
イベントのドキュメント ID をサブオブジェクトのフィールドに設定したい場合があります。次の例では、 OpenSearch info/id
シンクプラグインはサブオブジェクトを使用してドキュメント ID を生成します。
sink: - opensearch: ... document_id_field: info/id
次のイベントが発生すると、パイプラインは _id
フィールドに json001
を設定したドキュメントを生成します。
{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }
ルーティング ID の生成
routing_field
OpenSearch シンクプラグイン内のオプションを使用して、ドキュメントルーティングプロパティ (_routing
) の値を受信イベントの値に設定できます。
ルーティングは JSON ポインタ構文をサポートしているため、最上位のフィールドだけでなく、ネストされたフィールドも使用できます。
sink: - opensearch: ... routing_field: metadata/id document_id_field: id
次のイベントが発生すると、プラグインは _routing
フィールドに abcd
を設定したドキュメントを生成します。
{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }
インデックスを作成するときに、パイプラインで使用できるインデックステンプレートを作成する手順については、「インデックステンプレート
E nd-to-end 確認応答
OpenSearch 取り込みでは、確認応答を使用してステートレスパイプラインのソースからシンクへのデータ配信を追跡することで、データの耐久性と信頼性を確保します。end-to-end現在、確認をサポートしているのは S3 ソースプラグインだけです。
end-to-end 承認されると、パイプラインソースプラグインは受信確認セットを作成してイベントのバッチを監視します。イベントがシンクに正常に送信された場合は肯定応答を受け取り、いずれかのイベントがシンクに送信できなかった場合は否定応答を受け取ります。
パイプラインコンポーネントに障害またはクラッシュが発生した場合、またはソースが確認応答を受け取れなかった場合、ソースはタイムアウトし、再試行や障害のログ記録などの必要なアクションを実行します。パイプラインに複数のシンクまたは複数のサブパイプラインが設定されている場合、イベントレベルの確認応答は、イベントが全サブパイプラインの全シンクに送信された後にのみ送信されます。シンクに DLQ が設定されている場合、 end-to-end 確認応答は DLQ に書き込まれたイベントも追跡します。
end-to-end 確認を有効にするには、ソース設定に以下のオプションを含めてください。acknowledgments
s3-pipeline: source: s3: acknowledgments: true ...
ソースバックプレッシャー
パイプラインは、データ処理が忙しい場合や、シンクが一時的にダウンしていたり、データの取り込みが遅い場合に、バックプレッシャーが発生する可能性があります。 OpenSearch インジェストでは、パイプラインが使用しているソースプラグインによって、バックプレッシャの処理方法が異なります。
HTTP ソース
HTTP ソース
-
バッファ - バッファがいっぱいになると、パイプラインはエラーコード 408 の HTTP ステータス
REQUEST_TIMEOUT
をソースエンドポイントに返し始めます。バッファが解放されると、パイプラインは HTTP イベントの処理を再開します。 -
ソーススレッド - すべての HTTP ソーススレッドがリクエストを実行中で負荷がかかっており、未処理のリクエストキューサイズがリクエストの最大許容数を超えた場合、パイプラインはエラーコード 429 の HTTP ステータス
TOO_MANY_REQUESTS
をソースエンドポイントに返し始めます。リクエストキューが最大許容キューサイズを下回ると、パイプラインはリクエストの処理を再開します。
OTel ソース
OpenTelemetry ソース (oTel ログ、OTel メトリクス、OTelREQUEST_TIMEOUT
ステータスをソースエンドポイントに返し始めます。バッファが解放されると、パイプラインはイベントの処理を再開します。
S3 ソース
S3
シンクがダウンしているか、データを取り込めず、 end-to-end ソースの確認が有効になっている場合、パイプラインはすべてのシンクから正常に受信確認を受け取るまで SQS 通知の処理を停止します。