policy_wrapper.py - AWS Code Sample

policy_wrapper.py

# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """ Purpose Shows how to use AWS Identity and Access Management (IAM) policies. """ import json import logging import operator import pprint import time import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) iam = boto3.resource('iam') def create_policy(name, description, actions, resource_arn): """ Creates a policy that contains a single statement. :param name: The name of the policy to create. :param description: The description of the policy. :param actions: The actions allowed by the policy. These typically take the form of service:action, such as s3:PutObject. :param resource_arn: The Amazon Resource Name (ARN) of the resource this policy applies to. This ARN can contain wildcards, such as 'arn:aws:s3:::my-bucket/*' to allow actions on all objects in the bucket named 'my-bucket'. :return: The newly created policy. """ policy_doc = { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": actions, "Resource": resource_arn } ] } try: policy = iam.create_policy( PolicyName=name, Description=description, PolicyDocument=json.dumps(policy_doc)) logger.info("Created policy %s.", policy.arn) except ClientError: logger.exception("Couldn't create policy %s.", name) raise else: return policy def delete_policy(policy_arn): """ Deletes a policy. :param policy_arn: The ARN of the policy to delete. """ try: iam.Policy(policy_arn).delete() logger.info("Deleted policy %s.", policy_arn) except ClientError: logger.exception("Couldn't delete policy %s.", policy_arn) raise def create_policy_version(policy_arn, actions, resource_arn, set_as_default): """ Creates a policy version. Policies can have up to five versions. The default version is the one that is used for all resources that reference the policy. :param policy_arn: The ARN of the policy. :param actions: The actions to allow in the policy version. :param resource_arn: The ARN of the resource this policy version applies to. :param set_as_default: When True, this policy version is set as the default version for the policy. Otherwise, the default is not changed. :return: The newly created policy version. """ policy_doc = { 'Version': '2012-10-17', 'Statement': [ { 'Effect': 'Allow', 'Action': actions, 'Resource': resource_arn } ] } try: policy = iam.Policy(policy_arn) policy_version = policy.create_version( PolicyDocument=json.dumps(policy_doc), SetAsDefault=set_as_default) logger.info( "Created policy version %s for policy %s.", policy_version.version_id, policy_version.arn) except ClientError: logger.exception("Couldn't create a policy version for %s.", policy_arn) raise else: return policy_version def list_policies(scope): """ Lists the policies in the current account. :param scope: Limits the kinds of policies that are returned. For example, 'Local' specifies that only locally managed policies are returned. :return: The list of policies. """ try: policies = list(iam.policies.filter(Scope=scope)) logger.info("Got %s policies in scope '%s'.", len(policies), scope) except ClientError: logger.exception("Couldn't get policies for scope '%s'.", scope) raise else: return policies def get_default_policy_statement(policy_arn): """ Gets the statement of the default version of the specified policy. :param policy_arn: The ARN of the policy to look up. :return: The statement of the default policy version. """ try: policy = iam.Policy(policy_arn) policy_doc = policy.default_version.document policy_statement = policy_doc.get('Statement', None) logger.info("Got default policy doc for %s.", policy.policy_name) logger.info(policy_doc) except ClientError: logger.exception("Couldn't get default policy statement for %s.", policy_arn) raise else: return policy_statement def rollback_policy_version(policy_arn): """ Rolls back to the previous default policy, if it exists. 1. Gets the list of policy versions in order by date. 2. Finds the default. 3. Makes the previous policy the default. 4. Deletes the old default version. :param policy_arn: The ARN of the policy to roll back. :return: The default version of the policy after the rollback. """ try: policy_versions = sorted( iam.Policy(policy_arn).versions.all(), key=operator.attrgetter('create_date')) logger.info("Got %s versions for %s.", len(policy_versions), policy_arn) except ClientError: logger.exception("Couldn't get versions for %s.", policy_arn) raise default_version = None rollback_version = None try: while default_version is None: ver = policy_versions.pop() if ver.is_default_version: default_version = ver rollback_version = policy_versions.pop() rollback_version.set_as_default() logger.info("Set %s as the default version.", rollback_version.version_id) default_version.delete() logger.info("Deleted original default version %s.", default_version.version_id) except IndexError: if default_version is None: logger.warning("No default version found for %s.", policy_arn) elif rollback_version is None: logger.warning( "Default version %s found for %s, but no previous version exists, so " "nothing to roll back to.", default_version.version_id, policy_arn) except ClientError: logger.exception("Couldn't roll back version for %s.", policy_arn) raise else: return rollback_version def attach_to_role(role_name, policy_arn): """ Attaches a policy to a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).attach_role(RoleName=role_name) logger.info("Attached policy %s to role %s.", policy_arn, role_name) except ClientError: logger.exception("Couldn't attach policy %s to role %s.", policy_arn, role_name) raise def detach_from_role(role_name, policy_arn): """ Detaches a policy from a role. :param role_name: The name of the role. **Note** this is the name, not the ARN. :param policy_arn: The ARN of the policy. """ try: iam.Policy(policy_arn).detach_role(RoleName=role_name) logger.info("Detached policy %s from role %s.", policy_arn, role_name) except ClientError: logger.exception( "Couldn't detach policy %s from role %s.", policy_arn, role_name) raise def usage_demo(): """Shows how to use the policy functions.""" logging.basicConfig(level=logging.INFO, format='%(levelname)s: %(message)s') print('-'*88) print("Welcome to the AWS Identity and Account Management policy demo.") print('-'*88) print("Policies let you define sets of permissions that can be attached to " "other IAM resources, like users and roles.") bucket_arn = f'arn:aws:s3:::made-up-bucket-name' policy = create_policy( 'demo-iam-policy', 'Policy for IAM demonstration.', ['s3:ListObjects'], bucket_arn) print(f"Created policy {policy.policy_name}.") policies = list_policies('Local') print(f"Your account has {len(policies)} managed policies:") print(*[pol.policy_name for pol in policies], sep=', ') time.sleep(1) policy_version = create_policy_version( policy.arn, ['s3:PutObject'], bucket_arn, True) print(f"Added policy version {policy_version.version_id} to policy " f"{policy.policy_name}.") default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is:") pprint.pprint(default_statement) rollback_version = rollback_policy_version(policy.arn) print(f"Rolled back to version {rollback_version.version_id} for " f"{policy.policy_name}.") default_statement = get_default_policy_statement(policy.arn) print(f"The default policy statement for {policy.policy_name} is now:") pprint.pprint(default_statement) delete_policy(policy.arn) print(f"Deleted policy {policy.policy_name}.") print("Thanks for watching!") if __name__ == '__main__': usage_demo()