Amazon Neptune에 사용되는 AWS Lambda 함수 예제 - Amazon Neptune

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon Neptune에 사용되는 AWS Lambda 함수 예제

Java, JavaScript, Python으로 작성된 다음 예제 AWS Lambda 함수는 fold().coalesce().unfold() 표현 양식을 사용하여 무작위로 생성된 ID로 단일 버텍스를 업서트하는 방법을 보여줍니다.

각 함수의 코드 대부분은 연결을 관리하고 오류가 발생할 경우 연결 및 쿼리를 다시 시도하는 보일러플레이트 코드입니다. 실제 애플리케이션 로직과 Gremlin 쿼리는 각각 doQuery()query() 메서드에서 구현됩니다. 이 예제를 자체 Lambda 함수의 기반으로 사용하면 doQuery()query() 수정에 집중할 수 있습니다.

함수는 실패한 쿼리를 5번 재시도하고 재시도 사이에 1초 동안 대기하도록 구성됩니다.

함수를 사용하려면 다음 Lambda 환경 변수에 값이 있어야 합니다.

  • NEPTUNE_ENDPOINT   –   Neptune DB 클러스터 엔드포인트입니다. Python의 경우 neptuneEndpoint여야 합니다.

  • NEPTUNE_PORT   –   Neptune 포트입니다. Python의 경우 neptunePort여야 합니다.

  • USE_IAM   –   (true 또는 false) 데이터베이스에 AWS Identity and Access Management(IAM) 데이터베이스 인증이 활성화되어 있는 경우 USE_IAM 환경 변수를 true로 설정합니다. 그러면 Lambda 함수가 Neptune에 대한 연결 요청에 Sigv4 서명을 하게 됩니다. 이러한 IAM DB 인증 요청의 경우 Lambda 함수의 실행 역할에 함수를 Neptune DB 클러스터에 연결할 수 있도록 허용하는 적절한 IAM 정책이 연결되어 있는지 확인하세요(정책 유형 IAM 참조).

Amazon Neptune용 Java Lambda 함수 예제

Java AWS Lambda 함수에 대해 고려해야 할 몇 가지 사항이 있습니다.

  • Java 드라이버는 사용자에게 필요 없는 자체 연결 풀을 유지 관리하므로, minConnectionPoolSize(1)maxConnectionPoolSize(1)를 사용하여 Cluster 객체를 구성하세요.

  • Cluster 객체는 하나 이상의 직렬 변환기를 생성하기 때문에, 구축 속도가 느릴 수 있습니다(기본적으로 Gyro이고, binary과 같은 추가 출력 형식에 대해 구성한 경우 또 하나). 이를 인스턴스화하는 데 시간이 걸릴 수 있습니다.

  • 연결 풀은 첫 번째 요청으로 초기화됩니다. 이때 드라이버는 Netty 스택을 설정하고, 바이트 버퍼를 할당하고, IAM DB 인증을 사용하는 경우 서명 키를 생성합니다. 이 모든 것이 콜드 스타트 지연 시간을 가중시킬 수 있습니다.

  • Java 드라이버의 연결 풀은 서버 호스트의 가용성을 모니터링하고 연결에 실패할 경우 자동으로 재연결을 시도합니다. 연결을 다시 설정하기 위한 백그라운드 작업이 시작됩니다. reconnectInterval( )을 사용하여 재연결 시도 간격을 구성합니다. 드라이버가 재연결을 시도하는 동안 Lambda 함수는 간단히 쿼리를 재시도할 수 있습니다.

    재시도 간격이 재연결 시도 간격보다 짧으면 호스트를 사용할 수 없는 것으로 간주하여 실패한 연결에 대한 재시도가 다시 실패합니다. ConcurrentModificationException에 대한 재시도에는 적용되지 않습니다.

  • Java 11 대신 Java 8을 사용하세요. Netty 최적화는 Java 11에서 기본적으로 활성화되어 있지 않습니다.

  • 이 예제에서는 재시도에 Retry4j를 사용합니다.

  • Java Lambda 함수에서 Sigv4 서명 드라이버를 사용하려면 Signature Version 4 서명으로 Java 및 Gremlin을 사용하여 Neptune에 연결에서 종속성 요구 사항을 참조하세요.

주의

Retry4j에서 CallExecutor는 스레드 세이프가 아닐 수 있습니다. 각 스레드가 자체 CallExecutor 인스턴스를 사용하는 것을 고려해 보세요.

package com.amazonaws.examples.social; import com.amazonaws.services.lambda.runtime.Context; import com.amazonaws.services.lambda.runtime.RequestStreamHandler; import com.evanlennick.retry4j.CallExecutor; import com.evanlennick.retry4j.CallExecutorBuilder; import com.evanlennick.retry4j.Status; import com.evanlennick.retry4j.config.RetryConfig; import com.evanlennick.retry4j.config.RetryConfigBuilder; import org.apache.tinkerpop.gremlin.driver.Cluster; import com.amazonaws.auth.DefaultAWSCredentialsProviderChain; import com.amazonaws.neptune.auth.NeptuneNettyHttpSigV4Signer; import org.apache.tinkerpop.gremlin.driver.remote.DriverRemoteConnection; import org.apache.tinkerpop.gremlin.driver.ser.Serializers; import org.apache.tinkerpop.gremlin.process.traversal.AnonymousTraversalSource; import org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.GraphTraversalSource; import org.apache.tinkerpop.gremlin.structure.T; import java.io.*; import java.time.temporal.ChronoUnit; import java.util.HashMap; import java.util.Map; import java.util.Random; import java.util.concurrent.Callable; import java.util.function.Function; import static java.nio.charset.StandardCharsets.UTF_8; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.addV; import static org.apache.tinkerpop.gremlin.process.traversal.dsl.graph.__.unfold; public class MyHandler implements RequestStreamHandler { private final GraphTraversalSource g; private final CallExecutor<Object> executor; private final Random idGenerator = new Random(); public MyHandler() { this.g = AnonymousTraversalSource .traversal() .withRemote(DriverRemoteConnection.using(createCluster())); this.executor = new CallExecutorBuilder<Object>() .config(createRetryConfig()) .build(); } @Override public void handleRequest(InputStream input, OutputStream output, Context context) throws IOException { doQuery(input, output); } private void doQuery(InputStream input, OutputStream output) throws IOException { try { Map<String, Object> args = new HashMap<>(); args.put("id", idGenerator.nextInt()); String result = query(args); try (Writer writer = new BufferedWriter(new OutputStreamWriter(output, UTF_8))) { writer.write(result); } } finally { input.close(); output.close(); } } private String query(Map<String, Object> args) { int id = (int) args.get("id"); @SuppressWarnings("unchecked") Callable<Object> query = () -> g.V(id) .fold() .coalesce( unfold(), addV("Person").property(T.id, id)) .id().next(); Status<Object> status = executor.execute(query); return status.getResult().toString(); } private Cluster createCluster() { Cluster.Builder builder = Cluster.build() .addContactPoint(System.getenv("NEPTUNE_ENDPOINT")) .port(Integer.parseInt(System.getenv("NEPTUNE_PORT"))) .enableSsl(true) .minConnectionPoolSize(1) .maxConnectionPoolSize(1) .serializer(Serializers.GRAPHBINARY_V1D0) .reconnectInterval(2000); if (Boolean.parseBoolean(getOptionalEnv("USE_IAM", "true"))) { // For versions of TinkerPop 3.4.11 or higher: builder.handshakeInterceptor( r -> { NeptuneNettyHttpSigV4Signer sigV4Signer = new NeptuneNettyHttpSigV4Signer(region, new DefaultAWSCredentialsProviderChain()); sigV4Signer.signRequest(r); return r; } ) // Versions of TinkerPop prior to 3.4.11 should use the following approach. // Be sure to adjust the imports to include: // import org.apache.tinkerpop.gremlin.driver.SigV4WebSocketChannelizer; // builder = builder.channelizer(SigV4WebSocketChannelizer.class); return builder.create(); } private RetryConfig createRetryConfig() { return new RetryConfigBuilder().retryOnCustomExceptionLogic(retryLogic()) .withDelayBetweenTries(1000, ChronoUnit.MILLIS) .withMaxNumberOfTries(5) .withFixedBackoff() .build(); } private Function<Exception, Boolean> retryLogic() { return e -> { StringWriter stringWriter = new StringWriter(); e.printStackTrace(new PrintWriter(stringWriter)); String message = stringWriter.toString(); // Check for connection issues if ( message.contains("Timed out while waiting for an available host") || message.contains("Timed-out waiting for connection on Host") || message.contains("Connection to server is no longer active") || message.contains("Connection reset by peer") || message.contains("SSLEngine closed already") || message.contains("Pool is shutdown") || message.contains("ExtendedClosedChannelException") || message.contains("Broken pipe")) { return true; } // Concurrent writes can sometimes trigger a ConcurrentModificationException. // In these circumstances you may want to backoff and retry. if (message.contains("ConcurrentModificationException")) { return true; } // If the primary fails over to a new instance, existing connections to the old primary will // throw a ReadOnlyViolationException. You may want to back and retry. if (message.contains("ReadOnlyViolationException")) { return true; } return false; }; } private String getOptionalEnv(String name, String defaultValue) { String value = System.getenv(name); if (value != null && value.length() > 0) { return value; } else { return defaultValue; } } }

함수에 재연결 로직을 포함하려면 Java 재연결 샘플을 참조하세요.

Amazon Neptune용 JavaScript Lambda 함수 예제

예제 참고 사항
  • JavaScript 드라이버는 연결 풀을 유지하지 않습니다. 항상 단일 연결을 엽니다.

  • 예제 함수는 gremlin-aws-sigv4의 Sigv4 서명 유틸리티를 사용하여 IAM 인증 지원 데이터베이스에 대한 요청에 서명합니다.

  • 오픈 소스 비동기 유틸리티 모듈retry( ) 함수를 사용하여 backoff-and-retry 시도를 처리합니다.

  • Gremlin 터미널 단계는 JavaScript promise를 반환합니다(TinkerPop 설명서 참조). next()의 경우 {value, done} 튜플입니다.

  • 연결 오류는 핸들러 내에서 발생하며, 여기에 설명된 권장 사항에 따라 일부 backoff-and-retry 로직을 사용하여 처리합니다. 단, 한 가지 예외가 있습니다. 드라이버가 예외로 취급하지 않는 연결 문제가 하나 있는데, 따라서 이 backoff-and-retry 로직으로는 해결할 수 없습니다.

    문제는 드라이버가 요청을 보낸 후 드라이버가 응답을 받기 전에 연결이 끊어지면 쿼리가 완료된 것처럼 보이지만, null 값을 반환한다는 것입니다. Lambda 함수 클라이언트의 경우 함수가 성공적으로 완료된 것으로 보이지만, 응답이 비어 있습니다.

    이 문제의 영향은 애플리케이션이 빈 응답을 처리하는 방식에 따라 달라집니다. 읽기 요청의 빈 응답을 오류로 처리하는 애플리케이션도 있지만, 빈 결과로 잘못 처리하는 애플리케이션도 있습니다.

    이 연결 문제가 발생한 쓰기 요청도 빈 응답을 반환합니다. 간접 호출에 성공하고 응답이 비어 있으면 성공 또는 실패 신호로 여길 수 있을까요? 쓰기 함수를 간접적으로 호출하는 클라이언트가 응답 본문을 검사하지 않고 함수 간접 호출 성공을 단순히 데이터베이스에 대한 쓰기가 커밋되었음을 의미한다고 간주하는 경우, 시스템에 데이터가 손실된 것처럼 보일 수 있습니다.

    이 문제는 드라이버가 기본 소켓에서 내보낸 이벤트를 처리하는 방식 때문에 발생합니다. 기본 네트워크 소켓이 ECONNRESET 오류로 닫히면 드라이버에서 사용하는 WebSocket이 닫히고 'ws close' 이벤트가 발생합니다. 하지만 드라이버에는 예외를 유발하는 데 사용할 수 있는 방식으로 해당 이벤트를 처리할 수 있는 기능이 없습니다. 따라서 쿼리는 그냥 사라집니다.

    이 문제를 해결하기 위해 여기에 있는 예제 Lambda 함수는 원격 연결을 생성할 때 드라이버에 예외를 발생시키는 'ws close' 이벤트 핸들러를 추가합니다. 그러나 이 예외는 Gremlin 쿼리의 요청-응답 경로를 따라 발생하지 않으므로, Lambda 함수 자체 내에서 backoff-and-retry 로직을 트리거하는 데 사용할 수 없습니다. 대신 'ws close' 이벤트 핸들러에서 발생한 예외로 인해 처리되지 않은 예외가 발생하여 Lambda 간접 호출이 실패합니다. 이렇게 되면 함수를 간접적으로 호출하는 클라이언트가 오류를 처리하고 필요한 경우 Lambda 간접 호출을 재시도할 수 있습니다.

    클라이언트를 간헐적인 연결 문제로부터 보호하려면 Lambda 함수 자체에 backoff-and-retry 로직을 구현하는 것이 좋습니다. 하지만 위 문제를 해결하려면 클라이언트가 이 특정 연결 문제로 인한 실패를 처리하기 위한 재시도 로직도 구현해야 합니다.

Javascript 코드

const gremlin = require('gremlin'); const async = require('async'); const {getUrlAndHeaders} = require('gremlin-aws-sigv4/lib/utils'); const traversal = gremlin.process.AnonymousTraversalSource.traversal; const DriverRemoteConnection = gremlin.driver.DriverRemoteConnection; const t = gremlin.process.t; const __ = gremlin.process.statics; let conn = null; let g = null; async function query(context) { const id = context.id; return g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(t.id, id) ) .id().next(); } async function doQuery() { const id = Math.floor(Math.random() * 10000).toString(); let result = await query({id: id}); return result['value']; } exports.handler = async (event, context) => { const getConnectionDetails = () => { if (process.env['USE_IAM'] == 'true'){ return getUrlAndHeaders( process.env['NEPTUNE_ENDPOINT'], process.env['NEPTUNE_PORT'], {}, '/gremlin', 'wss'); } else { const database_url = 'wss://' + process.env['NEPTUNE_ENDPOINT'] + ':' + process.env['NEPTUNE_PORT'] + '/gremlin'; return { url: database_url, headers: {}}; } }; const createRemoteConnection = () => { const { url, headers } = getConnectionDetails(); const c = new DriverRemoteConnection( url, { mimeType: 'application/vnd.gremlin-v2.0+json', headers: headers }); c._client._connection.on('close', (code, message) => { console.info(`close - ${code} ${message}`); if (code == 1006){ console.error('Connection closed prematurely'); throw new Error('Connection closed prematurely'); } }); return c; }; const createGraphTraversalSource = (conn) => { return traversal().withRemote(conn); }; if (conn == null){ console.info("Initializing connection") conn = createRemoteConnection(); g = createGraphTraversalSource(conn); } return async.retry( { times: 5, interval: 1000, errorFilter: function (err) { // Add filters here to determine whether error can be retried console.warn('Determining whether retriable error: ' + err.message); // Check for connection issues if (err.message.startsWith('WebSocket is not open')){ console.warn('Reopening connection'); conn.close(); conn = createRemoteConnection(); g = createGraphTraversalSource(conn); return true; } // Check for ConcurrentModificationException if (err.message.includes('ConcurrentModificationException')){ console.warn('Retrying query because of ConcurrentModificationException'); return true; } // Check for ReadOnlyViolationException if (err.message.includes('ReadOnlyViolationException')){ console.warn('Retrying query because of ReadOnlyViolationException'); return true; } return false; } }, doQuery); };

Amazon Neptune용 Python Lambda 함수 예제

다음 Python AWS Lambda 예제 함수에 대해 알아두어야 할 몇 가지 사항은 아래와 같습니다.

  • 백오프 모듈을 사용합니다.

  • 불필요한 연결 풀을 만들지 않도록 pool_size=1을 설정합니다.

  • message_serializer=serializer.GraphSONSerializersV2d0()을 설정합니다.

import os, sys, backoff, math from random import randint from gremlin_python import statics from gremlin_python.driver.driver_remote_connection import DriverRemoteConnection from gremlin_python.driver.protocol import GremlinServerError from gremlin_python.driver import serializer from gremlin_python.process.anonymous_traversal import traversal from gremlin_python.process.graph_traversal import __ from gremlin_python.process.strategies import * from gremlin_python.process.traversal import T from aiohttp.client_exceptions import ClientConnectorError from botocore.auth import SigV4Auth from botocore.awsrequest import AWSRequest from botocore.credentials import ReadOnlyCredentials from types import SimpleNamespace import logging logger = logging.getLogger() logger.setLevel(logging.INFO) reconnectable_err_msgs = [ 'ReadOnlyViolationException', 'Server disconnected', 'Connection refused', 'Connection was already closed', 'Connection was closed by server', 'Failed to connect to server: HTTP Error code 403 - Forbidden' ] retriable_err_msgs = ['ConcurrentModificationException'] + reconnectable_err_msgs network_errors = [OSError, ClientConnectorError] retriable_errors = [GremlinServerError, RuntimeError, Exception] + network_errors def prepare_iamdb_request(database_url): service = 'neptune-db' method = 'GET' access_key = os.environ['AWS_ACCESS_KEY_ID'] secret_key = os.environ['AWS_SECRET_ACCESS_KEY'] region = os.environ['AWS_REGION'] session_token = os.environ['AWS_SESSION_TOKEN'] creds = SimpleNamespace( access_key=access_key, secret_key=secret_key, token=session_token, region=region, ) request = AWSRequest(method=method, url=database_url, data=None) SigV4Auth(creds, service, region).add_auth(request) return (database_url, request.headers.items()) def is_retriable_error(e): is_retriable = False err_msg = str(e) if isinstance(e, tuple(network_errors)): is_retriable = True else: is_retriable = any(retriable_err_msg in err_msg for retriable_err_msg in retriable_err_msgs) logger.error('error: [{}] {}'.format(type(e), err_msg)) logger.info('is_retriable: {}'.format(is_retriable)) return is_retriable def is_non_retriable_error(e): return not is_retriable_error(e) def reset_connection_if_connection_issue(params): is_reconnectable = False e = sys.exc_info()[1] err_msg = str(e) if isinstance(e, tuple(network_errors)): is_reconnectable = True else: is_reconnectable = any(reconnectable_err_msg in err_msg for reconnectable_err_msg in reconnectable_err_msgs) logger.info('is_reconnectable: {}'.format(is_reconnectable)) if is_reconnectable: global conn global g conn.close() conn = create_remote_connection() g = create_graph_traversal_source(conn) @backoff.on_exception(backoff.constant, tuple(retriable_errors), max_tries=5, jitter=None, giveup=is_non_retriable_error, on_backoff=reset_connection_if_connection_issue, interval=1) def query(**kwargs): id = kwargs['id'] return (g.V(id) .fold() .coalesce( __.unfold(), __.addV('User').property(T.id, id) ) .id().next()) def doQuery(event): return query(id=str(randint(0, 10000))) def lambda_handler(event, context): result = doQuery(event) logger.info('result – {}'.format(result)) return result def create_graph_traversal_source(conn): return traversal().withRemote(conn) def create_remote_connection(): logger.info('Creating remote connection') (database_url, headers) = connection_info() return DriverRemoteConnection( database_url, 'g', pool_size=1, message_serializer=serializer.GraphSONSerializersV2d0(), headers=headers) def connection_info(): database_url = 'wss://{}:{}/gremlin'.format(os.environ['neptuneEndpoint'], os.environ['neptunePort']) if 'USE_IAM' in os.environ and os.environ['USE_IAM'] == 'true': return prepare_iamdb_request(database_url) else: return (database_url, {}) conn = create_remote_connection() g = create_graph_traversal_source(conn)

다음은 무거운 부하와 가벼운 부하가 번갈아 나타나는 기간을 보여주는 샘플 결과입니다.

예제 Python Lambda 함수의 샘플 결과를 보여주는 다이어그램.