本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。
亚马逊 S3 实用工具
亚马逊 S3 转会管理器
Amazon S3 上传和下载管理器可以分解大型对象,因此可以将它们分成多个部分并行传输。这样可以轻松恢复中断的传输。
亚马逊 S3 上传管理器
Amazon S3 上传管理器决定是否可以将文件拆分为较小的部分并行上传。您可以自定义 parallel 上传的数量和上传段的大小。
以下示例使用 Amazon Uploader
S3 上传文件。使用Uploader
与s3.PutObject()
操作类似。
import "context" import "github.com/aws/aws-sdk-go-v2/config" import "github.com/aws/aws-sdk-go-v2/service/s3" import "github.com/aws/aws-sdk-go-v2/feature/s3/manager" // ... cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Printf("error: %v", err) return } client := s3.NewFromConfig(cfg) uploader := manager.NewUploader(client) result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String("
amzn-s3-demo-bucket
"), Key: aws.String("my-object-key
"), Body: uploadFile, })
配置选项
使用实例化Uploader
实例时 NewUploaderNewUploader
这些选项包括:
-
PartSize
— 指定要上传的每个分段的缓冲区大小(以字节为单位)。每个部件的最小大小为 5 MiB。 -
Concurrency
— 指定要并行上传的分段数。 -
LeavePartsOnError
— 表示是否将成功上传的段留在 Amazon S3 中。
该Concurrency
值限制了给定Upload
调用可能发生的分段上传的并发数量。这不是全局客户端并发限制。调整PartSize
和Concurrency
配置值以找到最佳配置。例如,具有高带宽连接的系统可以并行发送更大的部分和更多的上传。
例如,您的应用程序Uploader
Concurrency
的配置设置为。5
如果您的应用程序随后Upload
从两个不同的 goroutine 调用,则结果是10
并发分段上传(2 个 goroutine * 5)。Concurrency
警告
您的应用程序应限制并发调用次数,Upload
以防止应用程序资源耗尽。
以下是在Uploader
创建过程中设置默认零件尺寸的示例:
uploader := manager.NewUploader(client, func(u *Uploader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
有关Uploader
其配置的更多信息,请参阅 适用于 Go 的 AWS SDK API 参考中的 Uploader
PutObjectInput Body Field (io. ReadSeeker 与 io.Reader 对比)
s3.PutObjectInput
结构的Body
字段是一个io.Reader
类型。但是,可以用同时满足io.ReadSeeker
和io.ReaderAt
接口的类型填充此字段,以提高主机环境的应用程序资源利用率。以下示例创建了满足两个接口ReadSeekerAt
的类型:
type ReadSeekerAt interface { io.ReadSeeker io.ReaderAt }
对于io.Reader
类型,必须先将读取器的字节缓存在内存中,然后才能上传分段。增加PartSize
或Concurrency
值时,所需的内存 (RAM) Uploader
会显著增加。所需的内存约为 PartSize
* Concurrency
。例如,为指定 100 MBPartSize
,为指定 10Concurrency
,至少需要 1 GB。
由于io.Reader
类型在读取其字节之前无法确定其大小,Uploader
因此无法计算要上传多少段。因此,如果您设置得PartSize
太低,则Uploader
可以达到 Amazon S3 的大文件上传 10,000 个分段的限制。如果您尝试上传超过 10,000 个分段,则上传会停止并返回错误。
对于实现该ReadSeekerAt
类型的body
值,在将正文内容发送到 Amazon S3 之前,Uploader
不会将其缓冲到内存中。 Uploader
在将文件上传到 Amazon S3 之前计算预期的分段数。如果当前值PartSize
需要超过 10,000 个分段才能上传文件,则Uploader
会增加分段大小值以减少所需的分段。
处理失败的上传
如果上传到 Amazon S3 失败,默认情况下会Uploader
使用 Amazon S3 AbortMultipartUpload
操作来移除已上传的分段。此功能可确保失败的上传不会占用 Amazon S3 存储空间。
您可以LeavePartsOnError
将其设置为 true,这样就Uploader
不会删除成功上传的分段。这对于恢复部分完成的上传非常有用。要对上传的分段进行操作,您必须获取上传失败的信息。UploadID
以下示例演示如何使用manager.MultiUploadFailure
错误接口类型来获取UploadID
。
result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: aws.String("
amzn-s3-demo-bucket
"), Key: aws.String("my-object-key
"), Body: uploadFile, }) output, err := u.upload(input) if err != nil { var mu manager.MultiUploadFailure if errors.As(err, &mu) { // Process error and its associated uploadID fmt.Println("Error:", mu) _ = mu.UploadID() // retrieve the associated UploadID } else { // Process error generically fmt.Println("Error:", err.Error()) } return }
覆盖每次上传的上传者选项
在调用时,您可以Upload
通过向方法提供一个或多个参数来覆盖这些Uploader
选项。这些替代是并发安全的修改,不会影响正在进行的上传或随后对管理器的调用。Upload
例如,要覆盖特定上传请求的PartSize
配置,请执行以下操作:
params := &s3.PutObjectInput{ Bucket: aws.String("
amzn-s3-demo-bucket
"), Key: aws.String("my-key
"), Body: myBody, } resp, err := uploader.Upload(context.TODO(), params, func(u *manager.Uploader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
示例
将文件夹上传到亚马逊 S3
以下示例使用该path/filepath
包以递归方式收集文件列表并将其上传到指定的 Amazon S3 存储桶。Amazon S3 对象的密钥以文件的相对路径为前缀。
package main import ( "context" "log" "os" "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) var ( localPath string bucket string prefix string ) func init() { if len(os.Args) != 4 { log.Fatalln("Usage:", os.Args[0], "<local path> <bucket> <prefix>") } localPath = os.Args[1] bucket = os.Args[2] prefix = os.Args[3] } func main() { walker := make(fileWalk) go func() { // Gather the files to upload by walking the path recursively if err := filepath.Walk(localPath, walker.Walk); err != nil { log.Fatalln("Walk failed:", err) } close(walker) }() cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Fatalln("error:", err) } // For each file found walking, upload it to Amazon S3 uploader := manager.NewUploader(s3.NewFromConfig(cfg)) for path := range walker { rel, err := filepath.Rel(localPath, path) if err != nil { log.Fatalln("Unable to get relative path:", path, err) } file, err := os.Open(path) if err != nil { log.Println("Failed opening file", path, err) continue } defer file.Close() result, err := uploader.Upload(context.TODO(), &s3.PutObjectInput{ Bucket: &bucket, Key: aws.String(filepath.Join(prefix, rel)), Body: file, }) if err != nil { log.Fatalln("Failed to upload", path, err) } log.Println("Uploaded", path, result.Location) } } type fileWalk chan string func (f fileWalk) Walk(path string, info os.FileInfo, err error) error { if err != nil { return err } if !info.IsDir() { f <- path } return nil }
下载管理器
Amazon S3 下载器管理器
示例:下载文件
以下示例使用 Amazon Downloader
S3 下载文件。使用Downloader
与 s 3 类似。 GetObject
import "context" import "github.com/aws/aws-sdk-go-v2/aws" import "github.com/aws/aws-sdk-go-v2/config" import "github.com/aws/aws-sdk-go-v2/service/s3" import "github.com/aws/aws-sdk-go-v2/feature/s3/manager" // ... cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Println("error:", err) return } client := s3.NewFromConfig(cfg) downloader := manager.NewDownloader(client) numBytes, err := downloader.Download(context.TODO(), downloadFile, &s3.GetObjectInput{ Bucket: aws.String("
amzn-s3-demo-bucket
"), Key: aws.String("my-key
"), })
downloadFile
参数是一个io.WriterAt
类型。该WriterAt
接口使能够并行写入文件的多个部分。Downloader
配置选项
实例化实例时,您可以指定配置选项以自定义对象的下载方式:Downloader
-
PartSize
— 指定要下载的每个部分的缓冲区大小(以字节为单位)。每个部分的最小大小为 5 MB。 -
Concurrency
— 指定要并行下载的部分数量。
该Concurrency
值限制了给定Download
呼叫可能发生的分段下载的并发数量。这不是全局客户端并发限制。调整PartSize
和Concurrency
配置值以找到最佳配置。例如,具有高带宽连接的系统可以并行接收更大的部分和更多的下载量。
例如,您的应用程序配置Downloader
Concurrency
为。5
然后,您的应用程序Download
从两个不同的 goroutine 调用,结果将是10
并行下载部件(2 个 goroutine * 5)。Concurrency
警告
您的应用程序应限制并发调用次数,Download
以防止应用程序资源耗尽。
有关Downloader
及其其他配置选项的更多信息,请参阅 API 参考中的 Manager.downloader
覆盖每次下载的下载器选项
在调用时,您可以Download
通过向方法提供一个或多个函数参数来覆盖这些Downloader
选项。这些替代是并发安全的修改,不会影响正在进行的上传或对管理器的后续Download
调用。例如,要覆盖特定上传请求的PartSize
配置,请执行以下操作:
params := &s3.GetObjectInput{ Bucket: aws.String("
amzn-s3-demo-bucket
"), Key: aws.String("my-key
"), } resp, err := downloader.Download(context.TODO(), targetWriter, params, func(u *manager.Downloader) { u.PartSize = 10 * 1024 * 1024, // 10 MiB })
示例
下载存储桶中的所有对象
以下示例使用分页从 Amazon S3 存储桶中收集对象列表。然后,它将每个对象下载到本地文件。
package main import ( "context" "fmt" "log" "os" "path/filepath" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" "github.com/aws/aws-sdk-go-v2/service/s3" ) var ( Bucket = "
amzn-s3-demo-bucket
" // Download from this bucket Prefix = "logs/" // Using this key prefix LocalDirectory = "s3logs" // Into this directory ) func main() { cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Fatalln("error:", err) } client := s3.NewFromConfig(cfg) manager := manager.NewDownloader(client) paginator := s3.NewListObjectsV2Paginator(client, &s3.ListObjectsV2Input{ Bucket: &Bucket, Prefix: &Prefix, }) for paginator.HasMorePages() { page, err := paginator.NextPage(context.TODO()) if err != nil { log.Fatalln("error:", err) } for _, obj := range page.Contents { if err := downloadToFile(manager, LocalDirectory, Bucket, aws.ToString(obj.Key)); err != nil { log.Fatalln("error:", err) } } } } func downloadToFile(downloader *manager.Downloader, targetDirectory, bucket, key string) error { // Create the directories in the path file := filepath.Join(targetDirectory, key) if err := os.MkdirAll(filepath.Dir(file), 0775); err != nil { return err } // Set up the local file fd, err := os.Create(file) if err != nil { return err } defer fd.Close() // Download the file using the AWS SDK for Go fmt.Printf("Downloading s3://%s/%s to %s...\n", bucket, key, file) _, err = downloader.Download(context.TODO(), fd, &s3.GetObjectInput{Bucket: &bucket, Key: &key}) return err }
GetBucketRegion
GetBucketRegionGetBucketRegion
获取 Amazon S3 客户端,并使用它来确定请求的存储桶在与该客户配置的区域关联的 AWS 分区中的位置。
例如,要查找存储桶的区域,请执行
以下操作:amzn-s3-demo-bucket
cfg, err := config.LoadDefaultConfig(context.TODO()) if err != nil { log.Println("error:", err) return } bucket := "
amzn-s3-demo-bucket
" region, err := manager.GetBucketRegion(ctx, s3.NewFromConfig(cfg), bucket) if err != nil { var bnf manager.BucketNotFound if errors.As(err, &bnf) { log.Printf("unable to find bucket %s's Region\n", bucket) } else { log.Println("error:", err) } return } fmt.Printf("Bucket %s is in %s region\n", bucket, region)
GetBucketRegion
如果无法解析存储桶的位置,则该函数将返回BucketNotFound
不可搜索的直播输入
对于PutObject
和之类的 API 操作UploadPart
,默认情况下,Amazon S3 客户端需要Body
输入参数的值来实现 io.Seekerio.Seeker
接口来确定要上传的值的长度,并计算请求签名的有效载荷哈希。如果Body
输入参数值未实现io.Seeker
,则您的应用程序将收到错误。
operation error S3: PutObject, failed to compute payload hash: failed to seek body to start, request stream is not seekable
您可以通过中间件使用功能选项修改操作方法来更改此行为。W ith APIOptions
resp, err := client.PutObject(context.TODO(), &s3.PutObjectInput{ Bucket: &bucketName, Key: &objectName, Body: bytes.NewBuffer([]byte(`example object!`)), ContentLength: 15, // length of body }, s3.WithAPIOptions( v4.SwapComputePayloadSHA256ForUnsignedPayloadMiddleware, ))
警告
Amazon S3 要求为上传到存储桶的所有对象提供内容长度。由于Body
输入参数未实现io.Seeker
接口,因此客户端将无法计算请求的ContentLength
参数。参数必须由应用程序提供。如果未提供ContentLength
参数,则请求将失败。
亚马逊 S3 上传管理器对于不可搜索且长度不明的上传,请使用 SDK。