Ejemplos de funciones de AWS Lambda para Amazon Neptune - Amazon Neptune

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Ejemplos de funciones de AWS Lambda para Amazon Neptune

Las siguientes funciones de AWS Lambda de ejemplo, escritas en Java, JavaScript y Python, muestran cómo llevar a cabo actualizaciones o inserciones en un único vértice con un identificador generado de forma aleatoria mediante la expresión fold().coalesce().unfold().

Gran parte del código de cada función es código reutilizable, responsable de administrar las conexiones y reintentar las conexiones y consultas si se produce un error. La lógica de la aplicación real y la consulta de Gremlin se implementan en los métodos doQuery() y query(), respectivamente. Si utiliza estos ejemplos como base para sus propias funciones de Lambda, puede centrarse en modificar doQuery() y query().

Las funciones se han configurado para reintentar las consultas erróneas cinco veces, esperando un segundo entre cada reintento.

Las funciones requieren que los valores estén presentes en las siguientes variables del entorno de Lambda:

  • NEPTUNE_ENDPOINT: el punto de conexión del clúster de base de datos de Neptune. Para Python, debería ser neptuneEndpoint.

  • NEPTUNE_PORT: el puerto de Neptune. Para Python, debería ser neptunePort.

  • USE_IAM : (true o false) si la base de datos tiene habilitada la autenticación de bases de datos AWS Identity and Access Management (IAM), establezca la variable del entorno USE_IAM en true. Esto hace que la función de Lambda firme las solicitudes de conexión a Neptune mediante Sigv4. Para dichas solicitudes de autenticación de base de datos de IAM, asegúrese de que el rol de ejecución de la función de Lambda tenga adjunta una política de IAM adecuada que permita a la función conectarse al clúster de base de datos de Neptune (consulte Tipos de políticas de IAM).

Ejemplo de función de Lambda de Java para Amazon Neptune

Estos son algunos aspectos que se deben tener en cuenta sobre las funciones de AWS Lambda de Java:

  • El controlador de Java mantiene su propio conjunto de conexiones, que no es necesario, por lo que debe configurar el objeto Cluster con minConnectionPoolSize(1) y maxConnectionPoolSize(1).

  • El objeto Cluster puede tardar en crearse porque crea uno o varios serializadores (Gyro, de forma predeterminada, y otro si lo ha configurado para formatos de salida adicionales, como, por ejemplo, binary). Estos pueden tardar un tiempo en crear una instancia.

  • El conjunto de conexiones se inicializa con la primera solicitud. En este momento, el controlador configura la pila Netty, asigna búferes de bytes y crea una clave de firma si se utiliza la autenticación de base de datos de IAM. Todos estos factores pueden aumentar la latencia de arranque en frío.

  • El conjunto de conexiones del controlador Java supervisa la disponibilidad de los hosts del servidor e intenta volver a conectarse automáticamente si se produce un error en la conexión. Inicia una tarea en segundo plano para intentar restablecer la conexión. Utilice reconnectInterval( ) para configurar el intervalo entre los intentos de reconexión. Mientras el controlador intenta volver a conectarse, la función de Lambda simplemente puede volver a intentar la consulta.

    Si el intervalo entre los reintentos es menor que el intervalo entre los intentos de reconexión, los reintentos de conexión vuelven a fallar porque se considera que el host no está disponible. Esto no se aplica a los reintentos de una ConcurrentModificationException.

  • Utilice Java 8 en lugar de Java 11. Las optimizaciones de Netty no están habilitadas de forma predeterminada en Java 11.

  • En este ejemplo se utiliza Retry4j para los reintentos.

  • Para usar el controlador de firma Sigv4 en la función de Lambda de Java, consulte los requisitos de dependencia de Conexión con Neptune mediante Java y Gremlin con firma de Signature Version 4.

aviso

Es posible que el CallExecutor de Retry4j no sea seguro para subprocesos. Plantéese la posibilidad de que cada subproceso use su propia instancia de CallExecutor.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

Si desea incluir la lógica de reconexión en la función, consulte Ejemplo de reconexión de Java.

Ejemplo de función de Lambda de JavaScript para Amazon Neptune

Notas sobre este ejemplo
  • El controlador de JavaScript no mantiene un grupo de conexiones. Siempre abre una única conexión.

  • La función de ejemplo utiliza las utilidades de firma Sigv4 de gremlin-aws-sigv4 para firmar las solicitudes a una base de datos habilitada para la autenticación de IAM.

  • Utiliza la función retry() del módulo de utilidad asíncrona de código abierto para gestionar los intentos de retroceso y reintento.

  • Los pasos del terminal de Gremlin devuelven una promise de JavaScript (consulte la documentación de TinkerPop). En el caso de next(), se trata de una tupla de {value, done}.

  • Los errores de conexión aparecen en el controlador y se solucionan mediante una lógica de retroceso y reintento de acuerdo con las recomendaciones que aquí se describen, con una excepción. Hay un tipo de problema de conexión que el controlador no considera una excepción y que, por lo tanto, no puede solucionarse con esta lógica.

    El problema es que si una conexión se cierra después de que un controlador envíe una solicitud, pero antes de que el controlador reciba una respuesta, la consulta parece completarse, aunque devuelve un valor nulo. En lo que respecta al cliente de la función de lambda, la función parece completarse correctamente, pero con una respuesta vacía.

    El impacto de este problema depende del tratamiento que dé la aplicación a una respuesta vacía. Algunas aplicaciones pueden tratar una respuesta vacía de una solicitud de lectura como un error, pero otras pueden tratarla erróneamente como un resultado vacío.

    Las solicitudes de escritura que tengan este problema de conexión también devolverán una respuesta vacía. ¿Una invocación correcta con una respuesta vacía indica éxito o error? Si el cliente que invoca una función de escritura simplemente considera que la invocación correcta de la función significa que se ha realizado la escritura en la base de datos, en lugar de inspeccionar el cuerpo de la respuesta, puede parecer que el sistema ha perdido datos.

    Este problema se debe a la forma en que el controlador trata los eventos emitidos por el socket subyacente. Cuando el socket de red subyacente se cierra con un error ECONNRESET, el WebSocket utilizado por el controlador se cierra y emite un evento 'ws close'. Sin embargo, no hay nada en el controlador que permita gestionar ese evento de forma que pueda utilizarse para generar una excepción. Como resultado, la consulta simplemente desaparece.

    Para solucionar este problema, la función de lambda de ejemplo que se muestra aquí añade un controlador de eventos 'ws close' que lanza una excepción al controlador al crear una conexión remota. Sin embargo, esta excepción no se genera a lo largo de la ruta de solicitud-respuesta de la consulta de Gremlin y, por lo tanto, no se puede utilizar para desencadenar ninguna lógica de retroceso y reintento dentro de la propia función de Lambda. En cambio, la excepción lanzada por el controlador de eventos 'ws close' da como resultado una excepción no administrada que provoca un error en la invocación de Lambda. Esto permite al cliente que invoca la función gestionar el error y volver a intentar la invocación de Lambda si fuera necesario.

    Le recomendamos que implemente una lógica de retroceso y reintento en la propia función de Lambda para proteger a los clientes de problemas de conexión intermitentes. Sin embargo, la solución provisional al problema anterior requiere que el cliente también implemente una lógica de reintento para gestionar los errores que se producen como resultado de este problema de conexión en concreto.

Código Javascript

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Ejemplo de función de Lambda de Python para Amazon Neptune

Estos son algunos aspectos que se deben tener en cuenta sobre la siguiente función de AWS Lambda de ejemplo de Python:

  • Utiliza el módulo de retroceso.

  • Establece pool_size=1 para evitar la creación de un grupo de conexiones innecesario.

  • Establece message_serializer=serializer.GraphSONSerializersV2d0().

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

Estos son algunos resultados de ejemplo, que muestran periodos alternos de carga elevada y ligera:

Diagrama que muestra los resultados de ejemplo de la función de Lambda de ejemplo de Python