AWS SDK for C++

AWS SDK for C++ Version 1.11.822

Loading...
Searching...
No Matches
AWSClientBidirectionalStreaming.h
1
5#pragma once
6
7#include <aws/core/Core_EXPORTS.h>
8#include <aws/core/NoResult.h>
9#include <aws/core/client/AsyncCallerContext.h>
10#include <aws/core/client/CoreErrors.h>
11#include <aws/core/utils/DNS.h>
12#include <aws/core/utils/RAIICounter.h>
13#include <aws/core/utils/event/EventDecoderStream.h>
14#include <aws/core/utils/event/EventEncoderStream.h>
15#include <aws/core/utils/logging/LogMacros.h>
16#include <aws/core/utils/stream/HttpWriteDataStreamBuf.h>
17#include <aws/core/utils/threading/Executor.h>
18
19namespace Aws {
20namespace Client {
21
38template <
39 typename ClientT,
40 typename OutcomeT,
41 typename RequestT,
42 typename EncoderStreamT,
43 typename HandlerT,
44 typename StreamReadyHandlerT>
46 const ClientT* client,
47 RequestT& request,
48 std::shared_ptr<RequestT> requestCopy,
49 std::shared_ptr<EncoderStreamT> eventEncoderStream,
50 std::shared_ptr<Aws::Utils::Stream::HttpWriteDataStreamBuf> writeDataStreamBuf,
51 const std::shared_ptr<Aws::Http::HttpRequest>& httpRequest,
53 const StreamReadyHandlerT& streamReadyHandler,
54 const HandlerT& handler,
55 const std::shared_ptr<const Aws::Client::AsyncCallerContext>& handlerContext)
56{
57 const char* allocationTag = ClientT::GetAllocationTag();
58
59 // Validate host name
60 if (!Aws::Utils::IsValidHost(httpRequest->GetUri().GetHost())) {
61 handler(client, *requestCopy,
63 "Invalid DNS Label found in URI host", false)),
64 handlerContext);
65 return;
66 }
67
68 // Fix decoder pointer after copy construction
69 std::weak_ptr<RequestT> wReq = requestCopy;
70 requestCopy->SetEventStreamHandler(requestCopy->GetEventStreamHandler());
71
72 // Wire initial response handler on httpRequest (CRT reads it from there)
73 httpRequest->SetHeadersReceivedEventHandler(
74 [wReq](const Aws::Http::HttpRequest*, Aws::Http::HttpResponse* response) {
75 auto req = wReq.lock();
76 if (!req || !response) return;
77 auto& cb = req->GetEventStreamHandler().GetInitialResponseCallbackEx();
78 if (cb) {
80 }
81 });
82
83 // Wire response stream factory (weak_ptr breaks reference cycle)
84 auto responseStreamFactory = [wReq, allocationTag]() -> Aws::IOStream* {
85 auto req = wReq.lock();
86 if (!req) return nullptr;
87 req->GetEventStreamDecoder().Reset();
88 return Aws::New<Aws::Utils::Event::EventDecoderStream>(allocationTag, req->GetEventStreamDecoder());
89 };
90 requestCopy->SetResponseStreamFactory(responseStreamFactory);
91 httpRequest->SetResponseStreamFactory(responseStreamFactory);
92
93 // Initialize the HTTP/2 connection
94 auto initError = writeDataStreamBuf->Initialize(httpRequest);
95 if (initError.has_value()) {
96 handler(client, request, OutcomeT(initError.value()), handlerContext);
97 return;
98 }
99
100 // Submit executor task — waits for stream completion, then invokes handler
101 executor->Submit(
102 [client, requestCopy, eventEncoderStream, handler, handlerContext, writeDataStreamBuf, allocationTag]() {
103 if (!client->m_isInitialized) {
104 AWS_LOGSTREAM_ERROR(allocationTag, "Client is not initialized or already terminated");
105 handler(client, *requestCopy,
106 OutcomeT(Aws::Client::AWSError<CoreErrors>(
107 CoreErrors::NOT_INITIALIZED, "NOT_INITIALIZED",
108 "Client is not initialized or already terminated", false)),
109 handlerContext);
110 return;
111 }
112 Aws::Utils::RAIICounter raiiGuard(client->m_operationsProcessed, &client->m_shutdownSignal);
113
114 writeDataStreamBuf->WaitForStreamComplete();
115 auto response = writeDataStreamBuf->GetResponse();
116
117 // Flush any remaining buffered response data through the EventDecoderStream
118 if (response) {
119 response->GetResponseBody().flush();
120 }
121
122 if (response && response->HasClientError()) {
123 eventEncoderStream->Close();
124 handler(client, *requestCopy,
126 response->GetClientErrorType(), "", response->GetClientErrorMessage(), false)),
127 handlerContext);
128 } else {
129 handler(client, *requestCopy, OutcomeT(Aws::NoResult()), handlerContext);
130 }
131 });
132
133 streamReadyHandler(*eventEncoderStream);
134}
135
136} // namespace Client
137} // namespace Aws
virtual HeaderValueCollection GetHeaders() const =0
virtual Aws::IOStream & GetResponseBody() const =0
Aws::Client::CoreErrors GetClientErrorType()
const Aws::String & GetClientErrorMessage() const
bool HasClientError() const
bool Submit(Fn &&fn, Args &&... args)
Definition Executor.h:33
void SubmitBidirectionalStreamingRequest(const ClientT *client, RequestT &request, std::shared_ptr< RequestT > requestCopy, std::shared_ptr< EncoderStreamT > eventEncoderStream, std::shared_ptr< Aws::Utils::Stream::HttpWriteDataStreamBuf > writeDataStreamBuf, const std::shared_ptr< Aws::Http::HttpRequest > &httpRequest, Aws::Utils::Threading::Executor *executor, const StreamReadyHandlerT &streamReadyHandler, const HandlerT &handler, const std::shared_ptr< const Aws::Client::AsyncCallerContext > &handlerContext)
AWS_CORE_API bool IsValidHost(const Aws::String &host)
std::basic_iostream< char, std::char_traits< char > > IOStream