Amazon Elasticsearch Service
開発者ガイド (API バージョン 2015-01-01)

Curator を使用した Amazon Elasticsearch Service でのデータの更新

この章では、インデックスとスナップショットを管理するために AWS Lambda と Curator を使用するためのサンプルコードを示します。Curator は、60 日以上前に作成されたインデックスや、完了できなかったスナップショットなど、特定の条件を満たす ID インデックスやスナップショットの識別に役立つ数多くのフィルタを提供しています。

多くの場合、Curator はコマンドラインインターフェイス (CLI) として使用されますが、Python API も備えています。つまり、Lambda 関数内で使用することができます。

Lambda 関数の設定とデプロイパッケージの作成の詳細については、「Amazon S3 から Amazon ES にストリーミングデータをロードする」を参照してください。さらに詳しくは、「AWS Lambda Developer Guide」を参照してください。この章では、サンプルコード、基本設定、トリガー、およびアクセス許可のみを扱います。

サンプルコード

次のサンプルコードでは、Curator および公式な Python Elasticsearch クライアントを使用して、データが 30 日以上経過していることを示すタイムスタンプが名前に含まれているインデックスをすべて削除します。たとえば、インデックス名が my-logs-2014.03.02 である場合、このインデックスは削除されます。このフィルタはインデックスの名前を使用してその古さを判断するため、本日インデックスを作成した場合でも、削除は発生します。

このコードには、作成日に基づいて古さを決定するものを含めて、他の一般的なフィルタのコメントアウトされた例も含まれています。AWS SDK for Python (Boto3) および requests-aws 4 auth ライブラリは、Amazon ES へのリクエストに署名します。

警告

このセクションの両方のコード例では、データが削除されます (多数のデータが削除される可能性があります)。その動作が適切であることを確認するまで、重要でないドメインで各サンプルを変更およびテストしてください。

インデックスの削除

import boto3 from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection import curator host = '' # For example, search-my-domain.region.es.amazonaws.com region = '' # For example, us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) # Lambda execution starts here. def lambda_handler(event, context): # Build the Elasticsearch client. es = Elasticsearch( hosts = [{'host': host, 'port': 443}], http_auth = awsauth, use_ssl = True, verify_certs = True, connection_class = RequestsHttpConnection ) # A test document. document = { "title": "Moneyball", "director": "Bennett Miller", "year": "2011" } # Index the test document so that we have an index that matches the timestring pattern. # You can delete this line and the test document if you already created some test indices. es.index(index="movies-2017.01.31", doc_type="movie", id="1", body=document) index_list = curator.IndexList(es) # Filters by age, anything with a time stamp older than 30 days in the index name. index_list.filter_by_age(source='name', direction='older', timestring='%Y.%m.%d', unit='days', unit_count=30) # Filters by naming prefix. # index_list.filter_by_regex(kind='prefix', value='my-logs-2017') # Filters by age, anything created more than one month ago. # index_list.filter_by_age(source='creation_date', direction='older', unit='months', unit_count=1) print("Found %s indices to delete" % len(index_list.indices)) # If our filtered list contains any indices, delete them. if index_list.indices: curator.DeleteIndices(index_list).do_action()

host および region の値を更新する必要があります。

次のコードサンプルは、2 週間以上古いスナップショットを削除します。また、新しいスナップショットを作成します。

スナップショットの削除

import boto3 from datetime import datetime from requests_aws4auth import AWS4Auth from elasticsearch import Elasticsearch, RequestsHttpConnection import logging import curator # Adding a logger isn't strictly required, but helps with understanding Curator's requests and debugging. logger = logging.getLogger('curator') logger.addHandler(logging.StreamHandler()) logger.setLevel(logging.INFO) host = '' # For example, search-my-domain.region.es.amazonaws.com region = '' # For example, us-west-1 service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) now = datetime.now() # Clunky, but this approach keeps colons out of the URL. date_string = '-'.join((str(now.year), str(now.month), str(now.day), str(now.hour), str(now.second))) snapshot_name = 'my-snapshot-prefix-' + date_string repository_name = 'my-repo' # Lambda execution starts here. def lambda_handler(event, context): # Build the Elasticsearch client. es = Elasticsearch( hosts = [{'host': host, 'port': 443}], http_auth = awsauth, use_ssl = True, verify_certs = True, connection_class = RequestsHttpConnection, timeout = 120 # Deleting snapshots can take a while, so keep the connection open for long enough to get a response. ) try: # Get all snapshots in the repository. snapshot_list = curator.SnapshotList(es, repository=repository_name) # Filter by age, any snapshot older than two weeks. # snapshot_list.filter_by_age(source='creation_date', direction='older', unit='weeks', unit_count=2) # Delete the old snapshots. curator.DeleteSnapshots(snapshot_list, retry_interval=30, retry_count=3).do_action() except (curator.exceptions.SnapshotInProgress, curator.exceptions.NoSnapshots, curator.exceptions.FailedExecution) as e: print(e) # Split into two try blocks. We still want to try and take a snapshot if deletion failed. try: # Get the list of indices. # You can filter this list if you didn't want to snapshot all indices. index_list = curator.IndexList(es) # Take a new snapshot. This operation can take a while, so we don't want to wait for it to complete. curator.Snapshot(index_list, repository=repository_name, name=snapshot_name, wait_for_completion=False).do_action() except (curator.exceptions.SnapshotInProgress, curator.exceptions.FailedExecution) as e: print(e)

hostregionsnapshot_name、および repository_name の値を更新する必要があります。出力が詳細すぎる場合は、logging.INFOlogging.WARN に変更できます。

スナップショットの作成と削除にはしばらく時間がかかる場合があるため、このコードは接続タイムアウトおよび Lambda タイムアウトの影響をより強く受けます (したがって追加のログ記録コードがあります)。Elasticsearch クライアントでは、タイムアウトを 120 秒に設定します。DeleteSnapshots 関数で、Amazon ES ドメインからのレスポンスの取得に時間がかかる場合は、この値を増やす必要が生じる場合があります。また、Lambda 関数のタイムアウトも、デフォルト値の 3 秒から増やす必要があります。推奨される値については、「Basic Settings」を参照してください。

Basic Settings

この章のコードサンプルには、次の設定をお勧めします。

サンプルコード メモリ タイムアウト
インデックスの削除 128 MB 10 秒
スナップショットの削除 128 MB 3 分

トリガ

これらの関数は、一部のイベント (Amazon S3 へのファイルのアップロードなど) に応答するのではなく、スケジュールすることが前提となっています。これらの関数を実行する頻度は調整してください。

サンプルコード サービス ルールタイプ 式の例
インデックスの削除 CloudWatch イベント スケジュール式 rate(1 days)
スナップショットの削除 CloudWatch イベント スケジュール式 rate(4 hours)

アクセス許可

この章の両方の Lambda 関数には、すべての Lambda 関数が必要とする基本的なログ記録アクセス許可に加えて、Amazon ES ドメイン用の HTTP メソッドのアクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:us-west-1:123456789012:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:us-west-1:123456789012:log-group:/aws/lambda/your-lambda-function:*" ] }, { "Effect": "Allow", "Action": [ "es:ESHttpPost", "es:ESHttpGet", "es:ESHttpPut", "es:ESHttpDelete" ], "Resource": "arn:aws:es:us-west-1:123456789012:domain/my-domain/*" } ] }