7 #ifndef CPPWAMP_ASIOTRANSPORT_HPP
8 #define CPPWAMP_ASIOTRANSPORT_HPP
19 #include <boost/asio/buffer.hpp>
20 #include <boost/asio/executor.hpp>
21 #include <boost/asio/post.hpp>
22 #include <boost/asio/read.hpp>
23 #include <boost/asio/strand.hpp>
24 #include <boost/asio/write.hpp>
25 #include <boost/system/error_code.hpp>
26 #include "../asiodefs.hpp"
27 #include "../codec.hpp"
28 #include "../error.hpp"
29 #include "../messagebuffer.hpp"
30 #include "../rawsockoptions.hpp"
31 #include "rawsockheader.hpp"
45 using Ptr = std::shared_ptr<AsioFrame>;
46 using Header = uint32_t;
47 using GatherBufs = std::array<boost::asio::const_buffer, 2>;
49 AsioFrame() =
default;
52 : header_(computeHeader(type, payload)),
53 payload_(std::move(payload))
56 void clear() {header_ = 0; payload_.clear();}
58 void resize(
size_t length) {payload_.resize(length);}
62 header_ = computeHeader(type, payload);
63 payload_ = std::move(payload);
66 RawsockHeader header()
const {
return RawsockHeader::fromBigEndian(header_);}
72 GatherBufs gatherBuffers()
74 return GatherBufs{{ {&header_,
sizeof(header_)},
75 {payload_.data(), payload_.size()} }};
78 boost::asio::mutable_buffers_1 headerBuffer()
80 return boost::asio::buffer(&header_,
sizeof(header_));
83 boost::asio::mutable_buffers_1 payloadBuffer()
85 return boost::asio::buffer(&payload_.front(), payload_.size());
89 static Header computeHeader(RawsockMsgType type,
92 return RawsockHeader().setMsgType(type)
93 .setLength(payload.size())
102 template <
typename TSocket>
103 class AsioTransport :
104 public std::enable_shared_from_this<AsioTransport<TSocket>>
107 using Ptr = std::shared_ptr<AsioTransport>;
109 using FailHandler = std::function<void (std::error_code ec)>;
110 using PingHandler = std::function<void (
float)>;
112 using Socket = TSocket;
113 using SocketPtr = std::unique_ptr<Socket>;
115 static Ptr create(SocketPtr&& socket,
size_t maxTxLength,
118 return Ptr(
new AsioTransport(std::move(socket), maxTxLength,
123 AsioTransport(
const AsioTransport&) =
delete;
124 AsioTransport& operator=(
const AsioTransport&) =
delete;
126 virtual ~AsioTransport() {}
128 size_t maxSendLength()
const {
return maxTxLength_;}
130 size_t maxReceiveLength()
const {
return maxRxLength_;}
132 bool isOpen()
const {
return socket_ && socket_->is_open();}
134 bool isStarted()
const {
return started_;}
136 void start(RxHandler rxHandler, FailHandler failHandler)
139 rxHandler_ = rxHandler;
140 failHandler_ = failHandler;
148 auto buf = newFrame(RawsockMsgType::wamp, std::move(message));
149 sendFrame(std::move(buf));
155 rxHandler_ =
nullptr;
160 IoStrand strand()
const {
return strand_;}
165 pingHandler_ = std::move(handler);
166 pingFrame_ = newFrame(RawsockMsgType::ping, std::move(message));
167 sendFrame(pingFrame_);
168 pingStart_ = std::chrono::high_resolution_clock::now();
172 using TransmitQueue = std::deque<AsioFrame::Ptr>;
173 using TimePoint = std::chrono::high_resolution_clock::time_point;
175 AsioTransport(SocketPtr&& socket,
size_t maxTxLength,
size_t maxRxLength)
176 : socket_(std::move(socket)),
177 strand_(boost::asio::make_strand(socket_->get_executor())),
178 maxTxLength_(maxTxLength),
179 maxRxLength_(maxRxLength)
182 AsioFrame::Ptr newFrame(RawsockMsgType type,
MessageBuffer&& payload)
185 return std::make_shared<AsioFrame>(type, std::move(payload));
188 virtual void sendFrame(AsioFrame::Ptr frame)
190 assert(socket_ &&
"Attempting to send on bad transport");
191 assert((frame->payload().size() <= maxTxLength_) &&
192 "Outgoing message is longer than allowed by peer");
193 txQueue_.push_back(std::move(frame));
198 template <
typename TFunctor>
199 void post(TFunctor&& fn)
201 boost::asio::post(strand_, std::forward<TFunctor>(fn));
206 if (isReadyToTransmit())
208 txFrame_ = txQueue_.front();
209 txQueue_.pop_front();
211 auto self = this->shared_from_this();
212 boost::asio::async_write(*socket_, txFrame_->gatherBuffers(),
229 bool isReadyToTransmit()
const
241 auto self = this->shared_from_this();
242 boost::asio::async_read(*socket_, rxFrame_.headerBuffer(),
251 virtual void processHeader()
253 auto hdr = rxFrame_.header();
254 auto length = hdr.length();
258 receivePayload(hdr.msgType(), length);
262 void receivePayload(RawsockMsgType msgType,
size_t length)
264 rxFrame_.resize(length);
265 auto self = this->shared_from_this();
266 boost::asio::async_read(*socket_, rxFrame_.payloadBuffer(),
274 case RawsockMsgType::wamp:
277 post(std::bind(rxHandler_,
278 std::move(rxFrame_).payload()));
283 case RawsockMsgType::ping:
287 case RawsockMsgType::pong:
299 auto frame = newFrame(RawsockMsgType::pong,
300 std::move(rxFrame_).payload());
301 sendFrame(std::move(frame));
307 if (canProcessPong())
309 namespace chrn = std::chrono;
310 pingStop_ = chrn::high_resolution_clock::now();
311 using Fms = chrn::duration<float, chrn::milliseconds::period>;
312 float elapsed = Fms(pingStop_ - pingStart_).count();
313 post(std::bind(pingHandler_, elapsed));
314 pingHandler_ =
nullptr;
320 bool canProcessPong()
const
322 return pingHandler_ && pingFrame_ &&
323 (rxFrame_.payload() == pingFrame_->payload());
332 auto ec = make_error_code(
333 static_cast<std::errc
>(asioEc.value()));
334 post(std::bind(failHandler_, ec));
341 template <
typename TErrc>
342 bool check(
bool condition, TErrc errc)
347 post(std::bind(failHandler_, make_error_code(errc)));
355 rxHandler_ =
nullptr;
356 failHandler_ =
nullptr;
357 pingHandler_ =
nullptr;
361 pingFrame_ =
nullptr;
365 std::unique_ptr<TSocket> socket_;
369 bool started_ =
false;
370 RxHandler rxHandler_;
371 FailHandler failHandler_;
372 PingHandler pingHandler_;
374 TransmitQueue txQueue_;
375 AsioFrame::Ptr txFrame_;
376 AsioFrame::Ptr pingFrame_;
377 TimePoint pingStart_;
385 #endif // CPPWAMP_ASIOTRANSPORT_HPP