Amazon Elasticsearch Service로 스트리밍 데이터 로드 - Amazon Elasticsearch Service

Amazon Elasticsearch Service로 스트리밍 데이터 로드

다양한 소스에서 스트리밍 데이터를 Amazon Elasticsearch Service 도메인으로 로드할 수 있습니다. Amazon Kinesis Data Firehose 및 Amazon CloudWatch Logs 같은 일부 소스는 Amazon ES를 기본으로 지원합니다. Amazon S3, Amazon Kinesis Data Streams, Amazon DynamoDB 같은 소스는 AWS Lambda 함수를 이벤트 핸들러로 사용합니다. Lambda 함수는 새 데이터를 처리한 다음 도메인으로 스트리밍하여 응답합니다.

참고

Lambda는 다양한 주요 프로그래밍 언어를 지원하며, 대부분의 AWS 리전에서 사용할 수 있습니다. 자세한 내용은 AWS Lambda Developer GuideLambda 함수 구축AWS General ReferenceAWS Lambda 리전을 참조하십시오.

Amazon S3에서 Amazon ES로 스트리밍 데이터 로드

Lambda를 이용해 Amazon S3에서 Amazon ES 도메인으로 데이터를 전송할 수 있습니다. S3 버킷에 도착한 새 데이터는 Lambda로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다.

이러한 방식의 데이터 스트리밍은 대단히 유연합니다. 객체 메타데이터를 인덱싱할 수도 있고, 객체가 일반 텍스트라면 객체 본문의 일부 요소를 구문 분석하고 인덱싱할 수도 있습니다. 이 단원에는 정규식을 이용해 로그 파일을 구문 분석하고 매치를 인덱싱하는 단순한 Python 샘플 코드가 나와 있습니다.

작은 정보

Node.js에서 사용하는 더 강력한 코드를 알고 싶다면 GitHub의 amazon-elasticsearch-lambda-samples를 참조하십시오. 일부 Lambda 블루프린트에도 유용한 구문 분석 예제가 나와 있습니다.

사전 조건

계속하려면 먼저 다음 리소스를 확보해야 합니다.

사전 조건 설명​
Amazon S3 버킷 자세한 내용은 Amazon Simple Storage Service 시작 안내서버킷 생성을 참조하십시오. 버킷은 Amazon ES 도메인과 같은 리전에 있어야 합니다.
Amazon ES 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 Amazon ES 도메인 생성을 참조하십시오.

Lambda 배포 패키지 만들기

배포 패키지는 코드와 종속 프로그램이 포함된 ZIP 또는 JAR 파일로 구성됩니다. 이 단원에는 Python 샘플 코드가 나와 있습니다. 다른 프로그래밍 언어는 AWS Lambda Developer Guide배포 패키지 생성을 참조하십시오.

  1. 디렉터리를 생성합니다. 이 샘플에서는 s3-to-es 이름을 사용합니다.

  2. sample.py라는 디렉터리에서 파일을 생성합니다.

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = 'lambda-type' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost의 변수를 편집합니다.

  3. 종속 항목을 설치합니다:

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    모든 Lambda 실행 환경에는 Boto3가 설치돼 있으며, 따라서 배포 패키지에 이를 포함할 필요가 없습니다.

    작은 정보

    macOS를 사용한다면 이러한 명령어가 정상적으로 작동하지 않을 수도 있습니다. 차선책으로 setup.cfg라는 파일을 s3-to-es 디렉터리에 추가하십시오.

    [install] prefix=
  4. 애플리케이션 코드와 종속 항목을 패키지화합니다.

    zip -r lambda.zip *

Lambda 함수 생성

배포 패키지를 만든 뒤에는 Lambda 함수를 생성할 수 있습니다. 함수를 만들 때는 이름, 실행 시간(예: Python 2.7)과 IAM 역할을 선택해야 합니다. IAM 역할은 함수에 대한 권한을 정의합니다. 자세한 내용은 AWS Lambda Developer Guide단순 Lambda 함수 생성을 참조하십시오.

이 예에서는 콘솔을 사용하는 것으로 가정합니다. 다음 스크린샷처럼 Python 2.7과 S3 읽기 권한 및 Amazon ES 쓰기 권한이 있는 역할을 선택합니다.


                    Lambda 함수 구성 샘플

함수를 생성했으면 이제 트리거를 추가해야 합니다. 이 예에서는 로그 파일이 S3 버킷에 도착할 때마다 코드를 실행하려 합니다.

  1. S3를 선택합니다.

  2. 버킷을 선택합니다.

  3. 이벤트 유형에서 PUT을 선택합니다.

  4. 접두사에는 logs/를 입력합니다.

  5. 필터 패턴.log를 입력합니다.

  6. 트리거 활성화를 선택합니다.

  7. [추가]를 선택합니다.

마지막으로, 배포 패키지를 업로드합니다.

  1. 핸들러에서 sample.handler를 입력합니다. 이 설정은 트리거 후 실행해야 하는 파일(sample.py)과 메서드(handler)를 Lambda에게 알려 줍니다.

  2. 코드 항목 유형에서 ZIP 파일 업로드를 선택하고, 지시에 따라 배포 패키지를 업로드합니다.

  3. Save를 선택합니다.

이제 사용자는 완벽한 리소스 모음, 즉 로그 파일용 버킷, 로그 파일이 버킷에 추가될 때마다 실행되는 함수, 구문 분석과 인덱싱을 수행하는 코드, 검색과 시각화를 위한 Amazon ES 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만들었으면 이제 Amazon S3 버킷에 파일을 업로드해 함수를 테스트할 수 있습니다. 다음 샘플 로그 행을 이용해 sample.log라는 파일을 만드십시오.

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

파일을 S3 버킷의 logs 폴더에 업로드합니다. 지침은 Amazon Simple Storage Service 시작 안내서버킷에 객체 추가를 참조하십시오.

그런 다음 Amazon ES 콘솔이나 Kibana를 이용해 lambda-s3-index 인덱스에 문서가 두 개 있는지 확인하십시오. 표준 검색 요청을 할 수도 있습니다.

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams에서 Amazon ES로 스트리밍 데이터 로드

Kinesis Data Streams에서 Amazon ES로 스트리밍 데이터를 로드할 수 있습니다. 데이터 스트림에 도착한 새 데이터는 Lambda으로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다. 이 단원에는 단순한 Python 샘플 코드가 있습니다. Node.js에서 사용하는 더 강력한 코드를 알고 싶다면 GitHub의 amazon-elasticsearch-lambda-samples를 참조하십시오.

사전 조건

계속하려면 먼저 다음 리소스를 확보해야 합니다.

사전 조건 설명
Amazon Kinesis 데이터 스트림 Lambda 함수의 이벤트 소스. 자세한 내용은 Kinesis 데이터 스트림을 참조하십시오.
Amazon ES 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 Amazon ES 도메인 생성을 참조하십시오.
IAM 역할

이 역할에는 다음과 같은 기본 Amazon ES, Kinesis 및 Lambda 권한이 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

역할은 다음과 같은 신뢰 관계를 맺고 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

자세한 내용은 IAM 사용 설명서IAM 역할 생성을 참조하십시오.

Lambda 함수 생성

Lambda 배포 패키지 만들기의 지침을 따르되, kinesis-to-es라는 디렉터리를 만들고 sample.py에는 다음과 같은 코드를 사용하십시오.

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = 'lambda-kine-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost의 변수를 편집합니다.

다음과 같은 명령을 사용하면 종속 프로그램을 설치할 수 있습니다.

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

이제 Lambda 함수 생성 지침을 따르되, 사전 조건에서 IAM 역할을 지정하고 트리거에는 다음 설정을 지정하십시오.

  • Kinesis 스트림: 사용자의 Kinesis 스트림

  • 배치 크기: 100

  • 시작 위치: 수평 트리밍

자세한 내용은 Amazon Kinesis Data Streams 개발자 안내서Amazon Kinesis Data Streams 작업을 참조하십시오.

이제 사용자는 완벽한 리소스 모음, 즉 Kinesis 데이터 스트림, 스트림에 새 데이터가 들어오면 실행되어 해당 데이터를 인덱싱하는 함수, 검색과 시각화를 위한 Amazon ES 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만든 뒤에는 AWS CLI에서 데이터 스트림에 새 레코드를 추가해 함수를 테스트할 수 있습니다.

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

그런 다음 Amazon ES 콘솔이나 Kibana를 이용해 lambda-kine-index에 문서가 있는지 확인하십시오. 다음 요청을 사용할 수도 있습니다.

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB에서 Amazon ES로 스트리밍 데이터 로드

AWS Lambda를 이용해 Amazon DynamoDB에서 Amazon ES 도메인으로 데이터를 전송할 수 있습니다. 데이터베이스 테이블에 도착한 새 데이터는 Lambda로 이벤트 알림을 트리거한 다음 사용자 지정 코드를 실행해 인덱싱합니다.

사전 조건

계속하려면 먼저 다음 리소스를 확보해야 합니다.

사전 조건 설명
DynamoDB 테이블

이 테이블에는 소스 데이터가 있습니다. 자세한 내용은 Amazon DynamoDB 개발자 안내서테이블 기본 작업을 참조하십시오.

테이블은 Amazon ES 도메인과 같은 리전에 위치하고, 새 이미지로 설정된 스트림이 있어야 합니다. 자세한 내용은 스트림 활성화를 참조하십시오.

Amazon ES 도메인 Lambda 함수로 처리한 후의 데이터 대상 주소입니다. 자세한 내용은 Amazon ES 도메인 생성을 참조하십시오.
IAM 역할

이 역할에는 다음과 같은 기본 Amazon ES, DynamoDB 및 Lambda 실행 권한이 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

역할은 다음과 같은 신뢰 관계를 맺고 있어야 합니다.

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

자세한 내용은 IAM 사용 설명서IAM 역할 생성을 참조하십시오.

Lambda 함수 생성

Lambda 배포 패키지 만들기의 지침을 따르되, ddb-to-es라는 디렉터리를 만들고 sample.py에는 다음과 같은 코드를 사용하십시오.

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost의 변수를 편집합니다.

다음과 같은 명령을 사용하면 종속 프로그램을 설치할 수 있습니다.

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

이제 Lambda 함수 생성 지침을 따르되, 사전 조건에서 IAM 역할을 지정하고 트리거에는 다음 설정을 지정하십시오.

  • 테이블: 사용자의 DynamoDB 테이블

  • 배치 크기: 100

  • 시작 위치: 수평 트리밍

자세한 내용은 Amazon DynamoDB 개발자 안내서DynamoDB 테이블에서 새 항목 처리를 참조하십시오.

이제 사용자는 완벽한 리소스 모음, 즉 소스 데이터에 대한 DynamoDB 테이블, 테이블 변경 사항의 DynamoDB 스트림, 소스 데이터가 변경되면 실행되어 이러한 변경 사항을 인덱싱하는 함수, 검색과 시각화를 위한 Amazon ES 도메인을 모두 확보하게 됩니다.

Lambda 함수 테스트

함수를 만들었으면 이제 AWS CLI를 사용해 DynamoDB 테이블에 새 항목을 추가해 함수를 테스트할 수 있습니다.

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

그런 다음 Amazon ES 콘솔이나 Kibana를 이용해 lambda-index에 문서가 있는지 확인하십시오. 다음 요청을 사용할 수도 있습니다.

GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Kinesis Data Firehose에서 Amazon ES로 스트리밍 데이터 로드

Kinesis Data Firehose에서 Amazon ES를 대상으로 전송할 수 있습니다. Amazon ES로 스트리밍 데이터를 로드하는 방법에 대한 자세한 내용은 Amazon Kinesis Data Firehose 개발자 안내서Kinesis Data Firehose 전송 스트림 생성Amazon ES를 대상으로 선택을 참조하십시오.

Amazon ES에 데이터를 로드하기 전에, 먼저 데이터 변환을 실행해야 할 수도 있습니다. Lambda 함수로 이 작업을 수행하는 방법에 대한 자세한 내용은 동일한 안내서의 데이터 변환을 참조하십시오.

전송 스트림을 구성할 때 Kinesis Data Firehose에서는 Amazon ES로 데이터를 보내고, Amazon S3에서 데이터를 백업하고, Lambda로 데이터를 변환할 때 필요한 리소스 액세스 권한을 가진 "원클릭" IAM 역할을 사용합니다. 이러한 역할을 수동으로 생성하려면 복잡하기 때문에, 제공된 역할을 사용하는 것이 좋습니다.

Amazon CloudWatch에서 Amazon ES로 스트리밍 데이터 로드

CloudWatch Logs 구독을 사용하여 CloudWatch Logs에서 Amazon ES 도메인으로 스트리밍 데이터를 로드할 수 있습니다. Amazon CloudWatch 구독에 대한 자세한 내용은 구독을 통한 로그 데이터 실시간 처리를 참조하십시오. 구성 정보는 Amazon CloudWatch 개발자 안내서CloudWatch Logs 데이터를 Amazon Elasticsearch Service로 스트리밍을 참조하십시오.

AWS IoT에서 Amazon ES로 데이터 로드

AWS IoT에서 규칙을 사용하여 데이터를 전송할 수 있습니다. 자세한 내용은 AWS IoT 개발자 안내서에서 Amazon ES 작업을 참조하십시오.