Neptune Streams の使用 - Amazon Neptune

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Neptune Streams の使用

Neptune Streams 機能を使用すると、グラフデータに加えられたすべての変更を記録する、変更ログエントリの完全なシーケンスを生成できます。この機能の概要については、「Neptune ストリームを使用してグラフの変更をリアルタイムでキャプチャする」を参照してください。

Neptune Streams の有効化

neptune_streams DB クラスターパラメータを設定することで、いつでも Neptune Streams を有効または無効にできます。パラメータを 1 に設定すると Streams が有効になり、0 に設定すると Streams が無効になります。

注記

現在、Neptune Streams は、ラボモードで DB クラスター neptune_lab_mode パラメータを使用して有効または無効にする実験機能です (Neptune ラボモード を参照)。現在、ラボモードを使用した Streams の有効化は非推奨で、将来無効化される予定です。

次の設定を行うことができます。neptune_streams_expiry_daysDB クラスターパラメーターを使用して、ストリームレコードが削除されるまでにサーバー上に残る日数 (1 ~ 90 日) を制御します。デフォルトは 7 です。

Neptune Streams の無効化

Neptune Streams は、実行中であればいつでも無効にできます。

Streams をオフにするには、neptune_streams パラメータの値が 0 に設定されるように DB クラスターパラメータグループを更新します。

重要

Streams がオフになるとすぐに、変更ログデータにアクセスできなくなります。Streams をオフにするに、関心のある内容を必ずお読みください。

Neptune Streams REST API の呼び出し

Neptune Streams にアクセスするには、次のいずれかのローカルエンドポイントに HTTP GET リクエストを送信する REST API を使用します。

  • SPARQL グラフ DB の場合: https://Neptune-DNS:8182/sparql/stream

  • Gremlin または OpenCypher グラフ DB の場合:https://Neptune-DNS:8182/propertygraph/streamまたはhttps://Neptune-DNS:8182/pg/stream

注記

エンジンリリース 1.1.0.0 現在、Gremlin ストリームエンドポイント (https://Neptune-DNS:8182/gremlin/stream) は、関連する出力形式 (GREMLIN_JSON) とともに非推奨です。下位互換性のために引き続きサポートされていますが、将来のリリースで削除される可能性があります。

HTTP GET オペレーションのみが許可されます。

Neptune は、レスポンスの gzip 圧縮をサポートします。ただし、HTTP リクエストに、受け入れられた圧縮形式として gzip を指定する Accept-Encoding ヘッダーが含まれていることが条件です (つまり、"Accept-Encoding: gzip")。

パラメータ

  • limit - long、オプション。範囲: 1 ~ 100,000。デフォルト: 10.

    返すレコードの最大数を指定します。また、レスポンスのサイズ制限は 10 MB であり、これは変更できず、limit パラメータで指定されたレコード数よりも優先されます。10 MB の制限に達した場合、レスポンスにはしきい値超過レコードが含まれます。

  • iteratorType - 文字列、オプション。

    このパラメータには以下の値のいずれかがあります。

    • AT_SEQUENCE_NUMBER (デフォルト) - commitNum および opNum パラメータで一緒に指定されたイベントシーケンス番号から読み取りを開始することを示します。

    • AFTER_SEQUENCE_NUMBER - commitNum および opNum パラメータで一緒に指定されたイベントシーケンス番号の直後に読み取りが開始されることを示します。

    • TRIM_HORIZON - 読み取りは、システム内の最後のトリミングされていないレコードから開始することを示します。これは、変更ログストリームで最も古い (まだ削除されていない) レコードであることを示しています。このモードは、特定の開始イベントシーケンス番号がないアプリケーションの起動時に便利です。

    • LATEST - 読み取りは、システム内の最新のレコードから開始することを示します。これは、変更ログストリームで最近の (まだ削除されていない) レコードであることを示しています。これは、災害対策時やダウンタイムゼロのアップグレード時など、古いレコードを処理しないように、ストリームの現在の上位からレコードを読み取る必要がある場合に便利です。このモードでは、返されるレコードは最大 1 つだけであることに注意してください。

  • commitNum - long、iteratorType が AT_SEQUENCE_NUMBER または AFTER_SEQUENCE_NUMBER のときは必須。

    変更ログストリームから読み取る開始レコードのコミット番号。

    iteratorTypeTRIM_HORIZON または LATEST の場合、このパラメータは無視されます。

  • opNum— long、オプション (デフォルトは1).

    変更ログストリームデータからの読み取りを開始するための、指定されたコミット内のオペレーションシーケンス番号。

通常、SPARQL グラフデータを変更するオペレーションでは、オペレーションごとに 1 つの変更レコードしか生成されません。ただし、Gremlin グラフデータを変更するオペレーションでは、次の例のように、オペレーションごとに複数の変更レコードを生成できます。

  • INSERT - Gremlin 頂点は複数のラベルを持つことができ、Gremlin 要素は複数のプロパティを持つことができます。要素が挿入されると、ラベルとプロパティごとに個別の変更レコードが生成されます。

  • UPDATE - Gremlin 要素プロパティが変更されると、2 つの変更レコードが生成されます。1 つ目は前の値の削除で、2 つ目は新しい値の挿入です。

  • DELETE - 削除される要素プロパティごとに個別の変更レコードが生成されます。たとえば、プロパティを持つ Gremlin エッジが削除されると、プロパティごとに 1 つの変更レコードが生成されます。その後、エッジラベルの削除用に 1 つの変更レコードが生成されます。

    Gremlin 頂点が削除されると、すべての受信エッジプロパティと送信エッジプロパティが最初に削除され、次にエッジラベル、頂点プロパティ、最後に頂点ラベルが削除されます。これらの削除はそれぞれ、変更レコードを生成します。

Neptune Streams API レスポンスの形式

Neptune Streams REST API リクエストに対するレスポンスには、以下のフィールドがあります。

  • lastEventId - ストリームレスポンスの最後の変更のシーケンス識別子。イベント ID は、次の 2 つのフィールドで構成されます。あるcommitNumグラフを変更したトランザクションを識別し、opNumは、そのトランザクション内の特定の操作を識別します。これは次の例で示されます。

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp - トランザクションのコミットがリクエストされた時間 (Unix エポックからのミリ秒単位)。

  • format - 返される変更レコードのシリアル化形式。指定できる値は、Gremlin 変更レコードの場合は GREMLIN_JSON、SPARQL 変更レコードの場合は NQUADS です。

  • records - レスポンスに含まれるシリアル化された変更ログストリームレコードの配列。records 配列内の各レコードには、次のフィールドが含まれます。

    • commitTimestamp - トランザクションのコミットがリクエストされた時間 (Unix エポックからのミリ秒単位)。

    • eventId - ストリームレスポンスの最後の変更のシーケンス識別子。

    • data— シリアル化された Gremlin、SPARQL、または OpenCypher 記録を変えろ 各レコードのシリアル化形式については、次のセクション Neptune ストリームのシリアル化形式 で詳しく説明します。

    • op — 変更を作成した操作。

    • isLastOp— このオペレーションがトランザクションの最後のオペレーションである場合にのみ存在します。存在する場合、に設定されますtrue。トランザクション全体が確実に消費されるようにする場合に便利です。

  • totalRecords - レスポンスのレコードの総数。

たとえば、次のレスポンスは、複数のオペレーションを含むトランザクションの Gremlin 変更データを返します。

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "GREMLIN_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

次のレスポンスは、トランザクションの最後の操作 (以下で識別される操作) の SPARQL 変更データを返します。EventId(97, 1)トランザクション番号97)。

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp", true } ], "totalRecords": 1 }

Neptune Streams API の例外

次の表では、Neptune Streams の例外について説明します。

エラーコード HTTP コード 再試行してもいいですか。 Message

InvalidParameterException

400

いいえ

無効または out-of-range value は、入力パラメータとして指定されました。

ExpiredStreamException

400

いいえ

リクエストされたすべてのレコードが許容される最大有効期間を超え、有効期限が切れています。

ThrottlingException

500

はい

リクエストの速度が、最大スループットを超えています。

StreamRecordsNotFoundException

404

いいえ

リクエストされたリソースが見つかりませんでした。ストリームが正しく指定されていない可能性があります。

MemoryLimitExceededException

500

はい

メモリ不足のため、リクエストの処理は成功しませんでしたが、サーバーがビジー状態でなくなったら再試行できます。