Amazon OpenSearch インジェストのパイプライン機能の概要 - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon OpenSearch インジェストのパイプライン機能の概要

Amazon OpenSearch Ingestion は、ソース、バッファ、0 個以上のプロセッサ、1 つ以上のシンクで構成されるパイプラインをプロビジョニングします。取り込みパイプラインは、データエンジンとして Data Prepper を利用しています。パイプラインのさまざまなコンポーネントの概要については、「主要なコンセプト」を参照してください。

以下のセクションでは、Amazon OpenSearch Ingestion で最も一般的に使用される機能の概要を説明します。

注記

これは、パイプラインで利用可能な機能をすべて網羅したリストではありません。使用可能なすべてのパイプライン機能に関する包括的なドキュメントについては、「Data Prepper のドキュメント」を参照してください。 OpenSearch Ingestion では、使用できるプラグインとオプションにいくつかの制約があることに注意してください。詳細については、「Amazon Ingestion OpenSearch パイプラインでサポートされているプラグインとオプション」を参照してください。

永続的バッファリング

永続バッファは、データの耐久性を高めるため、複数のアベイラビリティーゾーンにまたがるディスクベースのバッファにデータを格納します。永続バッファリングを使用すると、スタンドアロンバッファを設定しなくても、サポートされているすべてのプッシュベースソースのデータを取り込むことができます。これらには HTTP のほか、ログ、トレース、 OpenTelemetry メトリクスのソースが含まれます。

永続的バッファリングを有効にするには、パイプラインを作成または更新するときに [永続的バッファを有効にする] を選択します。詳細については、「」を参照してくださいAmazon OpenSearch Ingestion パイプラインの作成。 OpenSearch インジェストは、 OpenSearch パイプラインに指定したインジェストコンピュートユニット (Ingestion OCU) に基づいて、必要なバッファリング容量を自動的に決定します。

デフォルトでは、パイプラインはを使用してバッファーデータを暗号化します。 AWS 所有のキー これらのパイプラインには、パイプラインロールに追加の権限は必要ありません。または、カスタマー管理キーを指定して、パイプラインロールに次の IAM 権限を追加することもできます。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

詳細については、「AWS Key Management Service デベロッパーガイド」の「カスタマーマネージドキー」を参照してください。

注記

永続的バッファリングを無効にすると、パイプラインが更新され、メモリ内バッファリングのみで実行されるようになります。

リクエストペイロードの最大サイズのチューニング

パイプラインの永続的バッファリングを有効にすると、リクエストペイロードの最大サイズはデフォルトで 1 MB になります。デフォルト値が最も高いパフォーマンスを発揮します。ただし、クライアントが 1 MB を超えるリクエストを送信する場合は、この値を増やすことができます。最大ペイロードサイズを調整するには、max_request_lengthソース設定でオプションを設定します。永続的バッファリングと同様に、このオプションは HTTP とログ、トレース、 OpenTelemetry メトリクスのソースでのみサポートされます。

max_request_lengthこのオプションで有効な値は、1 MB、1.5 MB、2 MB、2.5 MB、3 MB、3.5 MB、4 MB だけです。別の値を指定すると、エラーが表示されます。

次の例は、パイプライン設定内の最大ペイロードサイズを設定する方法を示しています。

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

パイプラインの永続的バッファリングを有効にしない場合、max_request_lengthオプションの値はすべてのソースでデフォルトで 10 MB になり、変更できません。

分割

受信イベントをサブパイプラインに分割するように OpenSearch Ingestion パイプラインを設定すると、同じ受信イベントに対して異なる種類の処理を実行できます。

次のパイプラインの例では、受信イベントを 2 つのサブパイプラインに分割します。各サブパイプラインは独自のプロセッサを使用してデータを強化および操作し、データを異なるインデックスに送信します。 OpenSearch

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

チェーン

複数のサブパイプラインを連結し、データ処理とエンリッチメントをチャンク単位で実行できます。つまり、受信イベントをあるサブパイプラインの特定の処理機能で強化し、それを別のサブパイプラインに送信して別のプロセッサでさらに強化し、最後にシンクに送信できます。 OpenSearch

次の例では、log_pipelineサブパイプラインは受信ログイベントを一連のプロセッサで強化し、そのイベントをという名前のインデックスに送信します。 OpenSearch enriched_logsパイプラインは同じイベントをサブパイプラインに送信し、log_advanced_pipelineサブパイプラインはそれを処理して、という名前の別のインデックスに送信します。 OpenSearch enriched_advanced_logs

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

デッドレターキュー

デッドレターキュー (DLQ) とは、パイプラインがシンクへの書き込みに失敗したイベントの送信先です。 OpenSearch Ingestion では、DLQ として使用する適切な書き込み権限を持つ Amazon S3 バケットを指定する必要があります。パイプライン内のすべてのシンクに DLQ 設定を追加できます。パイプラインで書き込みエラーが発生すると、設定された S3 バケットに DLQ オブジェクトが作成されます。DLQ オブジェクトは、失敗したイベントの配列として JSON ファイル内に存在します。

次のいずれかの条件が満たされたとき、パイプラインは DLQ にイベントを書き込みます。

  • max_retries OpenSearch シンク用のは使い果たされました。 OpenSearch このオプションでは、取り込みには最低でも 16 個必要です。

  • エラー状態のため、イベントがシンクによって拒否されたとき。

構成

サブパイプラインのデッドレターキューを設定するには、opensearch シンク設定内で dlq オプションを指定します。

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

この S3 DLQ に書き込まれたファイルには、次の命名パターンが付けられます。

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

詳細については、「デッドレターキュー (DLQ)」を参照してください。

sts_role_arn ロールを設定する手順については、「デッドレターキューへの書き込み」を参照してください。

次の DLQ ファイルの例を考えてみます。

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

次は、シンクへの書き込みに失敗したデータの例です。これは、さらなる分析のために DLQ S3 バケットに送信されます。

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

インデックス管理

Amazon OpenSearch Ingestion には、次のような多数のインデックス管理機能があります。

インデックスの作成

パイプラインシンクでインデックス名を指定でき、 OpenSearch Igestion はパイプラインをプロビジョニングするときにインデックスを作成します。インデックスが既に存在する場合、パイプラインはそれを使用して受信イベントのインデックスを作成します。インデックスがまだ存在しない場合、パイプラインを停止して再起動する、または YAML 設定を更新すると、パイプラインは新しいインデックスの作成を試みます。パイプラインではインデックスを一切削除できません。

次のシンクのサンプルでは、パイプラインがプロビジョニングされるときに 2 つのインデックスを作成します。

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

インデックス名とパターンの生成

受信イベントのフィールドにある変数を使用すると、動的なインデックス名を生成できます。シンク設定では、形式 string${} を使用して文字列補間を示し、JSON ポインタを使用してイベントからフィールドを抽出します。index_type のオプションは custom または management_disabled です。index_type OpenSearch management_disabled OpenSearch ドメインとサーバーレスコレクションではデフォルトになるため、custom未設定のままでかまいません。

例えば、次のパイプラインは、受信イベントから metadataType フィールドを選択してインデックス名を生成します。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

次の設定では、1 日または 1 時間ごとに新しいインデックスを生成し続けます。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

インデックス名は、my-index-%{yyyy.MM.dd} のように、サフィックスとして日付/時刻パターンを持つプレーン文字列にすることもできます。シンクがにデータを送信すると OpenSearch、日時パターンが UTC 時間に置き換えられ、日ごとに新しいインデックス (など) が作成されます。my-index-2022.01.25詳細については、クラスを参照してください。DateTimeFormatter

このインデックス名は、my-${index}-name のように形式化された文字列にすることもできます (日付/時刻パターンのサフィックスの有無にかかわらず)。シンクはにデータを送信すると OpenSearch、"${index}"その部分を処理中のイベントの値に置き換えます。形式が "${index1/index2/index3}" の場合、フィールド index1/index2/index3 をイベント内の値に置き換えます。

ドキュメント ID の生成

パイプラインは、ドキュメントのインデックスを作成する際にドキュメント ID を生成できます。 OpenSearchまた、それらのドキュメント ID を受信イベント内のフィールドから推測することも可能です。

次の例では、受信イベントの uuid フィールドを使用してドキュメント ID を生成します。

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

次の例では、Add entries プロセッサを使用し、受信イベントから uuid フィールドと other_field フィールドをマージしてドキュメント ID を生成します。

create アクションは、同じ ID のドキュメントが上書きされないようにします。パイプラインは再試行や DLQ イベントを必要とせずに、重複したドキュメントを削除します。ここでの目的は、既存ドキュメントの更新を避けることなので、このアクションを使用するパイプライン作成者にとっては当然想定されるものです。

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

イベントのドキュメント ID をサブオブジェクトのフィールドに設定したい場合があります。次の例では、 OpenSearch info/idシンクプラグインはサブオブジェクトを使用してドキュメント ID を生成します。

sink: - opensearch: ... document_id_field: info/id

次のイベントが発生すると、パイプラインは _id フィールドに json001 を設定したドキュメントを生成します。

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

ルーティング ID の生成

routing_field OpenSearch シンクプラグイン内のオプションを使用して、ドキュメントルーティングプロパティ (_routing) の値を受信イベントの値に設定できます。

ルーティングは JSON ポインタ構文をサポートしているため、最上位のフィールドだけでなく、ネストされたフィールドも使用できます。

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

次のイベントが発生すると、プラグインは _routing フィールドに abcd を設定したドキュメントを生成します。

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

インデックスを作成するときに、パイプラインで使用できるインデックステンプレートを作成する手順については、「インデックステンプレート」を参照してください。

E nd-to-end 確認応答

OpenSearch 取り込みでは、確認応答を使用してステートレスパイプラインのソースからシンクへのデータ配信を追跡することで、データの耐久性と信頼性を確保します。end-to-end現在、確認をサポートしているのは S3 ソースプラグインだけです。 end-to-end

end-to-end 承認されると、パイプラインソースプラグインは受信確認セットを作成してイベントのバッチを監視します。イベントがシンクに正常に送信された場合は肯定応答を受け取り、いずれかのイベントがシンクに送信できなかった場合は否定応答を受け取ります。

パイプラインコンポーネントに障害またはクラッシュが発生した場合、またはソースが確認応答を受け取れなかった場合、ソースはタイムアウトし、再試行や障害のログ記録などの必要なアクションを実行します。パイプラインに複数のシンクまたは複数のサブパイプラインが設定されている場合、イベントレベルの確認応答は、イベントが全サブパイプラインの全シンクに送信された後にのみ送信されます。シンクに DLQ が設定されている場合、 end-to-end 確認応答は DLQ に書き込まれたイベントも追跡します。

end-to-end 確認を有効にするには、ソース設定に以下のオプションを含めてください。acknowledgments

s3-pipeline: source: s3: acknowledgments: true ...

ソースバックプレッシャー

パイプラインは、データ処理が忙しい場合や、シンクが一時的にダウンしていたり、データの取り込みが遅い場合に、バックプレッシャーが発生する可能性があります。 OpenSearch インジェストでは、パイプラインが使用しているソースプラグインによって、バックプレッシャの処理方法が異なります。

HTTP ソース

HTTP ソースプラグインを使用するパイプラインでは、混雑しているパイプラインコンポーネントによってバックプレッシャーの処理方法が異なります。

  • バッファ - バッファがいっぱいになると、パイプラインはエラーコード 408 の HTTP ステータス REQUEST_TIMEOUT をソースエンドポイントに返し始めます。バッファが解放されると、パイプラインは HTTP イベントの処理を再開します。

  • ソーススレッド - すべての HTTP ソーススレッドがリクエストを実行中で負荷がかかっており、未処理のリクエストキューサイズがリクエストの最大許容数を超えた場合、パイプラインはエラーコード 429 の HTTP ステータス TOO_MANY_REQUESTS をソースエンドポイントに返し始めます。リクエストキューが最大許容キューサイズを下回ると、パイプラインはリクエストの処理を再開します。

OTel ソース

OpenTelemetry ソース (oTel ログ、OTel メトリクス、OTel トレース) を使用するパイプラインのバッファーがいっぱいになると、パイプラインはエラーコード 408 の HTTP REQUEST_TIMEOUT ステータスをソースエンドポイントに返し始めます。バッファが解放されると、パイプラインはイベントの処理を再開します。

S3 ソース

S3 ソースを使用するパイプラインのバッファがいっぱいになると、パイプラインは SQS 通知の処理を停止します。バッファが解放されると、パイプラインは通知の処理を再開します。

シンクがダウンしているか、データを取り込めず、 end-to-end ソースの確認が有効になっている場合、パイプラインはすべてのシンクから正常に受信確認を受け取るまで SQS 通知の処理を停止します。