Carregamento de dados de streaming no Amazon Elasticsearch Service - Amazon Elasticsearch Service

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Carregamento de dados de streaming no Amazon Elasticsearch Service

Você pode carregar dados em streaming no domínio do Amazon Elasticsearch Service de muitas fontes diferentes. Algumas origens, como o Amazon Kinesis Data Firehose e o Amazon CloudWatch Logs, têm suporte interno para o Amazon ES. Outras, como o Amazon S3, o Amazon Kinesis Data Streams e o Amazon DynamoDB, usam funções do AWS Lambda como manipuladores de eventos. As funções Lambda respondem a novos dados processando e transmitindo-os para seu domínio.

nota

O Lambda dá suporte a várias linguagens de programação populares e está disponível na maioria das regiões da AWS. Para obter mais informações, consulte Como criar funções do Lambda no AWS Lambda Developer Guide e Regiões do AWS Lambda no AWS General Reference.

Carregamento de dados em streaming no Amazon ES por meio do Amazon S3

Você pode usar o Lambda a fim de enviar dados para o domínio do Amazon ES pelo Amazon S3. Os novos dados recebidos em um bucket do S3 acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.

Esse método de streaming de dados é extremamente flexível. Você pode indexar metadados de objeto, ou se o objeto for texto simples, analisar e indexar alguns elementos do corpo do objeto. Esta seção inclui alguns códigos de exemplo Python simples que usam expressões regulares para analisar um arquivo de log e indexar as correspondências.

dica

Para obter um código mais robusto em Node.js, consulte amazon-elasticsearch-lambda-samples no GitHub. Alguns esquemas do Lambda também contêm exemplos de análise úteis.

Prerequisites

Para continuar, você deve ter os recursos a seguir.

Pré-requisito: Description (Descrição)
Bucket do Amazon S3 Para obter mais informações, consulte Como criar um bucket no Guia de conceitos básicos do Amazon Simple Storage Service. O bucket deve residir na mesma região do domínio do Amazon ES.
Domínio do Amazon ES O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação de domínios do Amazon ES.

Criar o pacote de implantação do Lambda

Os pacotes de implantação são arquivos ZIP ou JAR que contêm o código e as dependências. Esta seção inclui código de exemplo Python. Para outras linguagens de programação, consulte Criar um pacote de implantação no AWS Lambda Developer Guide.

  1. Crie um diretório. Neste exemplo, usamos o nome s3-to-es.

  2. Crie um arquivo no diretório chamado sample.py:

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = 'lambda-type' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    Edite as variáveis de region e host.

  3. Instale as dependências.

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    Como todos os ambientes de execução do Lambda têm Boto3 instalado, você não precisa incluí-lo no pacote de implantação.

    dica

    Se você usa macOS, esses comandos talvez não funcionem corretamente. Como alternativa, adicione um arquivo chamado setup.cfg ao diretório s3-to-es:

    [install] prefix=
  4. Empacote o código do aplicativo e as dependências:

    zip -r lambda.zip *

Criar a função Lambda

Depois de criar o pacote de implantação, você poderá criar a função Lambda. Ao criar uma função, escolha um nome, o tempo de execução (por exemplo, Python 2.7) e a função do IAM. A função do IAM define as permissões para a função. Para obter instruções detalhadas, consulte Criar uma função Lambda simples no AWS Lambda Developer Guide.

Esse exemplo pressupõe que você esteja usando o console. Escolha Python 2.7 e uma função que tenha permissões de leitura do S3 e permissões de gravação do Amazon ES, conforme mostrado na captura de tela a seguir.


                    Configuração da amostra para um Lambda função

Depois de criar a função, você deverá adicionar um gatilho. Para este exemplo, queremos que o código seja executado sempre que um ficheiro de registo chegar num balde S3:

  1. Selecione o S3.

  2. Escolha o bucket.

  3. Em Event type (Tipo de evento), selecione PUT.

  4. Em Prefix (Prefixo), digite logs/.

  5. Em Filter pattern (Padrão de filtro), digite .log.

  6. Selecione Enable trigger (Habilitar gatilho).

  7. Escolha Adicionar.

Por fim, você pode fazer upload do pacote de implantação:

  1. Para Processador, tipo sample.handler. Esta definição diz Lambda ao ficheiro (sample.py) e método (handler) que deve ser executado após um acionador.

  2. Em Code entry type (Tipo de entrada de código), escolha Upload a .ZIP file (Fazer upload de um arquivo .ZIP) e siga os avisos para fazer upload do pacote de implantação.

  3. Selecione Save (Salvar).

Neste ponto, tem um conjunto completo de recursos: um balde para ficheiros de registo, uma função que funciona sempre que um ficheiro de registo é adicionado ao balde, código que realiza a análise e indexação e um Amazon ES domínio para pesquisa e visualização.

Testar a função do Lambda

Depois de criar a função, você poderá testá-la fazendo upload de um arquivo no bucket do Amazon S3. Crie um arquivo chamado sample.log usando as seguintes linhas de log de exemplo:

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

Faça upload do arquivo na pasta logs do bucket do S3. Para obter instruções, consulte Adicionar um objeto a um bucket no Guia de conceitos básicos do Amazon Simple Storage Service.

Em seguida, use o console do Amazon ES ou o Kibana para verificar se o índice lambda-s3-index contém dois documentos. Você também pode fazer uma solicitação de pesquisa padrão:

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Carregamento de dados em streaming no Amazon ES por meio do Amazon Kinesis Data Streams

Você pode carregar dados em streaming do Kinesis Data Streams no Amazon ES. Os novos dados recebidos no streaming de dados acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação. Esta seção inclui um código de exemplo Python simples. Para obter um código mais robusto em Node.js, consulte amazon-elasticsearch-lambda-samples no GitHub.

Prerequisites

Para continuar, você deve ter os recursos a seguir.

Pré-requisito: Description (Descrição)
Streaming de dados do Amazon Kinesis Origem de evento para a função Lambda. Para saber mais, consulte Streamings de dados do Kinesis.
Domínio do Amazon ES O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação de domínios do Amazon ES.
Função do IAM

Essa função deve ter as permissões básicas do Amazon ES, do Kinesis e do Lambda, como as seguintes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

A função deve ter a seguinte relação de confiança:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para saber mais, consulte Como criar funções do IAM no Guia do usuário do IAM.

Criar a função Lambda

Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado kinesis-to-es e use o seguinte código para sample.py:

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = 'lambda-kine-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

Edite as variáveis de region e host.

Use os seguintes comandos para instalar as dependências:

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

Depois siga as instruções em Criar a função Lambda, mas especifique a função do IAM por Prerequisites e as seguintes configurações do gatilho:

  • Kinesis Stream: o stream do Kinesis

  • Tamanho do lote -100

  • Posição inicial: Horizonte de corte

Para saber mais, consulte Como trabalhar com streamings de dados do Amazon Kinesis no Guia do desenvolvedor do Amazon Kinesis Data Streams.

Neste momento, tem um conjunto completo de recursos: a Kinesis fluxo de dados, uma função que funciona depois de o fluxo receber novos dados e índices esses dados, e um Amazon ES domínio para pesquisa e visualização.

Testar a função do Lambda

Depois de criar a função, você poderá testá-la adicionando um novo registro ao streaming de dados usando a AWS CLI:

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

Em seguida, use o console do Amazon ES ou o Kibana para verificar se lambda-kine-index contém um documento. Você também pode usar a seguinte solicitação:

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Carregamento de dados em streaming no Amazon ES por meio do Amazon DynamoDB

Você pode usar o AWS Lambda a fim de enviar dados para o domínio do Amazon ES pelo Amazon DynamoDB. Os novos dados recebidos na tabela do banco de dados acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.

Prerequisites

Para continuar, você deve ter os recursos a seguir.

Pré-requisito: Description (Descrição)
Tabela do DynamoDB

A tabela contém os dados de origem. Para obter mais informações, consulte Operações básicas para tabelas no Guia do desenvolvedor do Amazon DynamoDB.

A tabela deve residir na mesma região que o domínio do Amazon ES e ter um stream definido como New image (Nova imagem). Para saber mais, consulte Como habilitar um stream.

Domínio do Amazon ES O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação de domínios do Amazon ES.
Função do IAM

Essa função deve ter as permissões básicas de execução do Amazon ES, do DynamoDB e do Lambda, como as seguintes:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

A função deve ter a seguinte relação de confiança:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

Para saber mais, consulte Como criar funções do IAM no Guia do usuário do IAM.

Criar a função Lambda

Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado ddb-to-es e use o seguinte código para sample.py:

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

Edite as variáveis de region e host.

Use os seguintes comandos para instalar as dependências:

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

Depois siga as instruções em Criar a função Lambda, mas especifique a função do IAM por Prerequisites e as seguintes configurações do gatilho:

  • Table (Tabela): a tabela do DynamoDB

  • Tamanho do lote -100

  • Posição inicial: Horizonte de corte

Para saber mais, consulte Processar novos itens em uma tabela do DynamoDB no Guia do desenvolvedor do Amazon DynamoDB.

Neste momento, tem um conjunto completo de recursos: a DynamoDB tabela para os seus dados de origem, um DynamoDB fluxo de alterações à tabela, uma função que funciona depois de os dados originais serem alterados e índices dessas alterações, e um Amazon ES domínio para pesquisa e visualização.

Testar a função do Lambda

Depois de criar a função, você poderá testá-la adicionando um novo item à tabela do DynamoDB usando a AWS CLI:

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

Em seguida, use o console do Amazon ES ou o Kibana para verificar se lambda-index contém um documento. Você também pode usar a seguinte solicitação:

GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Carregamento de dados em streaming no Amazon ES por meio do Amazon Kinesis Data Firehose

O Kinesis Data Firehose é compatível com o Amazon ES como um destino de entrega. Para obter instruções sobre como carregar streaming de dados no Amazon ES, consulte Como criar um fluxo de entrega do Kinesis Data Firehose e Escolher o Amazon ES para o destino no Guia do desenvolvedor do Amazon Kinesis Data Firehose.

Para carregar dados no Amazon ES, talvez você precise realizar transformações nos dados. Para saber mais sobre como usar funções do Lambda para executar essa tarefa, consulte Transformação de dados no mesmo guia.

Ao configurar um fluxo de entrega, o Kinesis Data Firehose oferece a função do IAM de "um único clique" que lhe oferece acesso ao recurso do qual necessita para enviar dados para o Amazon ES, fazer backup de dados no Amazon S3 e transformar dados usando o Lambda. Em virtude da complexidade envolvida na criação manual de uma função como essa, é recomendável usar a função fornecida.

Carregamento de dados em streaming no Amazon ES por meio do Amazon CloudWatch

Você pode carregar dados de streaming para do CloudWatch Logs para o domínio do Amazon ES usando uma assinatura do CloudWatch Logs. Para obter mais informações sobre assinaturas do Amazon CloudWatch, consulte Processamento em tempo real dos dados de log com assinaturas. Para obter informações de configuração, consulte Streaming CloudWatch Logs Data to Amazon Elasticsearch Service no Amazon CloudWatch Developer Guide.

Carregamento de dados no Amazon ES do AWS IoT

Você pode enviar dados da AWS IoT usando regras. Para saber mais, consulte Ações do Amazon ES no Guia do desenvolvedor do AWS IoT.