AWS Encryption SDK for Python example code - AWS Encryption SDK

AWS Encryption SDK for Python example code

The following examples show you how to use the AWS Encryption SDK for Python to encrypt and decrypt data.

The examples in this section show how to use version 2.0.x and later of the AWS Encryption SDK for Python. For examples that use earlier versions, find your release in the Releases list of the aws-encryption-sdk-python repository on GitHub.

Encrypting and decrypting strings

The following example shows you how to use the AWS Encryption SDK to encrypt and decrypt strings. This example uses an AWS KMS key in AWS Key Management Service (AWS KMS) as the master key.

When encrypting, the StrictAwsKmsMasterKeyProvider constructor takes a key ID, key ARN, alias name, or alias ARN. When decrypting, it requires a key ARN. In this case, because the keyArn parameter is used for encrypting and decrypting, its value must be a key ARN. For information about IDs for AWS KMS keys, see Key identifiers in the AWS Key Management Service Developer Guide.

# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Example showing basic encryption and decryption of a value already in memory.""" import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy def cycle_string(key_arn, source_plaintext, botocore_session=None): """Encrypts and then decrypts a string under an AWS KMS key. :param str key_arn: Amazon Resource Name (ARN) of the AWS KMS key :param bytes source_plaintext: Data to encrypt :param botocore_session: existing botocore session instance :type botocore_session: botocore.session.Session """ # Set up an encryption client with an explicit commitment policy. If you do not explicitly choose a # commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default. client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT) # Create an AWS KMS master key provider kms_kwargs = dict(key_ids=[key_arn]) if botocore_session is not None: kms_kwargs["botocore_session"] = botocore_session master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs) # Encrypt the plaintext source data ciphertext, encryptor_header = client.encrypt(source=source_plaintext, key_provider=master_key_provider) # Decrypt the ciphertext cycled_plaintext, decrypted_header = client.decrypt(source=ciphertext, key_provider=master_key_provider) # Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext assert cycled_plaintext == source_plaintext # Verify that the encryption context used in the decrypt operation includes all key pairs from # the encrypt operation. (The SDK can add pairs, so don't require an exact match.) # # In production, always use a meaningful encryption context. In this sample, we omit the # encryption context (no key pairs). assert all( pair in decrypted_header.encryption_context.items() for pair in encryptor_header.encryption_context.items() )

Encrypting and decrypting byte streams

The following example shows you how to use the AWS Encryption SDK to encrypt and decrypt byte streams. This example doesn't use AWS. It uses a static, ephemeral master key provider.

When encrypting, this example uses an alternate algorithm suite without digital signatures (AES_256_GCM_HKDF_SHA512_COMMIT_KEY). This algorithm suite is appropriate when the users who are encrypting and decrypting data are equally trusted. Then, when decrypting, the example uses the decrypt-unsigned streaming mode, which fails if it encounters signed ciphertext. The decrypt-unsigned streaming mode is introduced in AWS Encryption SDK versions 1.9.x and 2.2.x.

# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Example showing creation and use of a RawMasterKeyProvider.""" import filecmp import os import aws_encryption_sdk from aws_encryption_sdk.identifiers import Algorithm, CommitmentPolicy, EncryptionKeyType, WrappingAlgorithm from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider class StaticRandomMasterKeyProvider(RawMasterKeyProvider): """Randomly generates 256-bit keys for each unique key ID.""" provider_id = "static-random" def __init__(self, **kwargs): # pylint: disable=unused-argument """Initialize empty map of keys.""" self._static_keys = {} def _get_raw_key(self, key_id): """Returns a static, randomly-generated symmetric key for the specified key ID. :param str key_id: Key ID :returns: Wrapping key that contains the specified static key :rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey` """ try: static_key = self._static_keys[key_id] except KeyError: static_key = os.urandom(32) self._static_keys[key_id] = static_key return WrappingKey( wrapping_algorithm=WrappingAlgorithm.AES_256_GCM_IV12_TAG16_NO_PADDING, wrapping_key=static_key, wrapping_key_type=EncryptionKeyType.SYMMETRIC, ) def cycle_file(source_plaintext_filename): """Encrypts and then decrypts a file under a custom static master key provider. :param str source_plaintext_filename: Filename of file to encrypt """ # Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a # commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default. client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT) # Create a static random master key provider key_id = os.urandom(8) master_key_provider = StaticRandomMasterKeyProvider() master_key_provider.add_master_key(key_id) ciphertext_filename = source_plaintext_filename + ".encrypted" cycled_plaintext_filename = source_plaintext_filename + ".decrypted" # Encrypt the plaintext source data # We can use an unsigning algorithm suite here under the assumption that the contexts that encrypt # and decrypt are equally trusted. with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext: with client.stream( algorithm=Algorithm.AES_256_GCM_HKDF_SHA512_COMMIT_KEY, mode="e", source=plaintext, key_provider=master_key_provider, ) as encryptor: for chunk in encryptor: ciphertext.write(chunk) # Decrypt the ciphertext # We can use the recommended "decrypt-unsigned" streaming mode since we encrypted with an unsigned algorithm suite. with open(ciphertext_filename, "rb") as ciphertext, open(cycled_plaintext_filename, "wb") as plaintext: with client.stream(mode="decrypt-unsigned", source=ciphertext, key_provider=master_key_provider) as decryptor: for chunk in decryptor: plaintext.write(chunk) # Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source # plaintext assert filecmp.cmp(source_plaintext_filename, cycled_plaintext_filename) # Verify that the encryption context used in the decrypt operation includes all key pairs from # the encrypt operation # # In production, always use a meaningful encryption context. In this sample, we omit the # encryption context (no key pairs). assert all( pair in decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items() ) return ciphertext_filename, cycled_plaintext_filename

Encrypting and decrypting byte streams with multiple master key providers

The following example shows you how to use the AWS Encryption SDK with more than one master key provider. Using more than one master key provider creates redundancy if one master key provider is unavailable for decryption. This example uses an AWS KMS key and an RSA key pair as the master keys.

This example encrypts with the default algorithm suite, which includes a digital signature. When streaming, the AWS Encryption SDK releases plaintext after integrity checks, but before it has verified the digital signature. To avoid using the plaintext until the signature is verified, this example buffers the plaintext, and writes it to disk only when decryption and verification are complete.

# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Example showing creation of a RawMasterKeyProvider, how to use multiple master key providers to encrypt, and demonstrating that each master key provider can then be used independently to decrypt the same encrypted message. """ import filecmp import os from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import serialization from cryptography.hazmat.primitives.asymmetric import rsa import aws_encryption_sdk from aws_encryption_sdk.identifiers import CommitmentPolicy, EncryptionKeyType, WrappingAlgorithm from aws_encryption_sdk.internal.crypto.wrapping_keys import WrappingKey from aws_encryption_sdk.key_providers.raw import RawMasterKeyProvider class StaticRandomMasterKeyProvider(RawMasterKeyProvider): """Randomly generates and provides 4096-bit RSA keys consistently per unique key id.""" provider_id = "static-random" def __init__(self, **kwargs): # pylint: disable=unused-argument """Initialize empty map of keys.""" self._static_keys = {} def _get_raw_key(self, key_id): """Retrieves a static, randomly generated, RSA key for the specified key id. :param str key_id: User-defined ID for the static key :returns: Wrapping key that contains the specified static key :rtype: :class:`aws_encryption_sdk.internal.crypto.WrappingKey` """ try: static_key = self._static_keys[key_id] except KeyError: private_key = rsa.generate_private_key(public_exponent=65537, key_size=4096, backend=default_backend()) static_key = private_key.private_bytes( encoding=serialization.Encoding.PEM, format=serialization.PrivateFormat.PKCS8, encryption_algorithm=serialization.NoEncryption(), ) self._static_keys[key_id] = static_key return WrappingKey( wrapping_algorithm=WrappingAlgorithm.RSA_OAEP_SHA1_MGF1, wrapping_key=static_key, wrapping_key_type=EncryptionKeyType.PRIVATE, ) def cycle_file(key_arn, source_plaintext_filename, botocore_session=None): """Encrypts and then decrypts a file using an AWS KMS master key provider and a custom static master key provider. Both master key providers are used to encrypt the plaintext file, so either one alone can decrypt it. :param str key_arn: Amazon Resource Name (ARN) of the AWS KMS key (http://docs.aws.amazon.com/kms/latest/developerguide/viewing-keys.html) :param str source_plaintext_filename: Filename of file to encrypt :param botocore_session: existing botocore session instance :type botocore_session: botocore.session.Session """ # "Cycled" means encrypted and then decrypted ciphertext_filename = source_plaintext_filename + ".encrypted" cycled_kms_plaintext_filename = source_plaintext_filename + ".kms.decrypted" cycled_static_plaintext_filename = source_plaintext_filename + ".static.decrypted" # Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a # commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default. client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT) # Create an AWS KMS master key provider kms_kwargs = dict(key_ids=[key_arn]) if botocore_session is not None: kms_kwargs["botocore_session"] = botocore_session kms_master_key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs) # Create a static master key provider and add a master key to it static_key_id = os.urandom(8) static_master_key_provider = StaticRandomMasterKeyProvider() static_master_key_provider.add_master_key(static_key_id) # Add the static master key provider to the AWS KMS master key provider # The resulting master key provider uses AWS KMS master keys to generate (and encrypt) # data keys and static master keys to create an additional encrypted copy of each data key. kms_master_key_provider.add_master_key_provider(static_master_key_provider) # Encrypt plaintext with both AWS KMS and static master keys with open(source_plaintext_filename, "rb") as plaintext, open(ciphertext_filename, "wb") as ciphertext: with client.stream(source=plaintext, mode="e", key_provider=kms_master_key_provider) as encryptor: for chunk in encryptor: ciphertext.write(chunk) # Decrypt the ciphertext with only the AWS KMS master key # Buffer the data in memory before writing to disk. This ensures verfication of the digital signature before returning plaintext. with open(ciphertext_filename, "rb") as ciphertext, open(cycled_kms_plaintext_filename, "wb") as plaintext: with client.stream( source=ciphertext, mode="d", key_provider=aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(**kms_kwargs) ) as kms_decryptor: plaintext.write(kms_decryptor.read()) # Decrypt the ciphertext with only the static master key # Buffer the data in memory before writing to disk to ensure verfication of the signature before returning plaintext. with open(ciphertext_filename, "rb") as ciphertext, open(cycled_static_plaintext_filename, "wb") as plaintext: with client.stream(source=ciphertext, mode="d", key_provider=static_master_key_provider) as static_decryptor: plaintext.write(static_decryptor.read()) # Verify that the "cycled" (encrypted, then decrypted) plaintext is identical to the source plaintext assert filecmp.cmp(source_plaintext_filename, cycled_kms_plaintext_filename) assert filecmp.cmp(source_plaintext_filename, cycled_static_plaintext_filename) # Verify that the encryption context in the decrypt operation includes all key pairs from the # encrypt operation. # # In production, always use a meaningful encryption context. In this sample, we omit the # encryption context (no key pairs). assert all( pair in kms_decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items() ) assert all( pair in static_decryptor.header.encryption_context.items() for pair in encryptor.header.encryption_context.items() ) return (ciphertext_filename, cycled_kms_plaintext_filename, cycled_static_plaintext_filename)

Using data key caching to encrypt messages

The following example shows how to use data key caching in the AWS Encryption SDK for Python. It is designed to show you how to configure an instance of the local cache (LocalCryptoMaterialsCache) with the required capacity value and an instance of the caching cryptographic materials manager (caching CMM) with cache security thresholds.

This very basic example creates a function that encrypts a fixed string. It lets you specify an AWS KMS key, the required cache size (capacity), and a maximum age value. For a more complex, real-world example of data key caching, see Data key caching example code.

Although it is optional, this example also uses an encryption context as additional authenticated data. When you decrypt data that was encrypted with an encryption context, be sure that your application verifies that the encryption context is the one that you expect before returning the plaintext data to your caller. An encryption context is a best practice element of any encryption or decryption operation, but it plays a special role in data key caching. For details, see Encryption Context: How to Select Cache Entries.

# Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. """Example of encryption with data key caching.""" import aws_encryption_sdk from aws_encryption_sdk import CommitmentPolicy def encrypt_with_caching(kms_key_arn, max_age_in_cache, cache_capacity): """Encrypts a string using an AWS KMS key and data key caching. :param str kms_key_arn: Amazon Resource Name (ARN) of the AWS KMS key :param float max_age_in_cache: Maximum time in seconds that a cached entry can be used :param int cache_capacity: Maximum number of entries to retain in cache at once """ # Data to be encrypted my_data = "My plaintext data" # Security thresholds # Max messages (or max bytes per) data key are optional MAX_ENTRY_MESSAGES = 100 # Create an encryption context encryption_context = {"purpose": "test"} # Set up an encryption client with an explicit commitment policy. Note that if you do not explicitly choose a # commitment policy, REQUIRE_ENCRYPT_REQUIRE_DECRYPT is used by default. client = aws_encryption_sdk.EncryptionSDKClient(commitment_policy=CommitmentPolicy.REQUIRE_ENCRYPT_REQUIRE_DECRYPT) # Create a master key provider for the AWS KMS key key_provider = aws_encryption_sdk.StrictAwsKmsMasterKeyProvider(key_ids=[kms_key_arn]) # Create a local cache cache = aws_encryption_sdk.LocalCryptoMaterialsCache(cache_capacity) # Create a caching CMM caching_cmm = aws_encryption_sdk.CachingCryptoMaterialsManager( master_key_provider=key_provider, cache=cache, max_age=max_age_in_cache, max_messages_encrypted=MAX_ENTRY_MESSAGES, ) # When the call to encrypt data specifies a caching CMM, # the encryption operation uses the data key cache specified # in the caching CMM encrypted_message, _header = client.encrypt( source=my_data, materials_manager=caching_cmm, encryption_context=encryption_context ) return encrypted_message