Ingesting data into Amazon OpenSearch Serverless collections
These sections provide details about the supported ingest pipelines for data ingestion into Amazon OpenSearch Serverless collections. They also cover some of the clients that you can use to interact with the OpenSearch API operations. Your clients should be compatible with OpenSearch 2.x in order to integrate with OpenSearch Serverless.
Topics
Minimum required permissions
In order to ingest data into an OpenSearch Serverless collection, the principal that is writing the data must have the following minimum permissions assigned in a data access policy:
[ { "Rules":[ { "ResourceType":"index", "Resource":[ "index/
target-collection
/*
" ], "Permission":[ "aoss:CreateIndex", "aoss:WriteDocument", "aoss:UpdateIndex" ] } ], "Principal":[ "arn:aws:iam::123456789012
:user/my-user
" ] } ]
The permissions can be more broad if you plan to write to additional indexes. For
example, rather than specifying a single target index, you can allow permission to all
indexes (index/target-collection
/*), or a subset of indexes
(index/target-collection
/logs*
).
For a reference of all available OpenSearch API operations and their associated permissions, see Supported operations and plugins in Amazon OpenSearch Serverless.
Amazon Kinesis Data Firehose
Kinesis Data Firehose supports OpenSearch Serverless as a delivery destination. For instructions to send data into OpenSearch Serverless, see Creating a Kinesis Data Firehose Delivery Stream and Choose OpenSearch Serverless for Your Destination in the Amazon Kinesis Data Firehose Developer Guide.
The IAM role that you provide to Kinesis Data Firehose for delivery must be specified within a data
access policy with the aoss:WriteDocument
minimum permission for the target
collection. For more information, see Minimum required permissions.
Before you send data to OpenSearch Serverless, you might need to perform transforms on the data. To learn more about using Lambda functions to perform this task, see Amazon Kinesis Data Firehose Data Transformation in the same guide.
Fluentd
You can use the Fluentd OpenSearch plugin
To use Fluentd to send data to OpenSearch Serverless
-
Download version 1.4.2 or later of Calyptia Fluentd from https://www.fluentd.org/download
. This version includes the OpenSearch plugin by default, which supports OpenSearch Serverless. -
Install the package. Follow the instructions in the Fluentd documentation based on your operating system:
-
Add a configuration that sends data to OpenSearch Serverless. This sample configuration sends the message "test" to a single collection. Make sure to do the following:
-
For
host
, specify the endpoint of your OpenSearch Serverless collection. -
For
aws_service_name
, specifyaoss
.
<source> @type sample tag test test {"hello":"world"} </source> <match test> @type opensearch host https://
collection-endpoint
.us-east-1
.aoss.amazonaws.com port 443 index_namefluentd
aws_service_name aoss </match> -
-
Run Calyptia Fluentd to start sending data to the collection. For example, on Mac you can run the following command:
sudo launchctl load /Library/LaunchDaemons/calyptia-fluentd.plist
Go
The following sample code uses the opensearch-goregion
and
host
.
package main import ( "context" "log" "strings" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" opensearch "github.com/opensearch-project/opensearch-go/v2" opensearchapi "github.com/opensearch-project/opensearch-go/v2/opensearchapi" requestsigner "github.com/opensearch-project/opensearch-go/v2/signer/awsv2" ) const endpoint = "" // serverless collection endpoint func main() { ctx := context.Background() awsCfg, err := config.LoadDefaultConfig(ctx, config.WithRegion("<AWS_REGION>"), config.WithCredentialsProvider( getCredentialProvider("<AWS_ACCESS_KEY>", "<AWS_SECRET_ACCESS_KEY>", "<AWS_SESSION_TOKEN>"), ), ) if err != nil { log.Fatal(err) // Do not log.fatal in a production ready app. } // Create an AWS request Signer and load AWS configuration using default config folder or env vars. signer, err := requestsigner.NewSignerWithService(awsCfg, "aoss") // "aoss" for Amazon OpenSearch Serverless if err != nil { log.Fatal(err) // Do not log.fatal in a production ready app. } // Create an opensearch client and use the request-signer client, err := opensearch.NewClient(opensearch.Config{ Addresses: []string{endpoint}, Signer: signer, }) if err != nil { log.Fatal("client creation err", err) } indexName = "go-test-index" // Define index mapping. mapping := strings.NewReader(`{ "settings": { "index": { "number_of_shards": 4 } } }`) // Create an index with non-default settings. createIndex := opensearchapi.IndicesCreateRequest{ Index: indexName, Body: mapping, } createIndexResponse, err := createIndex.Do(context.Background(), client) if err != nil { log.Println("Error ", err.Error()) log.Println("failed to create index ", err) log.Fatal("create response body read err", err) } log.Println(createIndexResponse) // Delete previously created index. deleteIndex := opensearchapi.IndicesDeleteRequest{ Index: []string{indexName}, } deleteIndexResponse, err := deleteIndex.Do(context.Background(), client) if err != nil { log.Println("failed to delete index ", err) log.Fatal("delete index response body read err", err) } log.Println("deleting index", deleteIndexResponse) } func getCredentialProvider(accessKey, secretAccessKey, token string) aws.CredentialsProviderFunc { return func(ctx context.Context) (aws.Credentials, error) { c := &aws.Credentials{ AccessKeyID: accessKey, SecretAccessKey: secretAccessKey, SessionToken: token, } return *c, nil } }
Java
The following sample code uses the opensearch-javaregion
and host
.
The important difference compared to OpenSearch Service domains is the service
name (aoss
instead of es
).
SdkHttpClient httpClient = ApacheHttpClient.builder().build(); OpenSearchClient client = new OpenSearchClient( new AwsSdk2Transport( httpClient, "search-...us-west-2.es.amazonaws.com", // OpenSearch Serverless collection endpoint, without https:// "aoss" // signing service name Region.US_WEST_2, // signing service region AwsSdk2TransportOptions.builder().build() ) ); String index = "sample-index"; CreateIndexRequest createIndexRequest = new CreateIndexRequest.Builder().index(index).build(); CreateIndexResponse createIndexResponse = client.indices().create(createIndexRequest); System.out.println("Create index reponse: " + createIndexResponse); DeleteIndexRequest deleteIndexRequest = new DeleteRequest.Builder().index(index).build(); DeleteIndexResponse deleteIndexResponse = client.indices().delete(deleteIndexRequest); System.out.println("Delete index reponse: " + deleteIndexResponse); httpClient.close();
JavaScript
The following sample code uses the opensearch-jsnode
and region
.
The important difference compared to OpenSearch Service domains is the
service name (aoss
instead of es
).
Logstash
You must use version 2.0.0 or later of the logstash-output-opensearch
Docker installation
Docker hosts the Logstash OSS software with the OpenSearch output plugin
preinstalled: opensearchproject/logstash-oss-with-opensearch-output-plugin
You can pull the image just like any other image:
docker pull opensearchproject/logstash-oss-with-opensearch-output-plugin:latest
Linux installation
First, install the latest version of Logstash
Then, install version 2.0.0 of the output plugin:
cd logstash-8.5.0/ bin/logstash-plugin install --version 2.0.0 logstash-output-opensearch
If the plugin is already installed, update it to the latest version:
bin/logstash-plugin update logstash-output-opensearch
Starting with version 2.0.0 of the plugin, the AWS SDK uses version 3. If you're
using a Logstash version earlier than 8.4.0, you must remove any pre-installed AWS
plugins and install the logstash-integration-aws
plugin:
/usr/share/logstash/bin/logstash-plugin remove logstash-input-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-input-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-s3 /usr/share/logstash/bin/logstash-plugin remove logstash-output-sns /usr/share/logstash/bin/logstash-plugin remove logstash-output-sqs /usr/share/logstash/bin/logstash-plugin remove logstash-output-cloudwatch /usr/share/logstash/bin/logstash-plugin install --version 0.1.0.pre logstash-integration-aws
Configuring the OpenSearch output plugin
In order for the OpenSearch output plugin to work with OpenSearch Serverless, you must make the
following modifications to the opensearch
output section of
logstash.conf:
-
Specify
aoss
as theservice_name
underauth_type
. -
Specify your collection endpoint for
hosts
. -
Add the parameters
default_server_major_version
andlegacy_template
. These parameters are required for the plugin to work with OpenSearch Serverless.
output { opensearch { hosts => "
collection-endpoint
:443" auth_type => { ... service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
This example configuration file takes its input from files in an S3 bucket and sends them to an OpenSearch Serverless collection:
input { s3 { bucket => "
my-s3-bucket
" region => "us-east-1
" } } output { opensearch { ecs_compatibility => disabled hosts => "https://my-collection-endpoint
.us-east-1
.aoss.amazonaws.com:443" index =>my-index
auth_type => { type => 'aws_iam' aws_access_key_id => 'your-access-key
' aws_secret_access_key => 'your-secret-key
' region => 'us-east-1
' service_name => 'aoss' } default_server_major_version => 2 legacy_template => false } }
Then, run Logstash with the new configuration to test the plugin:
bin/logstash -f config/
test-plugin
.conf
Python
The following sample code uses the opensearch-pyregion
and
host
.
The important difference compared to OpenSearch Service domains is the
service name (aoss
instead of es
).
from opensearchpy import OpenSearch, RequestsHttpConnection, AWSV4SignerAuth import boto3 host = '' # The collection endpoint without https://. For example, 07tjusf2h91cunochc.us-east-1.aoss.amazonaws.com region = '' # e.g. us-east-1 service = 'aoss' credentials = boto3.Session().get_credentials() auth = AWSV4SignerAuth(credentials, region, service) client = OpenSearch( hosts=[{'host': host, 'port': 443}], http_auth=auth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection, pool_maxsize=20, ) # Create an index with non-default settings index_name = "python-test-index" index_body = { 'settings': { 'index': { 'number_of_shards': 4 } } } response = client.indices.create( index_name, body=index_body ) print('\nCreating index:') print(response) q = 'miller' query = { 'size': 5, 'query': { 'multi_match': { 'query': q, 'fields': ['title^2', 'director'] } } } response = client.search( body = query, index = index_name ) print('\nSearch results:') print(response)
Ruby
The opensearch-aws-sigv4
gem provides access to OpenSearch Serverless, along with OpenSearch Service,
out of the box. It has all features of the opensearch-ruby
To install the gem:
gem install opensearch-aws-sigv4
When instantiating the Sigv4 signer, specify aoss
as the service
name:
require 'opensearch-aws-sigv4' require 'aws-sigv4' signer = Aws::Sigv4::Signer.new(service: 'aoss', region: 'us-west-2', access_key_id: 'key_id', secret_access_key: 'secret') client = OpenSearch::Aws::Sigv4Client.new( { host: 'https://your.amz-opensearch-serverless.endpoint', log: true }, signer) index = 'prime' client.indices.create(index: index) client.index(index: index, id: '1', body: { name: 'Amazon Echo', msrp: '5999', year: 2011 }) client.search(body: { query: { match: { name: 'Echo' } } }) client.delete(index: index, id: '1') client.indices.delete(index: index)
Signing HTTP requests with other clients
The following requirements apply when signing requests to OpenSearch Serverless collections when you construct HTTP requests with another clients.
-
You must specify the service name as
aoss
. -
The
x-amz-content-sha256
header is required for all AWS Signature Version 4 requests. It provides a hash of the request payload. If there's a request payload, set the value to its Secure Hash Algorithm (SHA) cryptographic hash (SHA256). If there's no request payload, set the value toe3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855
, which is the hash of an empty string.