AWS Config ルールの AWS Lambda 関数の例 (Python) - AWS Config

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Config ルールの AWS Lambda 関数の例 (Python)

AWS Lambda は、 AWS サービスによって発行されたイベントに応答して関数を実行します。 AWS Config カスタム Lambda ルールの 関数は、 によって発行されるイベントを受け取り AWS Config、その後、イベントから受け取ったデータと AWS Config API から取得したデータを使用してルールのコンプライアンスを評価します。Config ルールの関数のオペレーションは、設定変更でトリガーされる評価を実行するか、定期的にトリガーされる評価を実行するかで異なります。

AWS Lambda 関数内の一般的なパターンについては、「 AWS Lambda デベロッパーガイド」の「プログラミングモデル」を参照してください。

設定変更でトリガーされる評価の関数の例

AWS Config は、カスタムルールの範囲内にあるリソースの設定変更を検出すると、次の例のような関数を呼び出します。

AWS Config コンソールを使用して、この例のような関数に関連付けられたルールを作成する場合は、トリガータイプとして設定の変更を選択します。 AWS Config API または を使用してルール AWS CLI を作成する場合は、 MessageType 属性を ConfigurationItemChangeNotificationおよび に設定しますOversizedConfigurationItemChangeNotification。これらの設定により、リソースの変更の結果として が設定項目またはサイズが大きすぎる設定項目 AWS Config を生成するたびに、ルールをトリガーできます。

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
関数のオペレーション

関数はランタイムに以下のオペレーションを実行します。

  1. 関数は、 が event オブジェクトをhandler関数に AWS Lambda 渡すときに実行されます。この例では、関数は呼び出し元に情報を返すために使用するオプションの callbackパラメータを受け入れます。 AWS Lambda また、 は、関数の実行中に使用できる情報とメソッドを含む context オブジェクトを渡します。Lambda の新しいバージョンでは、コンテキストは使用されなくなりました。

  2. 関数は、イベントの messageType が設定項目であるかサイズが大きすぎる設定項目であるかを確認し、その設定項目を返します。

  3. ハンドラーは、isApplicable 関数を呼び出してリソースが削除されたかどうかを確認します。

    注記

    削除されたリソースをレポートするルールは、不必要なルール評価を避けるために、NOT_APPLICABLE の評価結果を返す必要があります。

  4. ハンドラーは evaluateChangeNotificationCompliance関数を呼び出し、 がイベントで AWS Config 発行した configurationItemおよび ruleParameters オブジェクトを渡します。

    関数は最初にリソースが EC2 インスタンスであるかどうかを評価します。リソースが EC2 インスタンスではない場合、関数はコンプライアンス値として NOT_APPLICABLE を返します。

    次に、関数は設定項目の instanceType 属性が desiredInstanceType パラメータ値と等しいかどうかを評価します。値が等しい場合、関数は COMPLIANT を返します。値が等しくない場合、関数は NON_COMPLIANT を返します。

  5. ハンドラーは、 putEvaluationsRequest オブジェクトを初期化 AWS Config して評価結果を に送信する準備をします。このオブジェクトに含まれている Evaluations パラメータは、評価対象のリソースのコンプライアンス結果、リソースタイプ、および ID を識別します。putEvaluationsRequest オブジェクトには、ルールとイベントを識別するイベントの結果トークンも含まれます AWS Config。

  6. ハンドラーは、 オブジェクトをconfigクライアントの putEvaluationsメソッドに渡 AWS Config して、評価結果を に送信します。

定期的な評価の関数の例

AWS Config は、定期的な評価のために次の例のような関数を呼び出します。定期的な評価は、 AWS Configでのルールの定義時に指定した間隔で発生します。

AWS Config コンソールを使用して、この例のような関数に関連付けられたルールを作成する場合は、トリガータイプとして Periodic を選択します。 AWS Config API または を使用してルール AWS CLI を作成する場合は、 MessageType 属性を に設定しますScheduledNotification

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
関数のオペレーション

関数はランタイムに以下のオペレーションを実行します。

  1. 関数は、 が event オブジェクトをhandler関数に AWS Lambda 渡すときに実行されます。この例では、関数は呼び出し元に情報を返すために使用するオプションの callbackパラメータを受け入れます。 AWS Lambda また、 は、関数の実行中に使用できる情報とメソッドを含む context オブジェクトを渡します。Lambda の新しいバージョンでは、コンテキストは使用されなくなりました。

  2. 指定したタイプのリソースをカウントするために、ハンドラーは countResourceTypes 関数を呼び出し、イベントから受け取った applicableResourceType パラメータを渡します。countResourceTypes 関数は、listDiscoveredResources クライアントの config メソッドを呼び出します。クライアントは、該当するリソースの ID のリストを返します。関数は、このリストの長さに基づいて該当するリソースの数を判断し、この数をハンドラーに返します。

  3. ハンドラーは、 putEvaluationsRequest オブジェクトを初期化 AWS Config して評価結果を に送信する準備をします。このオブジェクトには、コンプライアンス結果とイベントで発行された を識別する AWS アカウント Evaluationsパラメータが含まれています。Evaluations パラメータでは、 AWS Configでサポートされている任意のリソースタイプに結果を適用できます。putEvaluationsRequest オブジェクトには、ルールとイベントを識別するイベントの結果トークンも含まれます AWS Config。

  4. putEvaluationsRequest オブジェクト内で、ハンドラーは evaluateCompliance 関数を呼び出します。この関数は、該当するリソースの数が、イベントから提供された maxCount パラメータに割り当てた最大値を超えているかどうかをテストします。リソース数が最大値を超えている場合、関数は NON_COMPLIANT を返します。リソース数が最大値を超えていない場合、関数は COMPLIANT を返します。

  5. ハンドラーは、 オブジェクトをconfigクライアントの putEvaluationsメソッドに渡 AWS Config して、評価結果を に送信します。