AWS Lambda ルールの AWS Config 関数の例 (Python) - AWS Config

AWS Lambda ルールの AWS Config 関数の例 (Python)

AWS Lambda は、AWS のサービスから発行されたイベントに応じて関数を実行します。AWS Config カスタム Lambda ルールの関数は AWS Config から発行されたイベントを受け取り、イベントから受け取ったデータと AWS Config API から取得したデータを使用してルールのコンプライアンスを評価します。Config ルールの関数のオペレーションは、設定変更でトリガーされる評価を実行するか、定期的にトリガーされる評価を実行するかで異なります。

AWS Lambda 関数内の一般的なパターンについては、[デベロッパーガイド] の [AWS Lambdaプログラミングモデル] を参照してください。

設定変更でトリガーされる評価の関数の例

AWS Config は、カスタムルールのスコープ内にあるリソースの設定変更を検出すると、次の例のような関数を呼び出します。

この例のような関数に関連付けるルールを AWS Config コンソールで作成する場合は、トリガータイプとして [Configuration changes] (設定変更) を選択します。AWS Config API または AWS CLI でルールを作成する場合は、MessageType 属性を ConfigurationItemChangeNotification および OversizedConfigurationItemChangeNotification に設定します。これらの設定により、リソースの変更に伴って AWS Config で設定項目またはサイズが大きすぎる設定項目が生成されるたびに、ルールがトリガーされます。

import boto3 import json # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])

関数のオペレーション

関数はランタイムに以下のオペレーションを実行します。

  1. 関数は、AWS Lambda から event オブジェクトが handler 関数に渡されると、実行されます。この例では、関数がオプションの callback パラメータを承諾し、それを使用して発信者に情報を返します。AWS Lambda は context オブジェクトも渡します。このオブジェクトには関数の実行中に使用できる情報とメソッドが含まれています。Lambda の新しいバージョンでは、コンテキストは使用されなくなりました。

  2. 関数は、イベントの messageType が設定項目であるかサイズが大きすぎる設定項目であるかを確認し、その設定項目を返します。

  3. ハンドラーは、isApplicable 関数を呼び出してリソースが削除されたかどうかを確認します。

  4. ハンドラは evaluateChangeNotificationCompliance 関数を呼び出し、イベントで AWS Config から発行された configurationItem オブジェクトと ruleParameters オブジェクトを渡します。

    関数は最初にリソースが EC2 インスタンスであるかどうかを評価します。リソースが EC2 インスタンスではない場合、関数はコンプライアンス値として NOT_APPLICABLE を返します。

    次に、関数は設定項目の instanceType 属性が desiredInstanceType パラメータ値と等しいかどうかを評価します。値が等しい場合、関数は COMPLIANT を返します。値が等しくない場合、関数は NON_COMPLIANT を返します。

  5. ハンドラは、putEvaluationsRequest オブジェクトを初期化し、AWS Config に評価結果を送信する準備を整えます。このオブジェクトに含まれている Evaluations パラメータは、評価対象のリソースのコンプライアンス結果、リソースタイプ、および ID を識別します。putEvaluationsRequest オブジェクトには、AWS Config のルールとイベントを識別する、イベントの結果トークンも含まれています。

  6. ハンドラは、config クライアントの putEvaluations メソッドにオブジェクトを渡すことで、AWS Config に評価結果を送信します。

定期的な評価の関数の例

AWS Config は定期的な評価に応じて次の例のような関数を呼び出します。定期的な評価は、AWS Config でのルールの定義時に指定した間隔で発生します。

この例のような関数に関連付けるルールを AWS Config コンソールで作成する場合は、トリガータイプとして [Periodic] (定期的) を選択します。AWS Config API または AWS CLI でルールを作成する場合は、MessageType 属性を ScheduledNotification に設定します。

import boto3 import json # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])

関数のオペレーション

関数はランタイムに以下のオペレーションを実行します。

  1. 関数は、AWS Lambda から event オブジェクトが handler 関数に渡されると、実行されます。この例では、関数がオプションの callback パラメータを承諾し、それを使用して発信者に情報を返します。AWS Lambda は context オブジェクトも渡します。このオブジェクトには関数の実行中に使用できる情報とメソッドが含まれています。Lambda の新しいバージョンでは、コンテキストは使用されなくなりました。

  2. 指定したタイプのリソースをカウントするために、ハンドラーは countResourceTypes 関数を呼び出し、イベントから受け取った applicableResourceType パラメータを渡します。countResourceTypes 関数は、listDiscoveredResources クライアントの config メソッドを呼び出します。クライアントは、該当するリソースの ID のリストを返します。関数は、このリストの長さに基づいて該当するリソースの数を判断し、この数をハンドラーに返します。

  3. ハンドラは、putEvaluationsRequest オブジェクトを初期化し、AWS Config に評価結果を送信する準備を整えます。このオブジェクトには、Evaluations パラメータが含まれています。このパラメータは、コンプライアンス結果とイベントで発行された AWS アカウント アカウントを識別します。Evaluations パラメータでは、AWS Config でサポートされている任意のリソースタイプに結果を適用できます。putEvaluationsRequest オブジェクトには、AWS Config のルールとイベントを識別する、イベントの結果トークンも含まれています。

  4. putEvaluationsRequest オブジェクト内で、ハンドラーは evaluateCompliance 関数を呼び出します。この関数は、該当するリソースの数が、イベントから提供された maxCount パラメータに割り当てた最大値を超えているかどうかをテストします。リソース数が最大値を超えている場合、関数は NON_COMPLIANT を返します。リソース数が最大値を超えていない場合、関数は COMPLIANT を返します。

  5. ハンドラは、config クライアントの putEvaluations メソッドにオブジェクトを渡すことで、AWS Config に評価結果を送信します。