As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
Siga as instruções no Criar o pacote de implantação do Lambda, mas crie um diretório chamado kinesis-to-opensearch
e use o seguinte código para sample.py
:
import base64
import boto3
import json
import requests
from requests_aws4auth import AWS4Auth
region = '' # e.g. us-west-1
service = 'es'
credentials = boto3.Session().get_credentials()
awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token)
host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com
index = 'lambda-kine-index'
datatype = '_doc'
url = host + '/' + index + '/' + datatype + '/'
headers = { "Content-Type": "application/json" }
def handler(event, context):
count = 0
for record in event['Records']:
id = record['eventID']
timestamp = record['kinesis']['approximateArrivalTimestamp']
# Kinesis data is base64-encoded, so decode here
message = base64.b64decode(record['kinesis']['data'])
# Create the JSON document
document = { "id": id, "timestamp": timestamp, "message": message }
# Index the document
r = requests.put(url + id, auth=awsauth, json=document, headers=headers)
count += 1
return 'Processed ' + str(count) + ' items.'
Edite as variáveis de region
e host
.
Caso ainda não tenha feito, instale o pip
cd kinesis-to-opensearch
pip install --target ./package requests
pip install --target ./package requests_aws4auth
Depois siga as instruções em Criar a função do Lambda, mas especifique a função do IAM por Pré-requisitos e as seguintes configurações do gatilho:
-
Fluxo do Kinesis: o fluxo do Kinesis
-
Tamanho do lote: 100
-
Posição inicial: redução horizontal
Para saber mais, consulte O que é o Amazon Kinesis Data Streams? no Guia do desenvolvedor do Amazon Kinesis Data Streams.
Nesse ponto, você tem um conjunto completo de recursos: um stream de dados do Kinesis, uma função que é executada depois que o stream recebe novos dados e indexa esses dados, e um domínio de OpenSearch serviço para pesquisa e visualização.