스트리밍 데이터를 Amazon Elasticsearch Service 로드 - Amazon Elasticsearch Service

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

스트리밍 데이터를 Amazon Elasticsearch Service 로드

로드 할 수 있습니다.데이터 스트리밍를 다양한 소스에서 Amazon Elasticsearch Service (Amazon ES) 도메인으로 전송할 수 있습니다. Amazon Kinesis Data Firehose 및 Amazon CloudWatch Logs 와 같은 일부 소스는 Amazon ES를 기본으로 지원합니다. Amazon S3, Amazon Kinesis Data Streams 및 Amazon DynamoDB 와 같은 다른 사용자는 AWS Lambda 함수를 이벤트 처리기로 사용합니다. Lambda 함수는 새 데이터를 처리한 다음 도메인으로 스트리밍하여 응답합니다.

참고

Lambda 는 다양한 주요 프로그래밍 언어를 지원하며, 대부분의 AWS 리전에서 사용할 수 있습니다. 자세한 내용은 단원을 참조하십시오.Lambda 함수 구축AWS Lambda 개발자 안내서AWS Lambda 지역AWS 일반 참조를 선택합니다.

Amazon S3 에서 스트리밍 데이터 로드

Amazon S3에서 Amazon S3 ES 도메인으로 데이터를 전송하는 데 Lambda 사용할 수 있습니다. S3 버킷에 도착한 새 데이터는 Lambda 로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다.

이러한 방식의 데이터 스트리밍은 대단히 유연합니다. 객체 메타데이터를 인덱싱할 수도 있고, 객체가 일반 텍스트라면 객체 본문의 일부 요소를 구문 분석하고 인덱싱할 수도 있습니다. 이 단원에는 정규식을 이용해 로그 파일을 구문 분석하고 매치를 인덱싱하는 단순한 Python 샘플 코드가 나와 있습니다.

작은 정보

Node.js에서 사용하는 더 강력한 코드를 알고 싶다면 GitHub의 amazon-elasticsearch-lambda-samples를 참조하십시오. 일부 Lambda Blueprint에도 유용한 구문 분석 예제가 나와 있습니다.

Prerequisites

계속하려면 먼저 다음 리소스를 확보해야 합니다.

사전 조건 설명
Amazon S3 버킷 자세한 내용은 단원을 참조하십시오.버킷 생성Amazon Simple Storage Service 시작 안내서를 선택합니다. 버킷은 Amazon ES 도메인과 동일한 리전에 있어야 합니다.
Amazon ES 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 단원을 참조하십시오.Amazon ES 도메인 생성를 선택합니다.

Lambda 배포 패키지 생성

배포 패키지는 코드와 종속 프로그램이 포함된 ZIP 또는 JAR 파일로 구성됩니다. 이 단원에는 Python 샘플 코드가 나와 있습니다. 다른 프로그래밍 언어는 단원을 참조하십시오배포 패키지 만들기AWS Lambda 개발자 안내서를 선택합니다.

  1. 디렉터리를 생성합니다. 이 샘플에서는 s3-to-es 이름을 사용합니다.

  2. sample.py라는 디렉터리에서 파일을 생성합니다.

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = 'lambda-type' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost의 변수를 편집합니다.

  3. 종속 항목을 설치합니다:

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    모든 Lambda 실행 환경에는Boto 3설치돼 있으며, 따라서 배포 패키지에 이를 포함할 필요가 없습니다.

    작은 정보

    macOS를 사용한다면 이러한 명령어가 정상적으로 작동하지 않을 수도 있습니다. 차선책으로 setup.cfg라는 파일을 s3-to-es 디렉터리에 추가하십시오.

    [install] prefix=
  4. 애플리케이션 코드와 종속 항목을 패키지화합니다.

    zip -r lambda.zip *

Lambda 함수 생성

배포 패키지를 만든 뒤에는 Lambda 함수를 생성할 수 있습니다. 함수를 만들 때는 이름, 실행 시간(예: Python 2.7)과 IAM 역할을 선택해야 합니다. IAM 역할은 함수에 대한 권한을 정의합니다. 자세한 지침은 단원을 참조하십시오.간단한 Lambda 함수 만들기AWS Lambda 개발자 안내서를 선택합니다.

이 예에서는 콘솔을 사용하는 것으로 가정합니다. 다음 스크린샷처럼 Python 2.7과 S3 읽기 권한 및 Amazon ES 쓰기 권한이 있는 역할을 선택합니다.


                    Lambda 함수의 샘플 구성

함수를 생성했으면 이제 트리거를 추가해야 합니다. 이 예에서는 로그 파일이 S3 버킷에 도착할 때마다 코드를 실행하려 합니다.

  1. S3를 선택합니다.

  2. 버킷을 선택합니다.

  3. 이벤트 유형에서 PUT을 선택합니다.

  4. 접두사에는 logs/를 입력합니다.

  5. 필터 패턴.log를 입력합니다.

  6. 트리거 활성화를 선택합니다.

  7. Add(추가)를 선택합니다.

마지막으로, 배포 패키지를 업로드합니다.

  1. 핸들러에서 sample.handler를 입력합니다. 이 설정은 Lambda 에게 파일 (sample.py) 및 메서드 (handler) 가 트리거 후에 실행되어야합니다.

  2. 코드 항목 유형에서 ZIP 파일 업로드를 선택하고, 지시에 따라 배포 패키지를 업로드합니다.

  3. 저장을 선택합니다.

이제 사용자는 완벽한 리소스 모음, 즉 로그 파일용 버킷, 로그 파일이 버킷에 추가될 때마다 실행되는 함수, 구문 분석과 인덱싱을 수행하는 코드, 검색과 시각화를 위한 Amazon ES 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만들었으면 이제 Amazon S3 버킷에 파일을 업로드해 함수를 테스트할 수 있습니다. 다음 샘플 로그 행을 이용해 sample.log라는 파일을 만드십시오.

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

파일을 S3 버킷의 logs 폴더에 업로드합니다. 지침은 다음을 참조하십시오.버킷에 객체 추가Amazon Simple Storage Service 시작 안내서를 선택합니다.

그런 다음 Amazon ES 콘솔 또는 Kibana를 이용해lambda-s3-index인덱스에는 두 개의 문서가 있습니다. 표준 검색 요청을 할 수도 있습니다.

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams 스트리밍 데이터 로드

Amazon ES 스트리밍 데이터를 Kinesis Data Streams 에서 Amazon ES로 로드할 수 있습니다. 데이터 스트림에 도착한 새 데이터는 Lambda 로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다. 이 단원에는 단순한 Python 샘플 코드가 있습니다. Node.js에서 사용하는 더 강력한 코드를 알고 싶다면 GitHub의 amazon-elasticsearch-lambda-samples를 참조하십시오.

Prerequisites

계속하려면 먼저 다음 리소스를 확보해야 합니다.

사전 조건 설명
Amazon Kinesis Data Streams Lambda 함수의 이벤트 소스. 자세한 내용은 다음을 참조하세요.Kinesis Data Streams를 선택합니다.
Amazon ES 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 단원을 참조하십시오.Amazon ES 도메인 생성를 선택합니다.
IAM 역할

이 역할에는 다음과 같은 기본 Amazon ES, Kinesis 및 Lambda 권한이 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

역할은 다음과 같은 신뢰 관계를 맺고 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

자세한 내용은 다음을 참조하세요.IAM 역할 생성IAM 사용 설명서를 선택합니다.

Lambda 함수 생성

Lambda 배포 패키지 생성의 지침을 따르되, kinesis-to-es라는 디렉터리를 만들고 sample.py에는 다음과 같은 코드를 사용하십시오.

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = 'lambda-kine-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost의 변수를 편집합니다.

다음과 같은 명령을 사용하면 종속 프로그램을 설치할 수 있습니다.

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

이제 Lambda 함수 생성 지침을 따르되, Prerequisites에서 IAM 역할을 지정하고 트리거에는 다음 설정을 지정하십시오.

  • Kinesis 스트림: Kinesis 스트림

  • 배치 크기: 100

  • 시작 위치: 지평선 자르기

자세한 내용은 다음을 참조하세요.Amazon Kinesis Data Streams 작업Amazon Kinesis Data Streams 개발자 안내서를 선택합니다.

이제 사용자는 완벽한 리소스 모음, 즉 Kinesis 데이터 스트림, 스트림에 새 데이터가 들어오면 실행되어 해당 데이터를 인덱싱하는 함수, 검색과 시각화를 위한 Amazon ES 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만들었으면 이제 AWS CLI를 사용하여 데이터 스트림에 새 레코드를 추가해 함수를 테스트할 수 있습니다.

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

그런 다음 Amazon ES 콘솔 또는 Kibana를 이용해lambda-kine-index에 문서가 포함되어 있습니다. 다음 요청을 사용할 수도 있습니다.

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB 에서 스트리밍 데이터 로드

AWS Lambda 를 이용해 Amazon ES 도메인으로 데이터를 전송할 수 있습니다. 데이터베이스 테이블에 도착한 새 데이터는 Lambda 로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다.

Prerequisites

계속하려면 먼저 다음 리소스를 확보해야 합니다.

사전 조건 설명
DynamoDB 테이블

이 테이블에는 소스 데이터가 있습니다. 자세한 내용은 단원을 참조하십시오.테이블 기본 작업Amazon DynamoDB Developer Guide를 선택합니다.

테이블은 Amazon ES 도메인과 같은 리전에 위치하고,새로운 이미지를 선택합니다. 자세한 내용은 스트림 활성화를 참조하십시오.

Amazon ES 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 단원을 참조하십시오.Amazon ES 도메인 생성를 선택합니다.
IAM 역할

이 역할에는 다음과 같은 기본 Amazon ES, DynamoDB 및 Lambda 실행 권한이 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

역할은 다음과 같은 신뢰 관계를 맺고 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

자세한 내용은 다음을 참조하세요.IAM 역할 생성IAM 사용 설명서를 선택합니다.

Lambda 함수 생성

Lambda 배포 패키지 생성의 지침을 따르되, ddb-to-es라는 디렉터리를 만들고 sample.py에는 다음과 같은 코드를 사용하십시오.

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost의 변수를 편집합니다.

다음과 같은 명령을 사용하면 종속 프로그램을 설치할 수 있습니다.

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

이제 Lambda 함수 생성 지침을 따르되, Prerequisites에서 IAM 역할을 지정하고 트리거에는 다음 설정을 지정하십시오.

  • : DynamoDB 테이블

  • 배치 크기: 100

  • 시작 위치: 지평선 자르기

자세한 내용은 다음을 참조하세요.DynamoDB 테이블에서 새 항목 처리Amazon DynamoDB Developer Guide를 선택합니다.

이제 사용자는 완벽한 리소스 모음, 즉 소스 데이터에 대한 DynamoDB 테이블, 테이블 변경 사항의 DynamoDB 스트림, 소스 데이터가 변경되면 실행되어 이러한 변경 사항을 인덱싱하는 함수, 검색과 시각화를 위한 Amazon ES 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만들었으면 이제 AWS CLI를 사용하여 DynamoDB 테이블에 새 항목을 추가해 함수를 테스트할 수 있습니다.

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

그런 다음 Amazon ES 콘솔 또는 Kibana를 이용해lambda-index에 문서가 포함되어 있습니다. 다음 요청을 사용할 수도 있습니다.

GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Kinesis Data Firehose 스트리밍 데이터 로드

Kinesis Data Firehose 는 아마존 ES를 배송 대상으로 지원합니다. Amazon ES로 스트리밍 데이터를 로드하는 방법에 대한 자세한 내용은 단원을 참조하십시오.Kinesis Data Firehose 전송 스트림 생성Amazon ES 선택Amazon Kinesis Data Firehose 개발자 안내서를 선택합니다.

Amazon ES로 데이터를 로드하기 전에 먼저 데이터 변환을 실행해야 할 수도 있습니다. Lambda 함수를 사용하여 이 작업을 수행하는 방법에 대한 자세한 내용은데이터 변환같은 가이드에.

전송 스트림을 구성할 때 Kinesis Data Firehose 에는 Amazon ES로 데이터를 보내고, Amazon S3 데이터를 백업하고, Lambda 를 사용하여 데이터를 변환할 때 필요한 리소스 액세스 권한을 가진 “원클릭” IAM 역할이 있습니다. 이러한 역할을 수동으로 생성하려면 복잡하기 때문에, 제공된 역할을 사용하는 것이 좋습니다.

Amazon CloudWatch 에서 스트리밍 데이터 로드

CloudWatch 로그 구독을 사용하여 CloudWatch Logs 에서 Amazon ES 도메인으로 스트리밍 데이터를 로드할 수 있습니다. Amazon CloudWatch 구독에 대한 자세한 내용은구독을 통한 로그 데이터 실시간 처리를 선택합니다. 구성 정보는 단원을 참조하십시오.CloudWatch Logs 데이터를 Amazon Elasticsearch Service 스트리밍Amazon CloudWatch 개발자 안내서를 선택합니다.

AWS IoT 에서 스트리밍 데이터 로드

AWS IoT 에서 데이터를 전송할 수 있습니다.규칙를 선택합니다. 자세한 내용은 다음을 참조하세요.Amazon ES 작업AWS IoT 개발자 안내서를 선택합니다.