Amazon OpenSearch Service へのストリーミングデータのロード - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon OpenSearch Service へのストリーミングデータのロード

OpenSearch Ingestion を使用すると、サードパーティーのソリューションを使用しなくても、ストリーミングデータを Amazon OpenSearch Service ドメインに直接ロードできます。 OpenSearch Ingestion にデータを送信するには、データプロデューサーを設定し、指定したドメインまたはコレクションにデータを自動的に配信します。取り込みを開始するには、 OpenSearch 「」を参照してくださいチュートリアル: Amazon Ingestion OpenSearch を使用してコレクションにデータを取り込む

OpenSearch サービスのサポートが組み込まれている Amazon Data Firehose や Amazon CloudWatch Logs など、他のソースを使用してストリーミングデータをロードすることもできます。Amazon S3、Amazon Kinesis Data Streams、Amazon DynamoDB などの他のものでは、イベントハンドラとして AWS Lambda 関数を使用します。Lambda 関数は、データを処理してドメインにストリーミングすることで、新しいデータに応答します。

注記

Lambda では、いくつかの一般的なプログラミング言語がサポートされ、ほとんどの AWS リージョンで利用できます。詳細については、「AWS Lambda デベロッパーガイド」の「Lambda の開始方法」、および「AWS 全般のリファレンス」の「AWS サービスエンドポイント」を参照してください。

取り込みからストリーミングデータをロード OpenSearch する

Amazon OpenSearch Ingestion を使用して、 OpenSearch サービスドメインにデータをロードできます。 OpenSearch Ingestion にデータを送信するようにデータプロデューサーを設定すると、指定したコレクションにデータが自動的に配信されます。データを配信する前にデータを変換するように OpenSearch Ingestion を設定することもできます。詳細については、「Amazon OpenSearch ン・インジェスト」を参照してください。

Amazon S3 からストリーミングデータをロードする

Lambda を使用して、Amazon S3 から OpenSearch サービスドメインにデータを送信できます。S3 バケットに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

このデータのストリーミング方法には非常に柔軟性があります。オブジェクトメタデータのインデックスを作成したり、オブジェクトがプレーンテキストの場合は、オブジェクト本文の要素を解析してインデックス作成したりすることができます。このセクションでは、正規表現を使用してログファイルを解析し、一致をインデックス作成するシンプルな Python サンプルコードがあります。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon S3 バケット 詳細については、Amazon Simple Storage Service ユーザーガイドの「最初の S3 バケットを作成する」を参照してください。バケットは、 OpenSearch サービスドメインと同じリージョンに存在する必要があります。
OpenSearch サービスドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch サービスドメインの作成」を参照してください。

Lambda デプロイパッケージを作成する

デプロイパッケージは、コードとその依存関係を含む ZIP または JAR ファイルです。このセクションには、Python サンプルコードがあります。他のプログラミング言語については、AWS Lambda デベロッパーガイドの「Lambda デプロイパッケージ」を参照してください。

  1. ディレクトリを作成します。このサンプルでは、名前 s3-to-opensearch を使用します。

  2. sample.py という名前のディレクトリ内にファイルを作成します。

    import boto3 import re import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-s3-index' datatype = '_doc' url = host + '/' + index + '/' + datatype headers = { "Content-Type": "application/json" } s3 = boto3.client('s3') # Regular expressions used to parse some simple log lines ip_pattern = re.compile('(\d+\.\d+\.\d+\.\d+)') time_pattern = re.compile('\[(\d+\/\w\w\w\/\d\d\d\d:\d\d:\d\d:\d\d\s-\d\d\d\d)\]') message_pattern = re.compile('\"(.+)\"') # Lambda execution starts here def handler(event, context): for record in event['Records']: # Get the bucket name and key for the new file bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] # Get, read, and split the file into lines obj = s3.get_object(Bucket=bucket, Key=key) body = obj['Body'].read() lines = body.splitlines() # Match the regular expressions to each line and index the JSON for line in lines: line = line.decode("utf-8") ip = ip_pattern.search(line).group(1) timestamp = time_pattern.search(line).group(1) message = message_pattern.search(line).group(1) document = { "ip": ip, "timestamp": timestamp, "message": message } r = requests.post(url, auth=awsauth, json=document, headers=headers)

    regionhost の変数を編集します。

  3. まだ持っていない場合、pip をインストールしてから、依存関係を新しい package ディレクトリにインストールします。

    cd s3-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

    すべての Lambda 実行環境に Boto3 がインストールされているため、デプロイパッケージに含める必要はありません。

  4. アプリケーションコードや相互依存性をパッケージ化します。

    cd package zip -r ../lambda.zip . cd .. zip -g lambda.zip sample.py

Lambda 関数を作成する

デプロイパッケージを作成すると、Lambda 関数を作成できます。関数を作成するとき、名前、ランタイム (たとえば、Python 3.8)、IAM ロールを選択します。IAM ロールにより、関数の許可が定義されます。詳細な手順については、AWS Lambda デベロッパーガイドの「コンソールで Lambda 関数を作成する」を参照してください。

この例では、コンソールを使用していることを前提としています。次のスクリーンショットに示すように、Python 3.9 と、S3 読み取りアクセス許可と OpenSearch サービス書き込みアクセス許可を持つロールを選択します。


                    Lambda 関数のサンプル設定

関数を作成した後、トリガーを追加する必要があります。この例では、S3 バケットにログファイルが到着するたびにコードを実行します。

  1. [トリガーの追加] を選択し、[S3] を選択します。

  2. バケットを選択します。

  3. [イベントタイプ] で、[PUT] を選択します。

  4. [プレフィックス] に logs/ と入力します。

  5. [サフィックス] では、.log と入力します。

  6. 再帰呼び出しの警告を確認し、[追加] を選択します。

最後に、デプロイパッケージをアップロードすることができます。

  1. [~からアップロード] および [.zip ファイルをアップロード] を選択してから、デプロイパッケージをアップロードするプロンプトに従います。

  2. アップロードが終了したら、[ランタイム設定] を編集し、[ハンドラ] を sample.handler に変更します。この設定により、トリガー後に実行するファイル (sample.py) およびメソッド (handler) が Lambda に通知されます。

この時点で、ログファイル用のバケット、ログファイルがバケットに追加されるたびに実行される関数、解析とインデックス作成を実行するコード、検索と可視化のための OpenSearch サービスドメインなどのリソースの完全なセットがあります。

Lambda 関数をテストする

関数を作成した後、Amazon S3 バケットにファイルをアップロードしてテストできます。次のサンプルログの行を使用して、sample.log という名前のファイルを作成します。

12.345.678.90 - [10/Oct/2000:13:55:36 -0700] "PUT /some-file.jpg" 12.345.678.91 - [10/Oct/2000:14:56:14 -0700] "GET /some-file.jpg"

S3 バケットの logs フォルダにファイルをアップロードします。手順については、Amazon Simple Storage Service ユーザーガイドの「バケットにオブジェクトをアップロードする」を参照してください。

次に、 OpenSearch サービスコンソールまたは OpenSearch Dashboards を使用して、lambda-s3-indexインデックスに 2 つのドキュメントが含まれていることを確認します。標準検索リクエストを行うこともできます。

GET https://domain-name/lambda-s3-index/_search?pretty { "hits" : { "total" : 2, "max_score" : 1.0, "hits" : [ { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vTYXaWIBJWV_TTkEuSDg", "_score" : 1.0, "_source" : { "ip" : "12.345.678.91", "message" : "GET /some-file.jpg", "timestamp" : "10/Oct/2000:14:56:14 -0700" } }, { "_index" : "lambda-s3-index", "_type" : "_doc", "_id" : "vjYmaWIBJWV_TTkEuCAB", "_score" : 1.0, "_source" : { "ip" : "12.345.678.90", "message" : "PUT /some-file.jpg", "timestamp" : "10/Oct/2000:13:55:36 -0700" } } ] } }

Amazon Kinesis Data Streams からストリーミングデータをロードする

Kinesis Data Streams から OpenSearch サービスにストリーミングデータをロードできます。データストリームに到達する新しいデータによって、Lambda へのイベント通知がトリガーされ、インデックス作成を実行するカスタムコードが実行されます。このセクションには、いくつかの簡単な Python サンプルコードがあります。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
Amazon Kinesis Data Stream Lambda 関数のイベントソース。詳細については、「Kinesis Data Streams」を参照してください。
OpenSearch サービスドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch サービスドメインの作成」を参照してください。
IAM ロール

このロールには、次のような基本的な OpenSearch サービス、Kinesis、および Lambda アクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents", "kinesis:GetShardIterator", "kinesis:GetRecords", "kinesis:DescribeStream", "kinesis:ListStreams" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージを作成する」の手順に従いますが、kinesis-to-opensearch という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import base64 import boto3 import json import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-kine-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: id = record['eventID'] timestamp = record['kinesis']['approximateArrivalTimestamp'] # Kinesis data is base64-encoded, so decode here message = base64.b64decode(record['kinesis']['data']) # Create the JSON document document = { "id": id, "timestamp": timestamp, "message": message } # Index the document r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return 'Processed ' + str(count) + ' items.'

regionhost の変数を編集します。

まだ持っていない場合、pip をインストールしてから、次のコマンドを使用して、依存関係をインストールします。

cd kinesis-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

次に、「Lambda 関数を作成する」の手順に従いますが、「前提条件」の IAM ロールと、トリガーの以下の設定を指定します。

  • [Kinesis ストリーム]: Kinesis ストリーム

  • [バッチサイズ]: 100

  • [開始位置]: 水平トリム

詳細については、Amazon Kinesis Data Streams デベロッパーガイドの「Amazon Kinesis Data Streams とは」を参照してください。

この時点で、Kinesis データストリーム、ストリームが新しいデータを受信してそのデータのインデックスを作成した後に実行される関数、検索と視覚化のための OpenSearch サービスドメインなどのリソースの完全なセットがあります。

Lambda 関数をテストする

関数を作成した後、 AWS CLIを使用してデータストリームに新しいレコードを追加することでテストできます。

aws kinesis put-record --stream-name test --data "My test data." --partition-key partitionKey1 --region us-west-1

次に、 OpenSearch サービスコンソールまたは OpenSearch Dashboards を使用して、 にドキュメントlambda-kine-indexが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://domain-name/lambda-kine-index/_search { "hits" : [ { "_index": "lambda-kine-index", "_type": "_doc", "_id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042", "_score": 1, "_source": { "timestamp": 1523648740.051, "message": "My test data.", "id": "shardId-000000000000:49583511615762699495012960821421456686529436680496087042" } } ] }

Amazon DynamoDB テーブルからストリーミングデータをロードする

を使用して AWS Lambda 、Amazon DynamoDB から OpenSearch サービスドメインにデータを送信できます。データテーブルに到着する新しいデータにより、Lambda へのイベント通知がトリガーされた後、インデックス作成を実行するカスタムコードが実行されます。

前提条件

続行する前に、以下のリソースが必要です。

前提条件 説明
DynamoDB テーブル

このテーブルにはソースデータが含まれています。詳細については、Amazon DynamoDB デベロッパーガイドの「DynamoDB テーブルの基本的なオペレーション」を参照してください。

テーブルは OpenSearch サービスドメインと同じリージョンに存在し、ストリームが新しいイメージ に設定されている必要があります。詳細については、「ストリームの有効化」を参照してください。

OpenSearch サービスドメイン Lambda 関数により処理された後のデータのターゲット。詳細については、「 OpenSearch サービスドメインの作成」を参照してください。
IAM ロール

このロールには、次のような基本的な OpenSearch サービス、DynamoDB、および Lambda 実行アクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpPut", "dynamodb:DescribeStream", "dynamodb:GetRecords", "dynamodb:GetShardIterator", "dynamodb:ListStreams", "logs:CreateLogGroup", "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": "*" } ] }

ロールには、次の信頼関係が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "lambda.amazonaws.com" }, "Action": "sts:AssumeRole" } ] }

詳細については、IAM ユーザーガイドの「IAM ロールの作成」を参照してください。

Lambda 関数を作成する

Lambda デプロイパッケージを作成する」の手順に従いますが、ddb-to-opensearch という名前のディレクトリを作成し、sample.py に次のコードを使用します。

import boto3 import requests from requests_aws4auth import AWS4Auth region = '' # e.g. us-east-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) host = '' # the OpenSearch Service domain, e.g. https://search-mydomain.us-west-1.es.amazonaws.com index = 'lambda-index' datatype = '_doc' url = host + '/' + index + '/' + datatype + '/' headers = { "Content-Type": "application/json" } def handler(event, context): count = 0 for record in event['Records']: # Get the primary key for use as the OpenSearch ID id = record['dynamodb']['Keys']['id']['S'] if record['eventName'] == 'REMOVE': r = requests.delete(url + id, auth=awsauth) else: document = record['dynamodb']['NewImage'] r = requests.put(url + id, auth=awsauth, json=document, headers=headers) count += 1 return str(count) + ' records processed.'

regionhost の変数を編集します。

まだ持っていない場合、pip をインストールしてから、次のコマンドを使用して、依存関係をインストールします。

cd ddb-to-opensearch pip install --target ./package requests pip install --target ./package requests_aws4auth

次に、「Lambda 関数を作成する」の手順に従いますが、「前提条件」の IAM ロールと、トリガーの以下の設定を指定します。

  • [テーブル]: DynamoDB テーブル

  • [バッチサイズ]: 100

  • [開始位置]: 水平トリム

詳細については、Amazon DynamoDB デベロッパーガイドの「DynamoDB Streams と Lambda を用いた新しい項目の処理」を参照してください。

この時点で、ソースデータの DynamoDB テーブル、テーブルに対する変更の DynamoDB ストリーム、ソースデータの変更後に実行される関数、それらの変更のインデックス作成、検索と可視化のための OpenSearch サービスドメインなどのリソースの完全なセットがあります。

Lambda 関数をテストする

関数を作成した後、 AWS CLIを使用して DynamoDB テーブルに新しい項目を追加することでテストできます。

aws dynamodb put-item --table-name test --item '{"director": {"S": "Kevin Costner"},"id": {"S": "00001"},"title": {"S": "The Postman"}}' --region us-west-1

次に、 OpenSearch サービスコンソールまたは OpenSearch Dashboards を使用して、 にドキュメントlambda-indexが含まれていることを確認します。以下のリクエストを使用することもできます。

GET https://domain-name/lambda-index/_doc/00001 { "_index": "lambda-index", "_type": "_doc", "_id": "00001", "_version": 1, "found": true, "_source": { "director": { "S": "Kevin Costner" }, "id": { "S": "00001" }, "title": { "S": "The Postman" } } }

Amazon Data Firehose からストリーミングデータをロードする

Firehose は、配信先として OpenSearch サービスをサポートします。ストリーミングデータを OpenSearch サービスにロードする方法については、「Amazon Data Firehose デベロッパーガイド」の「Kinesis Data Firehose 配信ストリームの作成」および「送信先に OpenSearch サービスを選択する」を参照してください。

Service にデータをロードする前に OpenSearch 、データの変換を実行する必要がある場合があります。Lambda 関数を使用してこのタスクを実行する方法については、このガイドの「Amazon Kinesis Data Firehose データ変換」を参照してください。

配信ストリームを設定すると、Firehose には「ワンクリック」の IAM ロールがあり、Lambda を使用して OpenSearch サービスへのデータの送信、Amazon S3 上のデータのバックアップ、およびデータの変換に必要なリソースアクセスが許可されます。このようなロールを手動で作成する作業は複雑になるため、用意されているロールの使用をお勧めします。

Amazon からストリーミングデータをロードする CloudWatch

CloudWatch Logs サブスクリプションを使用して、 CloudWatch ログから OpenSearch サービスドメインにストリーミングデータをロードできます。Amazon CloudWatch サブスクリプションの詳細については、「サブスクリプションによるログデータのリアルタイム処理」を参照してください。設定情報については、「Amazon デベロッパーガイド」の「Amazon OpenSearch Service への CloudWatch ログデータのストリーミング」を参照してください。 CloudWatch

AWS IoTからストリーミングデータをロードする

ルール AWS IoT を使用して、 からデータを送信できます。 https://docs.aws.amazon.com/iot/latest/developerguide/iot-rules.html詳細については、「 AWS IoT デベロッパーガイド」の「 OpenSearchアクション」を参照してください。