Contoh Kontrol Amazon S3 menggunakan SDK for Python (Boto3) - AWS Contoh Kode SDK

Ada lebih banyak contoh AWS SDK yang tersedia di repo Contoh SDK AWS Doc. GitHub

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Contoh Kontrol Amazon S3 menggunakan SDK for Python (Boto3)

Contoh kode berikut menunjukkan kepada Anda cara melakukan tindakan dan menerapkan skenario umum dengan menggunakan Kontrol Amazon S3 AWS SDK untuk Python (Boto3) dengan.

Dasar-dasar adalah contoh kode yang menunjukkan kepada Anda bagaimana melakukan operasi penting dalam suatu layanan.

Tindakan merupakan kutipan kode dari program yang lebih besar dan harus dijalankan dalam konteks. Sementara tindakan menunjukkan cara memanggil fungsi layanan individual, Anda dapat melihat tindakan dalam konteks dalam skenario terkait.

Setiap contoh menyertakan tautan ke kode sumber lengkap, di mana Anda dapat menemukan instruksi tentang cara mengatur dan menjalankan kode dalam konteks.

Memulai

Contoh kode berikut menunjukkan cara memulai menggunakan Amazon S3 Control.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise
  • Untuk detail API, lihat ListJobsdi AWS SDK for Python (Boto3) Referensi API.

Hal-hal mendasar

Contoh kode berikut menunjukkan cara mempelajari operasi inti untuk Kontrol Amazon S3.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

Pelajari Skenario Dasar Batch S3.

class S3BatchWrapper: """Wrapper class for managing S3 Batch Operations.""" def __init__(self, s3_client: Any, s3control_client: Any, sts_client: Any) -> None: """ Initializes the S3BatchWrapper with AWS service clients. :param s3_client: A Boto3 Amazon S3 client. This client provides low-level access to AWS S3 services. :param s3control_client: A Boto3 Amazon S3 Control client. This client provides low-level access to AWS S3 Control services. :param sts_client: A Boto3 AWS STS client. This client provides low-level access to AWS STS services. """ self.s3_client = s3_client self.s3control_client = s3control_client self.sts_client = sts_client # Get region from the client for bucket creation logic self.region_name = self.s3_client.meta.region_name def get_account_id(self) -> str: """ Get AWS account ID. Returns: str: AWS account ID """ return self.sts_client.get_caller_identity()["Account"] def create_bucket(self, bucket_name: str) -> None: """ Create an S3 bucket. Args: bucket_name (str): Name of the bucket to create Raises: ClientError: If bucket creation fails """ try: if self.region_name and self.region_name != 'us-east-1': self.s3_client.create_bucket( Bucket=bucket_name, CreateBucketConfiguration={ 'LocationConstraint': self.region_name } ) else: self.s3_client.create_bucket(Bucket=bucket_name) print(f"Created bucket: {bucket_name}") except ClientError as e: print(f"Error creating bucket: {e}") raise def upload_files_to_bucket(self, bucket_name: str, file_names: List[str]) -> str: """ Upload files to S3 bucket including manifest file. Args: bucket_name (str): Target bucket name file_names (list): List of file names to upload Returns: str: ETag of the manifest file Raises: ClientError: If file upload fails """ try: for file_name in file_names: if file_name != "job-manifest.csv": content = f"Content for {file_name}" self.s3_client.put_object( Bucket=bucket_name, Key=file_name, Body=content.encode('utf-8') ) print(f"Uploaded {file_name} to {bucket_name}") manifest_content = "" for file_name in file_names: if file_name != "job-manifest.csv": manifest_content += f"{bucket_name},{file_name}\n" manifest_response = self.s3_client.put_object( Bucket=bucket_name, Key="job-manifest.csv", Body=manifest_content.encode('utf-8') ) print(f"Uploaded manifest file to {bucket_name}") print(f"Manifest content:\n{manifest_content}") return manifest_response['ETag'].strip('"') except ClientError as e: print(f"Error uploading files: {e}") raise def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise def check_job_failure_reasons(self, job_id: str, account_id: str) -> List[Dict[str, Any]]: """ Check for any failure reasons of a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID Returns: list: List of failure reasons Raises: ClientError: If checking job failure reasons fails """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) if 'FailureReasons' in response['Job']: for reason in response['Job']['FailureReasons']: print(f"- {reason}") return response['Job'].get('FailureReasons', []) except ClientError as e: print(f"Error checking job failure reasons: {e}") raise def wait_for_job_ready(self, job_id: str, account_id: str, desired_status: str = 'Ready') -> bool: """ Wait for a job to reach the desired status. Args: job_id (str): ID of the batch job account_id (str): AWS account ID desired_status (str): Target status to wait for Returns: bool: True if desired status is reached, False otherwise Raises: ClientError: If checking job status fails """ print(f"Waiting for job to become {desired_status}...") max_attempts = 60 attempt = 0 while attempt < max_attempts: try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status == desired_status: return True if current_status == 'Suspended': print("Job is in Suspended state, can proceed with activation") return True if current_status in ['Active', 'Failed', 'Cancelled', 'Complete']: print(f"Job is in {current_status} state, cannot reach {desired_status} status") if 'FailureReasons' in response['Job']: print("Failure reasons:") for reason in response['Job']['FailureReasons']: print(f"- {reason}") return False time.sleep(20) attempt += 1 except ClientError as e: print(f"Error checking job status: {e}") raise print(f"Timeout waiting for job to become {desired_status}") return False def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise def list_jobs(self, account_id: str) -> None: """ List all batch jobs for the account. Args: account_id (str): AWS account ID """ try: response = self.s3control_client.list_jobs( AccountId=account_id, JobStatuses=['Active', 'Complete', 'Cancelled', 'Failed', 'New', 'Paused', 'Pausing', 'Preparing', 'Ready', 'Suspended'] ) jobs = response.get('Jobs', []) for job in jobs: print(f"The job id is {job['JobId']}") print(f"The job priority is {job['Priority']}") except ClientError as e: print(f"Error listing jobs: {e}") raise def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise def cleanup_resources(self, bucket_name: str, file_names: List[str]) -> None: """ Clean up all resources created during the scenario. Args: bucket_name (str): Name of the bucket to clean up file_names (list): List of files to delete Raises: ClientError: If cleanup fails """ try: for file_name in file_names: self.s3_client.delete_object(Bucket=bucket_name, Key=file_name) print(f"Deleted {file_name}") response = self.s3_client.list_objects_v2( Bucket=bucket_name, Prefix='batch-op-reports/' ) if 'Contents' in response: for obj in response['Contents']: self.s3_client.delete_object( Bucket=bucket_name, Key=obj['Key'] ) print(f"Deleted {obj['Key']}") self.s3_client.delete_bucket(Bucket=bucket_name) print(f"Deleted bucket {bucket_name}") except ClientError as e: print(f"Error in cleanup: {e}") raise

Tindakan

Contoh kode berikut menunjukkan cara menggunakanCreateJob.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def create_s3_batch_job(self, account_id: str, role_arn: str, manifest_location: str, report_bucket_name: str) -> str: """ Create an S3 batch operation job. Args: account_id (str): AWS account ID role_arn (str): IAM role ARN for batch operations manifest_location (str): Location of the manifest file report_bucket_name (str): Bucket for job reports Returns: str: Job ID Raises: ClientError: If job creation fails """ try: bucket_name = manifest_location.split(':::')[1].split('/')[0] manifest_key = 'job-manifest.csv' manifest_obj = self.s3_client.head_object( Bucket=bucket_name, Key=manifest_key ) etag = manifest_obj['ETag'].strip('"') response = self.s3control_client.create_job( AccountId=account_id, Operation={ 'S3PutObjectTagging': { 'TagSet': [ { 'Key': 'BatchTag', 'Value': 'BatchValue' }, ] } }, Report={ 'Bucket': report_bucket_name, 'Format': 'Report_CSV_20180820', 'Enabled': True, 'Prefix': 'batch-op-reports', 'ReportScope': 'AllTasks' }, Manifest={ 'Spec': { 'Format': 'S3BatchOperations_CSV_20180820', 'Fields': ['Bucket', 'Key'] }, 'Location': { 'ObjectArn': manifest_location, 'ETag': etag } }, Priority=10, RoleArn=role_arn, Description='Batch job for tagging objects', ConfirmationRequired=True ) job_id = response['JobId'] print(f"The Job id is {job_id}") return job_id except ClientError as e: print(f"Error creating batch job: {e}") if 'Message' in str(e): print(f"Detailed error message: {e.response['Message']}") raise
  • Untuk detail API, lihat CreateJobdi AWS SDK for Python (Boto3) Referensi API.

Contoh kode berikut menunjukkan cara menggunakanDeleteJobTagging.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def delete_job_tags(self, job_id: str, account_id: str) -> None: """ Delete all tags from a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.delete_job_tagging( AccountId=account_id, JobId=job_id ) print(f"You have successfully deleted {job_id} tagging.") except ClientError as e: print(f"Error deleting job tags: {e}") raise
  • Untuk detail API, lihat DeleteJobTaggingdi AWS SDK for Python (Boto3) Referensi API.

Contoh kode berikut menunjukkan cara menggunakanDescribeJob.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def describe_job_details(self, job_id: str, account_id: str) -> None: """ Describe detailed information about a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) job = response['Job'] print(f"Job ID: {job['JobId']}") print(f"Description: {job.get('Description', 'N/A')}") print(f"Status: {job['Status']}") print(f"Role ARN: {job['RoleArn']}") print(f"Priority: {job['Priority']}") if 'ProgressSummary' in job: progress = job['ProgressSummary'] print(f"Progress Summary: Total={progress.get('TotalNumberOfTasks', 0)}, " f"Succeeded={progress.get('NumberOfTasksSucceeded', 0)}, " f"Failed={progress.get('NumberOfTasksFailed', 0)}") except ClientError as e: print(f"Error describing job: {e}") raise
  • Untuk detail API, lihat DescribeJobdi AWS SDK for Python (Boto3) Referensi API.

Contoh kode berikut menunjukkan cara menggunakanGetJobTagging.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def get_job_tags(self, job_id: str, account_id: str) -> None: """ Get tags associated with a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.get_job_tagging( AccountId=account_id, JobId=job_id ) tags = response.get('Tags', []) if tags: print(f"Tags for job {job_id}:") for tag in tags: print(f" {tag['Key']}: {tag['Value']}") else: print(f"No tags found for job ID: {job_id}") except ClientError as e: print(f"Error getting job tags: {e}") raise
  • Untuk detail API, lihat GetJobTaggingdi AWS SDK for Python (Boto3) Referensi API.

Contoh kode berikut menunjukkan cara menggunakanPutJobTagging.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def put_job_tags(self, job_id: str, account_id: str) -> None: """ Add tags to a batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: self.s3control_client.put_job_tagging( AccountId=account_id, JobId=job_id, Tags=[ {'Key': 'Environment', 'Value': 'Development'}, {'Key': 'Team', 'Value': 'DataProcessing'} ] ) print(f"Additional tags were added to job {job_id}") except ClientError as e: print(f"Error adding job tags: {e}") raise
  • Untuk detail API, lihat PutJobTaggingdi AWS SDK for Python (Boto3) Referensi API.

Contoh kode berikut menunjukkan cara menggunakanUpdateJobPriority.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def update_job_priority(self, job_id: str, account_id: str) -> None: """ Update the priority of a batch job and start it. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended']: self.s3control_client.update_job_priority( AccountId=account_id, JobId=job_id, Priority=60 ) print("The job priority was updated") try: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Ready' ) print("Job activated successfully") except ClientError as activation_error: print(f"Note: Could not activate job automatically: {activation_error}") print("Job priority was updated successfully. Job may need manual activation in the console.") elif current_status in ['Active', 'Completing', 'Complete']: print(f"Job is in '{current_status}' state - priority cannot be updated") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print("Job is currently running.") else: print(f"Job is in '{current_status}' state - priority update not allowed") except ClientError as e: print(f"Error updating job priority: {e}") print("Continuing with the scenario...") return
  • Untuk detail API, lihat UpdateJobPrioritydi AWS SDK for Python (Boto3) Referensi API.

Contoh kode berikut menunjukkan cara menggunakanUpdateJobStatus.

SDK untuk Python (Boto3)
catatan

Ada lebih banyak tentang GitHub. Temukan contoh lengkapnya dan pelajari cara mengatur dan menjalankannya di Repositori Contoh Kode AWS.

def cancel_job(self, job_id: str, account_id: str) -> None: """ Cancel an S3 batch job. Args: job_id (str): ID of the batch job account_id (str): AWS account ID """ try: response = self.s3control_client.describe_job( AccountId=account_id, JobId=job_id ) current_status = response['Job']['Status'] print(f"Current job status: {current_status}") if current_status in ['Ready', 'Suspended', 'Active']: self.s3control_client.update_job_status( AccountId=account_id, JobId=job_id, RequestedJobStatus='Cancelled' ) print(f"Job {job_id} was successfully canceled.") elif current_status in ['Completing', 'Complete']: print(f"Job is in '{current_status}' state - cannot be cancelled") if current_status == 'Completing': print("Job is finishing up and will complete soon.") elif current_status == 'Complete': print("Job has already completed successfully.") else: print(f"Job is in '{current_status}' state - cancel not allowed") except ClientError as e: print(f"Error canceling job: {e}") raise
  • Untuk detail API, lihat UpdateJobStatusdi AWS SDK for Python (Boto3) Referensi API.