Amazon OpenSearch 擷取中的管道功能概觀 - Amazon OpenSearch 服務

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

Amazon OpenSearch 擷取中的管道功能概觀

Amazon OpenSearch 擷取佈建管道,其中包含一個來源、一個緩衝區、零個或多個處理器,以及一或多個接收器。擷取管線由資料準備程式作為資料引擎提供支援。如需配管各種元件的概觀,請參閱重要概念

以下各節提供 Amazon OpenSearch 擷取中一些最常用功能的概觀。

注意

這不是可用於配管的特徵的詳盡清單。如需所有可用管線功能的完整文件,請參閱資料準備程式文件。請注意,「 OpenSearch 擷取」會對您可以使用的外掛程式和選項設置一些限制。如需詳細資訊,請參閱 Amazon OpenSearch 擷取管道支援的外掛程式和選項

持久緩衝

持續性緩衝區會將您的資料儲存在跨多個可用區域的磁碟型緩衝區中,以增加資料的耐久性。您可以使用持續緩衝來擷取所有支援的以推送為基礎的來源的資料,而不需要設定獨立緩衝區。其中包括 HTTP 和記錄檔、追蹤和指標的 OpenTelemetry 來源。

若要啟用持續性緩衝,請在建立或更新管線時選擇「啟用持續緩衝區」。如需詳細資訊,請參閱建立 Amazon OpenSearch 擷取管道。 OpenSearch 擷取會根據您為管線指定的擷取 OpenSearch 運算單位 (擷取 OCU),自動判斷所需的緩衝容量。

根據預設,管道會使用 a AWS 擁有的金鑰 來加密緩衝區資料。這些管道不需要管線角色的任何其他權限。或者,您可以指定客戶受管金鑰,並將下列 IAM 許可新增至管道角色:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "KeyAccess", "Effect": "Allow", "Action": [ "kms:Decrypt", "kms:GenerateDataKeyWithoutPlaintext" ], "Resource": "arn:aws:kms:{region}:{aws-account-id}:key/1234abcd-12ab-34cd-56ef-1234567890ab" } ] }

如需更多資訊,請參閱 AWS Key Management Service 開發人員指南中的客戶受管金鑰

注意

如果您停用持續性緩衝,您的管道將會更新為完全在記憶體內緩衝上執行。

調整要求承載大小上限

如果您為管線啟用持續性緩衝,請求承載大小上限預設為 1 MB。預設值可提供最佳效能。不過,如果用戶端傳送的要求超過 1 MB,您可以增加此值。若要調整最大承載大小,請在來源組態中設定max_request_length選項。就像持續緩衝一樣,此選項僅支援 HTTP 和記錄檔、追蹤和指標的 OpenTelemetry 來源。

max_request_length選項的唯一有效值是 1 MB,1.5 MB,2 MB,2.5 MB,3 MB,3.5 MB 和 4 MB。如果您指定不同的值,您會收到錯誤訊息。

下列範例示範如何在管線組態中設定最大承載大小:

... log-pipeline: source: http: path: "/${pipelineName}/logs" max_request_length: 4mb processor: ...

如果您未啟用管線的持續性緩衝,則所有來源的max_request_length選項值預設為 10 MB,且無法修改。

分割

您可以設定 OpenSearch 擷取管線,將內送事件分割為子管線,讓您可以在相同的傳入事件上執行不同類型的處理。

下列範例配管會將傳入事件分割為兩個子配管。每個子管道都使用自己的處理器來豐富和操作數據,然後將數據發送到不同的 OpenSearch 索引。

version: "2" log-pipeline: source: http: ... sink: - pipeline: name: "logs_enriched_one_pipeline" - pipeline: name: "logs_enriched_two_pipeline" logs_enriched_one_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_one_logs" logs_enriched_two_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_two_logs"

鏈接

您可以將多個子管線結在一起,以便在區塊中執行資料處理和擴充。換句話說,您可以在一個子管道中使用某些處理能力來豐富傳入的事件,然後將其發送到另一個子管道以使用不同的處理器進行額外的豐富,最後將其發送到接收器。 OpenSearch

在下列範例中,log_pipeline子管線會使用一組處理器來豐富傳入的記錄事件,然後將事件傳送至名為的 OpenSearch 索引。enriched_logs管線會將相同的事件傳送至log_advanced_pipeline子管線,子管線會處理該事件,並將其傳送至名為enriched_advanced_logs的不同 OpenSearch 索引。

version: "2" log-pipeline: source: http: ... processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_logs" - pipeline: name: "log_advanced_pipeline" log_advanced_pipeline: source: log-pipeline processor: ... sink: - opensearch: # Provide a domain or collection endpoint # Enable the 'serverless' flag if the sink is an OpenSearch Serverless collection aws: ... index: "enriched_advanced_logs"

無效字母佇列

無效字母佇列 (DLQ) 是管線無法寫入接收器之事件的目的地。在 OpenSearch 擷取中,您必須指定具有適當寫入許可的 Amazon S3 儲存貯體做為 DLQ 使用。您可以將 DLQ 組態新增至管線中的每個接收器。當管道遇到寫入錯誤時,會在設定的 S3 儲存貯體中建立 DLQ 物件。DLQ 物件以失敗事件的陣列形式存在於 JSON 檔案中。

當符合下列任一條件時,管線會將事件寫入 DLQ:

  • max_retries於 OpenSearch 水槽已經用盡。 OpenSearch 此選項至少需要 16 個擷取。

  • 由於錯誤狀況,接收器拒絕事件。

組態

若要設定子管線的無效字母佇列,請在接收器組態中指定dlq選項:opensearch

apache-log-pipeline: ... sink: opensearch: dlq: s3: bucket: "my-dlq-bucket" key_path_prefix: "dlq-files" region: "us-west-2" sts_role_arn: "arn:aws:iam::123456789012:role/dlq-role"

寫入此 S3 DLQ 的檔案將具有下列命名模式:

dlq-v${version}-${pipelineName}-${pluginId}-${timestampIso8601}-${uniqueId}

如需詳細資訊,請參閱無效字母佇列 (DL Q)。

如需設定sts_role_arn角色的指示,請參閱寫入無效字母佇列

範例

請看下面的示例 DLQ 文件:

dlq-v2-apache-log-pipeline-opensearch-2023-04-05T15:26:19.152938Z-e7eb675a-f558-4048-8566-dac15a4f8343

以下是無法寫入接收器並傳送至 DLQ S3 儲存貯體進行進一步分析的資料範例:

Record_0 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "sample log" timestamp "2023-04-14T10:36:01.070Z" Record_1 pluginId "opensearch" pluginName "opensearch" pipelineName "apache-log-pipeline" failedData index "logs" indexId null status 0 message "Number of retries reached the limit of max retries (configured value 15)" document log "another sample log" timestamp "2023-04-14T10:36:01.071Z"

索引管理

Amazon OpenSearch 擷取具有許多索引管理功能,包括下列功能。

建立索引

您可以在管線接收器中指定索引名稱,而 OpenSearch 擷取會在佈建管線時建立索引。如果索引已存在,管線會使用它來索引傳入事件。如果您停止並重新啟動管線,或者更新其 YAML 組態,則管線會嘗試建立新索引 (如果尚未存在)。管線永遠無法刪除索引。

下列範例接收器會在佈建管線時建立兩個索引:

sink: - opensearch: index: apache_logs - opensearch: index: nginx_logs

產生索引名稱和模式

您可以使用傳入事件欄位中的變數來產生動態索引名稱。在接收器配置中,使用格式string${}來表示字符串插值,並使用 JSON 指針從事件中提取字段。的選項index_typecustommanagement_disabled。由custom於 OpenSearch 網域和management_disabled OpenSearch 無伺服器集合的index_type預設值為,因此可以將其保留為未設定。

例如,下列管線會從傳入事件中選取metadataType欄位來產生索引名稱。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}"

下列組態會持續每天或每小時產生一個新索引。

pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd}" pipeline: ... sink: opensearch: index: "metadata-${metadataType}-%{yyyy.MM.dd.HH}"

索引名稱也可以是具有日期時間模式作為後綴的純字符串,例如。my-index-%{yyyy.MM.dd}當接收器將數據發送到時 OpenSearch,它將替換為 UTC 時間的日期時間模式,並為每天創建一個新的索引,例如。my-index-2022.01.25如需詳細資訊,請參閱DateTimeFormatter類別。

此索引名稱也可以是格式化字串 (有無論是否有日期-時間模式尾碼),例如。my-${index}-name當接收器將數據發送到時 OpenSearch,它將替換為正在處理的事件中的值的"${index}"部分。如果格式為"${index1/index2/index3}",則會以事件中index1/index2/index3的值取代欄位。

產生文件 ID

管線可以在將文件編入索引時產生文件 ID OpenSearch。它可以從傳入事件中的欄位推斷這些文件 ID。

此範例使用來自傳入事件的uuid欄位來產生文件 ID。

pipeline: ... sink: opensearch: index_type: custom index: "metadata-${metadataType}-%{yyyy.MM.dd}" document_id_field: "uuid"

在下列範例中,「新增項目」處理器會合併欄位uuid和來other_field自傳入事件的欄位,以產生文件 ID。

create動作可確保不會覆寫具有相同 ID 的文件。管道丟棄重複的文檔,沒有任何重試或 DLQ 事件。對於使用此動作的管道作者而言,這是合理的期望,因為目標是避免更新現有文件。

pipeline: ... processor: - add_entries: entries: - key: "my_doc_id_field" format: "${uuid}-${other_field}" sink: - opensearch: ... action: "create" document_id_field: "my_doc_id_field"

您可能想要將事件的文件 ID 設定為子物件中的欄位。在下列範例中,接 OpenSearch 收器外掛程式會使用子物件info/id來產生文件 ID。

sink: - opensearch: ... document_id_field: info/id

鑑於下列事件,管線會產生_id欄位設定為的文件json001

{ "fieldA":"arbitrary value", "info":{ "id":"json001", "fieldA":"xyz", "fieldB":"def" } }

產生路由 ID

您可以使用接收 OpenSearch 器外掛程式中的routing_field選項,將文件路由屬性的值 (_routing) 設定為來自傳入事件的值。

路由支持 JSON 指針語法,因此嵌套字段也可用,而不僅僅是頂級字段。

sink: - opensearch: ... routing_field: metadata/id document_id_field: id

鑑於下列事件,外掛程式會產生_routing欄位設定為的文件abcd

{ "id":"123", "metadata":{ "id":"abcd", "fieldA":"valueA" }, "fieldB":"valueB" }

如需建立管線可在索引建立期間使用之索引範本的指示,請參閱索引範本

電子nd-to-end 確認

OpenSearch 擷取可使end-to-end用確認來追蹤資料從來源到接收器的傳遞,以確保資料的持久性和可靠性。目前,只有 S3 來源外掛程式支援 end-to-end 確認。

透過 end-to-end 確認,管線來源外掛程式會建立確認集來監視一批事件。當這些事件成功發送到接收器時,它會收到正確的確認,或者當任何事件無法發送到他們的接收器時,會收到負面的確認。

如果管線元件發生故障或損毀,或者來源無法接收確認,則來源會逾時並採取必要的動作,例如重試或記錄失敗。如果管線配置了多個接收器或多個子管線,則只有在將事件傳送至所有子管線中的所有接收器之後,才會傳送事件層級確認。如果接收器已設定 DLQ, end-to-end 確認也會追蹤寫入 DLQ 的事件。

若要啟用 end-to-end 確認,請在來源組態中包含acknowledgments選項:

s3-pipeline: source: s3: acknowledgments: true ...

源背壓

當管道忙於處理資料時,或者接收器暫時關閉或擷取資料速度緩慢時,可能會遇到背壓。 OpenSearch 根據管道使用的來源外掛程式,擷取有不同的處理背壓方式。

HTTP 來源

使用 HTTP 來源外掛程式的管道會根據哪個管線元件擁擠的不同方式處理背壓:

  • 緩衝區 — 當緩衝區已滿時,管道會開始將 HTTP 狀態 REQUEST_TIMEOUT (錯誤碼 408) 傳回來源端點。釋放緩衝區時,管線會再次開始處理 HTTP 事件。

  • 來源執行緒 — 當所有 HTTP 來源執行緒都忙於執行要求,且未處理的要求佇列大小已超過允許的要求數目上限時,管線會開始將 HTTP 狀態 TOO_MANY_REQUESTS (錯誤碼 429) 傳回來源端點。當要求佇列低於允許的最大佇列大小時,管線會再次開始處理要求。

酒店來源

當使用 OpenTelemetry 來源 (oTel 記錄檔、oTel 指標和 oTel 追蹤) 的管道的緩衝區已滿時,管線會開始將 HTTP 狀態 REQUEST_TIMEOUT (錯誤碼 408) 傳回給來源端點。隨著緩衝區被釋放,管道再次開始處理事件。

S3 來源

當具有 S3 來源的管道的緩衝區已滿時,管道會停止處理 SQS 通知。釋放緩衝區後,管道會再次開始處理通知。

如果接收器關閉或無法擷取資料,且已啟用來源的 end-to-end 確認,則管線會停止處理 SQS 通知,直到收到來自所有接收器的成功確認為止。