Exemple de fonctions AWS Lambda pour les règles AWS Config (Python) - AWS Config

Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.

Exemple de fonctions AWS Lambda pour les règles AWS Config (Python)

AWS Lambda exécute des fonctions en réponse à des événements qui sont publiés par des services d'AWS. La fonction d'une règle Lambda AWS Config personnalisée reçoit un événement qui est publié par AWS Config. La fonction utilise ensuite les données qu'elle reçoit de l'événement et qu'elle récupère dans l'API AWS Config pour évaluer la conformité de la règle. Les opérations dans une fonction pour une règle de configuration varient selon qu'il s'agisse d'une évaluation déclenchée par des changements de configuration ou de façon périodique.

Pour en savoir plus sur les modèles courants au sein des fonctions AWS Lambda, consultez Modèles de programmation dans le Guide du développeur AWS Lambda.

Exemple de fonction pour des évaluations déclenchées par des changements de configuration

AWS Config appelle une fonction comme dans l'exemple suivant lorsqu'il détecte un changement de configuration au niveau d'une ressource qui se trouve dans l'étendue d'une règle personnalisée.

Si vous utilisez la console AWS Config pour créer une règle qui est associée à une fonction, comme dans cet exemple, choisissez Configuration changes (Changements de configuration) en tant que type de déclencheur. Si vous utilisez l'API AWS Config ou l'AWS CLI pour créer la règle, définissez l'attribut MessageType sur ConfigurationItemChangeNotification et OversizedConfigurationItemChangeNotification. Ces paramètres permettent à votre règle d'être déclenchée dès qu'AWS Config génère un élément de configuration ou un élément de configuration surdimensionné à la suite d'un changement de ressource.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Opérations de la fonction

La fonction effectue les opérations suivantes lors de l'exécution :

  1. La fonction s'exécute lorsqu'AWS Lambda transmet l'objet event à la fonction handler. Dans cet exemple, la fonction accepte le paramètre facultatif callback, qu'elle utilise pour renvoyer des informations à l'appelant. AWS Lambda transmet également un objet context contenant des informations et des méthodes susceptibles d'être utilisés par la fonction pendant son exécution. Sachez que le contexte n'est plus utilisé dans les nouvelles versions de Lambda.

  2. La fonction vérifie si le messageType pour l'événement est un élément de configuration ou un élément de configuration surdimensionné, puis renvoie l'élément de configuration.

  3. Le gestionnaire appelle la fonction isApplicable afin de déterminer si la ressource a été supprimée.

    Note

    Les règles signalant les ressources supprimées doivent renvoyer le résultat de l'évaluation NOT_APPLICABLE afin d'éviter des évaluations de règles inutiles.

  4. Le gestionnaire appelle la fonction evaluateChangeNotificationCompliance et passe les objets configurationItem et ruleParameters publiés par AWS Config dans l'événement.

    La fonction commence par évaluer si la ressource est une instance EC2. Si la ressource n'est pas une instance EC2, la fonction renvoie la valeur de conformité NOT_APPLICABLE.

    La fonction évalue ensuite si l'attribut instanceType de l'élément de configuration est égal à la valeur du paramètre desiredInstanceType. Si les valeurs sont égales, la fonction renvoie COMPLIANT. Si les valeurs ne sont pas égales, la fonction renvoie NON_COMPLIANT.

  5. Le gestionnaire prépare l'envoi des résultats de l'évaluation à AWS Config en initialisant l'objet putEvaluationsRequest. Cet objet comprend le paramètre Evaluations qui identifie le résultat de la conformité, le type et l'ID de la ressource qui a été évaluée. L'objet putEvaluationsRequest comprend également le jeton du résultat de l'événement, qui permet l'identification de la règle et de l'événement par AWS Config.

  6. Le gestionnaire envoie les résultats de l'évaluation à AWS Config en passant l'objet dans la méthode putEvaluations du client config.

Exemple de fonction pour des évaluations périodiques

AWS Config appelle une fonction comme dans l'exemple suivant pour des évaluations périodiques. Des évaluations périodiques se produisent à la fréquence que vous spécifiez lorsque vous définissez la règle dans AWS Config.

Si vous utilisez la console AWS Config pour créer une règle qui est associée à une fonction comme dans cet exemple, choisissez Periodic (Périodique) en tant que type de déclencheur. Si vous utilisez l'API AWS Config ou l'AWS CLI pour créer la règle, définissez l'attribut MessageType sur ScheduledNotification.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Opérations de la fonction

La fonction effectue les opérations suivantes lors de l'exécution :

  1. La fonction s'exécute lorsqu'AWS Lambda transmet l'objet event à la fonction handler. Dans cet exemple, la fonction accepte le paramètre facultatif callback, qu'elle utilise pour renvoyer des informations à l'appelant. AWS Lambda transmet également un objet context contenant des informations et des méthodes susceptibles d'être utilisés par la fonction pendant son exécution. Sachez que le contexte n'est plus utilisé dans les nouvelles versions de Lambda.

  2. Pour compter les ressources du type spécifié, le gestionnaire appelle la fonction countResourceTypes et il transmet le paramètre applicableResourceType qu'il a reçu de l'événement. La fonction countResourceTypes appelle la méthode listDiscoveredResources du client config, qui retourne une liste d'identificateurs pour les ressources applicables. La fonction utilise la longueur de cette liste pour déterminer le nombre de ressources applicables, et elle retourne ce nombre au gestionnaire.

  3. Le gestionnaire prépare l'envoi des résultats de l'évaluation à AWS Config en initialisant l'objet putEvaluationsRequest. Cet objet comprend le paramètre Evaluations qui identifie le résultat de conformité et le Compte AWS qui a été publié dans l'événement. Vous pouvez utiliser le paramètre Evaluations pour appliquer le résultat à n'importe quel type de ressource pris en charge par AWS Config. L'objet putEvaluationsRequest comprend également le jeton du résultat de l'événement, qui permet l'identification de la règle et de l'événement par AWS Config.

  4. Au sein de l'objet putEvaluationsRequest, le gestionnaire appelle la fonction evaluateCompliance. Cette fonction teste si le nombre de ressources applicables dépasse le maximum attribué au paramètre maxCount fourni par l'événement. Si le nombre maximum de ressources est dépassé, la fonction renvoie NON_COMPLIANT. Si le nombre maximum de ressources n'est pas dépassé, la fonction renvoie COMPLIANT.

  5. Le gestionnaire envoie les résultats de l'évaluation à AWS Config en passant l'objet dans la méthode putEvaluations du client config.