將資料導入 Amazon OpenSearch 無伺服器集合 - Amazon OpenSearch 服務

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

將資料導入 Amazon OpenSearch 無伺服器集合

這些章節提供有關支援的擷取管道以將資料擷取至 Amazon OpenSearch 無伺服器集合的詳細資訊。它們還涵蓋了一些您可以用來與 OpenSearch API 操作進行交互的客戶端。您的用戶端應與 OpenSearch 2.x 相容,才能與 OpenSearch 無伺服器整合。

所需的最低許可

若要將資料內嵌至 OpenSearch 無伺服器集合,寫入資料的主體必須具有在資料取原則中指派下列最低權限:

[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/target-collection/logs" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012:user/my-user" ] } ]

如果您計劃寫入到其他索引,許可的範圍可能會更廣泛。例如,您可以允許對所有索引 (index/target-collection/*) 或索引子集合 (index/target-collection/logs*) 的許可,而不是指定單一目標索引。

如需所有可用 OpenSearch API 作業及其相關權限的參考資料,請參閱Amazon OpenSearch 無伺服器支援的操作和外掛程式

OpenSearch 攝入

您可以使用 Amazon OpenSearch 擷取,而不是使用第三方用戶端將資料直接傳送到 OpenSearch 無伺服器集合。您可以將資料生產者設定為將資料傳送至 OpenSearch 擷取,它會自動將資料傳送至您指定的集合。您也可以將 OpenSearch 擷取設定為在傳送資料之前轉換資料。如需詳細資訊,請參閱 Amazon OpenSearch 攝入

OpenSearch 擷取管線需要權限,才能寫入設定為其接收器的 OpenSearch 無伺服器集合。這些權限包括描述集合並向其傳送 HTTP 要求的能力。如需使用 OpenSearch 擷取將資料新增至集合的指示,請參閱授與 Amazon OpenSearch 擷取管道對集合的存取權

若要開始使用 OpenSearch 擷取,請參閱教學課程:使用 Amazon OpenSearch 擷取將資料擷取到集合

Fluent Bit

您可以使AWS 用 Fluent Bit 影像OpenSearch 輸出外掛程式,將資料擷取到 OpenSearch 無伺服器集合中。

注意

您必須擁有適用 AWS 於 Fluent 位元影像的 2.30.0 版或更新版本,才能與無伺服器整合 OpenSearch 。

範例組態

組態檔的這個範例輸出區段顯示如何使用 OpenSearch 無伺服器集合作為目的地。重要的補充是 AWS_Service_Name 參數,也就是 aossHost 是集合端點。

[OUTPUT] Name opensearch Match * Host collection-endpoint.us-west-2.aoss.amazonaws.com Port 443 Index my_index Trace_Error On Trace_Output On AWS_Auth On AWS_Region <region> AWS_Service_Name aoss tls On Suppress_Type_Name On

Amazon 數據 Firehose

Firehose 支援 OpenSearch 無伺服器作為傳送目的地。如需將資料傳送至 OpenSearch 無伺服器的指示,請參閱 Amazon Data Firehose 開發人員指南中的建立 Kinesis Data Firehose 交付串流為您的目的地選擇 OpenSearch 無伺服器

您提供給 Firehose 以進行交付的 IAM 角色必須在具有目標集合的aoss:WriteDocument最低權限的資料存取政策中指定,而且您必須擁有預先存在的索引才能將資料傳送至其中。如需詳細資訊,請參閱 所需的最低許可

在將資料傳送至 OpenSearch 無伺服器之前,您可能需要對資料執行轉換。如需進一步了解如何使用 Lambda 函數來執行此任務,請參閱相同指南中的 Amazon Kinesis Data Firehose 資料轉換

Fluentd

您可以使用 Fluentd OpenSearch 外掛程式從基礎架構、容器和網路裝置收集資料,並將其傳送至 OpenSearch 無伺服器集合。Calyptia 維護著一個 Fluentd 發行版,其中包含 Ruby 和 SSL 的所有下游相依性。

若要使用 Fluentd 將資料傳送至無伺服器 OpenSearch
  1. https://www.fluentd.org/download 下載版本 1.4.2 或更新版本的 Calyptia Fluentd。此版本預設包含支援 OpenSearch 無伺服器的 OpenSearch 外掛程式。

  2. 安裝套件。請根據您的作業系統,遵循 Fluentd 文件中的說明:

  3. 新增將資料傳送至 OpenSearch 無伺服器的組態。此範本組態會將訊息 "test" (測試) 傳送至單個集合。請確定執行下列操作:

    • 對於host,指定 OpenSearch 無伺服器集合的端點。

    • 對於 aws_service_name,請指定 aoss

    <source> @type sample tag test test {"hello":"world"} </source> <match test> @type opensearch host https://collection-endpoint.us-east-1.aoss.amazonaws.com port 443 index_name fluentd aws_service_name aoss </match>
  4. 執行 Calyptia Fluentd 以開始將資料傳送至集合。例如,在 Mac 上,您可執行下列命令:

    sudo launchctl load /Library/LaunchDaemons/calyptia-fluentd.plist

Go

下列範例程式碼會使用 Go 的 opensearch-go 用戶端,建立與指定的 OpenSearch 無伺服器集合的安全連線,並建立單一索引。您必須提供 regionhost 的值。

package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }

Java

下列範例程式碼會使用 Java 的 opensearch-java 用戶端,建立與指定的 OpenSearch 無伺服器集合的安全連線,並建立單一索引。您必須提供 regionhost 的值。

與 OpenSearch 服務相比,重要的區別在於服務名稱(aoss而不是es)。

// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();

下列範例程式碼會再次建立安全連線,然後搜尋索引。

import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();

JavaScript

下列範例程式碼會使用 opensearch-js 用戶端 JavaScript 來建立與指定的 OpenSearch 無伺服器集合的安全連線、建立單一索引、新增文件,以及刪除索引。您必須提供 noderegion 的值。

與 OpenSearch 服務相比,重要的區別在於服務名稱(aoss而不是es)。

Version 3

這個範例會 JavaScript 在 Node.js 中使用 SDK 的第 3 版

const { defaultProvider } = require('@aws-sdk/credential-provider-node'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => { const credentialsProvider = defaultProvider(); return credentialsProvider(); }, }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();
Version 2

這個範例會 JavaScript 在 Node.js 中使用 SDK 的第 2 版

const AWS = require('aws-sdk'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => new Promise((resolve, reject) => { AWS.config.getCredentials((err, credentials) => { if (err) { reject(err); } else { resolve(credentials); } }); }), }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();

Logstash

您可以使用 Logstash OpenSearch 外掛程式將記錄發佈至 OpenSearch 無伺服器集合。

若要使用 Logstash 將資料傳送至無伺服器 OpenSearch
  1. 使用泊塢視窗或 Linux 安裝logstash-output-opensearch外掛程式 2.0.0 版或更新版本。

    Docker

    碼頭窗承載 Logstash OSS 軟體,並預先安裝了輸出外掛程式:開放搜尋專案/ OpenSearch 輸出外掛程式。logstash-oss-with-opensearch您可以像任何其他映像一樣提取映像:

    docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
    Linux

    首先,如果您尚未安裝最新版本的 Logstash,請先安裝。然後,安裝輸出外掛程式的 2.0.0 版本:

    cd logstash-8.5.0/ bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch

    如果外掛程式已安裝,請將其更新至最新版本:

    bin/logstash-plugin update logstash-output-opensearch

    從外掛程式的 2.0.0 版開始, AWS SDK 使用版本 3。如果您使用的是 8.4.0 之前的 Logstash 版本,則必須刪除任何預先安裝的 AWS 插件並安裝插件:logstash-integration-aws

    /usr/share/logstash/bin/logstash-plugin remove logstash-input-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
  2. 若要讓 OpenSearch 輸出外掛程式與 OpenSearch 無伺服器搭配使用,您必須對 logstash.conf 的opensearch輸出區段進行下列修改:

    • aoss 指定為 auth_type 下的 service_name

    • 針對 hosts 指定您的集合端點。

    • 新增參數 default_server_major_versionlegacy_template。外掛程式需要這些參數才能與 OpenSearch 無伺服器搭配使用。

    output { opensearch { hosts => "collection-endpoint:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }

    此範例組態檔案會從 S3 儲存貯體中的檔案取得輸入,並將其傳送至 OpenSearch 無伺服器集合:

    input { s3 { bucket => "my-s3-bucket" region => "us-east-1" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com:443" index => my-index auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key' aws_secret_access_key => 'your-secret-key' region => 'us-east-1' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
  3. 然後,使用新的組態執行 Logstash 來測試外掛程式:

    bin/logstash -f config/test-plugin.conf

Python

下列範例程式碼會使用 Python 的 opensearch-py 用戶端,建立與指定的 OpenSearch 無伺服器集合的安全連線、建立單一索引,然後搜尋該索引。您必須提供 regionhost 的值。

與 OpenSearch 服務相比,重要的區別在於服務名稱(aoss而不是es)。

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)

Ruby

opensearch-aws-sigv4gem 提供對 OpenSearch 無伺服器的存取,以及開箱即用的 OpenSearch 服務。它具有 opensearch-ruby 客戶端的所有功能,因為這是此 Gem 套件的相依項目。

執行個體化 Sigv4 簽署者時,請指定 aoss 為服務名稱:

require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)

使用其他用戶端簽署 HTTP 請求

當您與其他用戶端建構 HTTP 要求時,將要求簽署至 OpenSearch 無伺服器集合時,適用下列需求。

  • 您必須將服務名稱指定為 aoss

  • 所有 AWS Signature 版本 4 請求均需要 x-amz-content-sha256 標頭。其提供請求承載的雜湊。如果有請求承載,請將值設定為其安全雜湊演算法 (SHA) 加密雜湊 (SHA256)。如果沒有請求承載,請將值設定為 e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855,這是一個空字串的雜湊。

使用 cURL 索引

下列範例要求會使用用戶端 URL 要求程式庫 (cURL),將單一文件傳送至集合movies-index中指定的索引:

curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:us-east-1:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com/movies-index/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'

與郵遞員索引

下圖顯示了如何使用郵遞員將請求發送到集合。如需驗證的指示,請參閱 Postman 中的使用 AWS 簽章驗證工作流程