Amazon Elasticsearch Service にストリーミングデータをロードする - Amazon Elasticsearch Service

英語の翻訳が提供されている場合で、内容が矛盾する場合には、英語版がオリジナルとして取り扱われます。翻訳は機械翻訳により提供されています。

Amazon Elasticsearch Service にストリーミングデータをロードする

さまざまなソースからストリーミングデータを Amazon Elasticsearch Service ドメインにロードできます。Amazon Kinesis Data Firehose や Amazon CloudWatch Logs などの一部のリソースには、Amazon ES のサポートが組み込まれています。Amazon S3、Amazon Kinesis Data Streams、Amazon DynamoDB などには、イベントハンドラとして AWS Lambda 関数を使用します。Lambda 関数は、データを処理してドメインにストリーミングすることで、新しいデータに応答します。

注記

Lambda では、いくつかの一般的なプログラミング言語がサポートされ、ほとんどの AWS リージョンで利用できます。詳細については、https://docs.aws.amazon.com/lambda/latest/dg/lambda-app.htmlの「AWS Lambda Developer GuideLambda 関数のビルド」と https://docs.aws.amazon.com/general/latest/gr/rande.html#lambda_regionの「AWS General ReferenceAWS Lambda のリージョン」を参照してください。

Amazon S3 から Amazon ES にストリーミングデータをロードする

Lambda を使用して、Amazon S3 から Amazon ES ドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成したり、オブジェクトがプレーンテキストの場合は、オブジェクト本文の要素を解析してインデックス作成したりすることができます。このセクションでは、正規表現を使用してログファイルを解析し、一致をインデックス作成するシンプルな Python サンプルコードがあります。

ヒント

Node.js の堅牢な他のコードについては、GitHub で amazon-elasticsearch-lambda-samples を参照してください。一部の Lambda 設計図にも役に立つ解析の例が含まれています。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件: 説明:
Amazon S3 バケット 詳細については、https://docs.aws.amazon.com/AmazonS3/latest/gsg/CreatingABucket.htmlの「Amazon Simple Storage Service 入門ガイドバケットの作成」を参照してください。バケットは、Amazon ES ドメインと同じリージョンに存在する必要があります。
Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「Amazon ES ドメインの作成」を参照してください。

Lambda デプロイパッケージの作成

デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、https://docs.aws.amazon.com/lambda/latest/dg/deployment-package-v2.htmlの「AWS Lambda Developer Guideデプロイパッケージの作成」を参照してください。

  1. ディレクトリを作成します。このサンプルでは、名前 s3-to-es を使用します。

  2. sample.py という名前のディレクトリにファイルを作成します。

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = 'lambda-type' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost の変数を編集します。

  3. 依存関係をインストールします。

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    すべての Lambda 実行環境に Boto3 がインストールされているため、デプロイパッケージに含める必要はありません。

    ヒント

    macOS を使用している場合、これらのコマンドが正常に動作しない可能性があります。回避策として、setup.cfg という名前のファイルを s3-to-es ディレクトリに追加します。

    [install] prefix=
  4. アプリケーションコードや相互依存性をパッケージ化します。

    zip -r lambda.zip *

Lambda 関数の作成

デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 2.7)、IAM ロールを選択します。IAM ロールにより、関数のアクセス権限が定義されます。詳細な手順については、https://docs.aws.amazon.com/lambda/latest/dg/get-started-create-function.htmlの「AWS Lambda Developer Guideシンプルな Lambda 関数の作成」を参照してください。

この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 2.7 と、S3 読み取り権限および Amazon ES 書き込み権限を持つロールを選択します。


                    のサンプル構成 Lambda 機能

関数を作成した後、トリガーを追加する必要があります。この例では、ログ・ファイルがS3バケットに到着するたびにコードを実行します。

  1. S3 を選択します。

  2. バケットを選択します。

  3. [Event type (イベントタイプ)] で、[PUT] を選択します。

  4. [Prefix (プレフィックス)] に logs/ と入力します。

  5. [Filter pattern (フィルタパターン)] には .log と入力します。

  6. [Enable trigger (トリガーの有効化)] を選択します。

  7. [Add] を選択します。

最後に、デプロイパッケージをアップロードすることができます。

  1. 対象: ハンドラー、タイプ sample.handler。 この設定はLambdaにファイル(sample.py)及び方法(handler)、トリガ後に実行する必要があります。

  2. [Code entry type (コードエントリタイプ)] で、[Upload a .ZIP file (.ZIP ファイルをアップロード)] を選択し、デプロイパッケージをアップロードするプロンプトに従います。

  3. [Save] を選択します。

この時点で、完全なリソース セットがあります。ログ ファイルのバケット、ログ ファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、および Amazon ES 検索と視覚化のためのドメイン。

Lambda 関数のテスト

関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log という名前のファイルを作成します。

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

S3 バケットの logs フォルダにファイルをアップロードします。手順については、https://docs.aws.amazon.com/AmazonS3/latest/gsg/PuttingAnObjectInABucket.htmlの「Amazon Simple Storage Service 入門ガイドバケットにオブジェクトを追加」を参照してください。

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-s3-index インデックスに 2 つのドキュメントが含まれていることを確認します。標準検索リクエストを行うこともできます。

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "lambda-type", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams から Amazon ES にストリーミングデータをロードする

ストリーミングデータは、Kinesis Data Streams から Amazon ES にロードできます。データストリームに到達する新しいデータによって、Lambda へのイベント通知がトリガーされ、インデックス作成を実行するカスタムコードが実行されます。このセクションには、いくつかの簡単な Python サンプルコードがあります。Node.js の堅牢な他のコードについては、GitHub で amazon-elasticsearch-lambda-samples を参照してください。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件: 説明:
Amazon Kinesis データストリーム Lambda 関数のイベントソース。詳細については、「Kinesis データストリーム」を参照してください。
Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「Amazon ES ドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように Amazon ES、Kinesis、Lambda の基本的な権限が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.htmlの「IAM ユーザーガイドIAM ロールの作成」を参照してください。

Lambda 関数の作成

Lambda デプロイパッケージの作成」の手順に従いますが、kinesis-to-es という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = 'lambda-kine-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost の変数を編集します。

次のコマンドを使用して、依存関係をインストールします。

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

次に、「Lambda 関数の作成」の手順に従いますが、「Prerequisites」の IAM ロールと、トリガーの以下の設定を指定します。

  • [Kinesis stream (Kinesis ストリーム)]: Kinesis ストリーム

  • バッチサイズ -100

  • 開始位置: トリム・ホライゾン

詳細については、https://docs.aws.amazon.com/streams/latest/dev/working-with-kinesis.htmlで Amazon Kinesis Data Streams 開発者ガイドAmazon Kinesis Data Streams の操作について参照してください。

この時点で、リソース一式があります。 Kinesis データ ストリーム、ストリームが新しいデータを受信してそのデータをインデックス化する後に実行される関数、および Amazon ES 検索と視覚化のためのドメイン。

Lambda 関数のテスト

関数を作成した後、AWS CLI を使用してデータストリームに新しいレコードを追加することでテストできます。

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-kine-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "lambda-kine-type", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB から Amazon ES にストリーミングデータをロードする

AWS Lambda を使用して、Amazon DynamoDB から Amazon ES ドメインにデータを送信できます。データテーブルに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件: 説明:
DynamoDB テーブル

このテーブルにはソースデータが含まれています。詳細については、https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/WorkingWithTables.Basics.htmlの「Amazon DynamoDB 開発者ガイドテーブルの基本運用」を参照してください。

このテーブルは、Amazon ES ドメインと同じリージョンに存在している必要があり、ストリームが [New image (新しいイメージ)] に設定されている必要があります。詳細については、「ストリームの有効化」を参照してください。

Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「Amazon ES ドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように Amazon ES、DynamoDB、Lambda の基本的な実行権限が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、https://docs.aws.amazon.com/IAM/latest/UserGuide/id_roles_create.htmlの「IAM ユーザーガイドIAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージの作成」の手順に従いますが、ddb-to-es という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = 'lambda-type' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost の変数を編集します。

次のコマンドを使用して、依存関係をインストールします。

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

次に、「Lambda 関数の作成」の手順に従いますが、「Prerequisites」の IAM ロールと、トリガーの以下の設定を指定します。

  • [Table (テーブル)]: DynamoDB テーブル

  • バッチサイズ -100

  • 開始位置: トリム・ホライゾン

詳細については、https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/Streams.Lambda.Tutorial.htmlで Amazon DynamoDB 開発者ガイドDynamoDB テーブルでの新しい項目の処理について参照してください。

この時点で、リソース一式があります。 DynamoDB ソース データのテーブル、 DynamoDB テーブルの変更のストリーム、ソース データの変更後に実行される関数、およびこれらの変更のインデックス付け、 Amazon ES 検索と視覚化のためのドメイン。

Lambda 関数のテスト

関数を作成した後、AWS CLI を使用して DynamoDB テーブルに新しい項目を追加することでテストできます。

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://es-domain/lambda-index/lambda-type/00001 { "_index": "lambda-index", "_type": "lambda-type", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Kinesis Data Firehose から Amazon ES にストリーミングデータをロードする

Kinesis Data Firehose では、配信先として Amazon ES がサポートされています。Amazon ES にストリーミングデータをロードする方法については、『https://docs.aws.amazon.com/firehose/latest/dev/basic-create.html』の「Kinesis Data Firehose 配信ストリームの作成」および「Amazon Kinesis Data Firehose 開発者ガイド配信先として Amazon ES を選択する」を参照してください。

データを Amazon ES にロードする前に、データの変換が必要になる場合があります。Lambda 関数を使用してこのタスクを実行する方法については、このガイドの「データ変換」を参照してください。

配信ストリームを設定する場合、Kinesis Data Firehose には 「ワンクリック」の IAM ロールが用意されています。このロールにより、Amazon ES へのデータ送信、Amazon S3 でのデータバックアップ、および Lambda の使用によるデータ変換に必要なリソースアクセス権が付与されます。このようなロールを手動で作成する作業は複雑になるため、用意されているロールの使用をお勧めします。

Amazon CloudWatch から Amazon ES にストリーミングデータをロードする

CloudWatch Logs サブスクリプションを使用することにより、ストリーミングデータを CloudWatch Logs から Amazon ES ドメインにロードできます。Amazon CloudWatch サブスクリプションの詳細については、「サブスクリプションを使用したログデータのリアルタイム処理」を参照してください。設定情報については、http://docs.aws.amazon.com/AmazonCloudWatch/latest/DeveloperGuide/CWL_ES_Stream.html 開発者ガイドの「Amazon CloudWatchCloudWatch Logs データを Amazon Elasticsearch Service にストリーミング」を参照してください。

AWS IoT から Amazon ES へのデータのロード

ルールを使用して AWS IoT からデータを送信することができます。詳細については、『https://docs.aws.amazon.com/iot/latest/developerguide/elasticsearch-rule.html』の「AWS IoT 開発者ガイドAmazon ES Action」(Amazon ES のアクション) を参照してください。