Amazon Neptune-to-OpenSearch replication setup
Amazon Neptune supports full-text search in Gremlin and SPARQL queries using Amazon OpenSearch Service (OpenSearch Service). You can use an AWS CloudFormation stack to link an OpenSearch Service domain to Neptune.
Before you begin, you need an existing Neptune DB cluster with streams enabled on it to serve as the source, and an OpenSearch Service domain to serve as the replication target.
If you already have an existing target OpenSearch Service domain that can be accessed by Lambda in the VPC where your Neptune DB cluster is located, the template can use that one. Otherwise, you need to create a new one.
The OpenSearch cluster and Lambda function that you create must be located in the same VPC as your Neptune DB cluster, and the OpenSearch cluster must be configured in VPC mode (not Internet mode).
We recommend that you use a newly created Neptune instance to use with OpenSearch Service.
If you use an existing instance that already has data in it, you should perform an
OpenSearch Service data sync before making queries or there may be data inconsistencies.
This GitHub project provides an example of how to perform the synchronization: Export
Neptune to OpenSearch
When integrating with Amazon OpenSearch Service, Neptune requires Elasticsearch version 7.1 or higher, or any version of OpenSearch except 2.3.
The AWS CloudFormation template below then creates a streams-consumer application instance that provides Neptune-to-OpenSearch replication.
Set up Neptune-to-OpenSearch replication using an AWS CloudFormation template for your region
To launch the AWS CloudFormation stack on the AWS CloudFormation console, choose one of the Launch Stack buttons in the following table, depending on the AWS Region that you want to use.
Region | View | View in Designer | Launch |
---|---|---|---|
US East (N. Virginia) | View |
View in Designer |
![]() |
US East (Ohio) | View |
View in Designer |
![]() |
US West (N. California) | View |
View in Designer |
![]() |
US West (Oregon) | View |
View in Designer |
![]() |
Canada (Central) | View |
View in Designer |
![]() |
South America (São Paulo) | View |
View in Designer |
![]() |
Europe (Stockholm) | View |
View in Designer |
![]() |
Europe (Ireland) | View |
View in Designer |
![]() |
Europe (London) | View |
View in Designer |
![]() |
Europe (Paris) | View |
View in Designer |
![]() |
Europe (Frankfurt) | View |
View in Designer |
![]() |
Middle East (Bahrain) | View |
View in Designer |
![]() |
Africa (Cape Town) | View |
View in Designer |
![]() |
Asia Pacific (Hong Kong) | View |
View in Designer |
![]() |
Asia Pacific (Tokyo) | View |
View in Designer |
![]() |
Asia Pacific (Seoul) | View |
View in Designer |
![]() |
Asia Pacific (Singapore) | View |
View in Designer |
![]() |
Asia Pacific (Mumbai) | View |
View in Designer |
![]() |
China (Beijing) | View |
View in Designer |
![]() |
China (Ningxia) | View |
View in Designer |
![]() |
AWS GovCloud (US-West) | View |
View in Designer |
![]() |
AWS GovCloud (US-East) | View |
View in Designer |
![]() |
On the Create Stack page, choose Next.
Add Details About the new OpenSearch stack you are creating
The Specify Stack Details page provides properties and parameters that you can use to control the setup of full-text search:
Stack Name –
The name of the new AWS CloudFormation stack that you're creating. You can generally use
the default value, NeptuneStreamPoller
.
Under Parameters, provide the following:
Network Configuration for the VPC Where the Streams Consumer Runs
VPC
– Provide the name of the VPC where the polling Lambda function will run.List of Subnet IDs
– The subnets to which a network interface is established. Add subnets corresponding to your Neptune cluster.List of Security Group Ids
– Provide the IDs of security groups that grant write inbound access to your source Neptune DB cluster.List of Route Table Ids
– This is needed to create an Amazon DynamoDB endpoint in your Neptune VPC, if you do not already have one. You must provide a comma-separated list of route table IDs associated with the subnets.Require to create Dynamo DB VPC Endpoint
– A Boolean value that defaults totrue
. You only need to change it tofalse
if you have already created a DynamoDB endpoint in your VPC.Require to create Monitoring VPC Endpoint
– A Boolean value that defaults totrue
. You only need to change it tofalse
if you have already created a monitoring endpoint in your VPC.
Stream Poller
Application Name
– You can generally leave this set to the default (NeptuneStream
). If you use a different name, it must be unique.Memory size for Lambda Poller
– Used to set the memory size available to the Lambda poller function. The default value is 2,048 megabytes.Lambda Runtime
– The language used in the Lambda function that retrieves items from the Neptune stream. You can set this either topython3.9
or tojava8
.S3 Bucket having Lambda code artifacts
– Leave this blank unless you are using a custom Lambda polling function that loads from a different S3 bucket.S3 Key corresponding to Lambda Code artifacts
– Leave this blank unless you are using a custom Lambda polling function.Logging level for Lambda
– In general, leave this set to the default value,INFO
.Managed Policies for Lambda Execution
– In general, leave this blank unless you are using a custom Lambda polling function.Stream Records Handler
– In general, leave this blank unless you are using a custom handler for the records in Neptune streams.Maximum records Fetched from Stream
– You can use this parameter to tune performance. The default (100
) is a good place to start. The maximum allowable is 10,000. The higher the number, the fewer network calls are needed to read records from the stream, but the more memory is required to process the records.Max wait time between two Polls (in Seconds)
– Determines how frequently the Lambda poller is invoked to poll the Neptune streams. Set this value to 0 for continuous polling. The maximum value is 3,600 seconds (1 hour). The default value (60 seconds) is a good place to start, depending on how fast your graph data changes.Maximum Continuous polling period (in Seconds)
– Used to set a timeout for the Lambda polling function. It should be between 5 seconds and 900 seconds. The default value (600 seconds) is a good place to start.Step Function Fallback Period
– The number of step-function-fallback-period units to wait for the poller, after which the step function is called through Amazon CloudWatch Events to recover from a failure. The default (5 minutes) is a good place to start.Step Function Fallback Period Unit
– The time units used to measure the precedingStep Function Fallback Period
(minutes, hours, days). The default (minutes) is generally sufficient.Data replication scope
– Determines whether to replicate both nodes and edges, or only nodes to OpenSearch (this applies to Gremlin engine data only). The default value (All) is generally a good place to start.Ignore OpenSearch missing document error
– Flag to determine whether a missing document error in OpenSearch can be ignored. Missing document errors occur rarely but need manual intervention if not ignored. The default value (True
) is generally a good place to start.Enable Non-String Indexing
– Flag to enable or disable indexing of fields that do not have string content. If this flag is set totrue
, non-string fields are indexed in OpenSearch, or iffalse
, only string fields are indexed. The default istrue
.Properties to exclude from being inserted into OpenSearch
– A comma-delimited list of property or predicate keys to exclude from OpenSearch indexing. If this CFN parameter value is left blank, all the property keys are indexed.Datatypes to exclude from being inserted into OpenSearch
– A comma-delimited list of property or predicate datatypes to exclude from OpenSearch indexing. If this CFN parameter value is left blank, all the property values that can safely be converted to OpenSearch datatypes are indexed.
Neptune Stream
-
Endpoint of source Neptune Stream
– (Required) This takes one of two forms:https://
(or its alias,your DB cluster
:port
/propertygraph/streamhttps://
).your DB cluster
:port
/pg/streamhttps://
your DB cluster
:port
/sparql/stream
Neptune Query Engine
– Choose Gremlin or SPARQL.Is IAM Auth Enabled?
– If your Neptune DB cluster is using IAM authentication, set this parameter totrue
.Neptune Cluster Resource Id
– If your Neptune DB cluster is using IAM authentication, set this parameter to the cluster resource ID. The resource ID is not the same as the cluster ID. Instead, it takes the form:cluster-
followed by 28 alpha-numeric characters. It can be found under Cluster Details in the Neptune console.
Target OpenSearch cluster
Endpoint for OpenSearch service
– (Required) Provide the endpoint for the OpenSearch service in your VPC.Number of Shards for OpenSearch Index
– The default value (5) is generally a good place to start.Number of Replicas for OpenSearch Index
– The default value (1) is generally a good place to start.Geo Location Fields for Mapping
– If you are using geolocation fields, list the property keys here.
Alarm
Require to create Cloud watch Alarm
– Set this totrue
if you want to create a CloudWatch alarm for the new stack.SNS Topic ARN for Cloudwatch Alarm Notifications
– The SNS topic ARN where CloudWatch alarm notifications should be sent (only needed if alarms are enabled).Email for Alarm Notifications
– The email address to which alarm notifications should be sent (only needed if alarms are enabled).
For destination of the alarm notification, you can add SNS only, email only, or both SNS and email.
Run the AWS CloudFormation Template
Now you can complete the process of provisioning a Neptune streams consumer application instance as follows:
In AWS CloudFormation, on the Specify Stack Details page, choose Next.
On the Options page, choose Next.
-
On the Review page, select the first check box to acknowledge that AWS CloudFormation will create IAM resources. Select the second check box to acknowledge
CAPABILITY_AUTO_EXPAND
for the new stack.Note CAPABILITY_AUTO_EXPAND
explicitly acknowledges that macros will be expanded when creating the stack, without prior review. Users often create a change set from a processed template so that the changes made by macros can be reviewed before actually creating the stack. For more information, see the AWS CloudFormation CreateStack API operation in the AWS CloudFormation API Reference.Then choose Create.
To update the stream poller with the latest Lambda artifacts
You can update the stream poller with the latest Lambda code artifacts as follows:
In the AWS Management Console, navigate to AWS CloudFormation and select the main parent AWS CloudFormation stack.
Select the Update option for the stack.
Select Replace current template.
-
For the template source, choose Amazon S3 URL and enter the following S3 URL:
https://aws-neptune-customer-samples.s3.amazonaws.com/neptune-stream/neptune_to_elastic_search.json
Select Next without changing any AWS CloudFormation parameters.
Choose Update Stack.
The stack will now update the Lambda artifacts with the most recent ones.
Different approaches to enabling full text search on existing Neptune databases
The best way to enable full text search on an existing Neptune database is generally as follows, provided you can pause your write workloads. The downtime it requires is limited, because creating a clone is relatively fast.
Stop all write workloads on the database.
Create a clone of the database.
Enable streams on the original database.
Resume the write workloads.
Use this Neptune tool on github
to perform a one-time synchronization from the clone. Use this AWS CloudFormation template to synchronize from your original database for continuous updating (no configuration change is needed in the template).
If you can't afford to suspend write workloads on your database, here is an approach that requires even less downtime than the recommended approach above, but it needs to be done carefully:
Enable streams on the database.
Create a clone of the database.
Make note of the latest
eventID
for the streams from the clone.Use this Neptune tool on github
to perform a one-time synchronization from the clone. Use this AWS CloudFormation template to synchronize from your original database for continuous updating. You do need to configure the AWS CloudFormation template: make sure to set the checkpoint in the DynamoDB table using the
eventID
you recorded before starting the continuous synchronization.
Disabling and re-enabling the stream poller process
Be careful when you disable the stream poller process! Data loss can occur if the process is paused for more than 7 days (the stream expiry window).
To disable (pause) the stream poller process
Sign in to the AWS Management Console and open the Amazon EventBridge console at https://console.aws.amazon.com/events/
. In the navigation pane, select Rules.
Select the rule whose name contains the name you supplied as Application Name in the AWS CloudFormation template that you used to set up the stream poller.
Choose Disable.
Open the Step Functions console at https://console.aws.amazon.com/states/
. Select the running step function that corresponds to the stream poller process. Again, the name of that step function contains the name you supplied as Application Name in the AWS CloudFormation template that you used to set up the stream poller. You can filter by function execution status to see only Running functions.
Choose Stop.
To re-enable the stream poller process
Sign in to the AWS Management Console and open the Amazon EventBridge console at https://console.aws.amazon.com/events/
. In the navigation pane, select Rules.
Select the rule whose name contains the name you supplied as Application Name in the AWS CloudFormation template that you used to set up the stream poller.
Choose Disable. The event rule based on the specified scheduled interval will now trigger a new execution of the step function.
Querying custom fields in OpenSearch using Neptune full-text search
The current stream poller can easily be extended to write custom code for handling
custom fields, as is explained in detail in the blog post: Capture
graph changes using Neptune Streams
When adding a custom field in OpenSearch, make sure to add the new field as an inner object of a predicate (see Neptune Full-text search data model).