As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Carregando dados de streaming no Amazon OpenSearch Service
Você pode usar o OpenSearch Inestion para carregar diretamente dados de streaming
Você ainda pode usar outras fontes para carregar dados de streaming, como Amazon Data Firehose e Amazon CloudWatch Logs, que têm suporte integrado para OpenSearch o Service. Outras, como Amazon S3, Amazon Kinesis Data Streams e Amazon DynamoDB, usam funções do AWS Lambda como manipuladores de eventos. As funções do Lambda respondem a novos dados processando e transmitindo-os para seu domínio.
nota
O Lambda oferece suporte a várias linguagens de programação populares e está disponível na maioria das Regiões da AWS. Para obter mais informações, consulte Conceitos básicos do Lambda na AWS Lambda Guia do desenvolvedor e AWS Endpoints de serviço na Referência geral da AWS.
Tópicos
- Carregando dados de streaming do OpenSearch Ingestion
- Carregamento de dados de transmissão do Amazon S3
- Carregamento dados de transmissão do Amazon Kinesis Data Streams
- Carregamento de dados de transmissão do Amazon DynamoDB
- Carregamento de dados de streaming do Amazon Data Firehose
- Carregando dados de streaming da Amazon CloudWatch
- Carregamento de dados de transmissão do AWS IoT
Carregando dados de streaming do OpenSearch Ingestion
Você pode usar o Amazon OpenSearch Ingestion para carregar dados em um domínio OpenSearch de serviço. Você configura seus produtores de dados para enviar dados para OpenSearch Ingestão, e ele entrega automaticamente os dados para a coleção que você especificar. Você também pode configurar a OpenSearch ingestão para transformar seus dados antes de entregá-los. Para ter mais informações, consulte OpenSearch Ingestão da Amazon.
Carregamento de dados de transmissão do Amazon S3
Você pode usar o Lambda para enviar dados para seu domínio de OpenSearch serviço do Amazon S3. Os novos dados recebidos em um bucket do S3 acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.
Esse método de streaming de dados é extremamente flexível. Você pode indexar metadados de objeto
Pré-requisitos
Para continuar, você deve ter os recursos a seguir.
Pré-requisito | Descrição |
---|---|
Bucket do Amazon S3. | Para obter mais informações, consulte Criar seu primeiro bucket do S3 no Manual do usuário do Amazon Simple Storage Service. O bucket deve residir na mesma região do seu domínio OpenSearch de serviço. |
OpenSearch Domínio do serviço | O destino dos dados depois que a função do Lambda os processa. Para ter mais informações, consulte Criação OpenSearch de domínios de serviço. |
Criar o pacote de implantação do Lambda
Os pacotes de implantação são arquivos ZIP ou JAR que contêm o código e as dependências. Esta seção inclui código de exemplo Python. Para outras linguagens de programação, consulte Pacotes de implantação do Lambda no Guia do desenvolvedor doAWS Lambda .
-
Crie um diretório. Neste exemplo, usamos o nome
s3-to-opensearch
. -
Crie um arquivo no diretório chamado
sample.py
:import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)
Edite as variáveis de
region
ehost
. -
Se ainda não o fez, instale o pip
. Em seguida, instale as dependências em um novo diretório package
:cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Como todos os ambientes de execução do Lambda têm o Boto3
instalado, você não precisa incluí-lo no pacote de implantação. -
Empacote o código do aplicativo e as dependências:
cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py
Criar a função do Lambda
Depois de criar o pacote de implantação, você poderá criar a função do Lambda. Ao criar uma função, escolha um nome, o tempo de execução (por exemplo, Python 3.8) e a função do IAM. A função do IAM define as permissões para a função. Para obter instruções detalhadas, consulte Criar uma função Lambda com o console no Guia do desenvolvedor doAWS Lambda .
Esse exemplo pressupõe que você está usando o console. Escolha Python 3.9 e uma função que tenha permissões de leitura do S3 e permissões de gravação do OpenSearch Serviço, conforme mostrado na captura de tela a seguir:
Depois de criar a função, você deverá adicionar um gatilho. Neste exemplo, queremos que o código seja executado sempre que um arquivo de log chegue em um bucket do S3:
-
Escolha Adicionar acionador e selecione S3.
-
Escolha o bucket.
-
Em Tipo de evento, selecione PUT.
-
Em Prefixo, digite
logs/
. -
Em Sufixo, digite
.log
. -
Confirme o aviso de invocação recursiva e escolha Adicionar.
Por fim, você pode carregar o pacote de implantação:
-
Escolha Carregar de e arquivo .zip e siga os avisos para carregar do pacote de implantação.
-
Depois que o carregamento terminar, edite as Configurações do tempo de execução e altere o Manipulador para
sample.handler
. Essa configuração informa ao Lambda o arquivo (sample.py
) e o método (handler
) que deverão ser executados depois de um acionador.
Nesse ponto, você tem um conjunto completo de recursos: um bucket para arquivos de log, uma função que é executada sempre que um arquivo de log é adicionado ao bucket, código que executa a análise e a indexação e um domínio de OpenSearch serviço para pesquisa e visualização.
Teste da função do Lambda
Após criar a função, você poderá testá-la carregando de um arquivo no bucket do Amazon S3. Crie um arquivo chamado sample.log
usando as seguintes linhas de log de exemplo:
12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"
Carregue o arquivo na pasta logs
do bucket do S3. Para obter instruções, consulte Carregar um objeto para o seu bucket no Manual do usuário do Amazon Simple Storage Service.
Em seguida, use o console de OpenSearch serviço ou os OpenSearch painéis para verificar se o lambda-s3-index
índice contém dois documentos. Você também pode fazer uma solicitação de pesquisa padrão:
GET https://domain-name
/lambda-s3-index/_search?pretty
{
"hits" : {
"total" : 2,
"max_score" : 1.0,
"hits" : [
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vTYXaWIBJWV_TTkEuSDg",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.91",
"message" : "GET /some-file.jpg",
"timestamp" : "10/Oct/2000:14:56:14 -0700"
}
},
{
"_index" : "lambda-s3-index",
"_type" : "_doc",
"_id" : "vjYmaWIBJWV_TTkEuCAB",
"_score" : 1.0,
"_source" : {
"ip" : "12.345.678.90",
"message" : "PUT /some-file.jpg",
"timestamp" : "10/Oct/2000:13:55:36 -0700"
}
}
]
}
}
Carregamento dados de transmissão do Amazon Kinesis Data Streams
Você pode carregar dados de streaming do Kinesis Data OpenSearch Streams para o Service. Os novos dados recebidos no fluxo de dados acionam uma notificação de evento para o Lambda, o qual executa seu código personalizado para realizar a indexação. Esta seção inclui um código de exemplo Python simples.
Pré-requisitos
Para continuar, você deve ter os recursos a seguir.
Pré-requisito | Descrição |
---|---|
Amazon Kinesis Data Streams | A fonte do evento para a função do Lambda. Para saber mais, consulte Kinesis Data Streams. |
OpenSearch Domínio do serviço | O destino dos dados depois que a função do Lambda os processa. Para obter mais informações, consulte Criação OpenSearch de domínios de serviço. |
Perfil do IAM |
Essa função deve ter permissões básicas OpenSearch de Service, Kinesis e Lambda, como as seguintes:
A função deve ter a seguinte relação de confiança:
Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM. |
Criar a função do Lambda
Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado kinesis-to-opensearch
e use o seguinte código para sample.py
:
import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'
Edite as variáveis de region
e host
.
Caso ainda não tenha feito, instale o pip
cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:
-
Fluxo do Kinesis: o fluxo do Kinesis
-
Tamanho do lote: 100
-
Posição inicial: redução horizontal
Para saber mais, consulte O que é o Amazon Kinesis Data Streams? no Guia do desenvolvedor do Amazon Kinesis Data Streams.
Nesse ponto, você tem um conjunto completo de recursos: um stream de dados do Kinesis, uma função que é executada depois que o stream recebe novos dados e indexa esses dados, e um domínio de OpenSearch serviço para pesquisa e visualização.
Testar a função do Lambda
Depois de criar a função, você poderá testá-la adicionando um novo registro ao streaming de dados usando a AWS CLI:
aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region
us-west-1
Em seguida, use o console de OpenSearch serviço ou os OpenSearch painéis para verificar se lambda-kine-index
contém um documento. Você também pode usar a seguinte solicitação:
GET https://domain-name
/lambda-kine-index/_search
{
"hits" : [
{
"_index": "lambda-kine-index",
"_type": "_doc",
"_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042",
"_score": 1,
"_source": {
"timestamp": 1523648740.051,
"message": "My test data.",
"id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042"
}
}
]
}
Carregamento de dados de transmissão do Amazon DynamoDB
Você pode usar AWS Lambda para enviar dados para seu domínio de OpenSearch serviço do Amazon DynamoDB. Os novos dados recebidos na tabela do banco de dados acionam uma notificação de evento para o Lambda, que executa seu código personalizado para realizar a indexação.
Pré-requisitos
Para continuar, você deve ter os recursos a seguir.
Pré-requisito | Descrição |
---|---|
Tabela do DynamoDB | A tabela contém os dados de origem. Para obter mais informações, consulte Operações básicas nas tabelas do DynamoDB no Guia do desenvolvedor do Amazon DynamoDB. A tabela deve residir na mesma região do seu domínio OpenSearch de serviço e ter um stream definido como Nova imagem. Para saber mais, consulte Como habilitar um stream. |
OpenSearch Domínio do serviço | O destino dos dados depois que a função do Lambda os processa. Para ter mais informações, consulte Criação OpenSearch de domínios de serviço. |
IAM role (Perfil do IAM) | Essa função deve ter permissões básicas OpenSearch de execução de Service, DynamoDB e Lambda, como as seguintes:
A função deve ter a seguinte relação de confiança:
Para saber mais, consulte Criação de funções do IAM no Manual do usuário do IAM. |
Criar a função do Lambda
Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado ddb-to-opensearch
e use o seguinte código para sample.py
:
import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'
Edite as variáveis de region
e host
.
Caso ainda não tenha feito, instale o pip
cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth
Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:
-
Tabela: a tabela do DynamoDB
-
Tamanho do lote: 100
-
Posição inicial: redução horizontal
Para saber mais, consulte Processar novos itens com o DynamoDB Streams e o Lambda no Guia do desenvolvedor do Amazon DynamoDB.
Neste momento, você tem um conjunto completo de recursos: uma tabela do DynamoDB para seus dados de origem, um stream de alterações na tabela do DynamoDB, uma função que é executada após a alteração dos dados de origem e indexa essas alterações e um domínio de serviço para pesquisa e visualização. OpenSearch
Testar a função do Lambda
Depois de criar a função, você poderá testá-la adicionando um novo item à tabela do DynamoDB usando a AWS CLI:
aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region
us-west-1
Em seguida, use o console de OpenSearch serviço ou os OpenSearch painéis para verificar se lambda-index
contém um documento. Você também pode usar a seguinte solicitação:
GET https://domain-name
/lambda-index/_doc/00001
{
"_index": "lambda-index",
"_type": "_doc",
"_id": "00001",
"_version": 1,
"found": true,
"_source": {
"director": {
"S": "Kevin Costner"
},
"id": {
"S": "00001"
},
"title": {
"S": "The Postman"
}
}
}
Carregamento de dados de streaming do Amazon Data Firehose
O Firehose oferece suporte OpenSearch ao serviço como destino de entrega. Para obter instruções sobre como carregar dados de streaming no OpenSearch Serviço, consulte Criação de um stream de entrega do Kinesis Data Firehose OpenSearch e Escolha o serviço para seu destino no Guia do desenvolvedor do Amazon Data Firehose.
Antes de carregar dados no OpenSearch Serviço, talvez seja necessário realizar transformações nos dados. Para saber mais sobre como usar funções do Lambda para executar essa tarefa, consulte Transformação de dados do Amazon Kinesis Data Firehose no mesmo guia.
Ao configurar um stream de entrega, o Firehose apresenta uma função IAM de “um clique” que fornece o acesso aos recursos necessários para enviar dados ao OpenSearch Serviço, fazer backup de dados no Amazon S3 e transformar dados usando o Lambda. Em virtude da complexidade envolvida na criação manual de uma função como essa, é recomendável usar a função fornecida.
Carregando dados de streaming da Amazon CloudWatch
Você pode carregar dados de streaming do CloudWatch Logs para seu domínio do OpenSearch Serviço usando uma assinatura do CloudWatch Logs. Para obter informações sobre as CloudWatch assinaturas da Amazon, consulte Processamento em tempo real de dados de log com assinaturas. Para obter informações de configuração, consulte Streaming de dados de CloudWatch registros para o Amazon OpenSearch Service no Amazon CloudWatch Developer Guide.
Carregamento de dados de transmissão do AWS IoT
Você pode enviar dados AWS IoT usando regras. Para saber mais, consulte a OpenSearchação no Guia doAWS IoT desenvolvedor.