AWS Lambda ルールの AWS Config 関数の例 (Python)
AWS Lambda は、AWS のサービスから発行されたイベントに応じて関数を実行します。AWS Config カスタム Lambda ルールの関数は AWS Config から発行されたイベントを受け取り、イベントから受け取ったデータと AWS Config API から取得したデータを使用してルールのコンプライアンスを評価します。Config ルールの関数のオペレーションは、設定変更でトリガーされる評価を実行するか、定期的にトリガーされる評価を実行するかで異なります。
AWS Lambda 関数内の一般的なパターンについては、[デベロッパーガイド] の [AWS Lambdaプログラミングモデル] を参照してください。
設定変更でトリガーされる評価の関数の例
AWS Config は、カスタムルールのスコープ内にあるリソースの設定変更を検出すると、次の例のような関数を呼び出します。
この例のような関数に関連付けるルールを AWS Config コンソールで作成する場合は、トリガータイプとして [Configuration changes] (設定変更) を選択します。AWS Config API または AWS CLI でルールを作成する場合は、MessageType
属性を ConfigurationItemChangeNotification
および OversizedConfigurationItemChangeNotification
に設定します。これらの設定により、リソースの変更に伴って AWS Config で設定項目またはサイズが大きすぎる設定項目が生成されるたびに、ルールがトリガーされます。
import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
関数のオペレーション
関数はランタイムに以下のオペレーションを実行します。
-
関数は、AWS Lambda から
event
オブジェクトがhandler
関数に渡されると、実行されます。この例では、関数がオプションのcallback
パラメータを承諾し、それを使用して発信者に情報を返します。AWS Lambda はcontext
オブジェクトも渡します。このオブジェクトには関数の実行中に使用できる情報とメソッドが含まれています。Lambda の新しいバージョンでは、コンテキストは使用されなくなりました。 -
関数は、イベントの
messageType
が設定項目であるかサイズが大きすぎる設定項目であるかを確認し、その設定項目を返します。 -
ハンドラーは、
isApplicable
関数を呼び出してリソースが削除されたかどうかを確認します。 -
ハンドラは
evaluateChangeNotificationCompliance
関数を呼び出し、イベントで AWS Config から発行されたconfigurationItem
オブジェクトとruleParameters
オブジェクトを渡します。関数は最初にリソースが EC2 インスタンスであるかどうかを評価します。リソースが EC2 インスタンスではない場合、関数はコンプライアンス値として
NOT_APPLICABLE
を返します。次に、関数は設定項目の
instanceType
属性がdesiredInstanceType
パラメータ値と等しいかどうかを評価します。値が等しい場合、関数はCOMPLIANT
を返します。値が等しくない場合、関数はNON_COMPLIANT
を返します。 -
ハンドラは、
putEvaluationsRequest
オブジェクトを初期化し、AWS Config に評価結果を送信する準備を整えます。このオブジェクトに含まれているEvaluations
パラメータは、評価対象のリソースのコンプライアンス結果、リソースタイプ、および ID を識別します。putEvaluationsRequest
オブジェクトには、AWS Config のルールとイベントを識別する、イベントの結果トークンも含まれています。 -
ハンドラは、
config
クライアントのputEvaluations
メソッドにオブジェクトを渡すことで、AWS Config に評価結果を送信します。
定期的な評価の関数の例
AWS Config は定期的な評価に応じて次の例のような関数を呼び出します。定期的な評価は、AWS Config でのルールの定義時に指定した間隔で発生します。
この例のような関数に関連付けるルールを AWS Config コンソールで作成する場合は、トリガータイプとして [Periodic] (定期的) を選択します。AWS Config API または AWS CLI でルールを作成する場合は、MessageType
属性を ScheduledNotification
に設定します。
import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
関数のオペレーション
関数はランタイムに以下のオペレーションを実行します。
-
関数は、AWS Lambda から
event
オブジェクトがhandler
関数に渡されると、実行されます。この例では、関数がオプションのcallback
パラメータを承諾し、それを使用して発信者に情報を返します。AWS Lambda はcontext
オブジェクトも渡します。このオブジェクトには関数の実行中に使用できる情報とメソッドが含まれています。Lambda の新しいバージョンでは、コンテキストは使用されなくなりました。 -
指定したタイプのリソースをカウントするために、ハンドラーは
countResourceTypes
関数を呼び出し、イベントから受け取ったapplicableResourceType
パラメータを渡します。countResourceTypes
関数は、listDiscoveredResources
クライアントのconfig
メソッドを呼び出します。クライアントは、該当するリソースの ID のリストを返します。関数は、このリストの長さに基づいて該当するリソースの数を判断し、この数をハンドラーに返します。 -
ハンドラは、
putEvaluationsRequest
オブジェクトを初期化し、AWS Config に評価結果を送信する準備を整えます。このオブジェクトには、Evaluations
パラメータが含まれています。このパラメータは、コンプライアンス結果とイベントで発行された AWS アカウント アカウントを識別します。Evaluations
パラメータでは、AWS Config でサポートされている任意のリソースタイプに結果を適用できます。putEvaluationsRequest
オブジェクトには、AWS Config のルールとイベントを識別する、イベントの結果トークンも含まれています。 -
putEvaluationsRequest
オブジェクト内で、ハンドラーはevaluateCompliance
関数を呼び出します。この関数は、該当するリソースの数が、イベントから提供されたmaxCount
パラメータに割り当てた最大値を超えているかどうかをテストします。リソース数が最大値を超えている場合、関数はNON_COMPLIANT
を返します。リソース数が最大値を超えていない場合、関数はCOMPLIANT
を返します。 -
ハンドラは、
config
クライアントのputEvaluations
メソッドにオブジェクトを渡すことで、AWS Config に評価結果を送信します。