Exemplos de função do AWS Lambda para Amazon Neptune - Amazon Neptune

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplos de função do AWS Lambda para Amazon Neptune

Os exemplos de função do AWS Lambda a seguir, escritos em Java, JavaScript e Python, ilustram a aplicação de upsert a um único vértice com um ID gerado aleatoriamente com a expressão fold().coalesce().unfold().

Grande parte do código em cada função é código clichê responsável por gerenciar e repetir conexões e consultas em caso de erro. A lógica real da aplicação e a consulta do Gremlin são implementadas nos métodos doQuery() e query(), respectivamente. Se você usar esses exemplos como base para as próprias funções do Lambda, poderá se concentrar em modificar doQuery() e query().

As funções são configuradas para repetir consultas com falha cinco vezes, aguardando um segundo entre as tentativas.

As funções exigem que os valores estejam presentes nas seguintes variáveis de ambiente do Lambda:

  • NEPTUNE_ENDPOINT: o endpoint de cluster de banco de dados do Neptune. Para Python, deve ser neptuneEndpoint.

  • NEPTUNE_PORT: a porta do Neptune. Para Python, deve ser neptunePort.

  • USE_IAM : (true ou false) se o banco de dados tiver a autenticação de banco de dados do AWS Identity and Access Management (IAM) habilitada, defina a variável de ambiente USE_IAM como true. Isso faz com que a função do Lambda assine por Sigv4 solicitações de conexão com o Neptune. Para essas solicitações de autenticação de banco de dados do IAM, garanta que a função de execução da função do Lambda tenha uma política do IAM apropriada anexada que permita que a função se conecte ao cluster de banco de dados do Neptune (consulte Tipos de IAM políticas).

Exemplo de função do Lambda em Java para Amazon Neptune

Veja algumas considerações sobre funções do AWS Lambda em Java:

  • O driver Java mantém o próprio grupo de conexões, que não é necessário, então configure o objeto Cluster com minConnectionPoolSize(1) e maxConnectionPoolSize(1).

  • O objeto Cluster pode demorar porque cria um ou mais serializadores (Gyro por padrão, além de outro, se você o tiver configurado para formatos de saída adicionais, como binary). Eles podem demorar um pouco para ser instanciados.

  • O grupo de conexões é inicializado com a primeira solicitação. Nesse ponto, o driver configura a pilha Netty, aloca buffers de bytes e cria uma chave de assinatura caso você esteja usando a autenticação de banco de dados do IAM. Tudo isso pode aumentar a latência de inicialização a frio.

  • O grupo de conexões do driver Java monitora a disponibilidade dos hosts do servidor e tenta se reconectar automaticamente em caso de falha da conexão. Ele inicia uma tarefa em segundo plano para tentar restabelecer a conexão. Use reconnectInterval( ) para configurar o intervalo entre as tentativas de reconexão. Enquanto o driver está tentando se reconectar, a função do Lambda pode simplesmente repetir a consulta.

    Se o intervalo entre as tentativas for menor do que o intervalo entre as tentativas de reconexão, ocorrerá novamente uma falha nas novas tentativas em uma conexão com falha porque o host será considerado indisponível. Isso não se aplica às novas tentativas de uma ConcurrentModificationException.

  • Use o Java 8 em vez do Java 11. As otimizações do Netty não estão habilitadas por padrão no Java 11.

  • Este exemplo usa Retry4j para novas tentativas.

  • Para usar o driver de assinatura Sigv4 na função do Lambda em Java, consulte os requisitos de dependência em Estabelecer conexão com o Neptune usando Java e Gremlin com a assinatura do Signature versão 4.

Atenção

O CallExecutor do Retry4j pode não ser livre de threads. Pense em fazer com que cada thread use a própria instância CallExecutor.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

Se você quiser incluir a lógica de reconexão na função, consulte Exemplo de reconexão Java.

Exemplo de função do Lambda em JavaScript para Amazon Neptune

Observações sobre este exemplo
  • O driver JavaScript não mantém um grupo de conexões. Ele sempre abre uma única conexão.

  • O exemplo de função usa os utilitários de assinatura Sigv4 do gremlin-aws-sigv4 para assinar solicitações em um banco de dados habilitado para autenticação do IAM.

  • Ele usa a função retry( ) do módulo utilitário assíncrono de código aberto para lidar com tentativas de recuo e repetição.

  • As etapas do terminal do Gremlin exibe uma promise em JavaScript (consulte a documentação do TinkerPop). Para next(), isso é uma tupla {value, done}.

  • Os erros de conexão são gerados no manipulador e tratados com alguma lógica de recuo e repetição, de acordo com as recomendações descritas aqui, com uma exceção. Há um tipo de problema de conexão que o driver não trata como uma exceção e que, portanto, não pode ser resolvido por essa lógica de recuo e repetição.

    O problema é que, se uma conexão for fechada depois que um driver enviar uma solicitação, mas antes que o driver receba uma resposta, parecerá que a consulta está concluída, mas gera um valor nulo. No que diz respeito ao cliente da função do Lambda, a função parece ter sido concluída com êxito, mas com uma resposta vazia.

    O impacto desse problema depende de como a aplicação trata uma resposta vazia. Algumas aplicações podem tratar uma resposta vazia de uma solicitação de leitura como um erro, mas outras podem tratá-la erroneamente como um resultado vazio.

    Solicitações de gravação que encontrem esse problema de conexão também exibirão uma resposta vazia. Uma invocação bem-sucedida com uma resposta vazia indica êxito ou falha? Se o cliente que estiver invocando uma função de gravação simplesmente tratar a invocação bem-sucedida da função como se a gravação no banco de dados tivesse sido confirmada em vez de verificar o corpo da resposta, poderá parecer que o sistema perdeu dados.

    Esse problema é causado pela forma como o driver trata os eventos emitidos pelo soquete subjacente. Quando o soquete de rede subjacente é fechado com um erro ECONNRESET, o WebSocket usado pelo driver é fechado e emite um evento 'ws close'. No entanto, não há nada no driver que lide com esse evento de uma forma que possa ser usada para gerar uma exceção. Como resultado, a consulta simplesmente desaparece.

    Para contornar esse problema, o exemplo da função do Lambda aqui adiciona um manipulador de eventos 'ws close' que lança uma exceção ao driver ao criar uma conexão remota. No entanto, essa exceção não é gerada ao longo do caminho de solicitação-resposta da consulta do Gremlin e, portanto, não pode ser usada para acionar nenhuma lógica de recuo e repetição dentro da própria função do Lambda. Em vez disso, a exceção lançada pelo manipulador de eventos 'ws close' gera uma exceção não tratada que faz com que a invocação do Lambda falhe. Isso permite ao cliente que invoca a função manipular o erro e repetir a invocação do Lambda, se apropriado.

    Recomendamos que você implemente a lógica de recuo e repetição na própria função do Lambda para proteger os clientes contra problemas de conexão intermitentes. No entanto, a solução alternativa para o problema acima exige que o cliente implemente a lógica de repetição também para lidar com falhas resultantes desse problema de conexão específico.

Código Javascript

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Exemplo de função do Lambda em Python para Amazon Neptune

Veja algumas considerações sobre o seguinte exemplo de função AWS Lambda em Python:

  • Ele usa o módulo de recuo.

  • Ele define pool_size=1 para evitar a criação de um grupo de conexões desnecessário.

  • Ele define message_serializer=serializer.GraphSONSerializersV2d0().

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

Veja exemplos de resultados, mostrando períodos alternados de carga pesada e leve:

Diagrama mostrando exemplos de resultados do exemplo de função do Lambda em Python.