AWS Lambdaexemples de fonction pour Amazon Neptune - Amazon Neptune

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

AWS Lambdaexemples de fonction pour Amazon Neptune

L'exemple suivantAWS Lambda, écrites en Java, JavaScript et Python, illustrent l'upserting d'un seul sommet avec un ID généré aléatoirement à l'aide de la commandefold().coalesce().unfold()idiome.

Une grande partie du code de chaque fonction est du code standard, chargé de gérer les connexions et de réessayer les connexions et les requêtes en cas d'erreur. La logique d'application réelle et la requête Gremlin sont implémentées dansdoQuery()etquery(), respectivement. Si vous utilisez ces exemples comme base pour vos propres fonctions Lambda, vous pouvez vous concentrer sur la modificationdoQuery()etquery().

Les fonctions sont configurées pour réessayer 5 fois les requêtes ayant échoué, en attendant 1 seconde entre les nouvelles tentatives.

Les fonctions nécessitent la présence de valeurs dans les variables d'environnement Lambda suivantes :

  • NEPTUNE_ENDPOINT— Point de terminaison de votre cluster de bases de données Neptune

  • NEPTUNE_PORT — Le port de Neptune.

  • USE_IAM — (trueoufalse) Si votre base de données contientAWS Identity and Access ManagementAuthentification de base de données (IAM), définissezUSE_IAMvariable d'environnementtrue. Cela amène la fonction Lambda à signer les demandes de connexion Sigv4-Neptune. Pour de telles demandes d'authentification de base de données IAM, assurez-vous que le rôle d'exécution de la fonction Lambda est associé à une stratégie IAM appropriée qui permet à la fonction de se connecter à votre cluster de bases de données Neptune (consultezTypes de stratégies IAM).

Exemple de fonction Java Lambda pour Amazon Neptune

Voici quelques informations à prendre en compte pour JavaAWS LambdaFonctions

  • Le pilote Java gère son propre pool de connexions, dont vous n'avez pas besoin. Configurez donc votreClusterl'objetminConnectionPoolSize(1)etmaxConnectionPoolSize(1).

  • LeClusterpeut être lent à construire car il crée un ou plusieurs sérialiseurs (Gyro par défaut, plus un autre si vous l'avez configuré pour des formats de sortie supplémentaires tels quebinary). L'instanciation de ces actions peut prendre un certain temps.

  • Le pool de connexions est initialisé avec la première demande. À ce stade, le conducteur configure leNettypile, alloue des tampons d'octets et crée une clé de signature si vous utilisez l'authentification IAM DB. Tout cela peut augmenter la latence de démarrage à froid.

  • Le pool de connexions du pilote Java surveille la disponibilité des hôtes du serveur et tente automatiquement de se reconnecter en cas d'échec de la connexion. Il lance une tâche en arrière-plan pour essayer de rétablir la connexion. UtiliserreconnectInterval( )pour configurer l'intervalle entre les tentatives de reconnexion. Pendant que le pilote tente de se reconnecter, votre fonction Lambda peut simplement réessayer la requête.

    Si l'intervalle entre les nouvelles tentatives est inférieur à l'intervalle entre les tentatives de reconnexion, les nouvelles tentatives sur une connexion échouée échouent à nouveau car l'hôte est considéré comme indisponible. Cela ne s'applique pas aux nouvelles tentatives pour unConcurrentModificationException.

  • Utilisez Java 8 plutôt que Java 11.Nettyles optimisations ne sont pas activées par défaut dans Java 11.

  • Cet exemple utiliseRéessayez 4Jpour les nouvelles tentatives.

  • Pour utiliser le pluginSigv4pilote de signature dans votre fonction Java Lambda, consultez les exigences de dépendance dansConnexion à Neptune à l'aide de Java et de Gremlin avec Signature Version 4 Signing.

Avertissement

LeCallExecutorde Retry4j peut ne pas être adapté aux threads. Pensez à ce que chaque thread utilise le sienCallExecutorinstance.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

Si vous souhaitez inclure une logique de reconnexion dans votre fonction, consultezExemple de reconnexion Java.

JavaScript Exemple de fonction Lambda pour Amazon Neptune

Remarques à propos de cet exemple

  • Le JavaScript le pilote ne gère pas de groupe de connexions. Il ouvre toujours une seule connexion.

  • La fonction d'exemple utilise les utilitaires de signature Sigv4 degremlin-aws-sigv4pour signer les demandes à une base de données compatible avec l'authentification IAM.

  • Il utilise leretry ()fonctionModule asynchroneà gérer backoff-and-retry Tentatives.

  • Les étapes du terminal Gremlin renvoient un JavaScript promise(voirTinkerPop documentation). Pournext(), il s'agit d'un{value, done}tuple.

  • Les erreurs de connexion sont signalées dans le gestionnaire et traitées à l'aide de certains backoff-and-retry logique conforme aux recommandations présentées ici, à une exception près. Il existe un type de problème de connexion que le conducteur ne considère pas comme une exception, et qui ne peut donc pas être réglé par cela. backoff-and-retry Logique.

    Le problème est que si une connexion est fermée après qu'un pilote a envoyé une demande mais avant que le pilote ne reçoive une réponse, la requête semble terminée mais renvoie une valeur nulle. En ce qui concerne le client de la fonction lambda, la fonction semble se terminer correctement, mais avec une réponse vide.

    L'impact de ce problème dépend de la manière dont votre application traite une réponse vide. Certaines applications peuvent traiter une réponse vide d'une demande de lecture comme une erreur, tandis que d'autres peuvent la traiter à tort comme un résultat vide.

    Les demandes d'écriture qui rencontrent ce problème de connexion renvoient également une réponse vide. Un appel réussi avec une réponse vide signifie-t-il un succès ou un échec ? Si le client qui appelle une fonction d'écriture considère simplement l'appel réussi de la fonction comme signifiant que l'écriture dans la base de données a été validée, au lieu d'inspecter le corps de la réponse, le système peut sembler perdre des données.

    Ce problème provient de la façon dont le pilote traite les événements émis par le socket sous-jacent. Lorsque le socket réseau sous-jacent est fermé avec unECONNRESETerror, le WebSocket utilisé par le pilote est fermé et émet un message'ws close'event. Cependant, rien dans le pilote ne permet de gérer cet événement d'une manière qui pourrait être utilisée pour déclencher une exception. Par conséquent, la requête disparaît tout simplement.

    Pour contourner ce problème, l'exemple de fonction lambda ajoute un'ws close'gestionnaire d'événements qui renvoie une exception au pilote lors de la création d'une connexion distante. Cette exception n'est cependant pas déclenchée le long du chemin requête-réponse de la requête Gremlin et ne peut donc pas être utilisée pour déclencher un backoff-and-retry logique au sein de la fonction lambda elle-même. Au lieu de cela, l'exception lancée par le'ws close'le gestionnaire d'événements génère une exception non gérée qui entraîne l'échec de l'appel lambda. Cela permet au client qui appelle la fonction de gérer l'erreur et de réessayer l'appel lambda le cas échéant.

    Nous vous recommandons de mettre en œuvre backoff-and-retry logique dans la fonction lambda elle-même pour protéger vos clients des problèmes de connexion intermittents. Toutefois, la solution de contournement pour le problème ci-dessus nécessite que le client implémente également une logique de nouvelle tentative, afin de gérer les échecs résultant de ce problème de connexion particulier.

Code JavaScript

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Exemple de fonction Lambda Python pour Amazon Neptune

Voici quelques informations à prendre en compte pour le PythonAWS LambdaExemple de fonction :

  • Il utilise leModule.

  • Ilpool_size=1pour éviter de créer un pool de connexions inutile.

  • Ilmessage_serializer=serializer.GraphSONSerializersV2d0().

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from tornado.websocket import WebSocketClosedError from tornado import httpclient from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [WebSocketClosedError, OSError] retriable_errors = [GremlinServerError] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return httpclient.HTTPRequest(database_url, headers=request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) print('error: [{}] {}'.format(type(e), err_msg)) print('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) print('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): return doQuery(event) def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): print('Creating remote connection') return DriverRemoteConnection( connection_string(), 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0()) def connection_string(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return database_url conn = create_remote_connection() g = create_graph_traversal_source(conn)

Voici des exemples de résultats :

Diagramme montrant des exemples de résultats de l'exemple de fonction Lambda Python.