Amazon S3 Control examples using SDK for Python (Boto3) - AWS SDK Code Examples

There are more AWS SDK examples available in the AWS Doc SDK Examples GitHub repo.

Amazon S3 Control examples using SDK for Python (Boto3)

The following code examples show you how to perform actions and implement common scenarios by using the AWS SDK for Python (Boto3) with Amazon S3 Control.

Basics are code examples that show you how to perform the essential operations within a service.

Actions are code excerpts from larger programs and must be run in context. While actions show you how to call individual service functions, you can see actions in context in their related scenarios.

Each example includes a link to the complete source code, where you can find instructions on how to set up and run the code in context.

Get started

The following code examples show how to get started using Amazon S3 Control.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise
  • For API details, see ListJobs in AWS SDK for Python (Boto3) API Reference.

Basics

The following code example shows how to learn core operations for Amazon S3 Control.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

Learn S3 Batch Basics Scenario.

class S3BatchWrapper: """Wrapper class for managing S3 Batch Operations.""" def __init__(self, s3_client: Any, s3control_client: Any, sts_client: Any) -> None: """ Initializes the S3BatchWrapper with AWS service clients. :param s3_client: A Boto3 Amazon S3 client. This client provides low-level access to AWS S3 services. :param s3control_client: A Boto3 Amazon S3 Control client. This client provides low-level access to AWS S3 Control services. :param sts_client: A Boto3 AWS STS client. This client provides low-level access to AWS STS services. """ self.s3_client = s3_client self.s3control_client = s3control_client self.sts_client = sts_client # Get region from the client for bucket creation logic self.region_name = self.s3_client.meta.region_name def get_account_id(self) -> str: """ Get AWS account ID. Returns: str: AWS account ID """ return self.sts_client.get_caller_identity()["Account"] def create_bucket(self, bucket_name: str) -> None: """ Create an S3 bucket. Args: bucket_name (str): Name of the bucket to create Raises: ClientError: If bucket creation fails """ try: if self.region_name and self.region_name != 'us-east-1': self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': self.region_name } ) else: self.s3_client.create_bucket(Bucket=bucket_name) print(f"Created bucket: {bucket_name}") except ClientError as e: print(f"Error creating bucket: {e}") raise def upload_files_to_bucket(self, bucket_name: str, file_names: List[str]) -> str: """ Upload files to S3 bucket including manifest file. Args: bucket_name (str): Target bucket name file_names (list): List of file names to upload Returns: str: ETag of the manifest file Raises: ClientError: If file upload fails """ try: for file_name in file_names: if file_name != "job-manifest.csv": content = f"Content for {file_name}" self.s3_client.put_object( Bucket=bucket_name, Key=file_name, Body=content.encode('utf-8') ) print(f"Uploaded {file_name} to {bucket_name}") manifest_content = "" for file_name in file_names: if file_name != "job-manifest.csv": manifest_content += f"{bucket_name},{file_name}\n" manifest_response = self.s3_client.put_object( Bucket=bucket_name, Key="job-manifest.csv", Body=manifest_content.encode('utf-8') ) print(f"Uploaded manifest file to {bucket_name}") print(f"Manifest content:\n{manifest_content}") return manifest_response['ETag'].strip('"') except ClientError as e: print(f"Error uploading files: {e}") raise def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise def check_job_failure_reasons(self, job_id: str, account_id: str) -> List[Dict[str, Any]]: """ Check for any failure reasons of a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID Returns: list: List of failure reasons Raises: ClientError: If checking job failure reasons fails """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) if 'FailureReasons' in response['Job']: for reason in response['Job']['FailureReasons']: print(f"- {reason}") return response['Job'].get('FailureReasons', []) except ClientError as e: print(f"Error checking job failure reasons: {e}") raise def wait_for_job_ready(self, job_id: str, account_id: str, desired_status: str = 'Ready') -> bool: """ Wait for a job to reach the desired status. Args: job_id (str): ID of the batch job account_id (str): AWS account ID desired_status (str): Target status to wait for Returns: bool: True if desired status is reached, False otherwise Raises: ClientError: If checking job status fails """ print(f"Waiting for job to become {desired_status}...") max_attempts = 60 attempt = 0 while attempt < max_attempts: try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status == desired_status: return True if current_status == 'Suspended': print("Job is in Suspended state, can proceed with activation") return True if current_status in ['Active', 'Failed', 'Cancelled', 'Complete']: print(f"Job is in {current_status} state, cannot reach {desired_status} status") if 'FailureReasons' in response['Job']: print("Failure reasons:") for reason in response['Job']['FailureReasons']: print(f"- {reason}") return False time.sleep(20) attempt += 1 except ClientError as e: print(f"Error checking job status: {e}") raise print(f"Timeout waiting for job to become {desired_status}") return False def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise def cleanup_resources(self, bucket_name: str, file_names: List[str]) -> None: """ Clean up all resources created during the scenario. Args: bucket_name (str): Name of the bucket to clean up file_names (list): List of files to delete Raises: ClientError: If cleanup fails """ try: for file_name in file_names: self.s3_client.delete_object(Bucket=bucket_name, Key=file_name) print(f"Deleted {file_name}") response = self.s3_client.list_objects_v2( Bucket=bucket_name, Prefix='batch-op-reports/' ) if 'Contents' in response: for obj in response['Contents']: self.s3_client.delete_object( Bucket=bucket_name, Key=obj['Key'] ) print(f"Deleted {obj['Key']}") self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Deleted bucket {bucket_name}") except ClientError as e: print(f"Error in cleanup: {e}") raise

Actions

The following code example shows how to use CreateJob.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise
  • For API details, see CreateJob in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DeleteJobTagging.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise
  • For API details, see DeleteJobTagging in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use DescribeJob.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise
  • For API details, see DescribeJob in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use GetJobTagging.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise
  • For API details, see GetJobTagging in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use PutJobTagging.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise
  • For API details, see PutJobTagging in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use UpdateJobPriority.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return
  • For API details, see UpdateJobPriority in AWS SDK for Python (Boto3) API Reference.

The following code example shows how to use UpdateJobStatus.

SDK for Python (Boto3)
Note

There's more on GitHub. Find the complete example and learn how to set up and run in the AWS Code Examples Repository.

def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise
  • For API details, see UpdateJobStatus in AWS SDK for Python (Boto3) API Reference.