AWS SDKを使用し、Amazon S3 Glacier アーカイブコンテンツを取得します - AWS SDK コードサンプル

Doc AWS SDK Examples リポジトリには、他にも SDK の例があります。 AWS GitHub

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS SDKを使用し、Amazon S3 Glacier アーカイブコンテンツを取得します

次のコードサンプルは、以下の操作方法を示しています。

  • Amazon S3 Glacier ボールトのジョブをリストし、ジョブのステータスを取得します。

  • 完了したアーカイブの取得ジョブの出力を取得します。

  • アーカイブを削除します。

  • ボールトを削除します。

Python
SDK for Python (Boto3)
注記

には他にもがあります GitHub。用例一覧を検索し、AWS コードサンプルリポジトリでの設定と実行の方法を確認してください。

S3 Glacier オペレーションをラップするクラスを作成します。

import argparse import logging import os import boto3 from botocore.exceptions import ClientError logger = logging.getLogger(__name__) class GlacierWrapper: """Encapsulates Amazon S3 Glacier API operations.""" def __init__(self, glacier_resource): """ :param glacier_resource: A Boto3 Amazon S3 Glacier resource. """ self.glacier_resource = glacier_resource @staticmethod def list_jobs(vault, job_type): """ Lists jobs by type for the specified vault. :param vault: The vault to query. :param job_type: The type of job to list. :return: The list of jobs of the requested type. """ job_list = [] try: if job_type == "all": jobs = vault.jobs.all() elif job_type == "in_progress": jobs = vault.jobs_in_progress.all() elif job_type == "completed": jobs = vault.completed_jobs.all() elif job_type == "succeeded": jobs = vault.succeeded_jobs.all() elif job_type == "failed": jobs = vault.failed_jobs.all() else: jobs = [] logger.warning("%s isn't a type of job I can get.", job_type) for job in jobs: job_list.append(job) logger.info("Got %s %s job %s.", job_type, job.action, job.id) except ClientError: logger.exception("Couldn't get %s jobs from %s.", job_type, vault.name) raise else: return job_list @staticmethod def get_job_output(job): """ Gets the output of a job, such as a vault inventory or the contents of an archive. :param job: The job to get output from. :return: The job output, in bytes. """ try: response = job.get_output() out_bytes = response["body"].read() logger.info("Read %s bytes from job %s.", len(out_bytes), job.id) if "archiveDescription" in response: logger.info( "These bytes are described as '%s'", response["archiveDescription"] ) except ClientError: logger.exception("Couldn't get output for job %s.", job.id) raise else: return out_bytes @staticmethod def delete_archive(archive): """ Deletes an archive from a vault. :param archive: The archive to delete. """ try: archive.delete() logger.info( "Deleted archive %s from vault %s.", archive.id, archive.vault_name ) except ClientError: logger.exception("Couldn't delete archive %s.", archive.id) raise @staticmethod def delete_vault(vault): """ Deletes a vault. :param vault: The vault to delete. """ try: vault.delete() logger.info("Deleted vault %s.", vault.name) except ClientError: logger.exception("Couldn't delete vault %s.", vault.name) raise

Wrapper クラスの関数を呼び出して、完了したジョブからアーカイブコンテンツを取得し、アーカイブを削除します。

def retrieve_demo(glacier, vault_name): """ Shows how to: * List jobs for a vault and get job status. * Get the output of a completed archive retrieval job. * Delete an archive. * Delete a vault. :param glacier: A Boto3 Amazon S3 Glacier resource. :param vault_name: The name of the vault to query for jobs. """ vault = glacier.glacier_resource.Vault("-", vault_name) try: vault.load() except ClientError as err: if err.response["Error"]["Code"] == "ResourceNotFoundException": print( f"\nVault {vault_name} doesn't exist. You must first run this script " f"with the --upload flag to create the vault." ) return else: raise print(f"\nGetting completed jobs for {vault.name}.") jobs = glacier.list_jobs(vault, "completed") if not jobs: print("\nNo completed jobs found. Give it some time and try again later.") return retrieval_job = None for job in jobs: if job.action == "ArchiveRetrieval" and job.status_code == "Succeeded": retrieval_job = job break if retrieval_job is None: print( "\nNo ArchiveRetrieval jobs found. Give it some time and try again " "later." ) return print(f"\nGetting output from job {retrieval_job.id}.") archive_bytes = glacier.get_job_output(retrieval_job) archive_str = archive_bytes.decode("utf-8") print("\nGot archive data. Printing the first 10 lines.") print(os.linesep.join(archive_str.split(os.linesep)[:10])) print(f"\nDeleting the archive from {vault.name}.") archive = glacier.glacier_resource.Archive( "-", vault.name, retrieval_job.archive_id ) glacier.delete_archive(archive) print(f"\nDeleting {vault.name}.") glacier.delete_vault(vault)