使用 Neptune 串流 - Amazon Neptune

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用 Neptune 串流

使用 Neptune 串流功能,您可以產生一系列完整的變更日誌項目,記錄對圖形資料所做的每項變更。如需此功能的概觀,請參閱 使用 Neptune 串流即時擷取圖形變更

啟用 Neptune 串流

您可以隨時設定 neptune_streams 資料庫叢集參數,以啟用或停用 Neptune 串流。將參數設為 1 可啟用串流,設為 0 則可停用串流。

注意

在變更 neptune_streams 資料庫叢集參數之後,您必須重新啟動叢集中的所有資料庫執行個體,變更才會生效。

您可以設定 neptune_streams_expiry_days 資料庫叢集參數,以控制串流記錄在刪除之前保留在伺服器上的天數 (從 1 到 90)。預設值為 7。

Neptune Streams 最初是以實驗性功能形式引入,您可以使用資料庫叢集 neptune_lab_mode 參數在實驗室模式下啟用或停用該功能 (請參閱 Neptune 實驗室模式)。使用實驗室模式啟用串流現在已遭取代,且未來將停用。

停用 Neptune 串流

您可以隨時關閉執行中的 Neptune 串流。

若要關閉串流,請更新資料庫叢集參數群組,將 neptune_streams 參數的值設為 0。

重要

一旦串流關閉,您再也無法存取變更日誌資料。在關閉串流之前,請務必閱讀您感興趣的內容。

呼叫 Neptune 串流 REST API

您可以使用將HTTPGET要求傳送至下列其中一個本機端點的方式來存取 Neptune 串流:RESTAPI

  • 對於SPARQL圖形 DB:https://Neptune-DNS:8182/sparql/stream

  • 對於一個小精靈或 openCypher 圖 DB:https://Neptune-DNS:8182/propertygraph/stream或。https://Neptune-DNS:8182/pg/stream

注意

引擎 1.1.0.0 版開始,Gramlin 串流端點 (https://Neptune-DNS:8182/gremlin/stream) 正被棄用,以及其相關聯的輸出格式 (GREMLIN_JSON) 也一樣。基於回溯相容性仍支援它,但可能會在未來的版本中將其移除。

只允許一項HTTPGET作業。

Neptune 支援回應gzip壓縮,前提是HTTP要求包含指定gzip為可接受的壓縮格式 (也就是"Accept-Encoding: gzip") 的Accept-Encoding標頭。

參數
  • limit – 長整數 (選用)。範圍:1–100,000。預設值:10

    指定要傳回的記錄數上限。回應也有無法修改的 10 MB 大小限制,而且會優先於 limit 參數中指定的記錄數量。如果達到 10 MB 限制,回應會包含違反閾值的記錄。

  • iteratorType – 字串 (選用)。

    此參數可採用下列其中一個值:

    • AT_SEQUENCE_NUMBER (預設值) – 指出讀取應該從 commitNumopNum 參數共同指定的事件序號開始。

    • AFTER_SEQUENCE_NUMBER – 指出讀取應該在 commitNumopNum 參數共同指定的事件序號之後立即開始。

    • TRIM_HORIZON – 指出讀取應該從系統中的最後一個未修整記錄開始,這是變更日誌串流中最舊的未過期 (尚未刪除) 記錄。當您沒有特定的開始事件序號時,此模式在應用程式啟動期間很有用。

    • LATEST – 指出讀取應該從系統中的最新記錄開始,這是變更日誌串流中最新的未過期 (尚未刪除) 記錄。當需要從串流的目前頂端讀取記錄,以免處理較舊的記錄 (例如在災難復原或零停機時間升級期間) 時,這會很有用。請注意,在此模式下,最多只會傳回一筆記錄。

  • commitNum— 長, iteratorType 為AT_SEQUENCE_NUMBER或時需要AFTER_SEQUENCE_NUMBER

    要從 change-log 串流讀取之開始記錄的遞交編號。

    iteratorTypeTRIM_HORIZONLATEST 時會忽略此參數。

  • opNum – 長整數,選用 (預設為 1)。

    所指定遞交內的操作序號,即在變更日誌串流資料中開始讀取之處。

變更SPARQL圖形資料的作業通常只會在每個作業產生一個變更記錄。不過,變更 Gremlin 圖形資料的操作可能會為每個操作產生多個變更記錄,如以下範例所示:

  • INSERT – Gremlin 頂點可以有多個標籤,而且 Gremlin 元素可以有多個屬性。插入元素時,會為每個標籤和屬性產生個別的變更記錄。

  • UPDATE – 變更 Gremlin 元素屬性時,會產生兩個變更記錄:第一個用於移除先前的值,第二個用於插入新值。

  • DELETE – 系統會針對刪除的每個元素屬性產生個別的變更記錄。例如,刪除具有屬性的 Gremlin 邊緣時,每個屬性都會產生一個變更記錄,之後會產生一個用於刪除邊緣標籤的變更記錄。

    刪除 Gremlin 頂點時,會先刪除所有傳入和傳出邊緣屬性,接著依序刪除邊緣標籤、頂點屬性,最後再刪除頂點標籤。其中每個刪除都會產生一筆變更記錄。

Neptune 流API響應格式

對 Neptune 串流RESTAPI要求的回應具有下列欄位:

  • lastEventId – 串流回應中上次變更的序列識別符。事件 ID 由兩個欄位組成:commitNum 識別已變更圖形的交易,以及 opNum 識別該交易內的特定操作。如以下範例所示。

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp – 已請求遞交交易的時間,以毫秒為單位,從 Unix epoch 開始。

  • format – 要傳回之變更記錄的序列化格式。可能的值是PG_JSON針對 Gemlin 或 openCypher 變更記錄,以及變SPARQL更NQUADS記錄。

  • records – 回應中包含之序列化變更日誌串流記錄的陣列。records 陣列中的每筆記錄都包含下列欄位:

    • commitTimestamp – 已請求遞交交易的時間,以毫秒為單位,從 Unix epoch 開始。

    • eventId – 串流變更記錄的序列識別符。

    • data— 序列化的小鬼,, 或 OpenCypher 更改記SPARQL錄. 下節 (Neptune 串流中的序列化格式) 會詳細描述每筆記錄的序列化格式。

    • op – 已建立變更的操作。

    • isLastOp – 僅當此操作是其交易中的最後一個操作時才存在。存在時,其會設定為 true。有助於確保取用整個交易。

  • totalRecords – 回應中的記錄總數。

例如,下列回應會針對包含多項操作的交易傳回 Gremlin 變更資料:

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

下列回應會傳回交易中最後一個作業的SPARQL變更資料 (在交易編號 97 EventId(97, 1) 中所識別的作業)。

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }

Neptune 流API異常

下表描述 Neptune 串流例外狀況。

錯誤程式碼 HTTP代碼 OK to Retry? (確定要重試嗎?) 訊息

InvalidParameterException

400

提供了無效的或 out-of-range 值作為輸入參數。

ExpiredStreamException

400

所有請求的記錄都超過允許的最大存留期且已過期。

ThrottlingException

500

請求速度超出傳輸量上限。

StreamRecordsNotFoundException

404

找不到請求的資源。可能無法正確指定串流。

MemoryLimitExceededException

500

由於缺乏記憶體,請求處理未成功,但可以在伺服器較不忙碌時重試。