Esempi di funzioni AWS Lambda per Amazon Neptune - Amazon Neptune

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Esempi di funzioni AWS Lambda per Amazon Neptune

Le seguenti funzioni AWS Lambda di esempio, scritte in Java, JavaScript e Python, illustrano l'upsert di un singolo vertice con un ID generato casualmente utilizzando l'idioma fold().coalesce().unfold().

Gran parte del codice contenuto in ogni funzione è codice boilerplate, responsabile della gestione delle connessioni e dei tentativi di connessione e query in caso di errore. La vera logica dell'applicazione e la query Gremlin sono implementate rispettivamente nei metodi doQuery() e query(). Se usi questi esempi come base per le funzioni Lambda, puoi concentrarti sulla modifica di doQuery() e query().

Le funzioni sono configurate per ripetere le query non riuscite 5 volte, con l'attesa di 1 secondo tra un tentativo e l'altro.

Le funzioni richiedono che i valori siano presenti nelle seguenti variabili di ambiente Lambda:

  • NEPTUNE_ENDPOINT: endpoint del cluster database Neptune. Per Python, sarà neptuneEndpoint

  • NEPTUNE_PORT: porta di Neptune. Per Python, sarà neptunePort

  • USE_IAM (true o false): se nel database è abilitata l'autenticazione AWS Identity and Access Management (IAM), imposta la variabile di ambiente USE_IAM su true. In questo modo, la funzione Lambda firmerà le richieste di connessione a Neptune tramite SigV4. Per tali richieste di autenticazione IAM del database, assicurati che al ruolo di esecuzione della funzione Lambda sia collegata una policy IAM appropriata che consenta alla funzione di connettersi al cluster database Neptune (consulta Tipi di policy IAM).

Esempio di funzione Lambda Java per Amazon Neptune

Di seguito sono illustrati alcuni aspetti da considerare sulle funzioni AWS Lambda Java:

  • Il driver Java mantiene il proprio pool di connessioni, che non è necessario, quindi configura l'oggetto Cluster con minConnectionPoolSize(1) e maxConnectionPoolSize(1).

  • La creazione dell'oggetto Cluster può essere lenta in quanto vengono creati uno o più serializzatori (per impostazione predefinita Gyro, più un altro se è stato configurato per formati di output aggiuntivi come binary). La creazione dell'istanza di questi può richiedere tempi prolungati.

  • Il pool di connessioni viene inizializzato con la prima richiesta. A questo punto, il driver configura lo stack Netty, alloca i buffer di byte e crea una chiave di firma se utilizzi l'autenticazione del database IAM. Tutto ciò può aumentare la latenza di avvio a freddo.

  • Il pool di connessioni del driver Java monitora la disponibilità degli host del server e tenta automaticamente di riconnettersi in caso di errore della connessione. Avvia un'attività in background per tentare di ristabilire la connessione. Usa reconnectInterval( ) per configurare l'intervallo tra i tentativi di riconnessione. Mentre il driver tenta di riconnettersi, la funzione Lambda può semplicemente provare a ripetere la query.

    Se l'intervallo tra i tentativi è inferiore all'intervallo tra i tentativi di riconnessione, i nuovi tentativi su una connessione non riuscita avranno nuovamente esito negativo perché l'host è considerato non disponibile. Non si applica ai nuovi tentativi per un'eccezione ConcurrentModificationException.

  • Usa Java 8 anziché Java 11. Le ottimizzazioni Netty in Java 11 non sono abilitate per impostazione predefinita.

  • Questo esempio utilizza Retry4j per i tentativi.

  • Per utilizzare il driver di firma Sigv4 nella funzione Lambda Java, consulta i requisiti per le dipendenze in Connessione a Neptune tramite Java e Gremlin con firma di Signature Version 4.

avvertimento

L'executor CallExecutor di Retry4j potrebbe non essere thread-safe. Prevedi che ogni thread utilizzi un'istanza CallExecutor dedicata.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

Se desideri includere la logica di riconnessione nella funzione, consulta Esempio di riconnessione Java.

Esempio di funzione Lambda JavaScript per Amazon Neptune

Note su questo esempio
  • Il driver JavaScript non mantiene un pool di connessioni. Apre sempre una singola connessione.

  • La funzione di esempio utilizza le utilità di firma SigV4 di gremlin-aws-sigv4 per firmare le richieste a un database abilitato per l'autenticazione IAM.

  • Utilizza la funzione retry( ) del modulo di utilità async open source per gestire i tentativi di backoff e nuovo tentativo.

  • I passaggi del terminale Gremlin restituiscono un elemento promise JavaScript (consulta la documentazione di TinkerPop). Per next(), questa è una tupla {value, done}.

  • Gli errori di connessione vengono generati all'interno del gestore e risolti utilizzando una logica di backoff e nuovo tentativo in linea con le raccomandazioni descritte qui, con un'eccezione. Esiste un tipo di problema di connessione che il driver non considera un'eccezione e che quindi non può essere risolto con questa logica di backoff e nuovo tentativo.

    Il problema sta nel fatto che se una connessione viene chiusa dopo che un driver ha inviato una richiesta ma prima che il driver riceva una risposta, la query sembra completata ma restituisce un valore Null. Per quanto riguarda il client della funzione Lambda, la funzione sembra essere stata completata correttamente, ma con una risposta vuota.

    L'impatto di questo problema dipende dal modo in cui l'applicazione tratta una risposta vuota. Alcune applicazioni considerano una risposta vuota da una richiesta di lettura come un errore, ma altre potrebbero considerarla erroneamente come un risultato vuoto.

    Anche le richieste di scrittura che riscontrano questo problema di connessione restituiranno una risposta vuota. Un'invocazione riuscita con una risposta vuota segnala un'operazione riuscita o un errore? Se il client che richiama una funzione di scrittura considera riuscita un'invocazione della funzione semplicemente se è stato eseguito il commit della scrittura sul database, anziché esaminare il corpo della risposta, è possibile che il sistema perda dati.

    Questo problema deriva dal modo in cui il driver tratta gli eventi generati dal socket sottostante. Quando il socket di rete sottostante viene chiuso con un errore ECONNRESET, il WebSocket utilizzato dal driver viene chiuso e genera un evento 'ws close'. Tuttavia, non c'è nulla nel driver che possa gestire quell'evento in un modo che possa essere usato per generare un'eccezione. Di conseguenza, la query semplicemente scompare.

    Per risolvere questo problema, la funzione Lambda di esempio qui riportata aggiunge un gestore di eventi 'ws close' che genera un'eccezione al driver durante la creazione di una connessione remota. Questa eccezione, tuttavia, non viene generata lungo il percorso di richiesta-risposta della query Gremlin e non può quindi essere utilizzata per attivare alcuna logica di backoff e nuovo tentativo all'interno della funzione Lambda stessa. Invece, l'eccezione generata dal gestore di eventi 'ws close' genera un'eccezione non gestita che causa l'esito negativo dell'invocazione Lambda. Questo consente al client che richiama la funzione di gestire l'errore e di ripetere l'invocazione Lambda, se appropriato.

    Ti consigliamo di implementare la logica di backoff e nuovo tentativo nella funzione Lambda stessa per proteggere i client da problemi di connessione intermittenti. Tuttavia, la soluzione alternativa per il problema precedente richiede che il client implementi anche la logica di ripetizione, per gestire gli errori derivanti da questo problema di connessione specifico.

Codice Javascript

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Esempio di funzione Lambda Python per Amazon Neptune

Ecco alcuni aspetti da notare sulla funzione di esempio AWS Lambda Python seguente:

  • Utilizza il modulo backoff.

  • Imposta pool_size=1 per evitare di creare un pool di connessioni non necessario.

  • Imposta message_serializer=serializer.GraphSONSerializersV2d0().

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

Ecco alcuni risultati di esempio, che mostrano periodi alternati di carichi pesanti e leggeri:

Diagramma che mostra i risultati di esempio della funzione Lambda Python di esempio.