CppWAMP
C++11 client library for the WAMP protocol
asiotransport.hpp
1 /*------------------------------------------------------------------------------
2  Copyright Butterfly Energy Systems 2014-2015, 2022.
3  Distributed under the Boost Software License, Version 1.0.
4  http://www.boost.org/LICENSE_1_0.txt
5 ------------------------------------------------------------------------------*/
6 
7 #ifndef CPPWAMP_ASIOTRANSPORT_HPP
8 #define CPPWAMP_ASIOTRANSPORT_HPP
9 
10 #include <array>
11 #include <chrono>
12 #include <cstdint>
13 #include <deque>
14 #include <functional>
15 #include <memory>
16 #include <stdexcept>
17 #include <utility>
18 #include <vector>
19 #include <boost/asio/buffer.hpp>
20 #include <boost/asio/executor.hpp>
21 #include <boost/asio/post.hpp>
22 #include <boost/asio/read.hpp>
23 #include <boost/asio/strand.hpp>
24 #include <boost/asio/write.hpp>
25 #include <boost/system/error_code.hpp>
26 #include "../asiodefs.hpp"
27 #include "../codec.hpp"
28 #include "../error.hpp"
29 #include "../messagebuffer.hpp"
30 #include "../rawsockoptions.hpp"
31 #include "rawsockheader.hpp"
32 
33 namespace wamp
34 {
35 
36 namespace internal
37 {
38 
39 //------------------------------------------------------------------------------
40 // Combines a raw socket transport header with an encoded message payload.
41 //------------------------------------------------------------------------------
42 class AsioFrame
43 {
44 public:
45  using Ptr = std::shared_ptr<AsioFrame>;
46  using Header = uint32_t;
47  using GatherBufs = std::array<boost::asio::const_buffer, 2>;
48 
49  AsioFrame() = default;
50 
51  AsioFrame(RawsockMsgType type, MessageBuffer&& payload)
52  : header_(computeHeader(type, payload)),
53  payload_(std::move(payload))
54  {}
55 
56  void clear() {header_ = 0; payload_.clear();}
57 
58  void resize(size_t length) {payload_.resize(length);}
59 
60  void prepare(RawsockMsgType type, MessageBuffer&& payload)
61  {
62  header_ = computeHeader(type, payload);
63  payload_ = std::move(payload);
64  }
65 
66  RawsockHeader header() const {return RawsockHeader::fromBigEndian(header_);}
67 
68  const MessageBuffer& payload() const & {return payload_;}
69 
70  MessageBuffer&& payload() && {return std::move(payload_);}
71 
72  GatherBufs gatherBuffers()
73  {
74  return GatherBufs{{ {&header_, sizeof(header_)},
75  {payload_.data(), payload_.size()} }};
76  }
77 
78  boost::asio::mutable_buffers_1 headerBuffer()
79  {
80  return boost::asio::buffer(&header_, sizeof(header_));
81  }
82 
83  boost::asio::mutable_buffers_1 payloadBuffer()
84  {
85  return boost::asio::buffer(&payload_.front(), payload_.size());
86  }
87 
88 private:
89  static Header computeHeader(RawsockMsgType type,
90  const MessageBuffer& payload)
91  {
92  return RawsockHeader().setMsgType(type)
93  .setLength(payload.size())
94  .toBigEndian();
95  }
96 
97  Header header_;
98  MessageBuffer payload_;
99 };
100 
101 //------------------------------------------------------------------------------
102 template <typename TSocket>
103 class AsioTransport :
104  public std::enable_shared_from_this<AsioTransport<TSocket>>
105 {
106 public:
107  using Ptr = std::shared_ptr<AsioTransport>;
108  using RxHandler = std::function<void (MessageBuffer)>;
109  using FailHandler = std::function<void (std::error_code ec)>;
110  using PingHandler = std::function<void (float)>;
111 
112  using Socket = TSocket;
113  using SocketPtr = std::unique_ptr<Socket>;
114 
115  static Ptr create(SocketPtr&& socket, size_t maxTxLength,
116  size_t maxRxLength)
117  {
118  return Ptr(new AsioTransport(std::move(socket), maxTxLength,
119  maxRxLength));
120  }
121 
122  // Noncopyable
123  AsioTransport(const AsioTransport&) = delete;
124  AsioTransport& operator=(const AsioTransport&) = delete;
125 
126  virtual ~AsioTransport() {}
127 
128  size_t maxSendLength() const {return maxTxLength_;}
129 
130  size_t maxReceiveLength() const {return maxRxLength_;}
131 
132  bool isOpen() const {return socket_ && socket_->is_open();}
133 
134  bool isStarted() const {return started_;}
135 
136  void start(RxHandler rxHandler, FailHandler failHandler)
137  {
138  assert(!started_);
139  rxHandler_ = rxHandler;
140  failHandler_ = failHandler;
141  receive();
142  started_ = true;
143  }
144 
145  void send(MessageBuffer message)
146  {
147  assert(started_);
148  auto buf = newFrame(RawsockMsgType::wamp, std::move(message));
149  sendFrame(std::move(buf));
150  }
151 
152  void close()
153  {
154  txQueue_.clear();
155  rxHandler_ = nullptr;
156  if (socket_)
157  socket_->close();
158  }
159 
160  IoStrand strand() const {return strand_;}
161 
162  void ping(MessageBuffer message, PingHandler handler)
163  {
164  assert(started_);
165  pingHandler_ = std::move(handler);
166  pingFrame_ = newFrame(RawsockMsgType::ping, std::move(message));
167  sendFrame(pingFrame_);
168  pingStart_ = std::chrono::high_resolution_clock::now();
169  }
170 
171 protected:
172  using TransmitQueue = std::deque<AsioFrame::Ptr>;
173  using TimePoint = std::chrono::high_resolution_clock::time_point;
174 
175  AsioTransport(SocketPtr&& socket, size_t maxTxLength, size_t maxRxLength)
176  : socket_(std::move(socket)),
177  strand_(boost::asio::make_strand(socket_->get_executor())),
178  maxTxLength_(maxTxLength),
179  maxRxLength_(maxRxLength)
180  {}
181 
182  AsioFrame::Ptr newFrame(RawsockMsgType type, MessageBuffer&& payload)
183  {
184  // TODO: Reuse frames somehow
185  return std::make_shared<AsioFrame>(type, std::move(payload));
186  }
187 
188  virtual void sendFrame(AsioFrame::Ptr frame)
189  {
190  assert(socket_ && "Attempting to send on bad transport");
191  assert((frame->payload().size() <= maxTxLength_) &&
192  "Outgoing message is longer than allowed by peer");
193  txQueue_.push_back(std::move(frame));
194  transmit();
195  }
196 
197 private:
198  template <typename TFunctor>
199  void post(TFunctor&& fn)
200  {
201  boost::asio::post(strand_, std::forward<TFunctor>(fn));
202  }
203 
204  void transmit()
205  {
206  if (isReadyToTransmit())
207  {
208  txFrame_ = txQueue_.front();
209  txQueue_.pop_front();
210 
211  auto self = this->shared_from_this();
212  boost::asio::async_write(*socket_, txFrame_->gatherBuffers(),
213  [this, self](AsioErrorCode ec, size_t size)
214  {
215  txFrame_.reset();
216  if (ec)
217  {
218  txQueue_.clear();
219  socket_.reset();
220  }
221  else
222  {
223  transmit();
224  }
225  });
226  }
227  }
228 
229  bool isReadyToTransmit() const
230  {
231  return socket_ && // Socket is still open
232  !txFrame_ && // No async_write is in progress
233  !txQueue_.empty(); // One or more messages are enqueued
234  }
235 
236  void receive()
237  {
238  if (socket_)
239  {
240  rxFrame_.clear();
241  auto self = this->shared_from_this();
242  boost::asio::async_read(*socket_, rxFrame_.headerBuffer(),
243  [this, self](AsioErrorCode ec, size_t)
244  {
245  if (check(ec))
246  processHeader();
247  });
248  }
249  }
250 
251  virtual void processHeader()
252  {
253  auto hdr = rxFrame_.header();
254  auto length = hdr.length();
255  if ( check(length <= maxRxLength_, TransportErrc::badRxLength) &&
256  check(hdr.msgTypeIsValid(), RawsockErrc::badMessageType) )
257  {
258  receivePayload(hdr.msgType(), length);
259  }
260  }
261 
262  void receivePayload(RawsockMsgType msgType, size_t length)
263  {
264  rxFrame_.resize(length);
265  auto self = this->shared_from_this();
266  boost::asio::async_read(*socket_, rxFrame_.payloadBuffer(),
267  [this, self, msgType](AsioErrorCode ec, size_t)
268  {
269  if (ec)
270  rxFrame_.clear();
271  if (check(ec))
272  switch (msgType)
273  {
274  case RawsockMsgType::wamp:
275  if (rxHandler_)
276  {
277  post(std::bind(rxHandler_,
278  std::move(rxFrame_).payload()));
279  }
280  receive();
281  break;
282 
283  case RawsockMsgType::ping:
284  sendPong();
285  break;
286 
287  case RawsockMsgType::pong:
288  receivePong();
289  break;
290 
291  default:
292  assert(false);
293  }
294  });
295  }
296 
297  void sendPong()
298  {
299  auto frame = newFrame(RawsockMsgType::pong,
300  std::move(rxFrame_).payload());
301  sendFrame(std::move(frame));
302  receive();
303  }
304 
305  void receivePong()
306  {
307  if (canProcessPong())
308  {
309  namespace chrn = std::chrono;
310  pingStop_ = chrn::high_resolution_clock::now();
311  using Fms = chrn::duration<float, chrn::milliseconds::period>;
312  float elapsed = Fms(pingStop_ - pingStart_).count();
313  post(std::bind(pingHandler_, elapsed));
314  pingHandler_ = nullptr;
315  }
316  pingFrame_.reset();
317  receive();
318  }
319 
320  bool canProcessPong() const
321  {
322  return pingHandler_ && pingFrame_ &&
323  (rxFrame_.payload() == pingFrame_->payload());
324  }
325 
326  bool check(AsioErrorCode asioEc)
327  {
328  if (asioEc)
329  {
330  if (failHandler_)
331  {
332  auto ec = make_error_code(
333  static_cast<std::errc>(asioEc.value()));
334  post(std::bind(failHandler_, ec));
335  }
336  cleanup();
337  }
338  return !asioEc;
339  }
340 
341  template <typename TErrc>
342  bool check(bool condition, TErrc errc)
343  {
344  if (!condition)
345  {
346  if (failHandler_)
347  post(std::bind(failHandler_, make_error_code(errc)));
348  cleanup();
349  }
350  return condition;
351  }
352 
353  void cleanup()
354  {
355  rxHandler_ = nullptr;
356  failHandler_ = nullptr;
357  pingHandler_ = nullptr;
358  rxFrame_.clear();
359  txQueue_.clear();
360  txFrame_ = nullptr;
361  pingFrame_ = nullptr;
362  socket_.reset();
363  }
364 
365  std::unique_ptr<TSocket> socket_;
366  IoStrand strand_;
367  size_t maxTxLength_;
368  size_t maxRxLength_;
369  bool started_ = false;
370  RxHandler rxHandler_;
371  FailHandler failHandler_;
372  PingHandler pingHandler_;
373  AsioFrame rxFrame_;
374  TransmitQueue txQueue_;
375  AsioFrame::Ptr txFrame_;
376  AsioFrame::Ptr pingFrame_;
377  TimePoint pingStart_;
378  TimePoint pingStop_;
379 };
380 
381 } // namespace internal
382 
383 } // namespace wamp
384 
385 #endif // CPPWAMP_ASIOTRANSPORT_HPP
wamp::TransportErrc::badRxLength
@ badRxLength
Incoming message exceeds maximum length.
wamp::IoStrand
boost::asio::strand< AnyIoExecutor > IoStrand
Serializes I/O operations.
Definition: asiodefs.hpp:41
wamp
Definition: anyhandler.hpp:36
wamp::AsioErrorCode
boost::system::error_code AsioErrorCode
Type used by Boost.Asio for reporting errors.
Definition: asiodefs.hpp:44
wamp::MessageBuffer
std::vector< uint8_t > MessageBuffer
Container type used for encoded WAMP messages that are sent/received over a transport.
Definition: messagebuffer.hpp:25
wamp::RawsockErrc::badMessageType
@ badMessageType
Invalid message type.