Carregando dados de streaming no Amazon OpenSearch Service - OpenSearch Serviço Amazon

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Carregando dados de streaming no Amazon OpenSearch Service

Você pode usar o OpenSearch Inestion para carregar diretamente dados de streaming em seu domínio do Amazon OpenSearch Service, sem precisar usar soluções de terceiros. Para enviar dados para o OpenSearch Ingestion, você configura seus produtores de dados e o serviço entrega automaticamente os dados ao domínio ou à coleção que você especificar. Para começar a usar o OpenSearch Ingestion, consulteTutorial: Ingestão de dados em uma coleção usando o Amazon OpenSearch Ingestion.

Você ainda pode usar outras fontes para carregar dados de streaming, como Amazon Data Firehose e Amazon CloudWatch Logs, que têm suporte integrado para OpenSearch o Service. Outras, como Amazon S3, Amazon Kinesis Data Streams e Amazon DynamoDB, usam funções do AWS Lambda como manipuladores de eventos. As funções do Lambda respondem a novos dados processando e transmitindo-os para seu domínio.

nota

O Lambda oferece suporte a várias linguagens de programação populares e está disponível na maioria das Regiões da AWS. Para obter mais informações, consulte Conceitos básicos do Lambda na AWS Lambda Guia do desenvolvedor e AWS Endpoints de serviço na Referência geral da AWS.

Carregando dados de streaming do OpenSearch Ingestion

Você pode usar o Amazon OpenSearch Ingestion para carregar dados em um domínio OpenSearch de serviço. Você configura seus produtores de dados para enviar dados para OpenSearch Ingestão, e ele entrega automaticamente os dados para a coleção que você especificar. Você também pode configurar a OpenSearch ingestão para transformar seus dados antes de entregá-los. Para ter mais informações, consulte OpenSearch Ingestão da Amazon.

Carregamento de dados de transmissão do Amazon S3

Você pode usar o Lambda para enviar dados para seu domínio de OpenSearch serviço do Amazon S3. Os novos dados recebidos em um bucket do S3 acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.

Esse método de streaming de dados é extremamente flexível. Você pode indexar metadados de objeto, ou se o objeto for texto simples, analisar e indexar alguns elementos do corpo do objeto. Esta seção inclui alguns códigos de exemplo Python simples que usam expressões regulares para analisar um arquivo de log e indexar as correspondências.

Pré-requisitos

Para continuar, você deve ter os recursos a seguir.

Pré-requisito Descrição
Bucket do Amazon S3. Para obter mais informações, consulte Criar seu primeiro bucket do S3 no Manual do usuário do Amazon Simple Storage Service. O bucket deve residir na mesma região do seu domínio OpenSearch de serviço.
OpenSearch Domínio do serviço O destino dos dados depois que a função do Lambda os processa. Para ter mais informações, consulte Criação OpenSearch de domínios de serviço.

Criar o pacote de implantação do Lambda

Os pacotes de implantação são arquivos ZIP ou JAR que contêm o código e as dependências. Esta seção inclui código de exemplo Python. Para outras linguagens de programação, consulte Pacotes de implantação do Lambda no Guia do desenvolvedor doAWS Lambda .

  1. Crie um diretório. Neste exemplo, usamos o nome s3-to-opensearch.

  2. Crie um arquivo no diretório chamado sample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Edite as variáveis de region e host.

  3. Se ainda não o fez, instale o pip. Em seguida, instale as dependências em um novo diretório package:

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    Como todos os ambientes de execução do Lambda têm o Boto3 instalado, você não precisa incluí-lo no pacote de implantação.

  4. Empacote o código do aplicativo e as dependências:

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Criar a função do Lambda

Depois de criar o pacote de implantação, você poderá criar a função do Lambda. Ao criar uma função, escolha um nome, o tempo de execução (por exemplo, Python 3.8) e a função do IAM. A função do IAM define as permissões para a função. Para obter instruções detalhadas, consulte Criar uma função Lambda com o console no Guia do desenvolvedor doAWS Lambda .

Esse exemplo pressupõe que você está usando o console. Escolha Python 3.9 e uma função que tenha permissões de leitura do S3 e permissões de gravação do OpenSearch Serviço, conforme mostrado na captura de tela a seguir:


                    Configuração de exemplo de uma função do Lambda

Depois de criar a função, você deverá adicionar um gatilho. Neste exemplo, queremos que o código seja executado sempre que um arquivo de log chegue em um bucket do S3:

  1. Escolha Adicionar acionador e selecione S3.

  2. Escolha o bucket.

  3. Em Tipo de evento, selecione PUT.

  4. Em Prefixo, digite logs/.

  5. Em Sufixo, digite .log.

  6. Confirme o aviso de invocação recursiva e escolha Adicionar.

Por fim, você pode carregar o pacote de implantação:

  1. Escolha Carregar de e arquivo .zip e siga os avisos para carregar do pacote de implantação.

  2. Depois que o carregamento terminar, edite as Configurações do tempo de execução e altere o Manipulador para sample.handler. Essa configuração informa ao Lambda o arquivo (sample.py) e o método (handler) que deverão ser executados depois de um acionador.

Nesse ponto, você tem um conjunto completo de recursos: um bucket para arquivos de log, uma função que é executada sempre que um arquivo de log é adicionado ao bucket, código que executa a análise e a indexação e um domínio de OpenSearch serviço para pesquisa e visualização.

Teste da função do Lambda

Após criar a função, você poderá testá-la carregando de um arquivo no bucket do Amazon S3. Crie um arquivo chamado sample.log usando as seguintes linhas de log de exemplo:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Carregue o arquivo na pasta logs do bucket do S3. Para obter instruções, consulte Carregar um objeto para o seu bucket no Manual do usuário do Amazon Simple Storage Service.

Em seguida, use o console de OpenSearch serviço ou os OpenSearch painéis para verificar se o lambda-s3-index índice contém dois documentos. Você também pode fazer uma solicitação de pesquisa padrão:

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Carregamento dados de transmissão do Amazon Kinesis Data Streams

Você pode carregar dados de streaming do Kinesis Data OpenSearch Streams para o Service. Os novos dados recebidos no fluxo de dados acionam uma notificação de evento para o Lambda, o qual executa seu código personalizado para realizar a indexação. Esta seção inclui um código de exemplo Python simples.

Pré-requisitos

Para continuar, você deve ter os recursos a seguir.

Pré-requisito Descrição
Amazon Kinesis Data Streams A fonte do evento para a função do Lambda. Para saber mais, consulte Kinesis Data Streams.
OpenSearch Domínio do serviço O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação OpenSearch de domínios de serviço.
Perfil do IAM

Essa função deve ter permissões básicas OpenSearch de Service, Kinesis e Lambda, como as seguintes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

A função deve ter a seguinte relação de confiança:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM.

Criar a função do Lambda

Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado kinesis-to-opensearch e use o seguinte código para sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Edite as variáveis de region e host.

Caso ainda não tenha feito, instale o pip. Em seguida, use os seguintes comandos para instalar as dependências:

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:

  • Fluxo do Kinesis: o fluxo do Kinesis

  • Tamanho do lote: 100

  • Posição inicial: redução horizontal

Para saber mais, consulte O que é o Amazon Kinesis Data Streams? no Guia do desenvolvedor do Amazon Kinesis Data Streams.

Nesse ponto, você tem um conjunto completo de recursos: um stream de dados do Kinesis, uma função que é executada depois que o stream recebe novos dados e indexa esses dados, e um domínio de OpenSearch serviço para pesquisa e visualização.

Testar a função do Lambda

Depois de criar a função, você poderá testá-la adicionando um novo registro ao streaming de dados usando a AWS CLI:

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

Em seguida, use o console de OpenSearch serviço ou os OpenSearch painéis para verificar se lambda-kine-index contém um documento. Você também pode usar a seguinte solicitação:

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Carregamento de dados de transmissão do Amazon DynamoDB

Você pode usar AWS Lambda para enviar dados para seu domínio de OpenSearch serviço do Amazon DynamoDB. Os novos dados recebidos na tabela do banco de dados acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.

Pré-requisitos

Para continuar, você deve ter os recursos a seguir.

Pré-requisito Descrição
Tabela do DynamoDB

A tabela contém os dados de origem. Para obter mais informações, consulte Operações básicas nas tabelas do DynamoDB no Guia do desenvolvedor do Amazon DynamoDB.

A tabela deve residir na mesma região do seu domínio OpenSearch de serviço e ter um stream definido como Nova imagem. Para saber mais, consulte Como habilitar um stream.

OpenSearch Domínio do serviço O destino dos dados depois que a função do Lambda os processa. Para ter mais informações, consulte Criação OpenSearch de domínios de serviço.
IAM role (Perfil do IAM)

Essa função deve ter permissões básicas OpenSearch de execução de Service, DynamoDB e Lambda, como as seguintes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

A função deve ter a seguinte relação de confiança:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM.

Criar a função do Lambda

Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado ddb-to-opensearch e use o seguinte código para sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Edite as variáveis de region e host.

Caso ainda não tenha feito, instale o pip. Em seguida, use os seguintes comandos para instalar as dependências:

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:

  • Tabela: a tabela do DynamoDB

  • Tamanho do lote: 100

  • Posição inicial: redução horizontal

Para saber mais, consulte Processar novos itens com o DynamoDB Streams e o Lambda no Guia do desenvolvedor do Amazon DynamoDB.

Neste momento, você tem um conjunto completo de recursos: uma tabela do DynamoDB para seus dados de origem, um stream de alterações na tabela do DynamoDB, uma função que é executada após a alteração dos dados de origem e indexa essas alterações e um domínio de serviço para pesquisa e visualização. OpenSearch

Testar a função do Lambda

Depois de criar a função, você poderá testá-la adicionando um novo item à tabela do DynamoDB usando a AWS CLI:

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Em seguida, use o console de OpenSearch serviço ou os OpenSearch painéis para verificar se lambda-index contém um documento. Você também pode usar a seguinte solicitação:

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Carregamento de dados de streaming do Amazon Data Firehose

O Firehose oferece suporte OpenSearch ao serviço como destino de entrega. Para obter instruções sobre como carregar dados de streaming no OpenSearch Serviço, consulte Criação de um stream de entrega do Kinesis Data Firehose OpenSearch e Escolha o serviço para seu destino no Guia do desenvolvedor do Amazon Data Firehose.

Antes de carregar dados no OpenSearch Serviço, talvez seja necessário realizar transformações nos dados. Para saber mais sobre como usar funções do Lambda para executar essa tarefa, consulte Transformação de dados do Amazon Kinesis Data Firehose no mesmo guia.

Ao configurar um stream de entrega, o Firehose apresenta uma função IAM de “um clique” que fornece o acesso aos recursos necessários para enviar dados ao OpenSearch Serviço, fazer backup de dados no Amazon S3 e transformar dados usando o Lambda. Em virtude da complexidade envolvida na criação manual de uma função como essa, é recomendável usar a função fornecida.

Carregando dados de streaming da Amazon CloudWatch

Você pode carregar dados de streaming do CloudWatch Logs para seu domínio do OpenSearch Serviço usando uma assinatura do CloudWatch Logs. Para obter informações sobre as CloudWatch assinaturas da Amazon, consulte Processamento em tempo real de dados de log com assinaturas. Para obter informações de configuração, consulte Streaming de dados de CloudWatch registros para o Amazon OpenSearch Service no Amazon CloudWatch Developer Guide.

Carregamento de dados de transmissão do AWS IoT

Você pode enviar dados AWS IoT usando regras. Para saber mais, consulte a OpenSearchação no Guia doAWS IoT desenvolvedor.