Amazon Elasticsearch Service へのストリーミングデータのロード - Amazon Elasticsearch Service

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon Elasticsearch Service へのストリーミングデータのロード

あなたがロードすることができますストリーミングデータさまざまなソースから Amazon Elasticsearch Service (Amazon ES) ドメインにコピーします。Amazon Kinesis Data Firehose や Amazon CloudWatch Logs などの一部のリソースには、Amazon ES のサポートが組み込まれています。Amazon S3、Amazon Kinesis Data Streams、Amazon DynamoDB など、AWS Lambda 関数をイベントハンドラとして使用します。Lambda 関数は、データを処理してドメインにストリーミングすることで、新しいデータに応答します。

注記

Lambda では、いくつかの一般的なプログラミング言語がサポートされ、ほとんどの AWS リージョンで利用できます。詳細については、「」を参照してください。Lambda 関数の構築()AWS Lambda デベロッパーガイドおよびAWS Lambda リージョン()AWS 全般のリファレンス

Amazon S3 からのストリーミングデータのロード

Lambda を使用して、Amazon S3 から Amazon ES ドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成したり、オブジェクトがプレーンテキストの場合は、オブジェクト本文の要素を解析してインデックス作成したりすることができます。このセクションでは、正規表現を使用してログファイルを解析し、一致をインデックス作成するシンプルな Python サンプルコードがあります。

ヒント

Node.js の堅牢な他のコードについては、GitHub で amazon-elasticsearch-lambda-samples を参照してください。Lambda 設計図にも役に立つ解析の例が含まれています。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon S3 バケットのリリース 詳細については、「」を参照してください。バケットの作成()Amazon Simple Storage Service 入門ガイド。バケットは、Amazon ES ドメインと同じリージョンに存在する必要があります。
Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「」を参照してください。Amazon ES ドメインの作成

Lambda デプロイパッケージの作成

デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、デプロイパッケージの作成()AWS Lambda デベロッパーガイド

  1. ディレクトリを作成します。このサンプルでは、名前 s3-to-es を使用します。

  2. sample.py という名前のディレクトリにファイルを作成します。

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-s3-index' type = '_doc' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost の変数を編集します。

  3. 依存関係をインストールします。

    cd s3-to-es pip install requests -t . pip install requests_aws4auth -t .

    すべての Lambda 実行環境にはBoto3インストールされているため、デプロイパッケージに含める必要はありません。

    ヒント

    macOS を使用している場合、これらのコマンドが正常に動作しない可能性があります。回避策として、setup.cfg という名前のファイルを s3-to-es ディレクトリに追加します。

    [install] prefix=
  4. アプリケーションコードや相互依存性をパッケージ化します。

    zip -r lambda.zip *

Lambda 関数の作成

デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 2.7)、IAM ロールを選択します。IAM ロールにより、関数のアクセス権限が定義されます。詳細な手順については、「」を参照してください。シンプルな Lambda 関数の作成()AWS Lambda デベロッパーガイド

この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 2.7 と、S3 読み取り権限および Amazon ES 書き込み権限を持つロールを選択します。


                    Lambda 関数のサンプル設定

関数を作成した後、トリガーを追加する必要があります。この例では、S3 バケットにログファイルが到着するたびにコードを実行します。

  1. S3 を選択します。

  2. バケットを選択します。

  3. [Event type (イベントタイプ)] で、[PUT] を選択します。

  4. [Prefix (プレフィックス)] に logs/ と入力します。

  5. [Filter pattern (フィルタパターン)] には .log と入力します。

  6. [Enable trigger (トリガーの有効化)] を選択します。

  7. [Add] を選択します。

最後に、デプロイパッケージをアップロードすることができます。

  1. [Handler (ハンドラ)] に「sample.handler」と入力します。この設定は、Lambda にファイル (sample.py) とメソッド (handler)は、トリガーの後に実行する必要があります。

  2. [Code entry type (コードエントリタイプ)] で、[Upload a .ZIP file (.ZIP ファイルをアップロード)] を選択し、デプロイパッケージをアップロードするプロンプトに従います。

  3. [Save] (保存) をクリックします。

この時点で、リソースがすべて揃いました (ログファイルのバケット、ログファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、検索および可視化のための Amazon ES ドメイン)。

Lambda 関数のテスト

関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log という名前のファイルを作成します。

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

S3 バケットの logs フォルダにファイルをアップロードします。手順については、以下を参照してください。バケットにオブジェクトを追加()Amazon Simple Storage Service 入門ガイド

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-s3-indexインデックスには、2 つのドキュメントが含まれています。標準検索リクエストを行うこともできます。

GET https://es-domain/lambda-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams からのストリーミングデータのロード

ストリーミングデータは、Kinesis Data Streams から Amazon ES にロードできます。データストリームに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。このセクションには、いくつかの簡単な Python サンプルコードがあります。Node.js の堅牢な他のコードについては、GitHub で amazon-elasticsearch-lambda-samples を参照してください。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon Kinesis データストリーム Lambda 関数のイベントソース。詳細については、次を参照してください。Kinesis Data Streams
Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「」を参照してください。Amazon ES ドメインの作成
IAM ロール

このロールには、以下のように Amazon ES、Kinesis、Lambda の基本的な権限が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、次を参照してください。IAM ロールの作成()IAM ユーザーガイド

Lambda 関数の作成

Lambda デプロイパッケージの作成」の手順に従いますが、kinesis-to-es という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, including https:// index = 'lambda-kine-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost の変数を編集します。

次のコマンドを使用して、依存関係をインストールします。

cd kinesis-to-es pip install requests -t . pip install requests_aws4auth -t .

次に、「Lambda 関数の作成」の手順に従いますが、「Prerequisites」の IAM ロールと、トリガーの以下の設定を指定します。

  • Kinesis ストリーム: Kinesis ストリーム

  • バッチサイズ: 100

  • 開始位置: [Trim

詳細については、次を参照してください。Amazon Kinesis Data Streams 操作()Amazon Kinesis Data Streams 開発者ガイド

この時点で、リソースがすべて揃いました (Kinesis データストリーム、ストリームが新しいデータを受信してそのデータのインデックスを作成した後に実行される関数、検索および可視化のための Amazon ES ドメイン)。

Lambda 関数のテスト

関数を作成した後、AWS CLI を使用してデータストリームに新しいレコードを追加することでテストできます。

aws kinesis put-record --stream-name es-test --data "My test data." --partition-key partitionKey1 --region us-west-1

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-kine-indexには、ドキュメントが含まれています。以下のリクエストを使用することもできます。

GET https://es-domain/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB からのストリーミングデータのロード

AWS Lambda を使用して、Amazon DynamoDB から Amazon ES ドメインにデータを送信できます。データテーブルに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件 説明
DynamoDB テーブル

このテーブルにはソースデータが含まれています。詳細については、「」を参照してください。テーブルの基本運用()Amazon DynamoDB 開発者ガイド

このテーブルは、Amazon ES ドメインと同じリージョンに存在している必要があり、ストリームが新しいイメージ。詳細については、「ストリームの有効化」を参照してください。

Amazon ES ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「」を参照してください。Amazon ES ドメインの作成
IAM ロール

このロールには、以下のように Amazon ES、DynamoDB、および Lambda の基本的な実行権限が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、次を参照してください。IAM ロールの作成()IAM ユーザーガイド

Lambda 関数の作成

Lambda デプロイパッケージの作成」の手順に従いますが、ddb-to-es という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the Amazon ES domain, with https:// index = 'lambda-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the Elasticsearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost の変数を編集します。

次のコマンドを使用して、依存関係をインストールします。

cd ddb-to-es pip install requests -t . pip install requests_aws4auth -t .

次に、「Lambda 関数の作成」の手順に従いますが、「Prerequisites」の IAM ロールと、トリガーの以下の設定を指定します。

  • : DynamoDB テーブル

  • バッチサイズ: 100

  • 開始位置: [Trim

詳細については、次を参照してください。DynamoDB テーブルでの新しい項目の処理()Amazon DynamoDB 開発者ガイド

この時点で、すべてのリソースが揃いました (ソースデータの DynamoDB テーブル、テーブルに対する DynamoDB ストリームの変更、ソースデータが変更されてそれらの変更のインデックスを作成した後に実行される関数、検索および可視化のための Amazon ES ドメイン)。

Lambda 関数をテストする

関数を作成した後、AWS CLI を使用して DynamoDB テーブルに新しい項目を追加することでテストできます。

aws dynamodb put-item --table-name es-test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

次に、Amazon ES コンソールまたは Kibana を使用して、lambda-indexには、ドキュメントが含まれています。以下のリクエストを使用することもできます。

GET https://es-domain/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Kinesis Data Firehose からのストリーミングデータのロード

Kinesis Data Firehose では、配信先として Amazon ES がサポートされています。ストリーミングデータを Amazon ES にロードする方法については、「」を参照してください。Kinesis Data Firehose 配信ストリームの作成および送信先に Amazon ES を選択する()Amazon Kinesis Data Firehose デベロッパーガイド

データを Amazon ES にロードする前に、データの変換が必要になる場合があります。Lambda 関数を使用してこのタスクを実行する方法については、データ変換同じガイドに記載されています。

配信ストリームを設定する場合、Kinesis Data Firehose には「ワンクリック」の IAM ロールが用意されています。このロールには、Amazon ES へのデータ送信、Amazon S3 でのデータバックアップ、および Lambda を使用したデータ変換に必要なリソースアクセス権が付与されます。このようなロールを手動で作成する作業は複雑になるため、用意されているロールの使用をお勧めします。

Amazon CloudWatch からのストリーミングデータのロード

CloudWatch Logs サブスクリプションを使用することにより、ストリーミングデータを CloudWatch Logs から Amazon ES ドメインにロードできます。Amazon CloudWatch サブスクリプションの詳細については、サブスクリプションを使用したログデータのリアルタイム処理。設定情報については、」ストリーミングは、Amazon Elasticsearch Service にデータをCloudWatch Logs する()Amazon CloudWatch デベロッパーガイド

AWS IoT からのストリーミングデータの読み込み

AWS IoT からデータを送信するには、ルール。詳細については、次を参照してください。Amazon ES アクション()AWS IoT デベロッパーガイド