Carregamento de dados de streaming no Amazon OpenSearch Service - Amazon OpenSearch Service

Carregamento de dados de streaming no Amazon OpenSearch Service

Você pode carregar dados de streaming no domínio do Amazon OpenSearch Service de muitas fontes diferentes. Algumas fontes, como o Amazon Kinesis Data Firehose e o Amazon CloudWatch Logs, oferecem suporte integrado ao OpenSearch Service. Outras, como Amazon S3, Amazon Kinesis Data Streams e Amazon DynamoDB, usam funções do AWS Lambdacomo manipuladores de eventos. As funções do Lambda respondem a novos dados processando e transmitindo-os para seu domínio.

nota

O Lambda oferece suporte a várias linguagens de programação populares e está disponível na maioria das Regiões da AWS. Para obter mais informações, consulte Conceitos básicos do Lambda no Guia do desenvolvedor do AWS Lambda e Endpoints de serviço daAWS na Referência geral da AWS.

Carregamento de dados de transmissão do Amazon S3

Você pode usar o Lambda para enviar dados para o domínio do OpenSearch Service do Amazon S3. Os novos dados recebidos em um bucket do S3 acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.

Esse método de streaming de dados é extremamente flexível. Você pode indexar metadados de objeto, ou se o objeto for texto simples, analisar e indexar alguns elementos do corpo do objeto. Esta seção inclui alguns códigos de exemplo Python simples que usam expressões regulares para analisar um arquivo de log e indexar as correspondências.

Pré-requisitos

Para continuar, você deve ter os recursos a seguir.

Pré-requisito Descrição
Bucket do Amazon S3. Para obter mais informações, consulte Criar seu primeiro bucket do S3 no Manual do usuário do Amazon Simple Storage Service. O bucket deve residir na mesma região do domínio do OpenSearch Service.
Domínio do OpenSearch Service O destino dos dados depois que a função do Lambda os processa. Para mais informações, consulte Criação de domínios do OpenSearch Service.

Criar o pacote de implantação do Lambda

Os pacotes de implantação são arquivos ZIP ou JAR que contêm o código e as dependências. Esta seção inclui código de exemplo Python. Para outras linguagens de programação, consulte Pacotes de implantação do Lambda no Guia do desenvolvedor do AWS Lambda.

  1. Crie um diretório. Neste exemplo, usamos o nome s3-to-opensearch.

  2. Crie um arquivo no diretório chamado sample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' type = '_doc' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Edite as variáveis de region e host.

  3. Se ainda não o fez, instale o pip. Em seguida, instale as dependências em um novo diretório package:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    Como todos os ambientes de execução do Lambda têm o Boto3 instalado, você não precisa incluí-lo no pacote de implantação.

  4. Empacote o código do aplicativo e as dependências:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Criar a função do Lambda

Depois de criar o pacote de implantação, você poderá criar a função do Lambda. Ao criar uma função, escolha um nome, o tempo de execução (por exemplo, Python 3.8) e a função do IAM. A função do IAM define as permissões para a função. Para obter instruções detalhadas, consulte Criar uma função Lambda com o console no Guia do desenvolvedor do AWS Lambda.

Esse exemplo pressupõe que você está usando o console. Escolha o Python 3.9 e uma função que tenha permissões de leitura do S3 e permissões de gravação do OpenSearch Service, conforme mostrado na captura de tela a seguir:


                    Configuração de exemplo de uma função do Lambda

Depois de criar a função, você deverá adicionar um gatilho. Neste exemplo, queremos que o código seja executado sempre que um arquivo de log chegue em um bucket do S3:

  1. Escolha Add trigger (Adicionar acionador) e selecione S3.

  2. Escolha o bucket.

  3. Em Event type (Tipo de evento), selecione PUT.

  4. Em Prefix (Prefixo), digite logs/.

  5. Em Suffix (Sufixo), digite .log.

  6. Confirme o aviso de invocação recursiva e escolha Add (Adicionar).

Por fim, você pode fazer upload do pacote de implantação:

  1. Escolha Upload from (Fazer upload de) e .zip file (Arquivo .zip) e siga os avisos para fazer upload do pacote de implantação.

  2. Depois que o upload terminar, edite as Runtime settings (Configurações do tempo de execução) e altere o Handler (Manipulador) para sample.handler. Essa configuração informa ao Lambda o arquivo (sample.py) e o método (handler) que deverão ser executados depois de um acionador.

A esta altura, você tem um conjunto completo de recursos: um bucket para arquivos de log, uma função executada sempre que um arquivo de log é adicionado ao bucket, o código que realiza a análise e a indexação e um domínio do OpenSearch Service para pesquisa e visualização.

Teste da função do Lambda

Após criar a função, você poderá testá-la fazendo upload de um arquivo no bucket do Amazon S3. Crie um arquivo chamado sample.log usando as seguintes linhas de log de exemplo:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Faça upload do arquivo na pasta logs do bucket do S3. Para obter instruções, consulte Fazer upload de um objeto para o seu bucket no Manual do usuário do Amazon Simple Storage Service.

Em seguida, use o console do OpenSearch Service ou o OpenSearch Dashboards para verificar se o índice lambda-s3-index contém dois documentos. Você também pode fazer uma solicitação de pesquisa padrão:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Carregamento dados de transmissão do Amazon Kinesis Data Streams

Você pode carregar dados de transmissão do Kinesis Data Streams para o OpenSearch Service. Os novos dados recebidos no fluxo de dados acionam uma notificação de evento para o Lambda, o qual executa seu código personalizado para realizar a indexação. Esta seção inclui um código de exemplo Python simples.

Pré-requisitos

Para continuar, você deve ter os recursos a seguir.

Pré-requisito Descrição
Amazon Kinesis Data Streams A fonte do evento para a função do Lambda. Para saber mais, consulte Kinesis Data Streams.
Domínio do OpenSearch Service O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação de domínios do OpenSearch Service.
Função do IAM

Essa função deve ter permissões básicas do OpenSearch Service, Kinesis e Lambda, como as seguintes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

A função deve ter a seguinte relação de confiança:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM.

Criar a função do Lambda

Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado kinesis-to-opensearch e use o seguinte código para sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Edite as variáveis de region e host.

Caso ainda não tenha feito, instale o pip. Em seguida, use os seguintes comandos para instalar as dependências:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:

  • Kinesis stream (Fluxo do Kinesis): o fluxo do Kinesis

  • Batch size (Tamanho do lote): 100

  • Starting position (Posição inicial): redução horizontal

Para saber mais, consulte O que é o Amazon Kinesis Data Streams? no Guia do desenvolvedor do Amazon Kinesis Data Streams.

A esta altura, você tem um conjunto completo de recursos: um fluxo de dados do Kinesis, uma função executada depois que o fluxo recebe novos dados e indexa esses dados e um domínio do OpenSearch Service para pesquisa e visualização.

Testar a função do Lambda

Depois de criar a função, você poderá testá-la adicionando um novo registro ao streaming de dados usando a AWS CLI:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

Em seguida, use o console do OpenSearch Service ou o OpenSearch Dashboards para verificar se o lambda-kine-index contém um documento. Você também pode usar a seguinte solicitação:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Carregamento de dados de transmissão do Amazon DynamoDB

Você pode usar o AWS Lambda para enviar dados do Amazon DynamoDB para o domínio do OpenSearch Service. Os novos dados recebidos na tabela do banco de dados acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.

Pré-requisitos

Para continuar, você deve ter os recursos a seguir.

Pré-requisito Descrição
Tabela do DynamoDB

A tabela contém os dados de origem. Para obter mais informações, consulte Operações básicas nas tabelas do DynamoDB no Guia do desenvolvedor do Amazon DynamoDB.

A tabela deve residir na mesma região que o domínio do OpenSearch Service e ter um stream definido como New image (Nova imagem). Para saber mais, consulte Como habilitar um stream.

Domínio do OpenSearch Service O destino dos dados depois que a função do Lambda os processa. Para mais informações, consulte Criação de domínios do OpenSearch Service.
Função do IAM

Essa função deve ter permissões básicas de execução do OpenSearch Service, DynamoDB e Lambda, como as seguintes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

A função deve ter a seguinte relação de confiança:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM.

Criar a função do Lambda

Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado ddb-to-opensearch e use o seguinte código para sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Edite as variáveis de region e host.

Caso ainda não tenha feito, instale o pip. Em seguida, use os seguintes comandos para instalar as dependências:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:

  • Table (Tabela): a tabela do DynamoDB

  • Batch size (Tamanho do lote): 100

  • Starting position (Posição inicial): redução horizontal

Para saber mais, consulte Processar novos itens com o DynamoDB Streams e o Lambda no Guia do desenvolvedor do Amazon DynamoDB.

A esta altura, você tem um conjunto completo de recursos: uma tabela do DynamoDB para os dados de origem, um fluxo do DynamoDB de alterações feitas na tabela, uma função que será executada depois que os dados de origem forem alterados e indexará essas alterações e um domínio do OpenSearch Service para pesquisa e visualização.

Testar a função do Lambda

Depois de criar a função, você poderá testá-la adicionando um novo item à tabela do DynamoDB usando a AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Em seguida, use o console do OpenSearch Service ou o OpenSearch Dashboards para verificar se o lambda-index contém um documento. Você também pode usar a seguinte solicitação:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Carregamento dados de transmissão do Amazon Kinesis Data Firehose

O Kinesis Data Firehose oferece suporte ao OpenSearch Service como um destino de entrega. Para obter instruções de como carregar dados de transmissão no OpenSearch Service, consulte Como criar um fluxo de entrega do Kinesis Data Firehose e Escolher o Amazon ES como destino no Guia do desenvolvedor do Amazon Kinesis Data Firehose.

Antes de carregar dados no OpenSearch Service, talvez você precise realizar transformações nos dados. Para saber mais sobre como usar funções do Lambda para executar essa tarefa, consulte Transformação de dados do Amazon Kinesis Data Firehose no mesmo guia.

Ao configurar um fluxo de entrega, o Kinesis Data Firehose oferece uma função do IAM de "um único clique" que concede a ele o acesso ao recurso necessário para enviar dados para o OpenSearch Service, fazer backup de dados no Amazon S3 e transformar dados usando o Lambda. Em virtude da complexidade envolvida na criação manual de uma função como essa, é recomendável usar a função fornecida.

Carregamento de dados de transmissão do Amazon CloudWatch

Você pode carregar dados de transmissão do CloudWatch Logs para o domínio do OpenSearch Service usando uma assinatura do CloudWatch Logs. Para obter mais informações sobre assinaturas do Amazon CloudWatch, consulte Processamento em tempo real dos dados de log com assinaturas. Para obter informações de configuração, consulte Fazer uma transmissão de dados do CloudWatch Logs para o Amazon OpenSearch Service no Guia do desenvolvedor do Amazon CloudWatch.

Carregamento de dados de transmissão do AWS IoT

Você pode enviar dados da AWS IoT usando regras. Para saber mais, consulte a ação OpenSearch no Guia do desenvolvedor do AWS IoT.