Amazon Elasticsearch Service
開発者ガイド (API バージョン 2015-01-01)

Amazon Elasticsearch Service にストリーミングデータをロードする

さまざまなソースからストリーミングデータを Amazon Elasticsearch Service ドメインにロードできます。Amazon Kinesis Data Firehose や Amazon CloudWatch Logs などの一部のリソースには、Amazon ES のサポートが組み込まれています。Amazon S3、Amazon Kinesis Data Streams、Amazon DynamoDB などの他のリソースには、イベントハンドラとして AWS Lambda 関数を使用します。Lambda 関数は、データを処理してドメインにストリーミングすることで、新しいデータに応答します。

注記

Lambda では、いくつかの一般的なプログラミング言語がサポートされ、ほとんどの AWS リージョンで利用できます。詳細については、AWS Lambda Developer Guideの「Lambda 関数のビルド」と AWS General Referenceの「AWS Lambda のリージョン」を参照してください。

Amazon S3 から Amazon ES にストリーミングデータをロードする

Lambda を使用して、Amazon S3 から Amazon ES ドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成したり、オブジェクトがプレーンテキストの場合は、オブジェクト本文の要素を解析してインデックス作成したりすることができます。このセクションでは、正規表現を使用してログファイルを解析し、一致をインデックス作成するシンプルな Python サンプルコードがあります。

ヒント

Node.js の堅牢な他のコードについては、GitHub で amazon-elasticsearch-lambda-samples を参照してください。一部の Lambda 設計図にも役に立つ解析の例が含まれています。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon S3 バケット 詳細については、Amazon Simple Storage Service 入門ガイドの「バケットの作成」を参照してください。バケットは、Amazon ES ドメインと同じリージョンに存在する必要があります。
Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「Amazon ES ドメインの作成」を参照してください。

Lambda デプロイパッケージの作成

デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、AWS Lambda Developer Guideの「デプロイパッケージの作成」を参照してください。

  1. ディレクトリを作成します。このサンプルでは、名前 s3-to-es を使用します。

  2. sample.py という名前のディレクトリにファイルを作成します。

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = 'lambda-type' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost の変数を編集します。

  3. 依存関係をインストールします。

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    すべての Lambda 実行環境に Boto3 がインストールされているため、デプロイパッケージに含める必要はありません。

    ヒント

    macOS を使用している場合、これらのコマンドが正常に動作しない可能性があります。回避策として、setup.cfg という名前のファイルを eslambda ディレクトリに追加します。

    [install] prefix=
  4. アプリケーションコードや相互依存性をパッケージ化します。

    zip -r lambda.zip *

Lambda 関数の作成

デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 2.7)、IAM ロールを選択します。IAM ロールにより、関数のアクセス権限が定義されます。詳細な手順については、AWS Lambda Developer Guideの「シンプルな Lambda 関数の作成」を参照してください。

この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 2.7 と、S3 読み取り権限および Amazon ES 書き込み権限を持つロールを選択します。

 Lambda 関数のサンプル設定

関数を作成した後、トリガーを追加する必要があります。この例では、S3 バケットにログファイルが到着するたびにコードを実行します。

  1. S3 を選択します。

  2. バケットを選択します。

  3. [イベントタイプ] で、[PUT] を選択します。

  4. [プレフィックス] に logs/ と入力します。

  5. [フィルタパターン] には .log と入力します。

  6. [トリガーの有効化] を選択します。

  7. [追加] を選択します。

最後に、デプロイパッケージをアップロードすることができます。

  1. [Handler] に「sample.handler」と入力します。この設定により、トリガー後に実行するファイル (sample.py) およびメソッド (handler) が Lambda に通知されます。

  2. [コードエントリタイプ] で、[.ZIP ファイルをアップロード] を選択し、デプロイパッケージをアップロードするプロンプトに従います。

  3. [Save] を選択します。

この時点で、リソースがすべて揃いました (ログファイルのバケット、ログファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、検索および可視化のための Amazon ES ドメイン)。

Lambda 関数のテスト

関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log という名前のファイルを作成します。

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

S3 バケットの logs フォルダにファイルをアップロードします。手順については、Amazon Simple Storage Service 入門ガイドの「バケットにオブジェクトを追加」を参照してください。

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-s3-index インデックスに 2 つのドキュメントが含まれていることを確認します。標準検索リクエストを行うこともできます。

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams から Amazon ES にストリーミングデータをロードする

ストリーミングデータは、Kinesis Data Streams から Amazon ES にロードできます。データストリームに到達する新しいデータによって、Lambda へのイベント通知がトリガーされ、インデックス作成を実行するカスタムコードが実行されます。このセクションには、いくつかの簡単な Python サンプルコードがあります。Node.js の堅牢な他のコードについては、GitHub で amazon-elasticsearch-lambda-samples を参照してください。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon Kinesis データストリーム Lambda 関数のイベントソース。 詳細については、「Kinesis データストリーム」を参照してください。
Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「Amazon ES ドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように Amazon ES、Kinesis、Lambda の基本的な権限が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数の作成

Lambda デプロイパッケージの作成」の手順に従いますが、kinesis-to-es という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = 'lambda-kine-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost の変数を編集します。

次のコマンドを使用して、依存関係をインストールします。

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

次に、「Lambda 関数の作成」の手順に従いますが、「前提条件」の IAM ロールと、トリガーの以下の設定を指定します。

  • Kinesis ストリーム: Kinesis ストリーム

  • バッチサイズ: 100

  • 開始位置: 水平トリム

詳細については、Amazon Kinesis Data Streams 開発者ガイドAmazon Kinesis Data Streams の操作について参照してください。

この時点で、リソースがすべて揃いました (Kinesis データストリーム、ストリームが新しいデータを受信してそのデータのインデックスを作成した後に実行される関数、検索および可視化のための Amazon ES ドメイン)。

Lambda 関数のテスト

関数を作成した後、AWS CLI を使用してデータストリームに新しいレコードを追加することでテストできます。

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-kine-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB から Amazon ES にストリーミングデータをロードする

AWS Lambda を使用して、Amazon DynamoDB から Amazon ES ドメインにデータを送信できます。データテーブルに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
DynamoDB Table

このテーブルにはソースデータが含まれています。詳細については、Amazon DynamoDB 開発者ガイドの「テーブルの基本運用」を参照してください。

このテーブルは、Amazon ES ドメインと同じリージョンに存在している必要があり、ストリームが [新しいイメージ] に設定されている必要があります。詳細については、「ストリームの有効化」を参照してください。

Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「Amazon ES ドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように Amazon ES、DynamoDB、Lambda の基本的な実行権限が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージの作成」の手順に従いますが、ddb-to-es という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost の変数を編集します。

次のコマンドを使用して、依存関係をインストールします。

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

次に、「Lambda 関数の作成」の手順に従いますが、「前提条件」の IAM ロールと、トリガーの以下の設定を指定します。

  • テーブル: DynamoDB テーブル

  • バッチサイズ: 100

  • 開始位置: 水平トリム

詳細については、Amazon DynamoDB 開発者ガイドDynamoDB テーブルでの新しい項目の処理について参照してください。

この時点で、すべてのリソースが揃いました (ソースデータの DynamoDB テーブル、テーブルに対する DynamoDB ストリームの変更、ソースデータが変更されてそれらの変更のインデックスを作成した後に実行される関数、検索および可視化のための Amazon ES ドメイン)。

Lambda 関数のテスト

関数を作成した後、AWS CLI を使用して DynamoDB テーブルに新しい項目を追加することでテストできます。

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Kinesis Data Firehose から Amazon ES にストリーミングデータをロードする

Kinesis Data Firehose では、配信先として Amazon ES がサポートされています。Amazon ES にストリーミングデータをロードする方法については、『Amazon Kinesis Data Firehose 開発者ガイド』の「Kinesis Data Firehose 配信ストリームの作成」および「配信先として Amazon ES を選択する」を参照してください。

データを Amazon ES にロードする前に、データの変換が必要になる場合があります。Lambda 関数を使用してこのタスクを実行する方法については、このガイドの「データ変換」を参照してください。

配信ストリームを設定する場合、Kinesis Data Firehose には「ワンクリック」の IAM ロールが用意されています。このロールには、Amazon ES へのデータ送信、Amazon S3 でのデータバックアップ、および Lambda を使用したデータ変換に必要なリソースアクセス権が付与されます。このようなロールを手動で作成する作業は複雑になるため、用意されているロールの使用をお勧めします。

Amazon CloudWatch から Amazon ES にストリーミングデータをロードする

CloudWatch Logs サブスクリプションを使用することにより、ストリーミングデータを CloudWatch Logs から Amazon ES ドメインにロードできます。Amazon CloudWatch サブスクリプションの詳細については、「サブスクリプションを使用したログデータのリアルタイム処理」を参照してください。設定情報については、Amazon CloudWatch 開発者ガイドの「CloudWatch Logs データを Amazon Elasticsearch Service にストリーミング」を参照してください。

AWS IoT から Amazon ES にデータをロードする

ルールを使用して AWS IoT からデータを送信することができます。詳細については、『AWS IoT 開発者ガイド』の「Amazon ES Action」(Amazon ES のアクション) を参照してください。