Using Neptune Streams - Amazon Neptune

Using Neptune Streams

With the Neptune Streams feature, you can generate a complete sequence of change-log entries that record every change made to your graph data as it happens. For an overview of this feature, see Capturing Graph Changes in Real Time Using Neptune Streams.

Enabling Neptune Streams

You can enable or disable Neptune Streams at any time by setting the neptune_streams DB Cluster parameter (see Parameters That You Can Use to Configure Amazon Neptune). Setting the parameter to 1 enables Streams, and setting it to 0 disables Streams.

Note

Up until recently, Neptune Streams has been an experimental feature that you enable or disable in Lab Mode using the DB Cluster neptune_lab_mode parameter (see Neptune Lab Mode). Using Lab Mode to enable Streams is now deprecated and will be disabled in the future.

After you turn Streams on, the change records in the change-log stream continue to be available for one week after they are created.

Disabling Neptune Streams

You can turn Neptune Streams off any time that it is running.

To turn Streams off, update the DB Cluster parameter group so that the value of the neptune_streams parameter is set to 0.

Important

As soon as Streams is turned off, you can't access the change-log data any more. Be sure to read what you are interested in before turning Streams off.

Calling the Neptune Streams REST API

You access Neptune Streams using a REST API that sends an HTTP GET request to one of the following local endpoints:

  • For a SPARQL graph DB:   https://Neptune-DNS:8182/sparql/stream.

  • For a Gremlin graph DB:   https://Neptune-DNS:8182/gremlin/stream.

Only an HTTP GET operation is allowed.

Neptune supports gzip compression of the response, provided that the HTTP request includes an Accept-Encoding header that specifies gzip as an accepted compression format (that is, "Accept-Encoding: gzip").

Parameters

  • limit – long, optional. Range: 1–100,000. Default: 10.

    Specifies the maximum number of records to return. There is also a size limit of 10 MB on the response that can't be modified and that takes precedence over the number of records specified in the limit parameter. The response does include a threshold-breaching record if the 10 MB limit was reached.

  • iteratorType – String, optional.

    This parameter can take one of the following values:

    • AT_SEQUENCE_NUMBER(default) – Indicates that reading should start from the event sequence number specified jointly by the commitNum and opNum parameters.

    • AFTER_SEQUENCE_NUMBER – Indicates that reading should start right after the event sequence number specified jointly by the commitNum and opNum parameters.

    • TRIM_HORIZON – Indicates that reading should start at the last untrimmed record in the system, which is the oldest unexpired (not yet deleted) record in the change-log stream. This mode is useful during application startup, when you don't have a specific starting event sequence number.

  • commitNum – long, required when iteratorType is AT_SEQUENCE_NUMBER or AFTER_SEQUENCE_NUMBER.

    The commit number of the starting record to read from the change-log stream.

    This parameter is ignored when iteratorType is TRIM_HORIZON.

  • opNum – long, optional (the default is 1).

    The operation sequence number within the specified commit to start reading from in the change-log stream data.

Operations that change SPARQL graph data generally only generate a single change record per operation. However, operations that change Gremlin graph data can generate multiple change records per operation, as in the following examples:

  • INSERT – A Gremlin vertex can have multiple labels, and a Gremlin element can have multiple properties. A separate change record is generated for each label and property when an element is inserted.

  • UPDATE – When a Gremlin element property is changed, two change records are generated: the first for removing the previous value, and the second for inserting the new value.

  • DELETE – A separate change record is generated for each element property that is deleted. For example, when a Gremlin edge with properties is deleted, one change record is generated for each of the properties, and after that, one is generated for deletion of the edge label.

    When a Gremlin vertex is deleted, all the incoming and outgoing edge properties are deleted first, then the edge labels, then the vertex properties, and finally the vertex labels. Each of these deletions generates a change record.

Neptune Streams API Response Format

A response to a Neptune Streams REST API request has the following fields:

  • lastEventId – Sequence identifier of the last change in the stream response. An event ID is composed of two fields: A commitNum identifies a transaction that changed the graph, and an opNum identifies a specific operation within that transaction. This is shown in the following example.

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp – The time at which the commit for the transaction was requested, in milliseconds from the Unix epoch.

  • format – Serialization format for the change records being returned. The possible values are GREMLIN_JSON for Gremlin change records and NQUADS for SPARQL change records.

  • records – An array of serialized change-log stream records included in the response.

  • totalRecords – The total number of records in the response.

For example, the following response returns Gremlin change data.

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1558942160603, "format": "GREMLIN_JSON", "records": [ { "eventId": { "commitNum": 12, "opNum": 1 }, "data": { "id": "v1", "type": "vl", "key": "label", "value": { "value": "person", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

The following response returns SPARQL change data.

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD" } ], "totalRecords": 1 }

The serialization formats for the data section of each record are described in more detail in the next section, Serialization Formats in Neptune Streams.

Neptune Streams API Exceptions

The following table describes Neptune Streams exceptions.

Error Code HTTP Code OK to Retry? Message

InvalidParameterException

400

No

An invalid or out-of-range value was supplied as an input parameter.

ExpiredStreamException

400

No

All of the requested records exceed the maximum age allowed and have expired.

ThrottlingException

500

Yes

Rate of requests exceeds the maximum throughput.

StreamRecordsNotFoundException

404

No

The requested resource could not be found. The stream may not be specified correctly.

MemoryLimitExceededException

500

Yes

The request processing did not succeed due to lack of memory, but can be retried when the server is less busy.