- AWS Code Sample

# Copyright, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 """ Demonstration manager for the Amazon S3 file transfer example. The demonstration collects information from the user and runs various upload and download scenarios to show how to use Boto 3 managed transfers. TransferDemoManager Handle user interactions like collecting user input and reporting transfer results. """ import hashlib import os import platform import shutil import time import boto3 from boto3.s3.transfer import TransferConfig from botocore.exceptions import ClientError from botocore.exceptions import ParamValidationError from botocore.exceptions import NoCredentialsError import file_transfer MB = 1024 * 1024 # These configuration attributes affect both uploads and downloads. CONFIG_ATTRS = ('multipart_threshold', 'multipart_chunksize', 'max_concurrency', 'use_threads') # These configuration attributes affect only downloads. DOWNLOAD_CONFIG_ATTRS = ('max_io_queue', 'io_chunksize', 'num_download_attempts') class TransferDemoManager: """ Manages the demonstration. Collects user input from a command line, reports transfer results, maintains a list of artifacts created during the demonstration, and cleans them up after the demonstration is completed. """ def __init__(self): self._s3 = boto3.resource('s3') self._chore_list = [] self._create_file_cmd = None self._size_multiplier = 0 self.file_size_mb = 30 self.demo_folder = None self.demo_bucket = None self._setup_platform_specific() self._terminal_width = shutil.get_terminal_size(fallback=(80, 80))[0] def collect_user_info(self): """ Collect local folder and Amazon S3 bucket name from the user. These locations are used to store files during the demonstration. """ while not self.demo_folder: self.demo_folder = input( "Which file folder do you want to use to store " "demonstration files? ") if not os.path.isdir(self.demo_folder): print(f"{self.demo_folder} isn't a folder!") self.demo_folder = None while not self.demo_bucket: self.demo_bucket = input( "Which Amazon S3 bucket do you want to use to store " "demonstration files? ") try: self._s3.meta.client.head_bucket(Bucket=self.demo_bucket) except ParamValidationError as err: print(err) self.demo_bucket = None except ClientError as err: print(err) print( f"Either {self.demo_bucket} doesn't exist or you don't " f"have access to it.") self.demo_bucket = None def demo(self, question, upload_func, download_func, upload_args=None, download_args=None): """Run a demonstration. Ask the user if they want to run this specific demonstration. If they say yes, create a file on the local path, upload it using the specified upload function, then download it using the specified download function. """ if download_args is None: download_args = {} if upload_args is None: upload_args = {} question = question.format(self.file_size_mb) answer = input(f"{question} (y/n)") if answer.lower() == 'y': local_file_path, object_key, download_file_path = \ self._create_demo_file() file_transfer.TransferConfig = \ self._config_wrapper(TransferConfig, CONFIG_ATTRS) self._report_transfer_params('Uploading', local_file_path, object_key, **upload_args) start_time = time.perf_counter() thread_info = upload_func(local_file_path, self.demo_bucket, object_key, self.file_size_mb, **upload_args) end_time = time.perf_counter() self._report_transfer_result(thread_info, end_time - start_time) file_transfer.TransferConfig = \ self._config_wrapper(TransferConfig, CONFIG_ATTRS + DOWNLOAD_CONFIG_ATTRS) self._report_transfer_params('Downloading', object_key, download_file_path, **download_args) start_time = time.perf_counter() thread_info = download_func(self.demo_bucket, object_key, download_file_path, self.file_size_mb, **download_args) end_time = time.perf_counter() self._report_transfer_result(thread_info, end_time - start_time) def last_name_set(self): """Get the name set used for the last demo.""" return self._chore_list[-1] def cleanup(self): """ Remove files from the demo folder, and uploaded objects from the Amazon S3 bucket. """ print('-' * self._terminal_width) for local_file_path, s3_object_key, downloaded_file_path \ in self._chore_list: print(f"Removing {local_file_path}") try: os.remove(local_file_path) except FileNotFoundError as err: print(err) print(f"Removing {downloaded_file_path}") try: os.remove(downloaded_file_path) except FileNotFoundError as err: print(err) if self.demo_bucket: print(f"Removing {self.demo_bucket}:{s3_object_key}") try: self._s3.Bucket(self.demo_bucket).Object( s3_object_key).delete() except ClientError as err: print(err) def _setup_platform_specific(self): """Set up platform-specific command used to create a large file.""" if platform.system() == "Windows": self._create_file_cmd = "fsutil file createnew {} {}" self._size_multiplier = MB elif platform.system() == "Linux" or platform.system() == "Darwin": self._create_file_cmd = f"dd if=/dev/urandom of={{}} " \ f"bs={MB} count={{}}" self._size_multiplier = 1 else: raise EnvironmentError( f"Demo of platform {platform.system()} isn't supported.") def _create_demo_file(self): """ Create a file in the demo folder specified by the user. Store the local path, object name, and download path for later cleanup. Only the local file is created by this method. The Amazon S3 object and download file are created later during the demonstration. Returns: A tuple that contains the local file path, object name, and download file path. """ file_name_template = "TestFile{}-{}.demo" local_suffix = "local" object_suffix = "s3object" download_suffix = "downloaded" file_tag = len(self._chore_list) + 1 local_file_path = os.path.join( self.demo_folder, file_name_template.format(file_tag, local_suffix)) s3_object_key = file_name_template.format(file_tag, object_suffix) downloaded_file_path = os.path.join( self.demo_folder, file_name_template.format(file_tag, download_suffix)) filled_cmd = self._create_file_cmd.format( local_file_path, self.file_size_mb * self._size_multiplier) print(f"Creating file of size {self.file_size_mb} MB " f"in {self.demo_folder} by running:") print(f"{'':4}{filled_cmd}") os.system(filled_cmd) chore = (local_file_path, s3_object_key, downloaded_file_path) self._chore_list.append(chore) return chore def _report_transfer_params(self, verb, source_name, dest_name, **kwargs): """Report configuration and extra arguments used for a file transfer.""" print('-' * self._terminal_width) print(f'{verb} {source_name} ({self.file_size_mb} MB) to {dest_name}') if kwargs: print('With extra args:') for arg, value in kwargs.items(): print(f'{"":4}{arg:<20}: {value}') @staticmethod def ask_user(question): """ Ask the user a yes or no question. Returns: True when the user answers 'y' or 'Y'; otherwise, False. """ answer = input(f"{question} (y/n) ") return answer.lower() == 'y' @staticmethod def _config_wrapper(func, config_attrs): def wrapper(*args, **kwargs): config = func(*args, **kwargs) print('With configuration:') for attr in config_attrs: print(f'{"":4}{attr:<20}: {getattr(config, attr)}') return config return wrapper @staticmethod def _report_transfer_result(thread_info, elapsed): """Report the result of a transfer, including per-thread data.""" print(f"\nUsed {len(thread_info)} threads.") for ident, byte_count in thread_info.items(): print(f"{'':4}Thread {ident} copied {byte_count} bytes.") print(f"Your transfer took {elapsed:.2f} seconds.") def main(): """ Run the demonstration script for s3_file_transfer. """ demo_manager = TransferDemoManager() demo_manager.collect_user_info() # Upload and download with default configuration. Because the file is 30 MB # and the default multipart_threshold is 8 MB, both upload and download are # multipart transfers. demo_manager.demo( "Do you want to upload and download a {} MB file " "using the default configuration?", file_transfer.upload_with_default_configuration, file_transfer.download_with_default_configuration) # Upload and download with multipart_threshold set higher than the size of # the file. This causes the transfer manager to use standard transfers # instead of multipart transfers. demo_manager.demo( "Do you want to upload and download a {} MB file " "as a standard (not multipart) transfer?", file_transfer.upload_with_high_threshold, file_transfer.download_with_high_threshold) # Upload with specific chunk size and additional metadata. # Download with a single thread. demo_manager.demo( "Do you want to upload a {} MB file with a smaller chunk size and " "then download the same file using a single thread?", file_transfer.upload_with_chunksize_and_meta, file_transfer.download_with_single_thread, upload_args={ 'metadata': { 'upload_type': 'chunky', 'favorite_color': 'aqua', 'size': 'medium'}}) # Upload using server-side encryption with customer-provided # encryption keys. # Generate a 256-bit key from a passphrase. sse_key = hashlib.sha256('demo_passphrase'.encode('utf-8')).digest() demo_manager.demo( "Do you want to upload and download a {} MB file using " "server-side encryption?", file_transfer.upload_with_sse, file_transfer.download_with_sse, upload_args={'sse_key': sse_key}, download_args={'sse_key': sse_key}) # Download without specifying an encryption key to show that the # encryption key must be included to download an encrypted object. if demo_manager.ask_user("Do you want to try to download the encrypted " "object without sending the required key?"): try: _, object_key, download_file_path = \ demo_manager.last_name_set() file_transfer.download_with_default_configuration( demo_manager.demo_bucket, object_key, download_file_path, demo_manager.file_size_mb) except ClientError as err: print("Got expected error when trying to download an encrypted " "object without specifying encryption info:") print(f"{'':4}{err}") # Remove all created and downloaded files, remove all objects from # S3 storage. if demo_manager.ask_user( "Demonstration complete. Do you want to remove local files " "and S3 objects?"): demo_manager.cleanup() if __name__ == '__main__': try: main() except NoCredentialsError as error: print(error) print("To run this example, you must have valid credentials in " "a shared credential file or set in environment variables.")