Carregamento de dados de streaming no Amazon OpenSearch Service
Você pode carregar dados de streaming
O Lambda oferece suporte a várias linguagens de programação populares e está disponível na maioria das Regiões da AWS. Para obter mais informações, consulte Conceitos básicos do Lambda no Guia do desenvolvedor do AWS Lambda e Endpoints de serviço daAWS na Referência geral da AWS.
Tópicos
- Carregamento de dados de transmissão do Amazon S3
- Carregamento dados de transmissão do Amazon Kinesis Data Streams
- Carregamento de dados de transmissão do Amazon DynamoDB
- Carregamento dados de transmissão do Amazon Kinesis Data Firehose
- Carregamento de dados de transmissão do Amazon CloudWatch
- Carregamento de dados de transmissão do AWS IoT
Carregamento de dados de transmissão do Amazon S3
Você pode usar o Lambda para enviar dados para o domínio do OpenSearch Service do Amazon S3. Os novos dados recebidos em um bucket do S3 acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.
Esse método de streaming de dados é extremamente flexível. Você pode indexar metadados de objeto
Pré-requisitos
Para continuar, você deve ter os recursos a seguir.
Pré-requisito | Descrição |
---|---|
Bucket do Amazon S3. | Para obter mais informações, consulte Criar seu primeiro bucket do S3 no Manual do usuário do Amazon Simple Storage Service. O bucket deve residir na mesma região do domínio do OpenSearch Service. |
Domínio do OpenSearch Service | O destino dos dados depois que a função do Lambda os processa. Para mais informações, consulte Criação de domínios do OpenSearch Service. |
Criar o pacote de implantação do Lambda
Os pacotes de implantação são arquivos ZIP ou JAR que contêm o código e as dependências. Esta seção inclui código de exemplo Python. Para outras linguagens de programação, consulte Pacotes de implantação do Lambda no Guia do desenvolvedor do AWS Lambda.
-
Crie um diretório. Neste exemplo, usamos o nome
s3-to-opensearch
. -
Crie um arquivo no diretório chamado
sample.py
:import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' type = '_doc' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)
Edite as variáveis de
region
ehost
. -
Se ainda não o fez, instale o pip
. Em seguida, instale as dependências em um novo diretório package
:cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Como todos os ambientes de execução do Lambda têm o Boto3
instalado, você não precisa incluí-lo no pacote de implantação. -
Empacote o código do aplicativo e as dependências:
cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py
Criar a função do Lambda
Depois de criar o pacote de implantação, você poderá criar a função do Lambda. Ao criar uma função, escolha um nome, o tempo de execução (por exemplo, Python 3.8) e a função do IAM. A função do IAM define as permissões para a função. Para obter instruções detalhadas, consulte Criar uma função Lambda com o console no Guia do desenvolvedor do AWS Lambda.
Esse exemplo pressupõe que você está usando o console. Escolha o Python 3.9 e uma função que tenha permissões de leitura do S3 e permissões de gravação do OpenSearch Service, conforme mostrado na captura de tela a seguir:

Depois de criar a função, você deverá adicionar um gatilho. Neste exemplo, queremos que o código seja executado sempre que um arquivo de log chegue em um bucket do S3:
-
Escolha Add trigger (Adicionar acionador) e selecione S3.
-
Escolha o bucket.
-
Em Event type (Tipo de evento), selecione PUT.
-
Em Prefix (Prefixo), digite
logs/
. -
Em Suffix (Sufixo), digite
.log
. -
Confirme o aviso de invocação recursiva e escolha Add (Adicionar).
Por fim, você pode fazer upload do pacote de implantação:
-
Escolha Upload from (Fazer upload de) e .zip file (Arquivo .zip) e siga os avisos para fazer upload do pacote de implantação.
-
Depois que o upload terminar, edite as Runtime settings (Configurações do tempo de execução) e altere o Handler (Manipulador) para
sample.handler
. Essa configuração informa ao Lambda o arquivo (sample.py
) e o método (handler
) que deverão ser executados depois de um acionador.
A esta altura, você tem um conjunto completo de recursos: um bucket para arquivos de log, uma função executada sempre que um arquivo de log é adicionado ao bucket, o código que realiza a análise e a indexação e um domínio do OpenSearch Service para pesquisa e visualização.
Teste da função do Lambda
Após criar a função, você poderá testá-la fazendo upload de um arquivo no bucket do Amazon S3. Crie um arquivo chamado sample.log
usando as seguintes linhas de log de exemplo:
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
Faça upload do arquivo na pasta logs
do bucket do S3. Para obter instruções, consulte Fazer upload de um objeto para o seu bucket no Manual do usuário do Amazon Simple Storage Service.
Em seguida, use o console do OpenSearch Service ou o OpenSearch Dashboards para verificar se o índice lambda-s3-index
contém dois documentos. Você também pode fazer uma solicitação de pesquisa padrão:
GET https://domain-name
/lambda-s3-index/_search?pretty
{
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vTYXaWIBJWV_TTkEuSDg",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.91",
"message" : "GET /some-file.jpg",
"timestamp" : "10/Oct/2000:14:56:14 -0700"
}
},
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vjYmaWIBJWV_TTkEuCAB",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.90",
"message" : "PUT /some-file.jpg",
"timestamp" : "10/Oct/2000:13:55:36 -0700"
}
}
]
}
}
Carregamento dados de transmissão do Amazon Kinesis Data Streams
Você pode carregar dados de transmissão do Kinesis Data Streams para o OpenSearch Service. Os novos dados recebidos no fluxo de dados acionam uma notificação de evento para o Lambda, o qual executa seu código personalizado para realizar a indexação. Esta seção inclui um código de exemplo Python simples.
Pré-requisitos
Para continuar, você deve ter os recursos a seguir.
Pré-requisito | Descrição |
---|---|
Amazon Kinesis Data Streams | A fonte do evento para a função do Lambda. Para saber mais, consulte Kinesis Data Streams. |
Domínio do OpenSearch Service | O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação de domínios do OpenSearch Service. |
Função do IAM |
Essa função deve ter permissões básicas do OpenSearch Service, Kinesis e Lambda, como as seguintes:
A função deve ter a seguinte relação de confiança:
Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM. |
Criar a função do Lambda
Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado kinesis-to-opensearch
e use o seguinte código para sample.py
:
import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'
Edite as variáveis de region
e host
.
Caso ainda não tenha feito, instale o pip
cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:
-
Kinesis stream (Fluxo do Kinesis): o fluxo do Kinesis
-
Batch size (Tamanho do lote): 100
-
Starting position (Posição inicial): redução horizontal
Para saber mais, consulte O que é o Amazon Kinesis Data Streams? no Guia do desenvolvedor do Amazon Kinesis Data Streams.
A esta altura, você tem um conjunto completo de recursos: um fluxo de dados do Kinesis, uma função executada depois que o fluxo recebe novos dados e indexa esses dados e um domínio do OpenSearch Service para pesquisa e visualização.
Testar a função do Lambda
Depois de criar a função, você poderá testá-la adicionando um novo registro ao streaming de dados usando a AWS CLI:
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region
us-west-1
Em seguida, use o console do OpenSearch Service ou o OpenSearch Dashboards para verificar se o lambda-kine-index
contém um documento. Você também pode usar a seguinte solicitação:
GET https://domain-name
/lambda-kine-index/_search
{
"hits" : [
{
"_index": "lambda-kine-index",
"_type": "_doc",
"_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
"_score": 1,
"_source": {
"timestamp": 1523648740.051,
"message": "My test data.",
"id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
}
}
]
}
Carregamento de dados de transmissão do Amazon DynamoDB
Você pode usar o AWS Lambda para enviar dados do Amazon DynamoDB para o domínio do OpenSearch Service. Os novos dados recebidos na tabela do banco de dados acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.
Pré-requisitos
Para continuar, você deve ter os recursos a seguir.
Pré-requisito | Descrição |
---|---|
Tabela do DynamoDB | A tabela contém os dados de origem. Para obter mais informações, consulte Operações básicas nas tabelas do DynamoDB no Guia do desenvolvedor do Amazon DynamoDB. A tabela deve residir na mesma região que o domínio do OpenSearch Service e ter um stream definido como New image (Nova imagem). Para saber mais, consulte Como habilitar um stream. |
Domínio do OpenSearch Service | O destino dos dados depois que a função do Lambda os processa. Para mais informações, consulte Criação de domínios do OpenSearch Service. |
Função do IAM | Essa função deve ter permissões básicas de execução do OpenSearch Service, DynamoDB e Lambda, como as seguintes:
A função deve ter a seguinte relação de confiança:
Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM. |
Criar a função do Lambda
Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado ddb-to-opensearch
e use o seguinte código para sample.py
:
import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
Edite as variáveis de region
e host
.
Caso ainda não tenha feito, instale o pip
cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:
-
Table (Tabela): a tabela do DynamoDB
-
Batch size (Tamanho do lote): 100
-
Starting position (Posição inicial): redução horizontal
Para saber mais, consulte Processar novos itens com o DynamoDB Streams e o Lambda no Guia do desenvolvedor do Amazon DynamoDB.
A esta altura, você tem um conjunto completo de recursos: uma tabela do DynamoDB para os dados de origem, um fluxo do DynamoDB de alterações feitas na tabela, uma função que será executada depois que os dados de origem forem alterados e indexará essas alterações e um domínio do OpenSearch Service para pesquisa e visualização.
Testar a função do Lambda
Depois de criar a função, você poderá testá-la adicionando um novo item à tabela do DynamoDB usando a AWS CLI:
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region
us-west-1
Em seguida, use o console do OpenSearch Service ou o OpenSearch Dashboards para verificar se o lambda-index
contém um documento. Você também pode usar a seguinte solicitação:
GET https://domain-name
/lambda-index/_doc/00001
{
"_index": "lambda-index",
"_type": "_doc",
"_id": "00001",
"_version": 1,
"found": true,
"_source": {
"director": {
"S": "Kevin Costner"
},
"id": {
"S": "00001"
},
"title": {
"S": "The Postman"
}
}
}
Carregamento dados de transmissão do Amazon Kinesis Data Firehose
O Kinesis Data Firehose oferece suporte ao OpenSearch Service como um destino de entrega. Para obter instruções de como carregar dados de transmissão no OpenSearch Service, consulte Como criar um fluxo de entrega do Kinesis Data Firehose e Escolher o Amazon ES como destino no Guia do desenvolvedor do Amazon Kinesis Data Firehose.
Antes de carregar dados no OpenSearch Service, talvez você precise realizar transformações nos dados. Para saber mais sobre como usar funções do Lambda para executar essa tarefa, consulte Transformação de dados do Amazon Kinesis Data Firehose no mesmo guia.
Ao configurar um fluxo de entrega, o Kinesis Data Firehose oferece uma função do IAM de "um único clique" que concede a ele o acesso ao recurso necessário para enviar dados para o OpenSearch Service, fazer backup de dados no Amazon S3 e transformar dados usando o Lambda. Em virtude da complexidade envolvida na criação manual de uma função como essa, é recomendável usar a função fornecida.
Carregamento de dados de transmissão do Amazon CloudWatch
Você pode carregar dados de transmissão do CloudWatch Logs para o domínio do OpenSearch Service usando uma assinatura do CloudWatch Logs. Para obter mais informações sobre assinaturas do Amazon CloudWatch, consulte Processamento em tempo real dos dados de log com assinaturas. Para obter informações de configuração, consulte Fazer uma transmissão de dados do CloudWatch Logs para o Amazon OpenSearch Service no Guia do desenvolvedor do Amazon CloudWatch.
Carregamento de dados de transmissão do AWS IoT
Você pode enviar dados da AWS IoT usando regras. Para saber mais, consulte a ação OpenSearch no Guia do desenvolvedor do AWS IoT.