7#include <aws/core/Core_EXPORTS.h>
8#include <aws/core/NoResult.h>
9#include <aws/core/client/AsyncCallerContext.h>
10#include <aws/core/client/CoreErrors.h>
11#include <aws/core/utils/DNS.h>
12#include <aws/core/utils/RAIICounter.h>
13#include <aws/core/utils/event/EventDecoderStream.h>
14#include <aws/core/utils/event/EventEncoderStream.h>
15#include <aws/core/utils/logging/LogMacros.h>
16#include <aws/core/utils/stream/HttpWriteDataStreamBuf.h>
17#include <aws/core/utils/threading/Executor.h>
42 typename EncoderStreamT,
44 typename StreamReadyHandlerT>
46 const ClientT* client,
48 std::shared_ptr<RequestT> requestCopy,
49 std::shared_ptr<EncoderStreamT> eventEncoderStream,
50 std::shared_ptr<Aws::Utils::Stream::HttpWriteDataStreamBuf> writeDataStreamBuf,
51 const std::shared_ptr<Aws::Http::HttpRequest>& httpRequest,
53 const StreamReadyHandlerT& streamReadyHandler,
54 const HandlerT& handler,
55 const std::shared_ptr<const Aws::Client::AsyncCallerContext>& handlerContext)
57 const char* allocationTag = ClientT::GetAllocationTag();
61 handler(client, *requestCopy,
63 "Invalid DNS Label found in URI host",
false)),
69 std::weak_ptr<RequestT> wReq = requestCopy;
70 requestCopy->SetEventStreamHandler(requestCopy->GetEventStreamHandler());
73 httpRequest->SetHeadersReceivedEventHandler(
75 auto req = wReq.lock();
76 if (!req || !response)
return;
77 auto& cb = req->GetEventStreamHandler().GetInitialResponseCallbackEx();
84 auto responseStreamFactory = [wReq, allocationTag]() ->
Aws::IOStream* {
85 auto req = wReq.lock();
86 if (!req)
return nullptr;
87 req->GetEventStreamDecoder().Reset();
88 return Aws::New<Aws::Utils::Event::EventDecoderStream>(allocationTag, req->GetEventStreamDecoder());
90 requestCopy->SetResponseStreamFactory(responseStreamFactory);
91 httpRequest->SetResponseStreamFactory(responseStreamFactory);
94 auto initError = writeDataStreamBuf->Initialize(httpRequest);
95 if (initError.has_value()) {
96 handler(client, request, OutcomeT(initError.value()), handlerContext);
102 [client, requestCopy, eventEncoderStream, handler, handlerContext, writeDataStreamBuf, allocationTag]() {
103 if (!client->m_isInitialized) {
104 AWS_LOGSTREAM_ERROR(allocationTag,
"Client is not initialized or already terminated");
105 handler(client, *requestCopy,
106 OutcomeT(Aws::Client::AWSError<CoreErrors>(
107 CoreErrors::NOT_INITIALIZED,
"NOT_INITIALIZED",
108 "Client is not initialized or already terminated", false)),
114 writeDataStreamBuf->WaitForStreamComplete();
115 auto response = writeDataStreamBuf->GetResponse();
123 eventEncoderStream->Close();
124 handler(client, *requestCopy,
129 handler(client, *requestCopy, OutcomeT(
Aws::NoResult()), handlerContext);
133 streamReadyHandler(*eventEncoderStream);
virtual HeaderValueCollection GetHeaders() const =0
virtual Aws::IOStream & GetResponseBody() const =0
Aws::Client::CoreErrors GetClientErrorType()
const Aws::String & GetClientErrorMessage() const
bool HasClientError() const
bool Submit(Fn &&fn, Args &&... args)
void SubmitBidirectionalStreamingRequest(const ClientT *client, RequestT &request, std::shared_ptr< RequestT > requestCopy, std::shared_ptr< EncoderStreamT > eventEncoderStream, std::shared_ptr< Aws::Utils::Stream::HttpWriteDataStreamBuf > writeDataStreamBuf, const std::shared_ptr< Aws::Http::HttpRequest > &httpRequest, Aws::Utils::Threading::Executor *executor, const StreamReadyHandlerT &streamReadyHandler, const HandlerT &handler, const std::shared_ptr< const Aws::Client::AsyncCallerContext > &handlerContext)
AWS_CORE_API bool IsValidHost(const Aws::String &host)
std::basic_iostream< char, std::char_traits< char > > IOStream