Utilisation de Neptune Streams - Amazon Neptune

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Neptune Streams

La fonction Neptune Streams vous permet de générer une séquence complète d'entrées de journal des modifications qui enregistrent chaque modification apportée à vos données de graphe au fur et à mesure de leur application. Pour obtenir une présentation de cette fonction, veuillez consulter Capture des changements de graphiques en temps réel à l'aide de flux Neptune.

Activation de Neptune Streams

Vous pouvez activer ou désactiver Neptune Streams à tout moment en définissant laneptune_streamsParamètre DB Cluster. Si le paramètre est défini sur 1, Streams est activé, et s'il est défini sur 0, Streams est désactivé.

Note

Actuellement, Neptune Streams est une fonction expérimentale que vous activez ou désactivez en mode Lab à l'aide du cluster de base de données.neptune_lab_modeparamètre (voirMode Neptune). L'utilisation du mode Lab pour activer Streams est désormais obsolète et sera désactivée à l'avenir.

Vous pouvez définir leneptune_streams_expiry_daysParamètre de cluster de base de données pour contrôler le nombre de jours, de 1 à 90, pendant lesquels les enregistrements de flux restent sur le serveur avant d'être supprimés. La valeur par défaut est 7.

Désactivation de Neptune Streams

Vous pouvez désactiver Neptune Streams à tout moment.

Pour désactiver Streams, mettez à jour le groupe de paramètres de cluster de base de données afin que la valeur du paramètre neptune_streams soit définie sur 0.

Important

Dès que Streams est désactivé, vous ne pouvez plus accéder aux données du journal des modifications. Veillez à lire les informations qui vous intéressent avant de désactiver Streams.

Appel de l'API REST Neptune Streams

Vous accédez à Neptune Streams à l'aide d'une API REST qui envoie une demande HTTP GET à l'un des points de terminaison locaux suivants :

  • Pour une base de données de graphe SPARQL :   https://Neptune-DNS:8182/sparql/stream.

  • Pour une base de données de graphe Gremlin ou OpenCypher :https://Neptune-DNS:8182/propertygraph/streamouhttps://Neptune-DNS:8182/pg/stream.

Note

À compter duversion de moteur 1.1.0.0, le point final du cours d'eau Gremlin (https://Neptune-DNS:8182/gremlin/stream) est obsolète, ainsi que son format de sortie associé (GREMLIN_JSON). Il est toujours pris en charge pour des raisons de compatibilité descendante mais pourrait être supprimé dans les future versions.

Seule une opération HTTP GET est autorisée.

Neptune prend en chargegzipcompression de la réponse, à condition que la demande HTTP inclue unAccept-Encodingen-tête qui spécifiegzipen tant que format de compression accepté (c'est-à-dire"Accept-Encoding: gzip").

Paramètres

  • limit— long, facultatif. Plage : 1 à 100 000. Par défaut: 10.

    Spécifie le nombre maximal d'enregistrements à renvoyer. Il existe également une limite de taille de 10 Mo pour la réponse qui ne peut pas être modifiée et qui est prioritaire sur le nombre d'enregistrements spécifié dans le paramètre limit. La réponse inclut un enregistrement de dépassement de seuil si la limite de 10 Mo a été atteinte.

  • iteratorType— Chaîne, facultatif.

    Ce paramètre peut avoir l'une des valeurs suivantes :

    • AT_SEQUENCE_NUMBER(valeur par défaut) — Indique que la lecture doit commencer à partir du numéro de séquence d'événement spécifié conjointement par lecommitNumetopNumparamètres.

    • AFTER_SEQUENCE_NUMBER— Indique que la lecture doit commencer juste après le numéro de séquence d'événement spécifié conjointement par lecommitNumetopNumparamètres.

    • TRIM_HORIZON— Indique que la lecture doit commencer au niveau du dernier enregistrement non tronqué du système, qui est le plus ancien enregistrement non expiré (pas encore supprimé) du flux de journaux des modifications. Ce mode est utile lors du démarrage de l'application, lorsque vous n'avez pas de numéro de séquence d'événement de démarrage spécifique.

    • LATEST— Indique que la lecture doit commencer à partir de l'enregistrement le plus récent du système, qui est le dernier enregistrement non expiré (pas encore supprimé) du flux de journaux des modifications. Cela est utile lorsqu'il est nécessaire de lire des enregistrements à partir du haut des flux actuels afin de ne pas traiter les enregistrements plus anciens, par exemple lors d'une reprise après sinistre ou d'une mise à niveau sans temps d'arrêt. Notez que dans ce mode, il n'y a qu'un seul enregistrement renvoyé.

  • commitNum— long, requis lorsque iteratorType estAT_SEQUENCE_NUMBERouAFTER_SEQUENCE_NUMBER.

    Numéro de validation de l'enregistrement de départ à lire à partir du flux du journal des modifications.

    Ce paramètre est ignoré lorsqueiteratorTypeestTRIM_HORIZONouLATEST.

  • opNumlong, facultatif (la valeur par défaut est1).

    Numéro de séquence d'opération au sein de la validation spécifiée à partir duquel commencer la lecture dans les données du flux du journal des modifications.

Les opérations qui modifient les données de graphe SPARQL génèrent généralement un seul enregistrement de modification par opération. Cependant, les opérations qui modifient les données de graphe Gremlin peuvent générer plusieurs enregistrements de modification par opération, comme dans les exemples suivants :

  • INSERT— Un sommet Gremlin peut avoir plusieurs étiquettes et un élément Gremlin peut avoir plusieurs propriétés. Un enregistrement de modification distinct est généré pour chaque étiquette et propriété lorsqu'un élément est inséré.

  • UPDATE— Lorsqu'une propriété d'élément Gremlin est modifiée, deux enregistrements de modification sont générés : le premier pour supprimer la valeur précédente et le second pour insérer la nouvelle valeur.

  • DELETE— Un enregistrement de modification distinct est généré pour chaque propriété d'élément supprimée. Par exemple, lorsqu'un arc Gremlin avec des propriétés est supprimé, un enregistrement de modification est généré pour chacune des propriétés, puis un autre enregistrement est généré pour la suppression de l'étiquette d'arc.

    Lorsqu'un sommet Gremlin est supprimé, toutes les propriétés d'arc entrant et sortant sont supprimées en premier, puis viennent les étiquettes d'arc, les propriétés de sommet et enfin les étiquettes de sommet. Chacune de ces suppressions génère un enregistrement de modification.

Format de réponse de l'API Neptune Streams

Une réponse à une demande d'API REST Neptune Streams comporte les champs suivants :

  • lastEventId— Identifiant de séquence de la dernière modification dans la réponse du flux. Un identifiant d'événement est composé de deux champs : UNcommitNumidentifie une transaction qui a modifié le graphique, et unopNumidentifie une opération spécifique au sein de cette transaction. Voici un exemple :

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp— Heure à laquelle la validation de la transaction a été demandée, en millisecondes, à partir de l'Unix Epoch.

  • format— Format de sérialisation pour les enregistrements de modification renvoyés. Les valeurs possibles sontGREMLIN_JSONpour les enregistrements de modifications Gremlin, etNQUADSpour les enregistrements de modification SPARQL.

  • records— Tableau des enregistrements du flux de journaux des modifications sérialisés inclus dans la réponse. Chaque enregistrement de larecordsarray contient les champs suivants :

    • commitTimestamp— Heure à laquelle la validation de la transaction a été demandée, en millisecondes, à partir de l'Unix Epoch.

    • eventId— L'identifiant de séquence de l'enregistrement de modification de flux.

    • data— Le Gremlin, SPARQL ou OpenCypher enregistrement de changement. Les formats de sérialisation de chaque enregistrement sont décrits plus en détail dans la section suivante,Formats de sérialisation dans Neptune Streams.

    • op— L'opération qui a créé le changement.

    • isLastOp— Présent uniquement si cette opération est la dernière de sa transaction. Lorsqu'il est présent, il est défini surtrue. Utile pour s'assurer que l'intégralité d'une transaction est consommée.

  • totalRecords— Nombre total d'enregistrements dans la réponse.

Par exemple, la réponse suivante renvoie des données de modification Gremlin, pour une transaction contenant plusieurs opérations :

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "GREMLIN_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

La réponse suivante renvoie les données de modification SPARQL pour la dernière opération d'une transaction (l'opération identifiée parEventId(97, 1)dans la transaction numéro 97).

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp", true } ], "totalRecords": 1 }

Exceptions de l'API Neptune Streams

Le tableau suivant décrit les exceptions Neptune Streams.

Code d'erreur Code HTTP OK pour réessayer ? Message

InvalidParameterException

400

Non

Un ou out-of-range valeur a été fournie en tant que paramètre d'entrée.

ExpiredStreamException

400

Non

Tous les enregistrements demandés dépassent l'âge maximum autorisé et ont expiré.

ThrottlingException

500

Oui

La fréquence des demandes dépasse le débit maximum.

StreamRecordsNotFoundException

404

Non

La ressource demandée est introuvable. Le flux n'est peut-être pas spécifié correctement.

MemoryLimitExceededException

500

Oui

Le traitement de la demande a échoué en raison d'un manque de mémoire, mais pourra être réessayé lorsque le serveur sera moins occupé.