Exemples de fonctions AWS Lambda pour Amazon Neptune - Amazon Neptune

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemples de fonctions AWS Lambda pour Amazon Neptune

Les exemples de fonctions AWS Lambda suivants, écrits en Java, JavaScript et Python, illustrent l'upsert d'un seul sommet avec un ID généré de manière aléatoire à l'aide de l'idiome fold().coalesce().unfold().

Une grande partie du code de chaque fonction est du code standard, chargé de gérer les connexions et de retenter les connexions et les requêtes en cas d'erreur. La logique d'application réelle et la requête Gremlin sont implémentées dans les méthodes doQuery() et query() respectivement. Si vous utilisez ces exemples comme base pour vos propres fonctions Lambda, vous pouvez vous concentrer sur la modification de doQuery() et query().

Les fonctions sont configurées pour retenter les requêtes qui ont échoué cinq fois, en attendant une seconde entre les nouvelles tentatives.

Les fonctions nécessitent la présence de valeurs dans les variables d'environnement Lambda suivantes :

  • NEPTUNE_ENDPOINT : votre point de terminaison de cluster de bases de données Neptune. Pour Python, il devrait s'agir de neptuneEndpoint.

  • NEPTUNE_PORT : port Neptune. Pour Python, il devrait s'agir de neptunePort.

  • USE_IAM  : (true ou false) si l'authentification de base de données AWS Identity and Access Management (IAM) est activée dans la base de données, définissez la variable d'environnement USE_IAM sur true. De cette manière, la fonction Lambda signera les demandes de connexion à Neptune avec Sigv4. Pour de telles demandes d'authentification de base de données IAM, assurez-vous que le rôle d'exécution de la fonction Lambda est associé à une politique IAM appropriée qui permet à la fonction de se connecter à votre cluster de bases de données Neptune (voir Types de politique IAM).

Exemple de fonction Lambda Java pour Amazon Neptune

Voici quelques éléments à garder à l'esprit sur les fonctions AWS Lambda Java

  • Le pilote Java gère son propre pool de connexions, dont vous n'avez pas besoin. Configurez donc l'objet Cluster avec minConnectionPoolSize(1) et maxConnectionPoolSize(1).

  • La construction de l'objet Cluster peut être lente, car elle entraîne la création d'un ou de plusieurs sérialiseurs (Gyro par défaut, plus un autre si vous l'avez configuré pour des formats de sortie supplémentaires tels que binary). Leur instanciation peut prendre un certain temps.

  • Le pool de connexions est initialisé lors de la première demande. À ce stade, le pilote configure la pile Netty, alloue des tampons d'octets et crée une clé de signature si vous utilisez l'authentification de base de données IAM. Tout cela peut augmenter la latence du démarrage à froid.

  • Le pool de connexions du pilote Java surveille la disponibilité des hôtes du serveur et tente automatiquement de se reconnecter en cas d'échec de connexion. Il lance une tâche en arrière-plan pour tenter de rétablir la connexion. Utilisez reconnectInterval( ) pour configurer l'intervalle entre les tentatives de reconnexion. Pendant que le pilote tente de se reconnecter, la fonction Lambda peut simplement retenter la requête.

    Si l'intervalle entre les nouvelles tentatives est inférieur à l'intervalle entre les tentatives de reconnexion, les nouvelles tentatives en cas d'échec de connexion échoueront à nouveau, car l'hôte sera considéré comme indisponible. Cela ne s'applique pas aux nouvelles tentatives pour une exception ConcurrentModificationException.

  • Utilisez Java 8 plutôt que Java 11. Les optimisations Netty ne sont pas activées par défaut dans Java 11.

  • Cet exemple utilise Retry4j pour les nouvelles tentatives.

  • Pour utiliser le pilote de signature Sigv4 dans la fonction Lambda Java, consultez les exigences de dépendance dans Connexion à Neptune à l'aide de Java et Gremlin avec la signature Signature Version 4.

Avertissement

Le CallExecutor de Retry4j n'est peut-être pas thread-safe. Pensez à faire en sorte que chaque thread utilise sa propre instance CallExecutor.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

Si vous souhaitez inclure une logique de reconnexion dans votre fonction, consultez Exemple de reconnexion Java.

Exemple de fonction Lambda JavaScript pour Amazon Neptune

Remarques à propos de cet exemple
  • Le pilote JavaScript ne gère aucun pool de connexions. Il ouvre toujours une seule connexion.

  • Cet exemple de fonction a recours aux utilitaires de signature Sigv4 de gremlin-aws-sigv4 pour signer les demandes adressées à une base de données activée pour l'authentification IAM.

  • Il utilise la fonction retry( ) du module utilitaire asynchrone open source pour gérer les tentatives de backoff et de nouvelle tentative.

  • Les étapes terminales Gremlin renvoient un élément promise JavaScript (voir la documentation TinkerPop). Pour next(), il s'agit d'un tuple {value, done}.

  • Les erreurs de connexion sont signalées dans le gestionnaire et traitées à l'aide d'une logique de backoff et de nouvelle tentative conformément aux recommandations décrites ici, à une exception près. Il existe un type de problème de connexion que le pilote ne considère pas comme une exception, et qui ne peut donc pas être résolu par cette logique de backoff et de nouvelle tentative.

    Le problème est que si une connexion est fermée après qu'un pilote a envoyé une demande, mais avant que le pilote ne reçoive une réponse, la requête semble s'être terminée, mais renvoie une valeur nulle. En ce qui concerne le client de la fonction Lambda, celle-ci semble s'être terminée correctement, mais avec une réponse vide.

    L'impact de ce problème dépend de la manière dont votre application traite une réponse vide. Certaines applications peuvent traiter une réponse vide provenant d'une demande de lecture comme une erreur, tandis que d'autres peuvent la traiter à tort comme un résultat vide.

    Les demandes d'écriture qui rencontrent ce problème de connexion renvoient également une réponse vide. Une invocation réussie avec une réponse vide signifie-t-elle un succès ou un échec ? Si le client invoquant une fonction d'écriture considère simplement l'invocation réussie de la fonction comme signifiant que l'écriture dans la base de données a été validée, au lieu d'inspecter le corps de la réponse, le système peut sembler perdre des données.

    Ce problème est dû à la façon dont le pilote traite les événements émis par le socket sous-jacent. Lorsque le socket réseau sous-jacent est fermé avec une erreur ECONNRESET, le WebSocket utilisé par le pilote est fermé et émet un événement 'ws close'. Cependant, rien dans le pilote ne permet de gérer cet événement d'une manière qui pourrait être utilisée pour déclencher une exception. Par conséquent, la requête disparaît tout simplement.

    Pour contourner ce problème, l'exemple de fonction lambda ci-dessous ajoute un gestionnaire d'événements 'ws close' qui envoie une exception au pilote lors de la création d'une connexion à distance. Cette exception n'est toutefois pas déclenchée avec le chemin de requête-réponse de la requête Gremlin et ne peut donc pas être utilisée pour déclencher une logique de backoff et de nouvelle tentative au sein de la fonction lambda elle-même. Au lieu de cela, l'exception émise par le gestionnaire d'événements 'ws close' entraîne une exception non gérée qui provoque l'échec de l'invocation Lambda. Cela permet au client qui invoque la fonction de gérer l'erreur et de retenter l'invocation Lambda le cas échéant.

    Nous vous recommandons d'implémenter une logique de backoff et de nouvelle tentative dans la fonction Lambda elle-même afin de protéger vos clients des problèmes de connexion intermittents. Cependant, pour contourner le problème ci-dessus, le client doit également implémenter une logique de nouvelle tentative afin de gérer les échecs résultant de ce problème de connexion particulier.

Code Javascript

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Exemple de fonction Lambda Python pour Amazon Neptune

Voici quelques points à noter à propos de l'exemple de fonction AWS Lambda Python suivant :

  • Il utilise le module de backoff.

  • Il configure pool_size=1 pour éviter de créer un pool de connexions inutile.

  • Il définit message_serializer=serializer.GraphSONSerializersV2d0().

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

Voici des exemples de résultats, illustrant des périodes alternées de charge lourde et légère :

Schéma présentant des exemples de résultats issus de l'exemple de fonction Lambda Python.