Caricamento dei dati in streaming in Amazon OpenSearch Service - Amazon OpenSearch Service

Caricamento dei dati in streaming in Amazon OpenSearch Service

È possibile caricare i dati in streaming nel dominio Amazon OpenSearch Service da diverse origini. Alcune origini, ad esempio Amazon Kinesis Data Firehose e Amazon CloudWatch Logs, includono un supporto integrato per OpenSearch Service, Altre, come Amazon S3, Amazon Kinesis Data Streams e Amazon DynamoDB, utilizzano le funzioni AWS Lambda come gestori di eventi. Le funzioni Lambda rispondono ai nuovi dati elaborandoli ed eseguendone lo streaming nel dominio.

Nota

Lambda supporta diversi linguaggi di programmazione tra i più diffusi ed è disponibile in gran parte delle Regioni AWS. Per ulteriori informazioni, consultare Nozioni di base su Lambda nella Guida per gli sviluppatori di AWS Lambda e Endpoint del servizio AWS in Riferimenti generali AWS.

Caricamento di dati in streaming da Amazon S3

È possibile utilizzare Lambda per inviare dati al dominio OpenSearch Service da Amazon S3. I nuovi dati che arrivano in un bucket S3 attivano una notifica eventi per Lambda, che quindi esegue il codice personalizzato per eseguire l'indicizzazione.

Questo metodo per lo streaming dei dati è estremamente flessibile. Puoi indicizzare i metadati degli oggetti oppure, se l'oggetto è un testo normale, analizzare e indicizzare alcuni elementi del corpo dell'oggetto. Questa sezione include alcuni semplici codici Python di esempio in cui sono utilizzate espressioni regolari per analizzare un file di log e indicizzare le corrispondenze.

Prerequisiti

Prima di procedere, devi disporre delle risorse indicate di seguito.

Prerequisito Descrizione
Bucket Amazon S3 Per ulteriori informazioni, consulta Creazione del primo bucket S3 nella Guida per l'utente di Amazon Simple Storage Service. Il bucket deve trovarsi nella stessa regione del dominio OpenSearch Service.
Dominio OpenSearch Service La destinazione dei dati dopo che la funzione Lambda li ha elaborati. Per ulteriori informazioni, consultare Creazione di domini OpenSearch Service.

Creazione il pacchetto di implementazione Lambda

I pacchetti di distribuzione sono file ZIP o JAR che includono codice ed eventuali dipendenze. In questa sezione è incluso codice di esempio Python. Per altri linguaggi di programmazione, consultare Pacchetti di implementazione Lambda nella Guida per gli sviluppatori di AWS Lambda.

  1. Crea una directory. In questo esempio utilizziamo il nome s3-to-opensearch.

  2. Creare un file nella directory denominata sample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' type = '_doc' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Modifica le variabili per region e host.

  3. Installare pip, se non è già stato fatto, quindi installare le dipendenze in una nuova directory package:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    In tutti gli ambienti di esecuzione Lambda è installato Boto3, perciò non è necessario includerlo nel pacchetto di implementazione.

  4. Crea un pacchetto con il codice dell'applicazione e le dipendenze:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Creazione della funzione Lambda

Dopo aver creato il pacchetto di implementazione, è possibile creare la funzione Lambda. Quando si crea una funzione, scegliere nome, runtime (ad esempio, Python 3.8) e ruolo IAM. Il ruolo IAM definisce le autorizzazioni per la tua funzione. Per istruzioni dettagliate, consultare Creazione di una funzione Lambda con la console nella Guida per gli sviluppatori di AWS Lambda.

Questo esempio presuppone l'utilizzo della console. Scegli Python 3.9 e un ruolo che disponga di autorizzazioni in lettura per S3 e in scrittura per OpenSearch Service, come illustrato nella schermata seguente.


                    Configurazione di esempio per una funzione Lambda

Una volta creata la funzione, devi aggiungere un trigger. In questo esempio, vogliamo che il codice venga eseguito ogni volta che un file di log arriva in un bucket S3:

  1. Scegliere Aggiungi trigger e selezionare S3.

  2. Scegli il bucket.

  3. Per Event type (Tipo di evento), seleziona PUT.

  4. In Prefix (Prefisso), digita logs/.

  5. Per Suffisso, digitare .log.

  6. Confermare l'avviso di chiamata ricorsiva e scegliere Aggiungi.

Puoi infine caricare il pacchetto di implementazione:

  1. Scegliere Carica da e File .zip, quindi seguire i prompt su schermo per caricare il pacchetto di implementazione.

  2. Al termine del caricamento, modificare il campo Impostazioni runtime e cambiare il gestore in sample.handler. Questa impostazione indica a Lambda il file (sample.py) e il metodo (handler) da eseguire dopo un trigger.

A questo punto, si dispone di un set completo di risorse: un bucket per i file di log, una funzione che viene eseguita ogni volta che un file di log viene aggiunto al bucket, il codice che esegue l'analisi e l'indicizzazione e un dominio OpenSearch Service per la ricerca e la visualizzazione.

Test della funzione Lambda

Una volta creata la funzione, è possibile eseguirne il test caricando un file nel bucket Amazon S3. Crea un file denominato sample.log utilizzando le righe di log di esempio indicate di seguito:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Carica il file nella cartella logs del bucket S3. Per le istruzioni, consulta Caricamento di un oggetto nel bucket nella Guida per l'utente di Amazon Simple Storage Service.

Utilizzare quindi la console OpenSearch Service o OpenSearch Dashboards per verificare che l'indice lambda-s3-indexcontenga due documenti. Puoi anche effettuare una richiesta di ricerca standard:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Caricamento dei dati in streaming in Amazon Kinesis Data Streams

È possibile caricare i dati in streaming da Kinesis Data Streams a OpenSearch Service. I nuovi dati che arrivano nel flusso di dati attivano una notifica eventi per Lambda, che quindi esegue il codice personalizzato per eseguire l'indicizzazione. In questa sezione è incluso un semplice codice di esempio Python.

Prerequisiti

Prima di procedere, devi disporre delle risorse indicate di seguito.

Prerequisito Descrizione
Amazon Kinesis Data Streams L'origine dell'evento per la funzione Lambda. Per ulteriori informazioni, consultare Kinesis Data Streams.
Dominio OpenSearch Service La destinazione dei dati dopo che la funzione Lambda li ha elaborati. Per ulteriori informazioni, consultare Creazione di domini OpenSearch Service
Ruolo IAM

Questo ruolo deve disporre di autorizzazioni di base per OpenSearch Service, Kinesis e Lambda, ad esempio le seguenti:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

Il ruolo deve avere la relazione di trust seguente:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Per ulteriori informazioni, consultare Creazione di ruoli IAM nella Guida per l'utente di IAM.

Creazione della funzione Lambda

Procedi come descritto in Creazione il pacchetto di implementazione Lambda, ma crea una directory denominata kinesis-to-opensearch e utilizza il codice seguente per sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Modifica le variabili per region e host.

Installare pip, se non è già stato fatto, quindi utilizzare i seguenti comandi per installare le dipendenze:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Procedi quindi come descritto in Creazione della funzione Lambda, ma specifica il ruolo IAM dai Prerequisiti e le impostazioni seguenti per il trigger:

  • Flusso Kinesis: il flusso di Kinesis.

  • Batch size (Dimensione batch): 100

  • Starting position (Posizione di inizio): orizzonte di taglio

Per ulteriori informazioni, consultare Cos'è Amazon Kinesis Data Streams? nella Guida per gli sviluppatori di Amazon Kinesis Data Streams.

A questo punto, si dispone di un set completo di risorse: un flusso di dati Kinesis, una funzione che viene eseguita dopo che il flusso riceve nuovi dati e li indicizza e un dominio OpenSearch Service per la ricerca e la visualizzazione.

Test della funzione Lambda

Una volta creata la funzione, puoi provarla aggiungendo un nuovo record al flusso di dati utilizzando l'AWS CLI:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

Utilizzare quindi la console OpenSearch Service o OpenSearch Dashboards per verificare che l'indice lambda-kine-indexcontenga un documento. Puoi inoltre utilizzare la seguente richiesta:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Caricamento di dati in streaming da una tabella Amazon DynamoDB

È possibile utilizzare AWS Lambda per inviare dati al dominio OpenSearch Service da Amazon DynamoDB. I nuovi dati che arrivano nella tabella di database attivano una notifica eventi per Lambda, che quindi esegue il codice personalizzato per eseguire l'indicizzazione.

Prerequisiti

Prima di procedere, devi disporre delle risorse indicate di seguito.

Prerequisito Descrizione
Tabella DynamoDB

La tabella contiene i dati di origine. Per ulteriori informazioni, consultare Operazioni di base sulle tabelle DynamoDB nella Guida per gli sviluppatori di Amazon DynamoDB.

La tabella deve trovarsi nella stessa regione del dominio OpenSearch Service e deve disporre di un flusso impostato su New image (Nuova immagine). Per ulteriori informazioni, consultare Abilitazione di un flusso.

Dominio OpenSearch Service La destinazione dei dati dopo che la funzione Lambda li ha elaborati. Per ulteriori informazioni, consultare Creazione di domini OpenSearch Service.
Ruolo IAM

Questo ruolo deve disporre di autorizzazioni di base per OpenSearch Service, DynamoDB e Lambda, ad esempio le seguenti:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

Il ruolo deve avere la relazione di trust seguente:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Per ulteriori informazioni, consultare Creazione di ruoli IAM nella Guida per l'utente di IAM.

Creazione della funzione Lambda

Procedi come descritto in Creazione il pacchetto di implementazione Lambda, ma crea una directory denominata ddb-to-opensearch e utilizza il codice seguente per sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Modifica le variabili per region e host.

Installare pip, se non è già stato fatto, quindi utilizzare i seguenti comandi per installare le dipendenze:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Procedi quindi come descritto in Creazione della funzione Lambda, ma specifica il ruolo IAM dai Prerequisiti e le impostazioni seguenti per il trigger:

  • Tabella: la tabella DynamoDB

  • Batch size (Dimensione batch): 100

  • Starting position (Posizione di inizio): orizzonte di taglio

Per ulteriori informazioni, consultare Elaborazione di nuovi elementi con DynamoDB Streams e Lambdanella Guida per gli sviluppatori di Amazon DynamoDB.

A questo punto, si dispone di un set completo di risorse: una tabella DynamoDB per i dati di origine, un flusso DynamoDB delle modifiche alla tabella, una funzione che viene eseguita dopo le modifiche ai dati di origine e che le indicizza e un dominio OpenSearch Service per la ricerca e la visualizzazione.

Test della funzione Lambda

Una volta creata la funzione, è possibile eseguirne il test aggiungendo un nuovo elemento alla tabella DynamoDB utilizzando la AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Utilizzare quindi la console OpenSearch Service o OpenSearch Dashboards per verificare che l'indice lambda-indexcontenga un documento. Puoi inoltre utilizzare la seguente richiesta:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Caricamento dei dati in streaming da Amazon Kinesis Data Firehose

Kinesis Data Firehose supporta OpenSearch Service come destinazione di consegna. Per istruzioni su come caricare i dati in streaming in OpenSearch Service, consultare Creazione di un flusso di consegna Amazon Kinesis Data Firehose e Scelta di OpenSearch Service come destinazione nella Guida per gli sviluppatori di Amazon Kinesis Data Firehose.

Prima di caricare i dati in OpenSearch Service, potrebbe essere necessario trasformarli. Per ulteriori informazioni su come usare le funzioni Lambda per completare questa attività, consultare Trasformazione dei dati di Amazon Kinesis Data Firehose nella stessa guida.

Nella configurazione di un flusso di consegna, Kinesis Data Firehose utilizza un ruolo IAM "pronto all'uso" che fornisce l'accesso alle risorse necessario per inviare i dati a OpenSearch Service, eseguire il backup dei dati su Amazon S3 e trasformare i dati tramite Lambda. Poiché creare un ruolo simile manualmente sarebbe molto complesso, è consigliabile utilizzare il ruolo fornito.

Caricamento di dati in streaming da Amazon CloudWatch

È possibile caricare i dati in streaming da CloudWatch Logs al dominio OpenSearch Service tramite una sottoscrizione a CloudWatch Logs. Per informazioni sulle sottoscrizioni ad Amazon CloudWatch, consultare Elaborazione in tempo reale dei dati di log con le sottoscrizioni. Per informazioni sulla configurazione, consultare Streaming dei dati CloudWatch Logs su Amazon OpenSearch Service nella Guida per gli sviluppatori di Amazon CloudWatch.

Caricamento di dati in streaming da AWS IoT

È possibile inviare dati da AWS IoT utilizzando le regole. Per ulteriori informazioni, consultare OpenSearch nella Guida per gli sviluppatori di AWS IoT.