Amazon OpenSearch Service へのストリーミングデータをロードする - Amazon OpenSearch Service (Amazon Elasticsearch Service の後継サービス)

Amazon OpenSearch Service へのストリーミングデータをロードする

さまざまなソースからストリーミングデータを Amazon OpenSearch Service ドメインにロードできます。Amazon Kinesis Data Firehose や Amazon CloudWatch Logs などの一部のリソースには、OpenSearch Service のサポートが組み込まれています。Amazon S3、Amazon Kinesis Data Streams、Amazon DynamoDB などの他のものでは、イベントハンドラとして AWS Lambda 関数を使用します。Lambda 関数は、データを処理してドメインにストリーミングすることで、新しいデータに応答します。

注記

Lambda では、いくつかの一般的なプログラミング言語がサポートされ、ほとんどの AWS リージョンで利用できます。詳細については、AWS Lambda デベロッパーガイドの「Lambda の開始方法」およびAWS全般のリファレンスの「AWS サービスエンドポイントを参照してください。

Amazon S3 からストリーミングデータをロードする

Lambda を使用して、Amazon S3 から OpenSearch Service ドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成したり、オブジェクトがプレーンテキストの場合は、オブジェクト本文の要素を解析してインデックス作成したりすることができます。このセクションでは、正規表現を使用してログファイルを解析し、一致をインデックス作成するシンプルな Python サンプルコードがあります。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon S3 バケット 詳細については、Amazon Simple Storage Service ユーザーガイドの「最初の S3 バケットを作成する」を参照してください。バケットは、OpenSearch Service ドメインと同じリージョンに存在する必要があります。
OpenSearch Service ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch Service ドメインの作成」を参照してください。

Lambda デプロイパッケージを作成する

デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、AWS Lambda デベロッパーガイドの「Lambda デプロイパッケージ」を参照してください。

  1. ディレクトリを作成します。このサンプルでは、名前 s3-to-opensearch を使用します。

  2. sample.py という名前のディレクトリ内にファイルを作成します。

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, including https:// index = 'lambda-s3-index' type = '_doc' url = host + '/' + index + '/' + type headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost の変数を編集します。

  3. まだ持っていない場合、pip をインストールしてから、依存関係を新しい package ディレクトリにインストールします。

    cd s3-to-opensearch
    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    すべての Lambda 実行環境に Boto3 がインストールされているため、デプロイパッケージに含める必要はありません。

  4. アプリケーションコードや相互依存性をパッケージ化します。

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Lambda 関数を作成する

デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 3.8)、IAM ロールを選択します。IAM ロールにより、関数の許可が定義されます。詳細な手順については、AWS Lambda デベロッパーガイドの「コンソールで Lambda 関数を作成する」を参照してください。

この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 3.9 と、S3 読み取り許可および OpenSearch Service 書き込み許可を持つロールを選択します。


                    Lambda 関数のサンプル設定

関数を作成した後、トリガーを追加する必要があります。この例では、S3 バケットにログファイルが到着するたびにコードを実行します。

  1. [トリガーの追加] を選択し、[S3] を選択します。

  2. バケットを選択します。

  3. [イベントタイプ] で、[PUT] を選択します。

  4. [プレフィックス] に logs/ と入力します。

  5. [サフィックス] では、.log と入力します。

  6. 再帰呼び出しの警告を確認し、[追加] を選択します。

最後に、デプロイパッケージをアップロードすることができます。

  1. [~からアップロード] および [.zip ファイルをアップロード] を選択してから、デプロイパッケージをアップロードするプロンプトに従います。

  2. アップロードが終了したら、[ランタイム設定] を編集し、[ハンドラ] を sample.handler に変更します。この設定により、トリガー後に実行するファイル (sample.py) およびメソッド (handler) が Lambda に通知されます。

この時点で、リソースがすべて揃いました (ログファイルのバケット、ログファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、検索および可視化のための OpenSearch Service ドメイン)。

Lambda 関数をテストする

関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log という名前のファイルを作成します。

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

S3 バケットの logs フォルダにファイルをアップロードします。手順については、Amazon Simple Storage Service ユーザーガイドの「バケットにオブジェクトをアップロードする」を参照してください。

次に、OpenSearch Service コンソールまたは OpenSearch Dashboards を使用して、lambda-s3-index インデックスに 2 つのドキュメントが含まれていることを確認します。標準検索リクエストを行うこともできます。

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams からストリーミングデータをロードする

ストリーミングデータは、Kinesis Data Streams から OpenSearch Service にロードできます。データストリームに到達する新しいデータによって、Lambda へのイベント通知がトリガーされ、インデックス作成を実行するカスタムコードが実行されます。このセクションには、いくつかの簡単な Python サンプルコードがあります。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon Kinesis Data Stream Lambda 関数のイベントソース。詳細については、「Kinesis Data Streams」を参照してください。
OpenSearch Service ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch Service ドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように OpenSearch Service、Kinesis、Lambda の基本的な許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージを作成する」の手順に従いますが、kinesis-to-opensearch という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, including https:// index = 'lambda-kine-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost の変数を編集します。

まだ持っていない場合、pip をインストールしてから、次のコマンドを使用して、依存関係をインストールします。

cd kinesis-to-opensearch
cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

次に、「Lambda 関数を作成する」の手順に従いますが、「Prerequisites」の IAM ロールと、トリガーの以下の設定を指定します。

  • [Kinesis ストリーム]: Kinesis ストリーム

  • [バッチサイズ]: 100

  • [開始位置]: 水平トリム

詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「Amazon Kinesis Data Streams とは」を参照してください。

この時点で、リソースがすべて揃いました (Kinesis データストリーム、ストリームが新しいデータを受信してそのデータのインデックスを作成した後に実行する関数、検索および可視化のための OpenSearch Service ドメイン)。

Lambda 関数をテストする

関数を作成した後、AWS CLI を使用してデータストリームに新しいレコードを追加することでテストできます。

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

次に、OpenSearch Service コンソールまたは OpenSearch Dashboards を使用して、lambda-kine-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB テーブルからストリーミングデータをロードする

AWS Lambda を使用して、Amazon DynamoDB から OpenSearch Service ドメインにデータを送信できます。データテーブルに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

Prerequisites

続行する前に、以下のリソースが必要です。

前提条件 説明
DynamoDB テーブル

このテーブルにはソースデータが含まれています。詳細については、Amazon DynamoDB デベロッパーガイドの「DynamoDB テーブルの基本的なオペレーション」を参照してください。

このテーブルは、OpenSearch Service ドメインと同じリージョンに存在している必要があり、ストリームが [新しいイメージ] に設定されている必要があります。詳細については、「ストリームの有効化」を参照してください。

OpenSearch Service ドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch Service ドメインの作成」を参照してください。
IAM ロール

このロールには、以下のように OpenSearch Service、DynamoDB、Lambda の基本的な実行許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージを作成する」の手順に従いますが、ddb-to-opensearch という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, with https:// index = 'lambda-index' type = '_doc' url = host + '/' + index + '/' + type + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost の変数を編集します。

まだ持っていない場合、pip をインストールしてから、次のコマンドを使用して、依存関係をインストールします。

cd ddb-to-opensearch
cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

次に、「Lambda 関数を作成する」の手順に従いますが、「Prerequisites」の IAM ロールと、トリガーの以下の設定を指定します。

  • [テーブル]: DynamoDB テーブル

  • [バッチサイズ]: 100

  • [開始位置]: 水平トリム

詳細については、Amazon DynamoDB デベロッパーガイドの「DynamoDB Streams と Lambda を用いた新しい項目の処理」を参照してください。

この時点で、すべてのリソースが揃いました (ソースデータの DynamoDB テーブル、テーブルに対する DynamoDB ストリームの変更、ソースデータが変更されてそれらの変更のインデックスを作成した後に実行される関数、検索および可視化のための OpenSearch Service ドメイン)。

Lambda 関数をテストする

関数を作成した後、AWS CLI を使用して DynamoDB テーブルに新しい項目を追加することでテストできます。

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

次に、OpenSearch Service コンソールまたは OpenSearch Dashboards を使用して、lambda-index にドキュメントが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Kinesis Data Firehose からストリーミングデータをロードする

Kinesis Data Firehose は配信先として OpenSearch Service をサポートしています。OpenSearch Service にストリーミングデータをロードする方法については、Amazon Kinesis Data Firehose デベロッパーガイドの「Kinesis Data Firehose 配信ストリームの作成」および「配信先として OpenSearch Service を選択する」を参照してください。

データを OpenSearch Service にロードする前に、データの変換が必要になる場合があります。Lambda 関数を使用してこのタスクを実行する方法については、このガイドの「Amazon Kinesis Data Firehose データ変換」を参照してください。

配信ストリームを設定する場合、Kinesis Data Firehose には「ワンクリック」の IAM ロールが用意されています。このロールにより、OpenSearch Service へのデータ送信、Amazon S3 でのデータバックアップ、および Lambda の使用によるデータ変換に必要なリソースアクセス権が付与されます。このようなロールを手動で作成する作業は複雑になるため、用意されているロールの使用をお勧めします。

Amazon CloudWatch からストリーミングデータをロードする

CloudWatch Logs サブスクリプションを使用して、ストリーミングデータを CloudWatch Logs から OpenSearch Service ドメインにロードできます。Amazon CloudWatch サブスクリプションの詳細については、「サブスクリプションを用いたログデータのリアルタイム処理」を参照してください。設定情報については、Amazon CloudWatch デベロッパーガイドの「CloudWatch Logs データを Amazon OpenSearch Service にストリーミング」を参照してください。

AWS IoT からストリーミングデータをロードする

ルールを使用して AWS IoT からデータを送信することができます。詳細については、 AWS IoT デベロッパーガイドの「OpenSearch アクション」を参照してください。