使用适用于 C++ 的 AWS SDK 进行异步编程 - 适用于 C++ 的 AWS SDK

使用适用于 C++ 的 AWS SDK 进行异步编程

异步 SDK 方法

对于许多方法,适用于 C++ 的 SDK 同时提供同步和异步版本。如果方法的名称中包含 Async 后缀,则该方法是异步的。例如,Amazon S3 方法 PutObject 是同步的,而 PutObjectAsync 是异步的。

与所有异步操作一样,异步 SDK 方法会在主任务完成之前返回。例如,PutObjectAsync 方法会在将文件上传到 Amazon S3 存储桶之前返回。文件上传操作持续进行期间,应用程序可执行其他操作,包括调用其他异步方法。调用关联的回调函数时,应用程序会收到异步操作已完成的通知。

以下各节描述了一个演示调用 PutObjectAsync 异步方法的代码示例。每节会重点介绍该示例整个源文件中的各个部分。

调用 SDK 异步方法

通常,SDK 方法的异步版本接受以下参数。

  • 对与同步版本对应的同一 Request 类型对象的引用。

  • 对响应处理程序回调函数的引用。当异步操作完成时,将调用此回调函数。其中一个参数包含操作的结果。

  • AsyncCallerContext 对象的可选 shared_ptr。该对象被传递给响应处理程序回调。它包含一个 UUID 属性,可用于将文本信息传递给回调。

下面显示的 uploadFileAsync 方法设置并调用 SDK 的 Amazon S3 PutObjectAsync 方法,以异步方式将文件上传到 Amazon S3 存储桶。

该函数接收对 S3Client 对象和 PutObjectRequest 对象的引用。这些对象由主函数传入,因为我们需要确保它们在异步调用的整个执行期间始终存在。

分配一个指向 AsyncCallerContext 对象的 shared_ptr。其 UUID 属性设置为 Amazon S3 对象名称。出于演示目的,响应处理程序回调会访问该属性并输出其值。

PutObjectAsync 的调用包括响应处理程序回调函数 uploadFileAsyncFinished 的引用参数。下一节将更详细地讨论这一回调函数。

bool AwsDoc::S3::uploadFileAsync(const Aws::S3::S3Client &s3Client, Aws::S3::Model::PutObjectRequest &request, const Aws::String &bucketName, const Aws::String &fileName) { request.SetBucket(bucketName); request.SetKey(fileName); const std::shared_ptr<Aws::IOStream> input_data = Aws::MakeShared<Aws::FStream>("SampleAllocationTag", fileName.c_str(), std::ios_base::in | std::ios_base::binary); if (!*input_data) { std::cerr << "Error: unable to open file " << fileName << std::endl; return false; } request.SetBody(input_data); // Create and configure the context for the asynchronous put object request. std::shared_ptr<Aws::Client::AsyncCallerContext> context = Aws::MakeShared<Aws::Client::AsyncCallerContext>("PutObjectAllocationTag"); context->SetUUID(fileName); // Make the asynchronous put object call. Queue the request into a // thread executor and call the uploadFileAsyncFinished function when the // operation has finished. s3Client.PutObjectAsync(request, uploadFileAsyncFinished, context); return true; }

在操作完成之前,用于异步操作的资源必须存在。例如,在应用程序收到操作完成的通知之前,客户端和请求对象必须存在。在异步操作完成之前,应用程序本身无法终止。

因此,uploadFileAsync 方法接受对 S3ClientPutObjectRequest 对象的引用,而不是在 uploadFileAsync 方法中创建它们并将其存储在局部变量中。

在该示例中,PutObjectAsync 方法启动异步操作后会立即向调用方返回,使得调用链能够在上传操作执行期间执行其他任务。

如果客户端存储在 uploadFileAsync 方法的局部变量中,那么该对象会在方法返回时超出范围。但是,在异步操作完成之前,客户端对象必须继续存在。

异步操作完成通知

异步操作完成后,将调用应用程序响应处理程序回调函数。此通知包括操作结果。操作结果包含在该方法同步版本返回的同一 Outcome 类型类中。在代码示例中,结果位于 PutObjectOutcome 对象中。

下面显示了该示例的响应处理程序回调函数 uploadFileAsyncFinished。它会检查异步操作是成功还是失败。它使用 std::condition_variable 通知应用程序线程异步操作已完成。

// A mutex is a synchronization primitive that can be used to protect shared // data from being simultaneously accessed by multiple threads. std::mutex AwsDoc::S3::upload_mutex; // A condition_variable is a synchronization primitive that can be used to // block a thread, or to block multiple threads at the same time. // The thread is blocked until another thread both modifies a shared // variable (the condition) and notifies the condition_variable. std::condition_variable AwsDoc::S3::upload_variable;
void uploadFileAsyncFinished(const Aws::S3::S3Client *s3Client, const Aws::S3::Model::PutObjectRequest &request, const Aws::S3::Model::PutObjectOutcome &outcome, const std::shared_ptr<const Aws::Client::AsyncCallerContext> &context) { if (outcome.IsSuccess()) { std::cout << "Success: uploadFileAsyncFinished: Finished uploading '" << context->GetUUID() << "'." << std::endl; } else { std::cerr << "Error: uploadFileAsyncFinished: " << outcome.GetError().GetMessage() << std::endl; } // Unblock the thread that is waiting for this function to complete. AwsDoc::S3::upload_variable.notify_one(); }

异步操作完成后,可以释放与其关联的资源。如果愿意,应用程序也可以终止这些资源。

以下代码演示了应用程序如何使用 uploadFileAsyncuploadFileAsyncFinished 方法。

应用程序分配 S3ClientPutObjectRequest 对象,以便它们在异步操作完成之前持续存在。调用 uploadFileAsync 后,应用程序可以执行它想要执行的任何操作。为了简单起见,该示例使用 std::mutexstd::condition_variable 进行等待,直至响应处理程序回调通知上传操作已完成。

int main(int argc, char* argv[]) { if (argc != 3) { std::cout << R"( Usage: run_put_object_async <file_name> <bucket_name> Where: file_name - The name of the file to upload. bucket_name - The name of the bucket to upload the object to. )" << std::endl; return 1; } const Aws::SDKOptions options; Aws::InitAPI(options); { const Aws::String fileName = argv[1]; const Aws::String bucketName = argv[2]; // A unique_lock is a general-purpose mutex ownership wrapper allowing // deferred locking, time-constrained attempts at locking, recursive // locking, transfer of lock ownership, and use with // condition variables. std::unique_lock<std::mutex> lock(AwsDoc::S3::upload_mutex); // Create and configure the Amazon S3 client. // This client must be declared here, as this client must exist // until the put object operation finishes. const Aws::S3::S3ClientConfiguration config; // Optional: Set to the AWS Region in which the bucket was created (overrides config file). // config.region = "us-east-1"; const Aws::S3::S3Client s3Client(config); // Create the request object. // This request object must be declared here, because the object must exist // until the put object operation finishes. Aws::S3::Model::PutObjectRequest request; AwsDoc::S3::uploadFileAsync(s3Client, request, bucketName, fileName); std::cout << "main: Waiting for file upload attempt..." << std::endl << std::endl; // While the put object operation attempt is in progress, // you can perform other tasks. // This example simply blocks until the put object operation // attempt finishes. AwsDoc::S3::upload_variable.wait(lock); std::cout << std::endl << "main: File upload attempt completed." << std::endl; } Aws::ShutdownAPI(options); return 0; }

请参阅 Github 上的完整示例。