Contoh AWS Lambda Fungsi untuk AWS Config Aturan (Python) - AWS Config

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Contoh AWS Lambda Fungsi untuk AWS Config Aturan (Python)

AWS Lambdamenjalankan fungsi dalam menanggapi peristiwa yang diterbitkan oleh AWS layanan. Fungsi untuk aturan Lambda AWS Config Kustom menerima peristiwa yang diterbitkan olehAWS Config, dan fungsi tersebut kemudian menggunakan data yang diterimanya dari acara dan diambil dari AWS Config API untuk mengevaluasi kepatuhan aturan. Operasi dalam fungsi untuk aturan Config berbeda tergantung pada apakah ia melakukan evaluasi yang dipicu oleh perubahan konfigurasi atau dipicu secara berkala.

Untuk informasi tentang pola umum dalam AWS Lambda fungsi, lihat Model Pemrograman di Panduan AWS Lambda Pengembang.

Contoh Fungsi untuk Evaluasi yang Dipicu oleh Perubahan Konfigurasi

AWS Configakan memanggil fungsi seperti contoh berikut ketika mendeteksi perubahan konfigurasi untuk sumber daya yang berada dalam cakupan aturan khusus.

Jika Anda menggunakan AWS Config konsol untuk membuat aturan yang terkait dengan fungsi seperti contoh ini, pilih Perubahan konfigurasi sebagai jenis pemicu. Jika Anda menggunakan AWS Config API atau AWS CLI untuk membuat aturan, tetapkan MessageType atribut ke ConfigurationItemChangeNotification danOversizedConfigurationItemChangeNotification. Pengaturan ini memungkinkan aturan Anda dipicu setiap kali AWS Config menghasilkan item konfigurasi atau item konfigurasi yang terlalu besar sebagai akibat dari perubahan sumber daya.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configurationItem = result['configurationItems'][0] return convert_api_configuration(configurationItem) # Convert from the API model to the original invocation model def convert_api_configuration(configurationItem): for k, v in configurationItem.items(): if isinstance(v, datetime.datetime): configurationItem[k] = str(v) configurationItem['awsAccountId'] = configurationItem['accountId'] configurationItem['ARN'] = configurationItem['arn'] configurationItem['configurationStateMd5Hash'] = configurationItem['configurationItemMD5Hash'] configurationItem['configurationItemVersion'] = configurationItem['version'] configurationItem['configuration'] = json.loads(configurationItem['configuration']) if 'relationships' in configurationItem: for i in range(len(configurationItem['relationships'])): configurationItem['relationships'][i]['name'] = configurationItem['relationships'][i]['relationshipName'] return configurationItem # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistory API in getConfiguration function. def get_configuration_item(invokingEvent): check_defined(invokingEvent, 'invokingEvent') if is_oversized_changed_notification(invokingEvent['messageType']): configurationItemSummary = check_defined(invokingEvent['configurationItemSummary'], 'configurationItemSummary') return get_configuration(configurationItemSummary['resourceType'], configurationItemSummary['resourceId'], configurationItemSummary['configurationItemCaptureTime']) return check_defined(invokingEvent['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configurationItem, event): try: check_defined(configurationItem, 'configurationItem') check_defined(event, 'event') except: return True status = configurationItem['configurationItemStatus'] eventLeftScope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return (status == 'OK' or status == 'ResourceDiscovered') and not eventLeftScope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex def evaluate_change_notification_compliance(configuration_item, rule_parameters): check_defined(configuration_item, 'configuration_item') check_defined(configuration_item['configuration'], 'configuration_item[\'configuration\']') if rule_parameters: check_defined(rule_parameters, 'rule_parameters') if (configuration_item['resourceType'] != 'AWS::EC2::Instance'): return 'NOT_APPLICABLE' elif rule_parameters.get('desiredInstanceType'): if (configuration_item['configuration']['instanceType'] in rule_parameters['desiredInstanceType']): return 'COMPLIANT' return 'NON_COMPLIANT' def lambda_handler(event, context): global AWS_CONFIG_CLIENT check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) configuration_item = get_configuration_item(invoking_event) if is_applicable(configuration_item, event): compliance_value = evaluate_change_notification_compliance( configuration_item, rule_parameters) response = AWS_CONFIG_CLIENT.put_evaluations( Evaluations=[ { 'ComplianceResourceType': invoking_event['configurationItem']['resourceType'], 'ComplianceResourceId': invoking_event['configurationItem']['resourceId'], 'ComplianceType': compliance_value, 'OrderingTimestamp': invoking_event['configurationItem']['configurationItemCaptureTime'] }, ], ResultToken=event['resultToken'])
Operasi Fungsi

Fungsi melakukan operasi berikut saat runtime:

  1. Fungsi berjalan ketika AWS Lambda melewati event objek ke handler fungsi. Dalam contoh ini, fungsi menerima callback parameter opsional, yang digunakan untuk mengembalikan informasi ke pemanggil. AWS Lambdajuga melewati context objek, yang berisi informasi dan metode yang dapat digunakan fungsi saat berjalan. Perhatikan bahwa dalam versi Lambda yang lebih baru, konteks tidak lagi digunakan.

  2. Fungsi memeriksa apakah messageType untuk acara tersebut adalah item konfigurasi atau item konfigurasi besar, dan kemudian mengembalikan item konfigurasi.

  3. Handler memanggil isApplicable fungsi untuk menentukan apakah sumber daya telah dihapus.

    catatan

    Aturan pelaporan sumber daya yang dihapus harus mengembalikan hasil evaluasi untuk menghindari evaluasi aturan yang tidak perlu. NOT_APPLICABLE

  4. Handler memanggil evaluateChangeNotificationCompliance fungsi dan meneruskan configurationItem dan ruleParameters objek yang AWS Config diterbitkan dalam acara tersebut.

    Fungsi pertama mengevaluasi apakah sumber daya adalah instans EC2. Jika sumber daya bukan instans EC2, fungsi mengembalikan nilai kepatuhan. NOT_APPLICABLE

    Fungsi kemudian mengevaluasi apakah instanceType atribut dalam item konfigurasi sama dengan nilai desiredInstanceType parameter. Jika nilainya sama, fungsi kembaliCOMPLIANT. Jika nilainya tidak sama, fungsi kembaliNON_COMPLIANT.

  5. Handler bersiap untuk mengirim hasil evaluasi AWS Config dengan menginisialisasi objek. putEvaluationsRequest Objek ini mencakup Evaluations parameter, yang mengidentifikasi hasil kepatuhan, jenis sumber daya, dan ID sumber daya yang dievaluasi. putEvaluationsRequestObjek juga menyertakan token hasil dari acara, yang mengidentifikasi aturan dan acara untukAWS Config.

  6. Handler mengirimkan hasil evaluasi AWS Config dengan meneruskan objek ke putEvaluations metode config klien.

Contoh Fungsi untuk Evaluasi Berkala

AWS Configakan memanggil fungsi seperti contoh berikut untuk evaluasi berkala. Evaluasi periodik terjadi pada frekuensi yang Anda tentukan saat Anda menentukan aturan diAWS Config.

Jika Anda menggunakan AWS Config konsol untuk membuat aturan yang terkait dengan fungsi seperti contoh ini, pilih Periodik sebagai jenis pemicu. Jika Anda menggunakan AWS Config API atau AWS CLI untuk membuat aturan, tetapkan MessageType atribut keScheduledNotification.

import botocore import boto3 import json import datetime # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): """Return the service boto client. It should be used instead of directly calling the client. Keyword arguments: service -- the service name used for calling the boto.client() event -- the event variable given in the lambda handler """ if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution") return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): return message_type == 'ScheduledNotification' def count_resource_types(applicable_resource_type, next_token, count): resource_identifier = AWS_CONFIG_CLIENT.list_discovered_resources(resourceType=applicable_resource_type, nextToken=next_token) updated = count + len(resource_identifier['resourceIdentifiers']); return updated # Evaluates the configuration items in the snapshot and returns the compliance value to the handler. def evaluate_compliance(max_count, actual_count): return 'NON_COMPLIANT' if int(actual_count) > int(max_count) else 'COMPLIANT' def evaluate_parameters(rule_parameters): if 'applicableResourceType' not in rule_parameters: raise ValueError('The parameter with "applicableResourceType" as key must be defined.') if not rule_parameters['applicableResourceType']: raise ValueError('The parameter "applicableResourceType" must have a defined value.') return rule_parameters # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): """Form an evaluation as a dictionary. Usually suited to report on scheduled rules. Keyword arguments: resource_id -- the unique id of the resource to report compliance_type -- either COMPLIANT, NON_COMPLIANT or NOT_APPLICABLE event -- the event variable given in the lambda handler resource_type -- the CloudFormation resource type (or AWS::::Account) to report on the rule (default DEFAULT_RESOURCE_TYPE) annotation -- an annotation to be added to the evaluation (default None) """ eval_cc = {} if annotation: eval_cc['Annotation'] = annotation eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def lambda_handler(event, context): global AWS_CONFIG_CLIENT evaluations = [] rule_parameters = {} resource_count = 0 max_count = 0 invoking_event = json.loads(event['invokingEvent']) if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) valid_rule_parameters = evaluate_parameters(rule_parameters) compliance_value = 'NOT_APPLICABLE' AWS_CONFIG_CLIENT = get_client('config', event) if is_scheduled_notification(invoking_event['messageType']): result_resource_count = count_resource_types(valid_rule_parameters['applicableResourceType'], '', resource_count) if valid_rule_parameters.get('maxCount'): max_count = valid_rule_parameters['maxCount'] compliance_value = evaluate_compliance(max_count, result_resource_count) evaluations.append(build_evaluation(event['accountId'], compliance_value, event, resource_type=DEFAULT_RESOURCE_TYPE)) response = AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluations, ResultToken=event['resultToken'])
Operasi Fungsi

Fungsi melakukan operasi berikut saat runtime:

  1. Fungsi berjalan ketika AWS Lambda melewati event objek ke handler fungsi. Dalam contoh ini, fungsi menerima callback parameter opsional, yang digunakan untuk mengembalikan informasi ke pemanggil. AWS Lambdajuga melewati context objek, yang berisi informasi dan metode yang dapat digunakan fungsi saat berjalan. Perhatikan bahwa dalam versi Lambda yang lebih baru, konteks tidak lagi digunakan.

  2. Untuk menghitung sumber daya dari tipe yang ditentukan, handler memanggil countResourceTypes fungsi, dan melewati applicableResourceType parameter yang diterimanya dari acara tersebut. countResourceTypesFungsi ini memanggil listDiscoveredResources metode config klien, yang mengembalikan daftar pengidentifikasi untuk sumber daya yang berlaku. Fungsi ini menggunakan panjang daftar ini untuk menentukan jumlah sumber daya yang berlaku, dan mengembalikan hitungan ini ke handler.

  3. Handler bersiap untuk mengirim hasil evaluasi AWS Config dengan menginisialisasi objek. putEvaluationsRequest Objek ini mencakup Evaluations parameter, yang mengidentifikasi hasil kepatuhan dan Akun AWS yang diterbitkan dalam acara tersebut. Anda dapat menggunakan Evaluations parameter untuk menerapkan hasilnya ke jenis sumber daya apa pun yang didukung olehAWS Config. putEvaluationsRequestObjek juga menyertakan token hasil dari acara, yang mengidentifikasi aturan dan acara untukAWS Config.

  4. Di dalam putEvaluationsRequest objek, handler memanggil evaluateCompliance fungsi. Fungsi ini menguji apakah jumlah sumber daya yang berlaku melebihi maksimum yang ditetapkan ke maxCount parameter, yang disediakan oleh acara. Jika jumlah sumber daya melebihi maksimum, fungsi kembaliNON_COMPLIANT. Jika jumlah sumber daya tidak melebihi maksimum, fungsi kembaliCOMPLIANT.

  5. Handler mengirimkan hasil evaluasi AWS Config dengan meneruskan objek ke putEvaluations metode config klien.