Utilisation de Neptune Streams - Amazon Neptune

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Utilisation de Neptune Streams

La fonctionnalité Neptune Streams vous permet de générer une séquence complète d'entrées de journal des modifications qui enregistrent chaque modification apportée aux données de votre graphe au fur et à mesure qu'elles sont appliquées. Pour obtenir une présentation de cette fonction, veuillez consulter Capture des modifications de graphe en temps réel à l'aide des flux Neptune.

Activation de Neptune Streams

Vous pouvez activer ou désactiver Neptune Streams à tout moment en définissant le paramètre de cluster de bases de données neptune_streams. Si le paramètre est défini sur 1, Streams est activé, et s'il est défini sur 0, Streams est désactivé.

Note

Après avoir modifié le paramètre neptune_streams du cluster de bases de données, vous devez redémarrer toutes les instances de base de données du cluster pour que la modification soit effective.

Vous pouvez définir le paramètre de cluster de bases de données neptune_streams_expiry_days pour contrôler le nombre de jours (compris entre 1 et 90) pendant lesquels les enregistrements de flux resteront sur le serveur avant d'être supprimés. La valeur par défaut est 7.

Initialement, Neptune Streams a été ajouté en tant que fonctionnalité expérimentale que vous activiez ou désactiviez en mode laboratoire à l'aide du paramètre neptune_lab_mode du cluster de bases de données (voir Mode expérimental Neptune). L'utilisation du mode Lab pour activer Streams est désormais obsolète et sera désactivée à l'avenir.

Désactivation de Neptune Streams

Vous pouvez désactiver Neptune Streams à tout moment.

Pour désactiver Streams, mettez à jour le groupe de paramètres de cluster de bases de données afin que la valeur du paramètre neptune_streams soit définie sur 0.

Important

Dès que Streams est désactivé, vous ne pouvez plus accéder aux données du journal des modifications. Veillez à lire les informations qui vous intéressent avant de désactiver Streams.

Appel de l'API REST Neptune Streams

Vous accédez à Neptune Streams à l'aide d'une API REST qui envoie une demande HTTP GET à l'un des points de terminaison locaux suivants :

  • Pour une base de données de graphe SPARQL :   https://Neptune-DNS:8182/sparql/stream.

  • Pour une base de données orientée graphe Gremlin ou openCypher : https://Neptune-DNS:8182/propertygraph/stream ou https://Neptune-DNS:8182/pg/stream.

Note

Depuis la version 1.1.0.0 du moteur, le point de terminaison du flux Gremlin (https://Neptune-DNS:8182/gremlin/stream) est obsolète, ainsi que son format de sortie associé (GREMLIN_JSON). Il reste pris en charge pour des raisons de rétrocompatibilité, mais il pourrait être supprimé dans les futures versions.

Seule une opération HTTP GET est autorisée.

Neptune prend en charge la compression gzip de la réponse, à condition que la demande HTTP inclue un en-tête Accept-Encoding spécifiant gzip comme format de compression accepté (c'est-à-dire, "Accept-Encoding: gzip").

Paramètres
  • limit : long, facultatif. Plage : de 1 à 100 000. Par défaut : 10

    Spécifie le nombre maximal d'enregistrements à renvoyer. Il existe également une limite de taille de 10 Mo pour la réponse qui ne peut pas être modifiée et qui est prioritaire sur le nombre d'enregistrements spécifié dans le paramètre limit. La réponse inclut un enregistrement de dépassement de seuil si la limite de 10 Mo a été atteinte.

  • iteratorType : chaîne, facultatif.

    Ce paramètre peut avoir l'une des valeurs suivantes :

    • AT_SEQUENCE_NUMBER (valeur par défaut) : indique que la lecture doit commencer à partir du numéro de séquence d'événement spécifié conjointement par les paramètres commitNum et opNum.

    • AFTER_SEQUENCE_NUMBER : indique que la lecture doit commencer juste après le numéro de séquence d'événement spécifié conjointement par les paramètres commitNum et opNum.

    • TRIM_HORIZON : indique que la lecture doit commencer au niveau du dernier enregistrement non tronqué du système, qui est le plus ancien enregistrement n'ayant pas expiré (pas encore supprimé) dans le flux de journaux des modifications. Ce mode est utile lors du démarrage de l'application, lorsque vous n'avez pas de numéro de séquence d'événement de démarrage spécifique.

    • LATEST : indique que la lecture doit commencer au niveau de l'enregistrement le plus récent dans le système, qui est le dernier enregistrement n'ayant pas expiré (pas encore supprimé) dans le flux de journaux des modifications. Cela est utile lorsqu'il est nécessaire de lire les enregistrements à partir du haut actuel des flux afin de ne pas traiter les anciens enregistrements, par exemple lors d'une reprise après sinistre ou d'une mise à niveau sans interruption de service. Notez que dans ce mode, un seul enregistrement est renvoyé au maximum.

  • commitNum : long, requis lorsque iteratorType est AT_SEQUENCE_NUMBER ou AFTER_SEQUENCE_NUMBER.

    Numéro de validation de l'enregistrement de départ à lire à partir du flux du journal des modifications.

    Ce paramètre est ignoré quand iteratorType a la valeur TRIM_HORIZON ou LATEST.

  • opNum : long, facultatif (la valeur par défaut est 1).

    Numéro de séquence d'opération au sein de la validation spécifiée à partir duquel commencer la lecture dans les données du flux du journal des modifications.

Les opérations qui modifient les données de graphe SPARQL génèrent généralement un seul enregistrement de modification par opération. Cependant, les opérations qui modifient les données de graphe Gremlin peuvent générer plusieurs enregistrements de modification par opération, comme dans les exemples suivants :

  • INSERT : un sommet Gremlin peut avoir plusieurs étiquettes, et un élément Gremlin peut avoir plusieurs propriétés. Un enregistrement de modification distinct est généré pour chaque étiquette et propriété lorsqu'un élément est inséré.

  • UPDATE : lorsqu'une propriété d'élément Gremlin est modifiée, deux enregistrements de modification sont générés : le premier pour supprimer la valeur précédente et le second pour insérer la nouvelle valeur.

  • DELETE : un enregistrement de modification distinct est généré pour chaque propriété d'élément supprimée. Par exemple, lorsqu'un arc Gremlin avec des propriétés est supprimé, un enregistrement de modification est généré pour chacune des propriétés, puis un autre enregistrement est généré pour la suppression de l'étiquette d'arc.

    Lorsqu'un sommet Gremlin est supprimé, toutes les propriétés d'arc entrant et sortant sont supprimées en premier, puis viennent les étiquettes d'arc, les propriétés de sommet et enfin les étiquettes de sommet. Chacune de ces suppressions génère un enregistrement de modification.

Format de réponse de l'API Neptune Streams

Une réponse à une demande d'API REST Neptune Streams comporte les champs suivants :

  • lastEventId : identifiant de séquence de la dernière modification dans la réponse du flux. Un ID d'événement se compose de deux champs : un commitNum qui identifie une transaction ayant modifié le graphe et un opNum qui identifie une opération spécifique au sein de cette transaction. Voici un exemple :

    "eventId": { "commitNum": 12, "opNum": 1 }
  • lastTrxTimestamp : heure à laquelle la validation de la transaction a été demandée, en millisecondes, à partir de l'Unix Epoch.

  • format : format de sérialisation pour les enregistrements de modification renvoyés. Les valeurs possibles sont PG_JSON pour les enregistrements de modification Gremlin ou openCypher et NQUADS pour les enregistrements de modification SPARQL.

  • records : tableau des enregistrements sérialisés du flux de journaux de modifications inclus dans la réponse. Chaque enregistrement du tableau records contient les champs suivants :

    • commitTimestamp : heure à laquelle la validation de la transaction a été demandée, en millisecondes, à partir de l'Unix Epoch.

    • eventId : identifiant de séquence de l'enregistrement de dernière modification du flux.

    • data— L'enregistrement sérialisé de Gremlin, SPARQL ou OpenCypher de modification. Les formats de sérialisation de chaque enregistrement sont décrits plus en détail dans la section suivante, Formats de sérialisation dans Neptune Streams.

    • op : opération à l'origine de la modification.

    • isLastOp : présent uniquement si cette opération est la dernière dans sa transaction. Lorsqu'il est présent, il est défini sur true. Utile pour s'assurer qu'une transaction est consommée dans son intégralité.

  • totalRecords : nombre total d'enregistrements dans la réponse.

Par exemple, la réponse suivante renvoie les données de modification Gremlin pour une transaction contenant plusieurs opérations :

{ "lastEventId": { "commitNum": 12, "opNum": 1 }, "lastTrxTimestamp": 1560011610678, "format": "PG_JSON", "records": [ { "commitTimestamp": 1560011610678, "eventId": { "commitNum": 1, "opNum": 1 }, "data": { "id": "d2b59bf8-0d0f-218b-f68b-2aa7b0b1904a", "type": "vl", "key": "label", "value": { "value": "vertex", "dataType": "String" } }, "op": "ADD" } ], "totalRecords": 1 }

La réponse suivante renvoie les données de modification SPARQL pour la dernière opération d'une transaction (opération identifiée par EventId(97, 1) dans le numéro de transaction 97).

{ "lastEventId": { "commitNum": 97, "opNum": 1 }, "lastTrxTimestamp": 1561489355102, "format": "NQUADS", "records": [ { "commitTimestamp": 1561489355102, "eventId": { "commitNum": 97, "opNum": 1 }, "data": { "stmt": "<https://test.com/s> <https://test.com/p> <https://test.com/o> .\n" }, "op": "ADD", "isLastOp": true } ], "totalRecords": 1 }

Exceptions de l'API Neptune Streams

Le tableau suivant décrit les exceptions Neptune Streams.

Code d’erreur Code HTTP OK pour réessayer ? Message

InvalidParameterException

400

Non

Une valeur non valide ou une out-of-range valeur a été fournie en tant que paramètre d'entrée.

ExpiredStreamException

400

Non

Tous les enregistrements demandés dépassent l'âge maximum autorisé et ont expiré.

ThrottlingException

500

Oui

La fréquence des demandes dépasse le débit maximum.

StreamRecordsNotFoundException

404

Non

La ressource demandée est introuvable. Le flux n'est peut-être pas spécifié correctement.

MemoryLimitExceededException

500

Oui

Le traitement de la demande a échoué en raison d'un manque de mémoire, mais pourra être réessayé lorsque le serveur sera moins occupé.