Exemplos de AWS Lambda funções para AWS Config regras (Python) - AWS Config

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

Exemplos de AWS Lambda funções para AWS Config regras (Python)

AWS Lambda executa funções em resposta a eventos publicados pelos AWS serviços. A função de uma regra AWS Config personalizada do Lambda recebe um evento que é publicado por AWS Config, e a função então usa os dados que recebe do evento e recupera do AWS Config API para avaliar a conformidade da regra. As operações em uma função para uma regra Config diferem conforme a avaliação seja acionada por alterações de configuração ou acionada periodicamente.

Para obter informações sobre padrões comuns nas AWS Lambda funções, consulte Modelo de programação no Guia do AWS Lambda desenvolvedor.

Exemplo de função para avaliações acionadas por alterações de configuração

AWS Config invocará uma função como o exemplo a seguir ao detectar uma alteração na configuração de um recurso que esteja dentro do escopo de uma regra personalizada.

Se você usar o AWS Config console para criar uma regra associada a uma função como este exemplo, escolha Alterações de configuração como o tipo de acionador. Se você usar o AWS Config API ou AWS CLI para criar a regra, defina o MessageType atributo ConfigurationItemChangeNotification como OversizedConfigurationItemChangeNotification e. Essas configurações permitem que sua regra seja acionada sempre que AWS Config gerar um item de configuração ou um item de configuração superdimensionado como resultado de uma alteração de recurso.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Operações de função

A função executa as seguintes operações em tempo de execução:

  1. A função é executada quando AWS Lambda passa o event objeto para a handler função. Neste exemplo, a função aceita o callback parâmetro opcional, que ela usa para retornar informações ao chamador. AWS Lambda também passa um context objeto, que contém informações e métodos que a função pode usar enquanto é executada. Observe que nas versões mais recentes do Lambda, o contexto não é mais usado.

  2. A função verifica se messageType para o evento é um item de configuração ou um item de configuração superdimensionado, e, em seguida, retorna o item de configuração.

  3. O handler chama a função isApplicable para determinar se o recurso foi excluído.


    Os relatórios de regras sobre recursos excluídos devem retornar o resultado da avaliação de NOT_APPLICABLE, a fim de evitar avaliações desnecessárias de regras.

  4. O manipulador chama a evaluateChangeNotificationCompliance função e passa os ruleParameters objetos configurationItem e AWS Config publicados no evento.

    A função avalia primeiro se o recurso é uma EC2 instância. Se o recurso não for uma EC2 instância, a função retornará um valor de conformidade deNOT_APPLICABLE.

    Em seguida, a função avalia se o atributo instanceType no item de configuração é igual ao valor de parâmetro desiredInstanceType. Se os valores são iguais, a função retornará COMPLIANT. Se os valores não são iguais, a função retornará NON_COMPLIANT.

  5. O manipulador se prepara para enviar os resultados da avaliação AWS Config inicializando o objeto. putEvaluationsRequest Este objeto inclui o parâmetro Evaluations, que identifica o resultado de compatibilidade, o tipo de recurso e o ID do recurso que foi avaliado. O putEvaluationsRequest objeto também inclui o token de resultado do evento, que identifica a regra e o evento para AWS Config.

  6. O manipulador envia os resultados da avaliação AWS Config passando o objeto para o putEvaluations método do config cliente.

Exemplo de função para avaliações periódicas

AWS Config invocará uma função como o exemplo a seguir para avaliações periódicas. As avaliações periódicas ocorrem com a frequência que você especifica ao definir a regra em AWS Config.

Se você usar o AWS Config console para criar uma regra associada a uma função como este exemplo, escolha Periódico como o tipo de acionador. Se você usar o AWS Config API ou AWS CLI para criar a regra, defina o MessageType atributo comoScheduledNotification.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Operações de função

A função executa as seguintes operações em tempo de execução:

  1. A função é executada quando AWS Lambda passa o event objeto para a handler função. Neste exemplo, a função aceita o callback parâmetro opcional, que ela usa para retornar informações ao chamador. AWS Lambda também passa um context objeto, que contém informações e métodos que a função pode usar enquanto é executada. Observe que nas versões mais recentes do Lambda, o contexto não é mais usado.

  2. Para contar os recursos do tipo especificado, o handler chama a função countResourceTypes e passa o parâmetro applicableResourceType que recebeu do evento. A função countResourceTypes chama o método listDiscoveredResources do cliente config, que retorna uma lista de identificadores para os recursos aplicáveis. A função usa o comprimento da lista para determinar o número de recursos aplicáveis e retorna essa contagem para o handler.

  3. O manipulador se prepara para enviar os resultados da avaliação AWS Config inicializando o objeto. putEvaluationsRequest Esse objeto inclui o Evaluations parâmetro, que identifica o resultado da conformidade e o Conta da AWS que foi publicado no evento. Você pode usar o parâmetro Evaluations para aplicar o resultado a qualquer tipo de recurso que tenha suporte no AWS Config. O putEvaluationsRequest objeto também inclui o token de resultado do evento, que identifica a regra e o evento para AWS Config.

  4. Dentro do objeto putEvaluationsRequest, o handler chama a função evaluateCompliance. Esta função testa se o número de recursos aplicáveis excede o máximo atribuído ao parâmetro maxCount, que foi fornecido pelo evento. Se o número de recursos excede o máximo, a função retorna NON_COMPLIANT. Se o número de recursos não excede o máximo, a função retorna COMPLIANT.

  5. O manipulador envia os resultados da avaliação AWS Config passando o objeto para o putEvaluations método do config cliente.