Chargement de données de streaming dans Amazon OpenSearch Service - Amazon OpenSearch Service

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Chargement de données de streaming dans Amazon OpenSearch Service

Vous pouvez utiliser OpenSearch Ingestion pour charger directement des données de streaming dans votre domaine Amazon OpenSearch Service, sans avoir besoin de recourir à des solutions tierces. Pour envoyer des données à OpenSearch Ingestion, vous configurez vos producteurs de données et le service fournit automatiquement les données au domaine ou à la collection que vous spécifiez. Pour commencer à utiliser OpenSearch Ingestion, voirTutoriel : Ingestion de données dans une collection à l'aide d'Amazon OpenSearch Ingestion.

Vous pouvez toujours utiliser d'autres sources pour charger des données de streaming, telles qu'Amazon Data Firehose et Amazon CloudWatch Logs, qui disposent d'un support intégré pour OpenSearch Service. D'autres, telles qu'Amazon S3, Amazon Kinesis Data Streams, et Amazon DynamoDB, utilisent des fonctions AWS Lambda comme gestionnaires d'événements. Les fonctions Lambda répondent aux nouvelles données en les traitant et en les diffusant dans votre domaine.

Note

Lambda prend en charge différents langages de programmation courants et est disponible dans la plupart des Régions AWS. Pour plus d'informations, consultez Getting started with Lambda dans le guide duAWS Lambda développeur et les points de terminaisonAWS de service dans le. Références générales AWS

Chargement de données de streaming depuis OpenSearch Ingestion

Vous pouvez utiliser Amazon OpenSearch Ingestion pour charger des données dans un domaine OpenSearch de service. Vous configurez vos producteurs de données pour qu'ils envoient des données à OpenSearch Ingestion, qui les fournit automatiquement à la collection que vous spécifiez. Vous pouvez également configurer OpenSearch Ingestion pour transformer vos données avant de les livrer. Pour de plus amples informations, veuillez consulter OpenSearch Ingestion d'Amazon.

Chargement de données de streaming à partir d'Amazon S3

Vous pouvez utiliser Lambda pour envoyer des données vers votre domaine de OpenSearch service depuis Amazon S3. Les nouvelles données qui arrivent dans un compartiment S3 déclenchent l'envoi d'une notification d'événement à Lambda, qui exécute alors votre code personnalisé pour effectuer l'indexation.

Cette méthode de diffusion de données est extrêmement flexible. Vous pouvez indexer les métadonnées d'objet, ou si l'objet est en texte brut, analyser et indexer certains éléments du corps de l'objet. Cette section inclut des exemples de code Python simple qui utilisent des expressions régulières pour analyser un fichier journal et indexer les correspondances.

Prérequis

Avant de poursuivre, vous devez disposer des ressources suivantes.

Prérequis Description
Compartiment Amazon S3 Pour en savoir plus, consultez Création de votre premier compartiment S3 dans le Guide de l'utilisateur Amazon Simple Storage Service. Le bucket doit résider dans la même région que votre domaine OpenSearch de service.
OpenSearch Domaine de service Destination des données après leur traitement par votre fonction Lambda. Pour de plus amples informations, veuillez consulter Création de domaines OpenSearch de service.

Créer le package de déploiement Lambda

Les packages de déploiement sont des fichiers ZIP ou JAR qui contiennent votre code et ses dépendances. Cette section inclut des exemples de code Python. Pour les autres langages de programmation, consultez Packages de déploiement Lambda dans le Guide du développeurAWS Lambda .

  1. Créez un répertoire. Dans cet exemple, nous utilisons le nom s3-to-opensearch.

  2. Dans le répertoire, créez un fichier nommé sample.py :

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Modifiez les variables des champs region et host.

  3. Si vous ne l'avez pas encore fait, installez pip, puis installez les dépendances dans un nouveau répertoire package :

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    Boto3 est installé dans tous les environnements d'exécution Lambda et vous n'avez pas besoin de l'inclure dans votre package de déploiement.

  4. Empaquetez le code d'application et les dépendances :

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Créer la fonction Lambda

Après avoir créé le package de déploiement, vous pouvez créer la fonction Lambda. Lorsque vous créez une fonction, choisissez un nom, une exécution (par exemple, Python 3.8) et un rôle IAM. Le rôle IAM définit les autorisations pour votre fonction. Pour obtenir des instructions détaillées, consultez Création d'une fonction Lambda à l'aide de la console dans le Guide du développeurAWS Lambda .

Cet exemple suppose que vous utilisez la console. Choisissez Python 3.9 et un rôle doté des autorisations de lecture S3 et des autorisations d'écriture du OpenSearch service, comme illustré dans la capture d'écran suivante :


                    Exemple de configuration d'une fonction Lambda

Une fois que vous avez créé la fonction, vous devez ajouter un déclencheur. Pour cet exemple, nous voulons que le code s'exécute chaque fois qu'un fichier journal arrive dans un compartiment S3 :

  1. Choisissez Add trigger (Ajouter un déclencheur) et sélectionnez S3.

  2. Choisissez votre compartiment.

  3. Pour Event type (Type d'événement), choisissez PUT.

  4. Pour Préfixe, tapez logs/.

  5. Pour Suffixe, tapez .log.

  6. Acceptez l'avertissement d'invocation récursive et choisissez Add (Ajouter).

Enfin, vous pouvez charger votre package de déploiement :

  1. Choisissez Upload from (Charger à partir de) et .zip file (Fichier .zip), puis suivez les invites pour charger votre package de déploiement.

  2. Au terme du chargement, modifiez les Paramètres d'exécution et remplacez le Gestionnaire par sample.handler. Ce paramètre indique à Lambda le fichier (sample.py) et la méthode (handler) à exécuter après un déclencheur.

À ce stade, vous disposez d'un ensemble complet de ressources : un compartiment pour les fichiers journaux, une fonction qui s'exécute chaque fois qu'un fichier journal est ajouté au compartiment, du code qui effectue l'analyse et l'indexation, et un domaine de OpenSearch service pour la recherche et la visualisation.

Test de la fonction Lambda

Après avoir créé la fonction, vous pouvez la tester en chargeant un fichier dans le compartiment Amazon S3. Créez un fichier nommé sample.log en utilisant les exemples de lignes de journal suivants :

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Chargez le fichier dans le dossier logs de votre compartiment S3. Pour obtenir des instructions, consultez Charger un objet dans votre compartiment dans le Guide de l'utilisateur Amazon Simple Storage Service.

Utilisez ensuite la console OpenSearch de service ou OpenSearch les tableaux de bord pour vérifier que l'lambda-s3-indexindex contient deux documents. Vous pouvez également effectuer une requête de recherche standard :

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Chargement de données de streaming à partir d'Amazon Kinesis Data Streams

Vous pouvez charger des données de streaming depuis Kinesis Data Streams OpenSearch vers Service. Les nouvelles données qui arrivent dans le flux de données déclenchent l'envoi d'une notification d'événement à Lambda, qui exécute alors votre code personnalisé pour effectuer l'indexation. Cette section inclut des exemples de code Python simple.

Prérequis

Avant de poursuivre, vous devez disposer des ressources suivantes.

Prérequis Description
Flux de données Amazon Kinesis Source d'événement de votre fonction Lambda. Pour en savoir plus, consultez Kinesis Data Streams.
OpenSearch Domaine de service Destination des données après leur traitement par votre fonction Lambda. Pour plus d'informations, consultez Création de domaines OpenSearch de service
Rôle IAM

Ce rôle doit disposer d'autorisations OpenSearch Service, Kinesis et Lambda de base, telles que les suivantes :

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

Le rôle doit avoir la relation d'approbation suivante :

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Pour en savoir plus, consultez Création de rôles IAM dans le Guide de l'utilisateur IAM.

Créer la fonction Lambda

Suivez les instructions fournies dans Créer le package de déploiement Lambda, mais créez un répertoire nommé kinesis-to-opensearch et utilisez le code suivant pour sample.py :

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Modifiez les variables des champs region et host.

Si vous ne l'avez pas encore fait, installez pip, puis utilisez les commandes suivantes pour installer vos dépendances :

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Ensuite, suivez les instructions fournies dans Créer la fonction Lambda, mais spécifiez le rôle IAM issu de Prérequis et les paramètres suivants pour le déclencheur :

  • Flux Kinesis : votre flux Kinesis

  • Taille de lot : 100

  • Position de départ : horizon Trim

Pour en savoir plus, consultez Présentation d'Amazon Kinesis Data Streams dans leGuide du développeur Amazon Kinesis Data Streams.

À ce stade, vous disposez d'un ensemble complet de ressources : un flux de données Kinesis, une fonction qui s'exécute une fois que le flux reçoit de nouvelles données et indexe ces données, et un domaine de OpenSearch service pour la recherche et la visualisation.

Tester la fonction Lambda

Après avoir créé la fonction, vous pouvez la tester en ajoutant un nouvel enregistrement dans le flux de données à l'aide de l' AWS CLI :

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

Utilisez ensuite la console OpenSearch de service ou OpenSearch les tableaux de bord pour vérifier qu'il lambda-kine-index contient un document. Vous pouvez également utiliser la demande suivante :

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Chargement de données de streaming à partir d'Amazon DynamoDB

Vous pouvez l'utiliser AWS Lambda pour envoyer des données vers votre domaine de OpenSearch service depuis Amazon DynamoDB. Les nouvelles données qui arrivent dans la table de la base de données déclenchent l'envoi d'une notification d'événement à Lambda, qui exécute ensuite votre code personnalisé pour effectuer l'indexation.

Prérequis

Avant de poursuivre, vous devez disposer des ressources suivantes.

Prérequis Description
Tableau DynamoDB

Le tableau contient vos données sources. Pour plus d'informations, consultez Opérations de base sur les tables DynamoDBdans le Guide du développeur Amazon DynamoDB.

La table doit résider dans la même région que votre domaine de OpenSearch service et avoir un flux défini sur Nouvelle image. Pour en savoir plus, consultez Activation d'un flux.

OpenSearch Domaine de service Destination des données après leur traitement par votre fonction Lambda. Pour de plus amples informations, veuillez consulter Création de domaines OpenSearch de service.
Rôle IAM

Ce rôle doit disposer des autorisations d'exécution de base de OpenSearch service, DynamoDB et Lambda, telles que les suivantes :

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

Le rôle doit avoir la relation d'approbation suivante :

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Pour en savoir plus, consultez Création de rôles IAM dans le Guide de l'utilisateur IAM.

Créer la fonction Lambda

Suivez les instructions fournies dans Créer le package de déploiement Lambda, mais créez un répertoire nommé ddb-to-opensearch et utilisez le code suivant pour sample.py :

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Modifiez les variables des champs region et host.

Si vous ne l'avez pas encore fait, installez pip, puis utilisez les commandes suivantes pour installer vos dépendances :

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Ensuite, suivez les instructions fournies dans Créer la fonction Lambda, mais spécifiez le rôle IAM issu de Prérequis et les paramètres suivants pour le déclencheur :

  • Table : votre table DynamoDB

  • Taille de lot : 100

  • Position de départ : horizon Trim

Pour en savoir plus, consultez Traitement des nouveaux éléments avec DynamoDB Streams et Lambda dans le Guide du développeur Amazon DynamoDB.

À ce stade, vous disposez d'un ensemble complet de ressources : une table DynamoDB pour vos données source, un flux DynamoDB contenant les modifications apportées à la table, une fonction qui s'exécute après les modifications de vos données source et indexe ces modifications, et un domaine de service pour la recherche et la visualisation. OpenSearch

Test de la fonction Lambda

Après avoir créé la fonction, vous pouvez la tester en ajoutant un nouvel élément dans la table DynamoDB à l'aide de l'interface AWS CLI :

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Utilisez ensuite la console OpenSearch de service ou OpenSearch les tableaux de bord pour vérifier qu'il lambda-index contient un document. Vous pouvez également utiliser la demande suivante :

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Chargement de données de streaming depuis Amazon Data Firehose

Firehose prend en charge le OpenSearch service en tant que destination de livraison. Pour savoir comment charger des données de streaming dans OpenSearch Service, consultez les sections Creating a Kinesis Data Firehose Delivery Stream OpenSearch et Choose Service for Your Destination dans le manuel Amazon Data Firehose Developer Guide.

Avant de charger des données dans OpenSearch Service, vous devrez peut-être effectuer des transformations sur les données. Pour en savoir plus sur l'utilisation des fonctions Lambda pour effectuer cette tâche, consultez Transformation de données Amazon Kinesis Data Firehose dans le même guide.

Lorsque vous configurez un flux de diffusion, Firehose propose un rôle IAM « en un clic » qui lui donne l'accès aux ressources dont il a besoin pour envoyer des données au OpenSearch Service, sauvegarder des données sur Amazon S3 et transformer des données à l'aide de Lambda. En raison de la complexité du processus de création manuelle d'un tel rôle, nous vous recommandons d'utiliser le rôle fourni.

Chargement de données de streaming depuis Amazon CloudWatch

Vous pouvez charger des données de streaming depuis CloudWatch Logs vers votre domaine OpenSearch de service à l'aide d'un abonnement CloudWatch Logs. Pour plus d'informations sur les CloudWatch abonnements Amazon, consultez la section Traitement en temps réel des données de journal dans le cadre des abonnements. Pour obtenir des informations de configuration, consultez la section Streaming CloudWatch Logs to Amazon OpenSearch Service dans le manuel Amazon CloudWatch Developer Guide.

Chargement de données de streaming depuis AWS IoT

Vous pouvez envoyer des données à l' AWS IoT aide de règles. Pour en savoir plus, consultez l'OpenSearchaction décrite dans le guide duAWS IoT développeur.