亚马逊 S3 实用工具 - 适用于 Go 的 AWS SDK v2

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

亚马逊 S3 实用工具

亚马逊 S3 转会管理器

Amazon S3 上传和下载管理器可以分解大型对象,因此可以将它们分成多个部分并行传输。这样可以轻松恢复中断的传输。

亚马逊 S3 上传管理器

Amazon S3 上传管理器决定是否可以将文件拆分为较小的部分并行上传。您可以自定义 parallel 上传的数量和上传段的大小。

以下示例使用 Amazon Uploader S3 上传文件。使用Uploaders3.PutObject()操作类似。

import "context" import "github.com/aws/aws-sdk-go-v2/config" import "github.com/aws/aws-sdk-go-v2/service/s3" import "github.com/aws/aws-sdk-go-v2/feature/s3/manager" // ... cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Printf("error: %v", err) return } client := s3.NewFromConfig(cfg) uploader := manager.NewUploader(client) result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-object-key"), Body: uploadFile, })

配置选项

使用实例化Uploader实例时 NewUploader,您可以指定多个配置选项来自定义对象的上传方式。通过向提供一个或多个参数来覆盖选项。NewUploader这些选项包括:

  • PartSize— 指定要上传的每个分段的缓冲区大小(以字节为单位)。每个部件的最小大小为 5 MiB。

  • Concurrency— 指定要并行上传的分段数。

  • LeavePartsOnError— 表示是否将成功上传的段留在 Amazon S3 中。

Concurrency值限制了给定Upload调用可能发生的分段上传的并发数量。这不是全局客户端并发限制。调整PartSizeConcurrency配置值以找到最佳配置。例如,具有高带宽连接的系统可以并行发送更大的部分和更多的上传。

例如,您的应用程序UploaderConcurrency的配置设置为。5如果您的应用程序随后Upload从两个不同的 goroutine 调用,则结果是10并发分段上传(2 个 goroutine * 5)。Concurrency

警告

您的应用程序应限制并发调用次数,Upload以防止应用程序资源耗尽。

以下是在Uploader创建过程中设置默认零件尺寸的示例:

uploader := manager.NewUploader(client, func(u *Uploader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })

有关Uploader其配置的更多信息,请参阅 适用于 Go 的 AWS SDK API 参考中的 Uploader

PutObjectInput Body Field (io. ReadSeeker 与 io.Reader 对比)

s3.PutObjectInput结构的Body字段是一个io.Reader类型。但是,可以用同时满足io.ReadSeekerio.ReaderAt接口的类型填充此字段,以提高主机环境的应用程序资源利用率。以下示例创建了满足两个接口ReadSeekerAt的类型:

type ReadSeekerAt interface { io.ReadSeeker io.ReaderAt }

对于io.Reader类型,必须先将读取器的字节缓存在内存中,然后才能上传分段。增加PartSizeConcurrency值时,所需的内存 (RAM) Uploader 会显著增加。所需的内存约为 PartSize* Concurrency。例如,为指定 100 MBPartSize,为指定 10Concurrency,至少需要 1 GB。

由于io.Reader类型在读取其字节之前无法确定其大小,Uploader因此无法计算要上传多少段。因此,如果您设置得PartSize太低,则Uploader可以达到 Amazon S3 的大文件上传 10,000 个分段的限制。如果您尝试上传超过 10,000 个分段,则上传会停止并返回错误。

对于实现该ReadSeekerAt类型的body值,在将正文内容发送到 Amazon S3 之前,Uploader不会将其缓冲到内存中。 Uploader在将文件上传到 Amazon S3 之前计算预期的分段数。如果当前值PartSize需要超过 10,000 个分段才能上传文件,则Uploader会增加分段大小值以减少所需的分段。

处理失败的上传

如果上传到 Amazon S3 失败,默认情况下会Uploader使用 Amazon S3 AbortMultipartUpload 操作来移除已上传的分段。此功能可确保失败的上传不会占用 Amazon S3 存储空间。

您可以LeavePartsOnError将其设置为 true,这样就Uploader不会删除成功上传的分段。这对于恢复部分完成的上传非常有用。要对上传的分段进行操作,您必须获取上传失败的信息。UploadID以下示例演示如何使用manager.MultiUploadFailure错误接口类型来获取UploadID

result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-object-key"), Body: uploadFile, }) output, err := u.upload(input) if err != nil { var mu manager.MultiUploadFailure if errors.As(err, &mu) { // Process error and its associated uploadID fmt.Println("Error:", mu) _ = mu.UploadID() // retrieve the associated UploadID } else { // Process error generically fmt.Println("Error:", err.Error()) } return }

覆盖每次上传的上传者选项

在调用时,您可以Upload通过向方法提供一个或多个参数来覆盖这些Uploader选项。这些替代是并发安全的修改,不会影响正在进行的上传或随后对管理器的调用。Upload例如,要覆盖特定上传请求的PartSize配置,请执行以下操作:

params := &s3.PutObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-key"), Body: myBody, } resp, err := uploader.Upload(context.TODO(), params, func(u *manager.Uploader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })

示例

将文件夹上传到亚马逊 S3

以下示例使用该path/filepath包以递归方式收集文件列表并将其上传到指定的 Amazon S3 存储桶。Amazon S3 对象的密钥以文件的相对路径为前缀。

package main import ( "context" "log" "os" "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) var ( localPath string bucket string prefix string ) func init() { if len(os.Args) != 4 { log.Fatalln("Usage:", os.Args[0], "<local path> <bucket> <prefix>") } localPath = os.Args[1] bucket = os.Args[2] prefix = os.Args[3] } func main() { walker := make(fileWalk) go func() { // Gather the files to upload by walking the path recursively if err := filepath.Walk(localPath, walker.Walk); err != nil { log.Fatalln("Walk failed:", err) } close(walker) }() cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Fatalln("error:", err) } // For each file found walking, upload it to Amazon S3 uploader := manager.NewUploader(s3.NewFromConfig(cfg)) for path := range walker { rel, err := filepath.Rel(localPath, path) if err != nil { log.Fatalln("Unable to get relative path:", path, err) } file, err := os.Open(path) if err != nil { log.Println("Failed opening file", path, err) continue } defer file.Close() result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: &bucket, Key: aws.String(filepath.Join(prefix, rel)), Body: file, }) if err != nil { log.Fatalln("Failed to upload", path, err) } log.Println("Uploaded", path, result.Location) } } type fileWalk chan string func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { f <- path } return nil }

下载管理器

Amazon S3 下载器管理器决定是否可以将文件拆分成较小的部分并行下载。您可以自定义 parallel 下载次数和已下载部分的大小。

示例:下载文件

以下示例使用 Amazon Downloader S3 下载文件。使用Downloader与 s 3 类似。 GetObject操作。

import "context" import "github.com/aws/aws-sdk-go-v2/aws" import "github.com/aws/aws-sdk-go-v2/config" import "github.com/aws/aws-sdk-go-v2/service/s3" import "github.com/aws/aws-sdk-go-v2/feature/s3/manager" // ... cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Println("error:", err) return } client := s3.NewFromConfig(cfg) downloader := manager.NewDownloader(client) numBytes, err := downloader.Download(context.TODO(), downloadFile, &s3.GetObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-key"), })

downloadFile参数是一个io.WriterAt类型。该WriterAt接口使能够并行写入文件的多个部分。Downloader

配置选项

实例化实例时,您可以指定配置选项以自定义对象的下载方式:Downloader

  • PartSize— 指定要下载的每个部分的缓冲区大小(以字节为单位)。每个部分的最小大小为 5 MB。

  • Concurrency— 指定要并行下载的部分数量。

Concurrency值限制了给定Download呼叫可能发生的分段下载的并发数量。这不是全局客户端并发限制。调整PartSizeConcurrency配置值以找到最佳配置。例如,具有高带宽连接的系统可以并行接收更大的部分和更多的下载量。

例如,您的应用程序配置DownloaderConcurrency为。5然后,您的应用程序Download从两个不同的 goroutine 调用,结果将是10并行下载部件(2 个 goroutine * 5)。Concurrency

警告

您的应用程序应限制并发调用次数,Download以防止应用程序资源耗尽。

有关Downloader及其其他配置选项的更多信息,请参阅 API 参考中的 Manager.downloader。 适用于 Go 的 AWS SDK

覆盖每次下载的下载器选项

在调用时,您可以Download通过向方法提供一个或多个函数参数来覆盖这些Downloader选项。这些替代是并发安全的修改,不会影响正在进行的上传或对管理器的后续Download调用。例如,要覆盖特定上传请求的PartSize配置,请执行以下操作:

params := &s3.GetObjectInput{ Bucket: aws.String("amzn-s3-demo-bucket"), Key: aws.String("my-key"), } resp, err := downloader.Download(context.TODO(), targetWriter, params, func(u *manager.Downloader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
示例
下载存储桶中的所有对象

以下示例使用分页从 Amazon S3 存储桶中收集对象列表。然后,它将每个对象下载到本地文件。

package main import ( "context" "fmt" "log" "os" "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) var ( Bucket = "amzn-s3-demo-bucket" // Download from this bucket Prefix = "logs/" // Using this key prefix LocalDirectory = "s3logs" // Into this directory ) func main() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Fatalln("error:", err) } client := s3.NewFromConfig(cfg) manager := manager.NewDownloader(client) paginator := s3.NewListObjectsV2Paginator(client, &s3.ListObjectsV2Input{ Bucket: &Bucket, Prefix: &Prefix, }) for paginator.HasMorePages() { page, err := paginator.NextPage(context.TODO()) if err != nil { log.Fatalln("error:", err) } for _, obj := range page.Contents { if err := downloadToFile(manager, LocalDirectory, Bucket, aws.ToString(obj.Key)); err != nil { log.Fatalln("error:", err) } } } } func downloadToFile(downloader *manager.Downloader, targetDirectory, bucket, key string) error { // Create the directories in the path file := filepath.Join(targetDirectory, key) if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil { return err } // Set up the local file fd, err := os.Create(file) if err != nil { return err } defer fd.Close() // Download the file using the AWS SDK for Go fmt.Printf("Downloading s3://%s/%s to %s...\n", bucket, key, file) _, err = downloader.Download(context.TODO(), fd, &s3.GetObjectInput{Bucket: &bucket, Key: &key}) return err }

GetBucketRegion

GetBucketRegion是一项用于确定 Amazon S3 存储桶的 AWS 区域位置的实用函数。 GetBucketRegion获取 Amazon S3 客户端,并使用它来确定请求的存储桶在与该客户配置的区域关联的 AWS 分区中的位置。

例如,要查找存储桶的区域,请执行amzn-s3-demo-bucket以下操作:

cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Println("error:", err) return } bucket := "amzn-s3-demo-bucket" region, err := manager.GetBucketRegion(ctx, s3.NewFromConfig(cfg), bucket) if err != nil { var bnf manager.BucketNotFound if errors.As(err, &bnf) { log.Printf("unable to find bucket %s's Region\n", bucket) } else { log.Println("error:", err) } return } fmt.Printf("Bucket %s is in %s region\n", bucket, region)

GetBucketRegion如果无法解析存储桶的位置,则该函数将返回BucketNotFound错误类型,如示例所示。

不可搜索的直播输入

对于PutObject和之类的 API 操作UploadPart,默认情况下,Amazon S3 客户端需要Body输入参数的值来实现 io.Seeker 接口。客户端使用该io.Seeker接口来确定要上传的值的长度,并计算请求签名的有效载荷哈希。如果Body输入参数值未实现io.Seeker,则您的应用程序将收到错误。

operation error S3: PutObject, failed to compute payload hash: failed to seek body to start, request stream is not seekable

您可以通过中间件使用功能选项修改操作方法来更改此行为。W ith APIOptions 助手返回零个或多个中间件突变器的功能选项。要禁用客户端计算有效载荷哈希值并使用未签名的有效载荷请求签名,请添加 v4。 SwapComputePayloadSHA256ForUnsignedPayloadMiddleware

resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: &bucketName, Key: &objectName, Body: bytes.NewBuffer([]byte(`example object!`)), ContentLength: 15, // length of body }, s3.WithAPIOptions( v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware, ))
警告

Amazon S3 要求为上传到存储桶的所有对象提供内容长度。由于Body输入参数未实现io.Seeker接口,因此客户端将无法计算请求的ContentLength参数。参数必须由应用程序提供。如果未提供ContentLength参数,则请求将失败。

亚马逊 S3 上传管理器对于不可搜索且长度不明的上传,请使用 SDK。