Amazon OpenSearch Serverless コレクションへのデータの取り込み - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon OpenSearch Serverless コレクションへのデータの取り込み

これらのセクションでは、Amazon OpenSearch Serverless コレクションへのデータインジェストでサポートされている取り込みパイプラインについて詳しく説明します。また、オペレーションを操作する OpenSearch APIために使用できるクライアントの一部についても説明します。Serverless と OpenSearch統合するには、クライアントが 2.x OpenSearch と互換性がある必要があります。

必要な最小限のアクセス許可

OpenSearch サーバーレスコレクションにデータを取り込むには、データを書き込むプリンシパルに、データアクセスポリシー で次の最小限のアクセス許可を割り当てる必要があります。

[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/target-collection/logs" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012:user/my-user" ] } ]

追加のインデックスに書き込む場合は、アクセス許可をより幅広く指定できます。例えば、単一のターゲットインデックスを指定するのではなく、すべてのインデックス (インデックス/target-collection/*)、またはインデックスのサブセット (index/target-collection/logs*).

使用可能な OpenSearch APIすべてのオペレーションとそれに関連するアクセス許可のリファレンスについては、「」を参照してくださいAmazon OpenSearch Serverless でサポートされているオペレーションとプラグイン

OpenSearch 取り込み

サードパーティーのクライアントを使用して OpenSearch サーバーレスコレクションに直接データを送信するのではなく、Amazon Ingestion OpenSearch を使用できます。データを Ingestion OpenSearch に送信するようにデータプロデューサーを設定すると、指定したコレクションにデータが自動的に配信されます。データを配信する前にデータを変換するように Ingestion OpenSearch を設定することもできます。詳細については、「Amazon OpenSearch Ingestion」を参照してください。

OpenSearch 取り込みパイプラインには、シンクとして設定された OpenSearch サーバーレスコレクションに書き込むためのアクセス許可が必要です。これらのアクセス許可には、コレクションを記述してHTTPリクエストを送信する機能が含まれます。Ingestion OpenSearch を使用してコレクションにデータを追加する手順については、「」を参照してくださいAmazon OpenSearch Ingestion パイプラインにコレクションへのアクセス権を付与する

取り込みを開始するには、 OpenSearch 「」を参照してくださいチュートリアル: Amazon Ingestion OpenSearch を使用してコレクションにデータを取り込む

Fluent Bit

AWS for Fluent Bit イメージOpenSearch 出力プラグインを使用して、 OpenSearch サーバーレスコレクションにデータを取り込むことができます。

注記

Serverless と統合するには、 AWS for Fluent Bit イメージのバージョン 2.30.0 OpenSearch 以降が必要です。

設定例:

設定ファイルのこのサンプル出力セクションでは、 OpenSearch サーバーレスコレクションを送信先として使用する方法を示します。重要な追加項目は AWS_Service_Name パラメータで、これは aoss です。Host はコレクションエンドポイントです。

[OUTPUT] Name opensearch Match * Host collection-endpoint.us-west-2.aoss.amazonaws.com Port 443 Index my_index Trace_Error On Trace_Output On AWS_Auth On AWS_Region <region> AWS_Service_Name aoss tls On Suppress_Type_Name On

Amazon Data Firehose

Firehose は配信先として OpenSearch Serverless をサポートしています。 OpenSearch Serverless にデータを送信する手順については、「Amazon Data Firehose デベロッパーガイド」の「Kinesis Data Firehose 配信ストリームの作成」および「送信先に OpenSearch サーバーレスを選択する」を参照してください。

配信のために Firehose に提供するIAMロールは、ターゲットコレクションのaoss:WriteDocument最小限のアクセス許可を持つデータアクセスポリシー内で指定する必要があります。また、データの送信先となる既存のインデックスが必要です。詳細については、「必要な最小限のアクセス許可」を参照してください。

OpenSearch Serverless にデータを送信する前に、データに対して変換を実行する必要がある場合があります。Lambda 関数を使用してこのタスクを実行する方法については、このガイドの「Amazon Kinesis Data Firehose データ変換」を参照してください。

Fluentd

Fluentd OpenSearch プラグインを使用して、インフラストラクチャ、コンテナ、ネットワークデバイスからデータを収集し、Serverless OpenSearch コレクションに送信できます。Calyptia は、Ruby と のすべてのダウンストリーム依存関係を含む Fluentd のディストリビューションを維持しますSSL。

Fluentd を使用して OpenSearch Serverless にデータを送信するには
  1. Calyptia Fluentd のバージョン 1.4.2 以降を https://www.fluentd.org/download からダウンロードします。このバージョンには、 OpenSearch サーバーレスをサポートする OpenSearch プラグインがデフォルトで含まれています。

  2. パッケージをインストールします。ご使用のオペレーティングシステムに基づいて、次の Fluentd ドキュメントの指示に従ってください。

  3. OpenSearch Serverless にデータを送信する設定を追加します。この設定例では、「test」というメッセージが 1 つのコレクションに送信されます。以下を実行するようにしてください。

    • にはhost、 OpenSearch サーバーレスコレクションのエンドポイントを指定します。

    • aws_service_name には、aoss を指定します。

    <source> @type sample tag test test {"hello":"world"} </source> <match test> @type opensearch host https://collection-endpoint.us-east-1.aoss.amazonaws.com port 443 index_name fluentd aws_service_name aoss </match>
  4. Calyptia Fluentd を実行して、コレクションへのデータの送信を開始します。例えば、Mac では次のコマンドを実行できます。

    sudo launchctl load /Library/LaunchDaemons/calyptia-fluentd.plist

Go

次のサンプルコードでは、Go の opensearch-go クライアントを使用して、指定された OpenSearch Serverless コレクションへの安全な接続を確立し、単一のインデックスを作成します。region および host の値を指定する必要があります。

package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // don't log.fatal in a production-ready app } // create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName := "go-test-index" // define index mapping mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // create an index createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // delete the index deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }

Java

次のサンプルコードでは、Java 用の opensearch-java クライアントを使用して、指定された OpenSearch Serverless コレクションへの安全な接続を確立し、単一のインデックスを作成します。region および host の値を指定する必要があります。

OpenSearch サービスドメインとの重要な違いは、サービス名 (aoss ではなく ) ですes

// import OpenSearchClient to establish connection to OpenSearch Serverless collection import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); // create an opensearch client and use the request-signer OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; // create an index CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); // delete the index DeleteIndexRequest deleteIndexRequest = new DeleteIndexRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();

次のサンプルコードは、再度安全な接続を確立し、インデックスを検索します。

import org.opensearch.client.opensearch.OpenSearchClient; SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "...us-west-2.aoss.amazonaws.com", // serverless collection endpoint "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); Response response = client.generic() .execute( Requests.builder() .endpoint("/" + "users" + "/_search?typed_keys=true") .method("GET") .json("{" + " \"query\": {" + " \"match_all\": {}" + " }" + "}") .build()); httpClient.close();

JavaScript

次のサンプルコードでは、 の opensearch-js クライアント JavaScript を使用して、指定された OpenSearch Serverless コレクションへの安全な接続を確立し、単一のインデックスを作成し、ドキュメントを追加し、インデックスを削除します。node および region の値を指定する必要があります。

OpenSearch サービスドメインとの重要な違いは、サービス名 (aoss ではなく ) ですes

Version 3

この例では、Node.js の のバージョン 3 を使用します。 SDK JavaScript

const { defaultProvider } = require('@aws-sdk/credential-provider-node'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => { const credentialsProvider = defaultProvider(); return credentialsProvider(); }, }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();
Version 2

この例では、Node.js の のバージョン 2 を使用します。 SDK JavaScript

const AWS = require('aws-sdk'); const { Client } = require('@opensearch-project/opensearch'); const { AwsSigv4Signer } = require('@opensearch-project/opensearch/aws'); async function main() { // create an opensearch client and use the request-signer const client = new Client({ ...AwsSigv4Signer({ region: 'us-west-2', service: 'aoss', getCredentials: () => new Promise((resolve, reject) => { AWS.config.getCredentials((err, credentials) => { if (err) { reject(err); } else { resolve(credentials); } }); }), }), node: '' # // serverless collection endpoint }); const index = 'movies'; // create index if it doesn't already exist if (!(await client.indices.exists({ index })).body) { console.log((await client.indices.create({ index })).body); } // add a document to the index const document = { foo: 'bar' }; const response = await client.index({ id: '1', index: index, body: document, }); console.log(response.body); // delete the index console.log((await client.indices.delete({ index })).body); } main();

Logstash

Logstash OpenSearch プラグインを使用して、 OpenSearch Serverless コレクションにログを発行できます。

Logstash を使用して OpenSearch サーバーレスにデータを送信するには
  1. Docker または Linux を使用して、プラグインのバージョン 2.0.0 以降をインストールします。 logstash-output-opensearch

    Docker

    Docker は、 OpenSearch 出力プラグインがプリインストールされた Logstash OSSソフトウェアをホストします: opensearchproject/logstash-oss-with-opensearch-output-plugin 。他のイメージと同じように次のようにイメージをプルできます。

    docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
    Linux

    まだ Logstash の最新バージョンをインストールしていない場合は、インストールします。次に、出力プラグインのバージョン 2.0.0 を次のようにインストールします。

    cd logstash-8.5.0/ bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch

    プラグインが既にインストールされている場合は、次のように最新バージョンに更新します。

    bin/logstash-plugin update logstash-output-opensearch

    プラグインのバージョン 2.0.0 以降、 AWS SDKはバージョン 3 を使用します。8.4.0 より前のバージョンの Logstash を使用している場合は、プリインストールされている AWS プラグインをすべて削除し、logstash-integration-awsプラグインをインストールする必要があります。

    /usr/share/logstash/bin/logstash-plugin remove logstash-input-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
  2. OpenSearch 出力プラグインを OpenSearch Serverless と連携させるには、logstash.conf のopensearch出力セクションに次の変更を加える必要があります。

    • auth_typeservice_name として aoss を指定します。

    • hosts のコレクションエンドポイントを指定します。

    • パラメータ default_server_major_version および legacy_template を追加します。これらのパラメータは、プラグインが OpenSearch Serverless で動作するために必要です。

    output { opensearch { hosts => "collection-endpoint:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }

    この設定ファイルの例は、S3 バケット内のファイルから入力を受け取り、 OpenSearch サーバーレスコレクションに送信します。

    input { s3 { bucket => "my-s3-bucket" region => "us-east-1" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com:443" index => my-index auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key' aws_secret_access_key => 'your-secret-key' region => 'us-east-1' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
  3. 次に、次のように新しい設定で Logstash を実行してプラグインをテストします。

    bin/logstash -f config/test-plugin.conf

Python

次のサンプルコードでは、Python 用の opensearch-py クライアントを使用して、指定された OpenSearch Serverless コレクションへの安全な接続を確立し、単一のインデックスを作成し、そのインデックスを検索します。region および host の値を指定する必要があります。

OpenSearch サービスドメインとの重要な違いは、サービス名 (aoss ではなく ) ですes

from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # serverless collection endpoint, without https:// region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) # create an opensearch client and use the request-signer client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # create an index index_name = 'books-index' create_response = client.indices.create( index_name ) print('\nCreating index:') print(create_response) # index a document document = { 'title': 'The Green Mile', 'director': 'Stephen King', 'year': '1996' } response = client.index( index = 'books-index', body = document, id = '1' ) # delete the index delete_response = client.indices.delete( index_name ) print('\nDeleting index:') print(delete_response)

Ruby

opensearch-aws-sigv4 Gem は、すぐに OpenSearch サーバーレスと OpenSearch サービスへのアクセスを提供します。これには、opensearch-ruby クライアントのすべての機能が備わっています。クライアントがこの gem の依存関係であるためです。

Sigv4 署名者をインスタンス化するときは、aoss をサービス名として指定します。

require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') # create an opensearch client and use the request-signer client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) # create an index index = 'prime' client.indices.create(index: index) # insert data client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) # query the index client.search(body: { query: { match: { name: 'Echo' } } }) # delete index entry client.delete(index: index, id: '1') # delete the index client.indices.delete(index: index)

他のクライアントとのHTTPリクエストの署名

別のクライアントでリクエストを作成するときに、サーバーレスコレクションへのリクエストに署名するときは、次の要件が適用されます。 OpenSearch HTTP

  • サービス名を aoss として指定する必要があります。

  • x-amz-content-sha256 ヘッダーは、すべての AWS 署名バージョン 4 リクエストに必要です。これにより、リクエストペイロードのハッシュが指定されます。リクエストペイロードがある場合は、値を Secure Hash Algorithm (SHA) 暗号化ハッシュ () に設定しますSHA256。リクエストペイロードがない場合は、値を e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855 に設定します。これは空の文字列のハッシュです。

c でのインデックス作成URL

次のリクエスト例では、クライアントURLリクエストライブラリ (c URL) を使用して、コレクションmovies-index内の という名前のインデックスに 1 つのドキュメントを送信します。

curl -XPOST \ --user "$AWS_ACCESS_KEY_ID":"$AWS_SECRET_ACCESS_KEY" \ --aws-sigv4 "aws:amz:us-east-1:aoss" \ --header "x-amz-content-sha256: $REQUEST_PAYLOAD_SHA_HASH" \ --header "x-amz-security-token: $AWS_SESSION_TOKEN" \ "https://my-collection-endpoint.us-east-1.aoss.amazonaws.com/movies-index/_doc" \ -H "Content-Type: application/json" -d '{"title": "Shawshank Redemption"}'

Postman でのインデックス作成

次の図は、Postman を使用してコレクションにリクエストを送信する方法を示しています。認証の手順については、Postman の AWS 「署名付き認証ワークフローによる認証」を参照してください。

JSON response showing creation of a "movies-index" with successful result and no shards.