Carga de datos de streaming en Amazon OpenSearch Service - OpenSearch Servicio Amazon

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

Carga de datos de streaming en Amazon OpenSearch Service

Puedes usar OpenSearch Ingestion para cargar directamente los datos de streaming en tu dominio de Amazon OpenSearch Service, sin necesidad de utilizar soluciones de terceros. Para enviar datos a OpenSearch Ingestion, debes configurar tus generadores de datos y el servicio entrega automáticamente los datos al dominio o la colección que especifiques. Para empezar a usar OpenSearch Ingestion, consulte. Tutorial: Ingerir datos en una colección mediante Amazon OpenSearch Ingestion

Puedes seguir utilizando otras fuentes para cargar datos de streaming, como Amazon Data Firehose y Amazon CloudWatch Logs, que tienen soporte integrado para OpenSearch Service. Otros, como Amazon S3, Amazon Kinesis Data Streams y Amazon DynamoDB, utilizan funciones AWS Lambda como controladores de eventos. Las funciones de Lambda responden a los nuevos datos procesándolos y transmitiéndolos al dominio.

nota

Lambda es compatible con varios lenguajes de programación populares y está disponible en la mayoría de las Regiones de AWS. Para más información, consulte Introducción a Lambda en la Guía para desarrolladores deAWS Lambda y Puntos de conexión de servicio deAWS en Referencia general de AWS.

Cargando datos de streaming desde Ingestion OpenSearch

Puede usar Amazon OpenSearch Ingestion para cargar datos en un dominio OpenSearch de servicio. Usted configura sus generadores de datos para que envíen datos a OpenSearch Ingestion y esta entrega automáticamente los datos a la colección que usted especifique. También puede configurar OpenSearch Ingestion para transformar los datos antes de entregarlos. Para obtener más información, consulte OpenSearch Ingestión de Amazon.

Carga de datos de streaming desde Amazon S3

Puede usar Lambda para enviar datos a su dominio de OpenSearch servicio desde Amazon S3. Cuando llegan datos nuevos a un bucket de S3, activan una notificación de eventos en Lambda que, a su vez, ejecuta el código personalizado para realizar la indexación.

Este método de streaming de datos es extremadamente flexible. Es posible indexar los metadatos del objeto o, si el objeto contiene texto sin formato, analizar e indexar algunos elementos del cuerpo del objeto. En esta sección, se incluye código de muestra sencillo de Python que utiliza expresiones regulares para analizar un archivo de registros e indexar los resultados obtenidos.

Requisitos previos

Antes de continuar, debe contar con los siguientes recursos.

Requisito previo Descripción
Bucket de Amazon S3 Para más información, consulte Crear su primer bucket de S3 en la Guía del usuario de Amazon Simple Storage Service. El bucket debe residir en la misma región que su dominio OpenSearch de servicio.
OpenSearch Dominio de servicio Es el destino de los datos después de que la función de Lambda los procesa. Para más información, consulte Creación de dominios OpenSearch de servicio.

Crear el paquete de implementación de Lambda

Los paquetes de implementación son archivos ZIP o JAR que contienen el código y sus dependencias. En esta sección, se incluye un código de muestra de Python. Para otros lenguajes de programación, consulte Paquetes de implementación de Lambda en la Guía para desarrolladores deAWS Lambda .

  1. Cree un directorio. En este ejemplo, utilizamos el nombre s3-to-opensearch.

  2. Cree un archivo en el directorio denominado sample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Edite las variables para region y host.

  3. Instale pip, si todavía no lo hizo, luego instale las dependencias en un directorio package nuevo:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    Todos los entornos de ejecución de Lambda tienen instalado Boto3, por lo que no es necesario incluirlo en el paquete de implementación.

  4. Cree el paquete con el código de la aplicación y las dependencias:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Crear la función de Lambda

Después de crear el paquete de implementación, puede crear la función de Lambda. Al crear una función, elija un nombre, un tiempo de ejecución (por ejemplo, Python 3.8) y un rol de IAM. El rol de IAM define los permisos para la función. Para obtener instrucciones detalladas, consulte Create a Lambda function with the consol en la Guía para desarrolladores deAWS Lambda .

En este ejemplo, se supone que utiliza la consola. Elija Python 3.9 y un rol que tenga permisos de lectura en S3 y permisos de escritura en el OpenSearch servicio, como se muestra en la siguiente captura de pantalla:


                    Ejemplo de configuración de una función de Lambda

Después de crear la función, debe agregar un desencadenador. En este ejemplo, queremos que el código se ejecute cada vez que llega un archivo de registros a un bucket de S3:

  1. Seleccione Agregar desencadenador y Seleccione S3.

  2. Seleccione el bucket.

  3. Para el Tipo de evento, elija PUT.

  4. Para el Prefijo, escriba logs/.

  5. Para el Sufijo, escriba .log.

  6. Reconozca la advertencia de invocación recursiva y elija Agregar.

Por último, puede cargar el paquete de implementación:

  1. Seleccione Cargar desde y archivo .zip, luego siga las instrucciones para cargar el paquete de implementación.

  2. Una vez finalizada la carga, edite la Configuración de tiempo de ejecución y cambie el Controlador a sample.handler. Esta configuración indica a Lambda el archivo (sample.py) y el método (handler) que debe ejecutar cuando se produzca un desencadenador.

En este punto, dispone de un conjunto completo de recursos: un depósito para los archivos de registro, una función que se ejecuta cada vez que se añade un archivo de registro al depósito, un código que realiza el análisis y la indexación y un dominio de OpenSearch servicio para la búsqueda y la visualización.

Prueba de la función de Lambda

Después de crear la función, puede probarla mediante la carga de un archivo en el bucket de Amazon S3. Cree un archivo denominado sample.log que contenga los siguientes ejemplos de líneas de registro:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Cargue el archivo en la carpeta logs del bucket de S3. Para obtener instrucciones, consulte Carga de un objeto en el bucket en la Guía del usuario de Amazon Simple Storage Service.

A continuación, utilice la consola OpenSearch de servicio o los OpenSearch paneles de control para comprobar que el lambda-s3-index índice contiene dos documentos. También puede realizar una petición de búsqueda estándar:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Cargar datos de streaming desde Amazon Kinesis Data Streams

Puede cargar datos de streaming desde Kinesis Data Streams OpenSearch a Service. Cuando llegan datos nuevos al flujo de datos, activan una notificación de eventos en Lambda que, a su vez, ejecuta el código personalizado para realizar la indexación. En esta sección, se incluye un código de muestra simple de Python.

Requisitos previos

Antes de continuar, debe contar con los siguientes recursos.

Requisito previo Descripción
Amazon Kinesis Data Streams Fuente de eventos de la función Lambda Para más información, consulte Kinesis Data Streams.
OpenSearch Dominio de servicio Es el destino de los datos después de que la función Lambda los procesa. Para más información, consulte Creación de dominios OpenSearch de servicio.
Rol de IAM

Esta función debe tener permisos básicos OpenSearch de Servicio, Kinesis y Lambda, como los siguientes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

El rol debe tener la siguiente relación de confianza:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para más información, consulte Creación de roles de IAM en la Guía del usuario de IAM.

Crear la función de Lambda

Siga las instrucciones de Crear el paquete de implementación de Lambda, pero cree un directorio denominado kinesis-to-opensearch y utilice el siguiente código para sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Edite las variables para region y host.

Instale pip, si todavía no lo hizo, luego utilice los siguientes comandos para instalar las dependencias:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

A continuación, siga las instrucciones de Crear la función de Lambda, pero especifique el rol de IAM de Requisitos previos y la configuración siguiente para el desencadenador:

  • Flujo de Kinesis: su flujo de Kinesis

  • Tamaño del lote: 100

  • Posición inicial: Trim horizon

Para más información, consulte ¿Qué es Amazon Kinesis Data Streams? en la Guía para desarrolladores de Amazon Kinesis Data Streams.

En este punto, dispone de un conjunto completo de recursos: una transmisión de datos de Kinesis, una función que se ejecuta después de que la transmisión reciba nuevos datos y los indexe, y un dominio de OpenSearch servicio para la búsqueda y la visualización.

Prueba de la función de Lambda

Después de crear la función, puede probarla al agregar un registro al flujo de datos mediante la AWS CLI:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

A continuación, utilice la consola OpenSearch de servicio o los OpenSearch paneles de control para comprobar que lambda-kine-index contiene un documento. También puede utilizar la solicitud siguiente:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Carga de datos de streaming desde Amazon DynamoDB

Puede utilizarlos AWS Lambda para enviar datos a su dominio de OpenSearch servicio desde Amazon DynamoDB. Cuando llegan datos nuevos a la tabla de base de datos, activan una notificación de eventos en Lambda que, a su vez, ejecuta el código personalizado para realizar la indexación.

Requisitos previos

Antes de continuar, debe contar con los siguientes recursos.

Requisito previo Descripción
Tabla de DynamoDB

La tabla contiene los datos de origen. Para más información, consulte Operaciones básicas en tablas de DynamoDB en la Guía para desarrolladores de Amazon DynamoDB.

La tabla debe residir en la misma región que su dominio de OpenSearch servicio y tener una transmisión configurada como Nueva imagen. Para más información, consulte Habilitación de un flujo.

OpenSearch Dominio de servicio Es el destino de los datos después de que la función de Lambda los procesa. Para más información, consulte Creación de dominios OpenSearch de servicio.
Rol de IAM

Esta función debe tener permisos básicos OpenSearch de ejecución de Servicios, DynamoDB y Lambda, como los siguientes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

El rol debe tener la siguiente relación de confianza:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para más información, consulte Creación de roles de IAM en la Guía del usuario de IAM.

Crear la función de Lambda

Siga las instrucciones de Crear el paquete de implementación de Lambda, pero cree un directorio denominado ddb-to-opensearch y utilice el siguiente código para sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Edite las variables para region y host.

Instale pip, si todavía no lo hizo, luego utilice los siguientes comandos para instalar las dependencias:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

A continuación, siga las instrucciones de Crear la función de Lambda, pero especifique el rol de IAM de Requisitos previos y la configuración siguiente para el desencadenador:

  • Tabla: tabla de DynamoDB

  • Tamaño del lote: 100

  • Posición inicial: Trim horizon

Para más información, consulte Process New Items with DynamoDB Streams and Lambda en la Guía para desarrolladores de Amazon DynamoDB.

En este punto, dispone de un conjunto completo de recursos: una tabla de DynamoDB para los datos de origen, un flujo de cambios de DynamoDB en la tabla, una función que se ejecuta después de que los datos de origen cambien e indexa esos cambios, y un dominio de servicio para la búsqueda y la visualización. OpenSearch

Probar la función de Lambda

Después de crear la función, puede probarla al agregar un elemento a la tabla de DynamoDB mediante la AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

A continuación, utilice la consola de OpenSearch servicio o los OpenSearch paneles de control para comprobar que contiene un documento. lambda-index También puede utilizar la solicitud siguiente:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Carga de datos de streaming desde Amazon Data Firehose

Firehose admite el OpenSearch servicio como destino de entrega. Para obtener instrucciones sobre cómo cargar datos de streaming en el OpenSearch servicio, consulte Crear una transmisión de entrega de Kinesis Data Firehose OpenSearch y elegir un servicio para su destino en la Guía para desarrolladores de Amazon Data Firehose.

Antes de cargar datos en OpenSearch Service, es posible que deba realizar transformaciones en los datos. Para obtener más información acerca de cómo utilizar las funciones de Lambda para realizar esta tarea, consulte Amazon Kinesis Data Firehose Data Transformation en la misma guía.

Al configurar una transmisión de entrega, Firehose incluye una función de IAM con un solo clic que le proporciona el acceso a los recursos que necesita para enviar datos al OpenSearch Servicio, hacer copias de seguridad de los datos en Amazon S3 y transformar los datos con Lambda. Debido a la complejidad que supone la creación de ese rol manualmente, recomendamos utilizar el rol proporcionado.

Carga de datos de streaming desde Amazon CloudWatch

Puedes cargar datos de streaming desde CloudWatch Logs a tu dominio OpenSearch de servicio mediante una suscripción a CloudWatch Logs. Para obtener información sobre CloudWatch las suscripciones de Amazon, consulta Procesamiento en tiempo real de datos de registro con suscripciones. Para obtener información sobre la configuración, consulte Transmisión de datos de CloudWatch registros a Amazon OpenSearch Service en la Guía para CloudWatch desarrolladores de Amazon.

Carga de datos de streaming desde AWS IoT

Puede enviar datos mediante el AWS IoT uso de reglas. Para obtener más información, consulta la OpenSearchacción en la Guía paraAWS IoT desarrolladores.