Funzioni AWS Lambda di esempio per le regole AWS Config (Python) - AWS Config

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Funzioni AWS Lambda di esempio per le regole AWS Config (Python)

AWS Lambda esegue funzioni in risposta a eventi pubblicati da servizi AWS. La funzione per una regola Lambda AWS Config personalizzata riceve un evento pubblicato da AWS Config. La funzione usa quindi i dati ricevuti dall'evento e che recupera dall'API di AWS Config per valutare la conformità della regola. Le operazioni in una funzione per una regola di configurazione variano a seconda che si esegua una valutazione attivata da modifiche di configurazione o attivata periodicamente.

Per informazioni sui modelli comuni all'interno delle funzioni AWS Lambda, consulta Modello di programmazione nella Guida per gli sviluppatori di AWS Lambda.

Funzione di esempio per le valutazioni attivate dalle modifiche di configurazione

AWS Config richiamerà una funzione, come nell'esempio seguente, quando rileva una modifica di configurazione per una risorsa nell'ambito di una regola personalizzata.

Se usi la console AWS Config per creare una regola che è associata a una funzione, come in questo esempio, scegli Configuration changes (Modifiche di configurazione) come tipo di trigger. Se usi l'API AWS Config o AWS CLI per creare la regola, imposta l'attributo MessageType su ConfigurationItemChangeNotification e OversizedConfigurationItemChangeNotification. Queste impostazioni permettono alla regola di essere attivate ogni volta che AWS Config genera un elemento di configurazione o un elemento di configurazione di dimensioni eccessive come risultato di una modifica delle risorse.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Operazioni della funzione

La funzione esegue le operazioni seguenti in fase di runtime:

  1. La funzione viene eseguita quando AWS Lambda trasmette l'oggetto event alla funzione handler. In questo esempio, la funzione accetta il parametro callback opzionale, che utilizza per restituire le informazioni al chiamante. AWS Lambda trasmette anche un oggetto context, che contiene informazioni e metodi che la funzione può utilizzare durante l'esecuzione. Tieni presente che nelle versioni più recenti di Lambda il contesto non è più utilizzato.

  2. La funzione verifica che il messageType per l'evento sia un elemento di configurazione o un elemento di configurazione di dimensioni eccessive e quindi restituisce l'elemento di configurazione.

  3. Il gestore chiama la funzione isApplicable per determinare se la risorsa sia stato eliminata.

    Nota

    Le regole che segnalano le risorse eliminate devono restituire il risultato della valutazione di NOT_APPLICABLE per evitare valutazioni non necessarie delle regole.

  4. Il gestore chiama la funzione evaluateChangeNotificationCompliance e trasferisce gli oggetti configurationItem e ruleParameters AWS Config pubblicati nell'evento.

    La funzione prima valuta se la risorsa è un'istanza EC2. Se la risorsa non è un'istanza EC2, la funzione restituisce un valore di conformità di NOT_APPLICABLE.

    La funzione valuta quindi se l'attributo instanceType nell'elemento di configurazione sia pari al valore del parametro desiredInstanceType. Se i valori sono uguali, la funzione restituisce COMPLIANT. Se i valori non sono uguali, la funzione restituisce NON_COMPLIANT.

  5. Il gestore prepara l'invio dei risultati di valutazione a AWS Config inizializzando l'oggetto putEvaluationsRequest. Questo oggetto include il parametro Evaluations, che identifica il risultato di conformità, il tipo di risorsa e l'ID della risorsa che è stata valutata. L'oggetto putEvaluationsRequest include, inoltre, il risultato del token dall'evento, che identifica la regola e l'evento per AWS Config.

  6. Il gestore invia i risultati di valutazione a AWS Config trasferendo l'oggetto al metodo putEvaluations del client config.

Funzione di esempio per valutazioni periodiche

AWS Config richiamerà una funzione simile al seguente esempio per le valutazioni periodiche. Le valutazioni periodiche si verificano con la frequenza che specifichi quando definisci la regola in AWS Config.

Se usi la console AWS Config per creare una regola che è associata a una funzione, come in questo esempio, scegli Periodic (Periodico) come tipo di trigger. Se usi l'API AWS Config o AWS CLI per creare la regola, imposta l'attributo MessageType su ScheduledNotification.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Operazioni della funzione

La funzione esegue le operazioni seguenti in fase di runtime:

  1. La funzione viene eseguita quando AWS Lambda trasmette l'oggetto event alla funzione handler. In questo esempio, la funzione accetta il parametro callback opzionale, che utilizza per restituire le informazioni al chiamante. AWS Lambda trasmette anche un oggetto context, che contiene informazioni e metodi che la funzione può utilizzare durante l'esecuzione. Tieni presente che nelle versioni più recenti di Lambda il contesto non è più utilizzato.

  2. Per contare le risorse del tipo specificato, il gestore chiama la funzione countResourceTypes e trasferisce il parametro applicableResourceType che ha ricevuto dall'evento. La funzione countResourceTypes chiama il metodo listDiscoveredResources del client config, che restituisce un elenco di identificatori per le risorse applicabili. La funzione utilizza la lunghezza di questo elenco per determinare il numero di risorse applicabili e restituisce questo conteggio al gestore.

  3. Il gestore prepara l'invio dei risultati di valutazione a AWS Config inizializzando l'oggetto putEvaluationsRequest. Questo oggetto include il parametro Evaluations, che identifica il risultato di conformità e l'Account AWS che è stato pubblicato nell'evento. Puoi utilizzare il parametro Evaluations per applicare il risultato a qualsiasi tipo di risorsa supportata da AWS Config. L'oggetto putEvaluationsRequest include, inoltre, il risultato del token dall'evento, che identifica la regola e l'evento per AWS Config.

  4. All'interno dell'oggetto putEvaluationsRequest, il gestore chiama la funzione evaluateCompliance, Questa funzione verifica se il numero di risorse applicabili supera il massimo assegnato al parametro maxCount, che è stato fornito dall'evento. Se il numero di risorse supera il limite massimo, la funzione restituisce NON_COMPLIANT. Se il numero di risorse non supera il limite massimo, la funzione restituisce COMPLIANT.

  5. Il gestore invia i risultati di valutazione a AWS Config trasferendo l'oggetto al metodo putEvaluations del client config.