Caricamento di dati di streaming in Amazon OpenSearch Service - OpenSearch Servizio Amazon

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Caricamento di dati di streaming in Amazon OpenSearch Service

Puoi utilizzare OpenSearch Ingestion per caricare direttamente i dati di streaming nel tuo dominio Amazon OpenSearch Service, senza dover utilizzare soluzioni di terze parti. Per inviare dati a OpenSearch Ingestion, configuri i produttori di dati e il servizio consegna automaticamente i dati al dominio o alla raccolta specificati. Per iniziare a usare OpenSearch Ingestion, consulta. Tutorial: Inserimento di dati in una raccolta con Amazon Ingestion OpenSearch

Puoi comunque utilizzare altre fonti per caricare dati in streaming, come Amazon Data Firehose e Amazon CloudWatch Logs, che dispongono del supporto integrato per Service. OpenSearch Altre, come Amazon S3, Amazon Kinesis Data Streams e Amazon DynamoDB, utilizzano le funzioni AWS Lambda come gestori di eventi. Le funzioni Lambda rispondono ai nuovi dati elaborandoli ed eseguendone lo streaming nel dominio.

Nota

Lambda supporta diversi linguaggi di programmazione tra i più diffusi ed è disponibile in gran parte delle Regioni AWS. Per ulteriori informazioni, consulta Getting started with Lambda nella AWS Lambda Developer Guide e AWS Service Endpoints nel. Riferimenti generali di AWS

Caricamento di dati di streaming da Ingestion OpenSearch

Puoi usare Amazon OpenSearch Ingestion per caricare dati in un dominio di OpenSearch servizio. Puoi configurare i produttori di dati per inviare dati a OpenSearch Ingestion, che li invia automaticamente alla raccolta specificata. Puoi anche configurare OpenSearch Ingestion per trasformare i dati prima di consegnarli. Per ulteriori informazioni, consulta OpenSearch Ingestione di Amazon.

Caricamento di dati in streaming da Amazon S3

Puoi usare Lambda per inviare dati al tuo dominio di OpenSearch servizio da Amazon S3. I nuovi dati che arrivano in un bucket S3 attivano una notifica eventi per Lambda, che quindi esegue il codice personalizzato per eseguire l'indicizzazione.

Questo metodo per lo streaming dei dati è estremamente flessibile. Puoi indicizzare i metadati degli oggetti oppure, se l'oggetto è un testo normale, analizzare e indicizzare alcuni elementi del corpo dell'oggetto. Questa sezione include alcuni semplici codici Python di esempio in cui sono utilizzate espressioni regolari per analizzare un file di log e indicizzare le corrispondenze.

Prerequisiti

Prima di procedere, devi disporre delle risorse indicate di seguito.

Prerequisito Descrizione
Bucket Amazon S3 Per ulteriori informazioni, consulta Creazione del primo bucket S3 nella Guida per l'utente di Amazon Simple Storage Service. Il bucket deve risiedere nella stessa regione del dominio di servizio. OpenSearch
OpenSearch Dominio di servizio La destinazione dei dati dopo che la funzione Lambda li ha elaborati. Per ulteriori informazioni, consultare Creazione di domini OpenSearch di servizio.

Creazione il pacchetto di implementazione Lambda

I pacchetti di distribuzione sono file ZIP o JAR che includono codice ed eventuali dipendenze. In questa sezione è incluso codice di esempio Python. Per altri linguaggi di programmazione, consultare Pacchetti di implementazione Lambda nella Guida per gli sviluppatori diAWS Lambda .

  1. Crea una directory. In questo esempio utilizziamo il nome s3-to-opensearch.

  2. Creare un file nella directory denominata sample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Modifica le variabili per region e host.

  3. Installare pip, se non è già stato fatto, quindi installare le dipendenze in una nuova directory package:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    In tutti gli ambienti di esecuzione Lambda è installato Boto3, perciò non è necessario includerlo nel pacchetto di implementazione.

  4. Crea un pacchetto con il codice dell'applicazione e le dipendenze:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Creazione della funzione Lambda

Dopo aver creato il pacchetto di implementazione, è possibile creare la funzione Lambda. Quando si crea una funzione, scegliere nome, runtime (ad esempio, Python 3.8) e ruolo IAM. Il ruolo IAM definisce le autorizzazioni per la tua funzione. Per istruzioni dettagliate, consultare Creazione di una funzione Lambda con la console nella Guida per gli sviluppatori diAWS Lambda .

Questo esempio presuppone l'utilizzo della console. Scegli Python 3.9 e un ruolo con autorizzazioni di lettura S3 e autorizzazioni di scrittura del OpenSearch servizio, come mostrato nella schermata seguente:


                    Configurazione di esempio per una funzione Lambda

Una volta creata la funzione, devi aggiungere un trigger. In questo esempio, vogliamo che il codice venga eseguito ogni volta che un file di log arriva in un bucket S3:

  1. Scegliere Aggiungi trigger e selezionare S3.

  2. Scegli il bucket.

  3. Per Event type (Tipo di evento), seleziona PUT.

  4. In Prefix (Prefisso), digita logs/.

  5. Per Suffisso, digitare .log.

  6. Confermare l'avviso di chiamata ricorsiva e scegliere Aggiungi.

Puoi infine caricare il pacchetto di implementazione:

  1. Scegliere Carica da e File .zip, quindi seguire i prompt su schermo per caricare il pacchetto di implementazione.

  2. Al termine del caricamento, modificare il campo Impostazioni runtime e cambiare il gestore in sample.handler. Questa impostazione indica a Lambda il file (sample.py) e il metodo (handler) da eseguire dopo un trigger.

A questo punto, hai un set completo di risorse: un bucket per i file di registro, una funzione che viene eseguita ogni volta che viene aggiunto un file di registro al bucket, codice che esegue l'analisi e l'indicizzazione e un dominio di servizio per la ricerca e la visualizzazione. OpenSearch

Test della funzione Lambda

Una volta creata la funzione, è possibile eseguirne il test caricando un file nel bucket Amazon S3. Crea un file denominato sample.log utilizzando le righe di log di esempio indicate di seguito:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Carica il file nella cartella logs del bucket S3. Per le istruzioni, consulta Caricamento di un oggetto nel bucket nella Guida per l'utente di Amazon Simple Storage Service.

Utilizza quindi la console di OpenSearch servizio o le OpenSearch dashboard per verificare che l'indice contenga due documenti. lambda-s3-index Puoi anche effettuare una richiesta di ricerca standard:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Caricamento dei dati in streaming in Amazon Kinesis Data Streams

È possibile caricare dati di streaming da Kinesis Data OpenSearch Streams to Service. I nuovi dati che arrivano nel flusso di dati attivano una notifica eventi per Lambda, che quindi esegue il codice personalizzato per eseguire l'indicizzazione. In questa sezione è incluso un semplice codice di esempio Python.

Prerequisiti

Prima di procedere, devi disporre delle risorse indicate di seguito.

Prerequisito Descrizione
Amazon Kinesis Data Streams L'origine dell'evento per la funzione Lambda. Per ulteriori informazioni, consultare Kinesis Data Streams.
OpenSearch Dominio di servizio La destinazione dei dati dopo che la funzione Lambda li ha elaborati. Per ulteriori informazioni, consultare Creazione di domini OpenSearch di servizio
Ruolo IAM

Questo ruolo deve avere le autorizzazioni di base OpenSearch Service, Kinesis e Lambda, come le seguenti:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

Il ruolo deve avere la relazione di trust seguente:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Per ulteriori informazioni, consultare Creazione di ruoli IAM nella Guida per l'utente di IAM.

Creazione della funzione Lambda

Procedi come descritto in Creazione il pacchetto di implementazione Lambda, ma crea una directory denominata kinesis-to-opensearch e utilizza il codice seguente per sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Modifica le variabili per region e host.

Installare pip, se non è già stato fatto, quindi utilizzare i seguenti comandi per installare le dipendenze:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Procedi quindi come descritto in Creazione della funzione Lambda, ma specifica il ruolo IAM dai Prerequisiti e le impostazioni seguenti per il trigger:

  • Flusso Kinesis: il flusso di Kinesis.

  • Batch size (Dimensione batch): 100

  • Starting position (Posizione di inizio): orizzonte di taglio

Per ulteriori informazioni, consultare Cos'è Amazon Kinesis Data Streams? nella Guida per gli sviluppatori di Amazon Kinesis Data Streams.

A questo punto, hai a disposizione un set completo di risorse: un flusso di dati Kinesis, una funzione che viene eseguita dopo che il flusso riceve nuovi dati e li indicizza e un dominio di OpenSearch servizio per la ricerca e la visualizzazione.

Test della funzione Lambda

Una volta creata la funzione, puoi provarla aggiungendo un nuovo record al flusso di dati utilizzando l' AWS CLI:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

Quindi utilizza la console di OpenSearch servizio o le OpenSearch dashboard per verificare che contenga un documento. lambda-kine-index Puoi inoltre utilizzare la seguente richiesta:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Caricamento di dati in streaming da una tabella Amazon DynamoDB

Puoi utilizzarlo AWS Lambda per inviare dati al tuo dominio di OpenSearch servizio da Amazon DynamoDB. I nuovi dati che arrivano nella tabella di database attivano una notifica eventi per Lambda, che quindi esegue il codice personalizzato per eseguire l'indicizzazione.

Prerequisiti

Prima di procedere, devi disporre delle risorse indicate di seguito.

Prerequisito Descrizione
DynamoDB tabella

La tabella contiene i dati di origine. Per ulteriori informazioni, consultare Operazioni di base sulle tabelle DynamoDB nella Guida per gli sviluppatori di Amazon DynamoDB.

La tabella deve risiedere nella stessa regione del dominio di OpenSearch servizio e avere uno stream impostato su Nuova immagine. Per ulteriori informazioni, consultare Abilitazione di un flusso.

OpenSearch Dominio di servizio La destinazione dei dati dopo che la funzione Lambda li ha elaborati. Per ulteriori informazioni, consultare Creazione di domini OpenSearch di servizio.
Ruolo IAM

Questo ruolo deve disporre delle autorizzazioni di esecuzione di base di OpenSearch Service, DynamoDB e Lambda, come le seguenti:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

Il ruolo deve avere la relazione di trust seguente:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Per ulteriori informazioni, consultare Creazione di ruoli IAM nella Guida per l'utente di IAM.

Creazione della funzione Lambda

Procedi come descritto in Creazione il pacchetto di implementazione Lambda, ma crea una directory denominata ddb-to-opensearch e utilizza il codice seguente per sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Modifica le variabili per region e host.

Installare pip, se non è già stato fatto, quindi utilizzare i seguenti comandi per installare le dipendenze:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Procedi quindi come descritto in Creazione della funzione Lambda, ma specifica il ruolo IAM dai Prerequisiti e le impostazioni seguenti per il trigger:

  • Tabella: la tabella DynamoDB

  • Batch size (Dimensione batch): 100

  • Starting position (Posizione di inizio): orizzonte di taglio

Per ulteriori informazioni, consultare Elaborazione di nuovi elementi con DynamoDB Streams e Lambdanella Guida per gli sviluppatori di Amazon DynamoDB.

A questo punto, hai a disposizione un set completo di risorse: una tabella DynamoDB per i dati di origine, un flusso DynamoDB di modifiche alla tabella, una funzione che viene eseguita dopo le modifiche dei dati di origine e indicizza tali modifiche e un dominio di servizio per la ricerca e la visualizzazione. OpenSearch

Test della funzione Lambda

Una volta creata la funzione, è possibile eseguirne il test aggiungendo un nuovo elemento alla tabella DynamoDB utilizzando la AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Utilizzate quindi la console di OpenSearch servizio o le OpenSearch dashboard per verificare che contenga un documento. lambda-index Puoi inoltre utilizzare la seguente richiesta:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Caricamento di dati di streaming da Amazon Data Firehose

Firehose supporta il OpenSearch Servizio come destinazione di consegna. Per istruzioni su come caricare i dati di streaming in OpenSearch Service, consulta Creating a Kinesis Data Firehose Delivery Stream OpenSearch e Choose Service for Your Destination nella Amazon Data Firehose Developer Guide.

Prima di caricare i dati in OpenSearch Service, potrebbe essere necessario eseguire delle trasformazioni sui dati. Per ulteriori informazioni su come usare le funzioni Lambda per completare questa attività, consultare Trasformazione dei dati di Amazon Kinesis Data Firehose nella stessa guida.

Durante la configurazione di un flusso di distribuzione, Firehose offre un ruolo IAM «one-click» che gli fornisce l'accesso alle risorse di cui ha bisogno per inviare dati a OpenSearch Service, eseguire il backup dei dati su Amazon S3 e trasformarli utilizzando Lambda. Poiché creare un ruolo simile manualmente sarebbe molto complesso, è consigliabile utilizzare il ruolo fornito.

Caricamento di dati di streaming da Amazon CloudWatch

Puoi caricare dati di streaming da CloudWatch Logs al tuo dominio OpenSearch di servizio utilizzando un abbonamento CloudWatch Logs. Per informazioni sugli CloudWatch abbonamenti Amazon, consulta Elaborazione in tempo reale dei dati di registro con gli abbonamenti. Per informazioni sulla configurazione, consulta Streaming CloudWatch Logs data to Amazon OpenSearch Service nell'Amazon CloudWatch Developer Guide.

Caricamento di dati in streaming da AWS IoT

Puoi inviare dati AWS IoT utilizzando regole. Per ulteriori informazioni, consulta l'OpenSearchazione nella Guida per gliAWS IoT sviluppatori.