Using the Bolt protocol to make openCypher queries to Neptune - Amazon Neptune

Using the Bolt protocol to make openCypher queries to Neptune

Bolt is a statement-oriented client/server protocol initially developed by Neo4j and licensed under the Creative Commons 3.0 Attribution-ShareAlike license. It is client-driven, meaning that the client always initiates message exchanges.

To connect to Neptune using Neo4j's Bolt drivers, simply replace the URL and Port number with your cluster endpoints using the bolt URI scheme. If you have a single Neptune instance running, use the read_write endpoint. If multiple instances are running, then two drivers are recommended, one for the writer and another for all the read replicas. If you have only the default two endpoints, a read_write and a read_only driver are sufficient, but if you have custom endpoints as well, consider creating a driver instance for each one.

Note

Althought the Bolt spec states that Bolt can connect using either TCP or WebSockets, Neptune only supports TCP connections for Bolt.

Neptune allows up to 1000 concurrent Bolt connections.

Neptune supports the following Neo4j Bolt message specification version: 4.0.0.

For examples of openCypher queries in various languages that use the Bolt drivers, see the Neo4j Drivers & Language Guides documentation.

Using Bolt with Java to connect to Neptune

You can download a driver for whatever version you want to use from the Maven MVN repository, or can add this dependency to your project:

<dependency> <groupId>org.neo4j.driver</groupId> <artifactId>neo4j-java-driver</artifactId> <version>4.3.3</version> </dependency>

Then, to connect to Neptune in Java using one of these Bolt drivers, create a driver instance for the primary/writer instance in your cluster using code like the following:

import org.neo4j.driver.Driver; import org.neo4j.driver.GraphDatabase; final Driver driver = GraphDatabase.driver("bolt://(your cluster endpoint URL):(your cluster port)", AuthTokens.none(), Config.builder().withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build());

If you have one or more reader replicas, you can similarly create a driver instance for them using code like this:

final Driver read_only_driver = // (without connection timeout) GraphDatabase.driver("bolt://(your cluster endpoint URL):(your cluster port)", Config.builder().withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build());

Or, with a timeout:

final Driver read_only_timeout_driver = // (with connection timeout) GraphDatabase.driver("bolt://(your cluster endpoint URL):(your cluster port)", Config.builder().withConnectionTimeout(30, TimeUnit.SECONDS) .withEncryption() .withTrustStrategy(TrustStrategy.trustSystemCertificates()) .build());

If you have custom endpoints, it may also be worthwhile to create a driver instance for each one.

A Python openCypher query example using Bolt

Here is how to make an openCypher query in Python using Bolt:

python -m pip install neo4j
from neo4j import GraphDatabase uri = "bolt://(your cluster endpoint URL):(your cluster port)" driver = GraphDatabase.driver(uri, auth=("username", "password"), encrypted=True)

Note that the auth parameters are ignored.

A .NET openCypher query example using Bolt

Here is how to make an openCypher query in .NET using Bolt:

Install-Package Neo4j.Driver-4.3.0
using Neo4j.Driver; namespace hello { // This example creates a node and reads a node in a Neptune // Cluster where IAM Authentication is not enabled. public class HelloWorldExample : IDisposable { private bool _disposed = false; private readonly IDriver _driver; private static string url = "bolt://(your cluster endpoint URL):(your cluster port)"; private static string createNodeQuery = "CREATE (a:Greeting) SET a.message = 'HelloWorldExample'"; private static string readNodeQuery = "MATCH(n:Greeting) RETURN n.message"; ~HelloWorldExample() => Dispose(false); public HelloWorldExample(string uri) { _driver = GraphDatabase.Driver(uri, AuthTokens.None, o => o.WithEncryptionLevel(EncryptionLevel.Encrypted)); } public void createNode() { // Open a session using (var session = _driver.Session()) { // Run the query in a write transaction var greeting = session.WriteTransaction(tx => { var result = tx.Run(createNodeQuery); // Consume the result return result.Consume(); }); // The output will look like this: // ResultSummary{Query=`CREATE (a:Greeting) SET a.message = 'HelloWorldExample"..... Console.WriteLine(greeting); } } public void retrieveNode() { // Open a session using (var session = _driver.Session()) { // Run the query in a read transaction var greeting = session.ReadTransaction(tx => { var result = tx.Run(readNodeQuery); // Consume the result. Read the single node // created in a previous step. return result.Single()[0].As<string>();; }); // The output will look like this: // HelloWorldExample Console.WriteLine(greeting); } } public void Dispose() { Dispose(true); GC.SuppressFinalize(this); } protected virtual void Dispose(bool disposing) { if (_disposed) return; if (disposing) { _driver?.Dispose(); } _disposed = true; } public static void Main() { using (var apiCaller = new HelloWorldExample(url)) { apiCaller.createNode(); apiCaller.retrieveNode(); } } } }

A Java openCypher query example using Bolt with IAM authentication

The Java code below shows how to make openCypher queries in Java using Bolt with IAM authentication. The JavaDoc comment describes its usage. Once a driver instance is available, you can use it to make multiple authenticated requests.

package software.amazon.neptune.bolt; import com.amazonaws.DefaultRequest; import com.amazonaws.Request; import com.amazonaws.auth.AWS4Signer; import com.amazonaws.auth.AWSCredentialsProvider; import com.amazonaws.http.HttpMethodName; import com.google.gson.Gson; import lombok.Builder; import lombok.Getter; import lombok.NonNull; import org.neo4j.driver.Value; import org.neo4j.driver.Values; import org.neo4j.driver.internal.security.InternalAuthToken; import org.neo4j.driver.internal.value.StringValue; import java.net.URI; import java.util.Collections; import java.util.HashMap; import java.util.Map; import static com.amazonaws.auth.internal.SignerConstants.AUTHORIZATION; import static com.amazonaws.auth.internal.SignerConstants.HOST; import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_DATE; import static com.amazonaws.auth.internal.SignerConstants.X_AMZ_SECURITY_TOKEN; /** * Use this class instead of `AuthTokens.basic` when working with an IAM * auth-enabled server. It works the same as `AuthTokens.basic` when using * static credentials, and avoids making requests with an expired signature * when using temporary credentials. Internally, it generates a new signature * on every invocation (this may change in a future implementation). * * Note that authentication happens only the first time for a pooled connection. * * Typical usage: * * NeptuneAuthToken authToken = NeptuneAuthToken.builder() * .credentialsProvider(credentialsProvider) * .region("aws region") * .url("cluster endpoint url") * .build(); * * Driver driver = GraphDatabase.driver( * authToken.getUrl(), * authToken, * config * ); */ public class NeptuneAuthToken extends InternalAuthToken { private static final String SCHEME = "basic"; private static final String REALM = "realm"; private static final String SERVICE_NAME = "neptune-db"; private static final String HTTP_METHOD_HDR = "HttpMethod"; private static final String DUMMY_USERNAME = "username"; @NonNull private final String region; @NonNull @Getter private final String url; @NonNull private final AWSCredentialsProvider credentialsProvider; private final Gson gson = new Gson(); @Builder private NeptuneAuthToken( @NonNull final String region, @NonNull final String url, @NonNull final AWSCredentialsProvider credentialsProvider ) { // The superclass caches the result of toMap(), which we don't want super(Collections.emptyMap()); this.region = region; this.url = url; this.credentialsProvider = credentialsProvider; } @Override public Map<String, Value> toMap() { final Map<String, Value> map = new HashMap<>(); map.put(SCHEME_KEY, Values.value(SCHEME)); map.put(PRINCIPAL_KEY, Values.value(DUMMY_USERNAME)); map.put(CREDENTIALS_KEY, new StringValue(getSignedHeader())); map.put(REALM_KEY, Values.value(REALM)); return map; } private String getSignedHeader() { final Request<Void> request = new DefaultRequest<>(SERVICE_NAME); request.setHttpMethod(HttpMethodName.GET); request.setEndpoint(URI.create(url)); // Comment out the following line if you're using an engine version older than 1.2.0.0 request.setResourcePath("/opencypher"); final AWS4Signer signer = new AWS4Signer(); signer.setRegionName(region); signer.setServiceName(request.getServiceName()); signer.sign(request, credentialsProvider.getCredentials()); return getAuthInfoJson(request); } private String getAuthInfoJson(final Request<Void> request) { final Map<String, Object> obj = new HashMap<>(); obj.put(AUTHORIZATION, request.getHeaders().get(AUTHORIZATION)); obj.put(HTTP_METHOD_HDR, request.getHttpMethod()); obj.put(X_AMZ_DATE, request.getHeaders().get(X_AMZ_DATE)); obj.put(HOST, request.getHeaders().get(HOST)); obj.put(X_AMZ_SECURITY_TOKEN, request.getHeaders().get(X_AMZ_SECURITY_TOKEN)); return gson.toJson(obj); } }

A Python openCypher query example using Bolt with IAM authentication

The Python class below lets you make openCypher queries in Python using Bolt with with IAM authentication:

import json from neo4j import Auth from botocore.awsrequest import AWSRequest from botocore.credentials import Credentials from botocore.auth import ( SigV4Auth, _host_from_url, ) SCHEME = "basic" REALM = "realm" SERVICE_NAME = "neptune-db" DUMMY_USERNAME = "username" HTTP_METHOD_HDR = "HttpMethod" HTTP_METHOD = "GET" AUTHORIZATION = "Authorization" X_AMZ_DATE = "X-Amz-Date" X_AMZ_SECURITY_TOKEN = "X-Amz-Security-Token" HOST = "Host" class NeptuneAuthToken(Auth): def __init__( self, credentials: Credentials, region: str, url: str, **parameters ): # Do NOT add "/opencypher" in the line below if you're using an engine version older than 1.2.0.0 request = AWSRequest(method=HTTP_METHOD, url=url + "/opencypher") request.headers.add_header("Host", _host_from_url(request.url)) sigv4 = SigV4Auth(credentials, SERVICE_NAME, region) sigv4.add_auth(request) auth_obj = { hdr: request.headers[hdr] for hdr in [AUTHORIZATION, X_AMZ_DATE, X_AMZ_SECURITY_TOKEN, HOST] } auth_obj[HTTP_METHOD_HDR] = request.method creds: str = json.dumps(auth_obj) super().__init__(SCHEME, DUMMY_USERNAME, creds, REALM, **parameters)

You use this class to create a driver as follows:

authToken = NeptuneAuthToken(creds, REGION, URL) driver = GraphDatabase.driver(URL, auth=authToken, encrypted=True)

A Node.js example using IAM authentication and Bolt

The Node.js code below uses the AWS SDK for JavaScript version 3 and ES6 syntax to create a driver that authenticates requests:

import neo4j from "neo4j-driver"; import { HttpRequest } from "@aws-sdk/protocol-http"; import { defaultProvider } from "@aws-sdk/credential-provider-node"; import { SignatureV4 } from "@aws-sdk/signature-v4"; import crypto from "@aws-crypto/sha256-js"; const { Sha256 } = crypto; import assert from "node:assert"; const region = "us-west-2"; const serviceName = "neptune-db"; const host = "(your cluster endpoint URL)"; const port = 8182; const protocol = "bolt"; const hostPort = host + ":" + port; const url = protocol + "://" + hostPort; const createQuery = "CREATE (n:Greeting {message: 'Hello'}) RETURN ID(n)"; const readQuery = "MATCH(n:Greeting) WHERE ID(n) = $id RETURN n.message"; async function signedHeader() { const req = new HttpRequest({ method: "GET", protocol: protocol, hostname: host, port: port, // Comment out the following line if you're using an engine version older than 1.2.0.0 path: "/opencypher", headers: { host: hostPort } }); const signer = new SignatureV4({ credentials: defaultProvider(), region: region, service: serviceName, sha256: Sha256 }); return signer.sign(req, { unsignableHeaders: new Set(["x-amz-content-sha256"]) }) .then((signedRequest) => { const authInfo = { "Authorization": signedRequest.headers["authorization"], "HttpMethod": signedRequest.method, "X-Amz-Date": signedRequest.headers["x-amz-date"], "Host": signedRequest.headers["host"], "X-Amz-Security-Token": signedRequest.headers["x-amz-security-token"] }; return JSON.stringify(authInfo); }); } async function createDriver() { let authToken = { scheme: "basic", realm: "realm", principal: "username", credentials: await signedHeader() }; return neo4j.driver(url, authToken, { encrypted: "ENCRYPTION_ON", trust: "TRUST_SYSTEM_CA_SIGNED_CERTIFICATES", maxConnectionPoolSize: 1, // logging: neo4j.logging.console("debug") } ); } function unmanagedTxn(driver) { const session = driver.session(); const tx = session.beginTransaction(); tx.run(createQuery) .then((res) => { const id = res.records[0].get(0); return tx.run(readQuery, { id: id }); }) .then((res) => { // All good, the transaction will be committed const msg = res.records[0].get("n.message"); assert.equal(msg, "Hello"); }) .catch(err => { // The transaction will be rolled back, now handle the error. console.log(err); }) .then(() => session.close()); } createDriver() .then((driver) => { unmanagedTxn(driver); driver.close(); }) .catch((err) => { console.log(err); });

Bolt connection behavior in Neptune

Here are some things to keep in mind about Neptune Bolt connections:

  • Because Bolt connections are created at the TCP layer, you can't use an Application Load Balancer in front of them, as you can with an HTTP endpoint.

  • The port that Neptune uses for Bolt connections is your DB cluster's port.

  • Based on the Bolt preamble passed to it, the Neptune server selects the highest appropriate Bolt version (1, 2, 3, or 4.0).

  • The maximum number of connections to the Neptune server that a client can have open at any point in time is 1,000.

  • If the client doesn't close a connection after a query, that connection can be used to execute the next query.

  • However, if a connection is idle for 20 minutes, the server closes it automatically.

  • If IAM authentication is not enabled, you can use AuthTokens.none() rather than supplying a dummy user name and password. For example, in Java:

    GraphDatabase.driver("bolt://(your cluster endpoint URL):(your cluster port)", AuthTokens.none(), Config.builder().withEncryption().withTrustStrategy(TrustStrategy.trustSystemCertificates()).build());
  • When IAM authentication is enabled, a Bolt connection is always disconnected a few minutes more than 10 days after it was established if it hasn't already closed for some other reason.

  • If the client sends a query for execution over a connection without having consumed the results of a previous query, the new query is discarded. To discard the previous results instead, the client must send a reset message over the connection.

  • Only one transaction at a time can be created on a given connection.

  • If an exception occurs during a transaction, the Neptune server rolls back the transaction and closes the connection. In this case, the driver creates a new connection for the next query.

  • Be aware that sessions are not thread-safe. Multiple parallel operations must use multiple separate sessions.