CppWAMP
C++11 client library for the WAMP protocol
peer.hpp
1 /*------------------------------------------------------------------------------
2  Copyright Butterfly Energy Systems 2014-2015, 2022.
3  Distributed under the Boost Software License, Version 1.0.
4  http://www.boost.org/LICENSE_1_0.txt
5 ------------------------------------------------------------------------------*/
6 
7 #ifndef CPPWAMP_INTERNAL_DIALOGUE_HPP
8 #define CPPWAMP_INTERNAL_DIALOGUE_HPP
9 
10 #include <atomic>
11 #include <cassert>
12 #include <functional>
13 #include <map>
14 #include <memory>
15 #include <sstream>
16 #include <string>
17 #include <utility>
18 #include <boost/asio/strand.hpp>
19 #include "../anyhandler.hpp"
20 #include "../codec.hpp"
21 #include "../peerdata.hpp"
22 #include "../error.hpp"
23 #include "../variant.hpp"
24 #include "../wampdefs.hpp"
25 #include "wampmessage.hpp"
26 
27 namespace wamp
28 {
29 
30 namespace internal
31 {
32 
33 //------------------------------------------------------------------------------
34 // Base class providing session functionality common to both clients and
35 // router peers. This class is extended by Client to implement a client session.
36 //------------------------------------------------------------------------------
37 template <typename TCodec, typename TTransport>
38 class Peer : public std::enable_shared_from_this<Peer<TCodec, TTransport>>
39 {
40 public:
41  using Codec = TCodec;
42  using Transport = TTransport;
43  using TransportPtr = std::shared_ptr<Transport>;
44  using State = SessionState;
45 
46  State state() const {return state_.load();}
47 
48  IoStrand strand() const {return strand_;}
49 
50 protected:
51  using Message = WampMessage;
52  using OneShotHandler =
53  AnyCompletionHandler<void (std::error_code, Message)>;
54  using MultiShotHandler = std::function<void (std::error_code, Message)>;
55  using LogHandler = AnyReusableHandler<void (std::string)>;
56  using StateChangeHandler = AnyReusableHandler<void (State)>;
57 
58  explicit Peer(TransportPtr&& transport)
59  : strand_(transport->strand()),
60  transport_(std::move(transport)),
61  state_(State::closed)
62  {}
63 
64  virtual bool isMsgSupported(const MessageTraits& traits) = 0;
65 
66  virtual void onInbound(Message msg) = 0;
67 
68  const AnyIoExecutor& userExecutor() const {return userExecutor_;}
69 
70  void start()
71  {
72  assert(state() == State::closed || state() == State::disconnected);
73  setState(State::establishing);
74 
75  if (!transport_->isStarted())
76  {
77  std::weak_ptr<Peer> self(this->shared_from_this());
78 
79  transport_->start(
80  [self](MessageBuffer buffer)
81  {
82  auto me = self.lock();
83  if (me)
84  me->onTransportRx(std::move(buffer));
85  },
86  [self](std::error_code ec)
87  {
88  auto me = self.lock();
89  if (me)
90  me->checkError(ec);
91  }
92  );
93  }
94  }
95 
96  void adjourn(Reason& reason, OneShotHandler&& handler)
97  {
98  struct Requested
99  {
100  std::shared_ptr<Peer> self;
101  OneShotHandler handler;
102 
103  void operator()(std::error_code ec, Message reply)
104  {
105  auto& me = *self;
106  if (!ec)
107  {
108  me.setState(State::closed);
109  me.abortPending(make_error_code(SessionErrc::sessionEnded));
110  me.post(std::move(handler),
111  make_error_code(ProtocolErrc::success),
112  std::move(reply));
113  }
114  else
115  me.post(move(handler), ec, Message());
116  }
117  };
118 
119  using std::move;
120  assert(state() == State::established);
121  setState(State::shuttingDown);
122  auto self = this->shared_from_this();
123  request(reason.message({}),
124  Requested{this->shared_from_this(), std::move(handler)});
125  }
126 
127  void close(bool terminating)
128  {
129  setState(State::disconnected);
130  abortPending(make_error_code(SessionErrc::sessionEnded), terminating);
131  transport_->close();
132  }
133 
134  void send(Message& msg)
135  {
136  sendMessage(msg);
137  }
138 
139  void sendError(WampMsgType reqType, RequestId reqId, Error&& error)
140  {
141  send(error.errorMessage({}, reqType, reqId));
142  }
143 
144  RequestId request(Message& msg, OneShotHandler&& handler)
145  {
146  return sendRequest(msg, oneShotRequestMap_, std::move(handler));
147  }
148 
149  RequestId ongoingRequest(Message& msg, MultiShotHandler&& handler)
150  {
151  return sendRequest(msg, multiShotRequestMap_, std::move(handler));
152  }
153 
154  void cancelCall(CallCancellation&& cancellation)
155  {
156  // If the cancel mode is not 'kill', don't wait for the router's
157  // ERROR message and post the request handler immediately
158  // with a SessionErrc::cancelled error code.
159  RequestKey key{WampMsgType::call, cancellation.requestId()};
160  if (cancellation.mode() != CallCancelMode::kill)
161  {
162  auto kv = oneShotRequestMap_.find(key);
163  if (kv != oneShotRequestMap_.end())
164  {
165  auto handler = std::move(kv->second);
166  oneShotRequestMap_.erase(kv);
167  auto ec = make_error_code(SessionErrc::cancelled);
168  post(std::move(handler), ec, Message());
169  }
170  else
171  {
172  auto kv = multiShotRequestMap_.find(key);
173  if (kv != multiShotRequestMap_.end())
174  {
175  auto handler = std::move(kv->second);
176  multiShotRequestMap_.erase(kv);
177  auto ec = make_error_code(SessionErrc::cancelled);
178  post(std::move(handler), ec, Message());
179  }
180  }
181  }
182 
183  // Always send the CANCEL message in all modes.
184  sendMessage(cancellation.message({}));
185  }
186 
187  template <typename TErrorValue>
188  void fail(TErrorValue errc)
189  {
190  fail(make_error_code(errc));
191  }
192 
193  void setUserExecutor(AnyIoExecutor exec)
194  {
195  userExecutor_ = std::move(exec);
196  }
197 
198  void setTraceHandler(LogHandler handler)
199  {
200  traceHandler_ = std::move(handler);
201  }
202 
203  void setStateChangeHandler(StateChangeHandler handler)
204  {
205  stateChangeHandler_ = std::move(handler);
206  }
207 
208  template <typename TFunctor, typename... TArgs>
209  void post(TFunctor&& fn, TArgs&&... args)
210  {
211  boost::asio::post(strand_,
212  std::bind(std::forward<TFunctor>(fn),
213  std::forward<TArgs>(args)...));
214  }
215 
216 private:
217  static constexpr unsigned progressiveResponseFlag_ = 0x01;
218 
219  using RequestKey = typename Message::RequestKey;
220  using OneShotRequestMap = std::map<RequestKey, OneShotHandler>;
221  using MultiShotRequestMap = std::map<RequestKey, MultiShotHandler>;
222  using EncoderType = Encoder<Codec, MessageBuffer>;
223  using DecoderType = Decoder<Codec, MessageBuffer>;
224 
225  void setState(State s)
226  {
227  auto old = state_.exchange(s);
228  if (old != s && stateChangeHandler_)
229  postVia(userExecutor_, stateChangeHandler_, s);
230  }
231 
232  RequestId sendMessage(Message& msg)
233  {
234  assert(msg.type() != WampMsgType::none);
235 
236  auto requestId = setMessageRequestId(msg);
237 
238  MessageBuffer buffer;
239  encoder_.encode(msg.fields(), buffer);
240  if (buffer.size() > transport_->maxSendLength())
241  throw error::Failure(make_error_code(TransportErrc::badTxLength));
242 
243  trace(msg, true);
244  transport_->send(std::move(buffer));
245  return requestId;
246  }
247 
248  template <typename TRequestMap, typename THandler>
249  RequestId sendRequest(Message& msg, TRequestMap& requests,
250  THandler&& handler)
251  {
252  assert(msg.type() != WampMsgType::none);
253 
254  auto requestId = setMessageRequestId(msg);
255  MessageBuffer buffer;
256  encoder_.encode(msg.fields(), buffer);
257  if (buffer.size() > transport_->maxSendLength())
258  throw error::Failure(make_error_code(TransportErrc::badTxLength));
259 
260  auto key = msg.requestKey();
261  auto found = requests.find(key);
262  if (found != requests.end())
263  {
264  post(std::move(found->second),
265  make_error_code(SessionErrc::cancelled), Message());
266  requests.erase(found);
267  }
268 
269  requests.emplace(msg.requestKey(), std::move(handler));
270 
271  trace(msg, true);
272  transport_->send(std::move(buffer));
273  return requestId;
274  }
275 
276  RequestId setMessageRequestId(Message& msg)
277  {
278  RequestId requestId = nullRequestId();
279 
280  if (msg.hasRequestId())
281  {
282  requestId = nextRequestId();
283  msg.setRequestId(requestId);
284  }
285 
286  return requestId;
287  }
288 
289  RequestId nextRequestId()
290  {
291  if (nextRequestId_ >= maxRequestId_)
292  nextRequestId_ = nullRequestId();
293  return ++nextRequestId_;
294  }
295 
296  void onTransportRx(MessageBuffer buffer)
297  {
298  auto s = state();
299  if (s == State::establishing || s == State::authenticating ||
300  s == State::established || s == State::shuttingDown)
301  {
302  Variant v;
303  if (checkError(decode(buffer, v)) &&
304  check(v.is<Array>(), ProtocolErrc::badSchema))
305  {
306  std::error_code ec;
307  auto msg = Message::parse(std::move(v.as<Array>()), ec);
308  if (checkError(ec))
309  {
310  trace(msg, false);
311  if (checkValidMsg(msg.type()))
312  processMessage(std::move(msg));
313  }
314  }
315  }
316  }
317 
318  std::error_code decode(const MessageBuffer& buffer, Variant& variant)
319  {
320  return decoder_.decode(buffer, variant);
321  }
322 
323  void processMessage(Message&& msg)
324  {
325  if (msg.repliesTo() != WampMsgType::none)
326  {
327  processWampReply( RequestKey(msg.repliesTo(), msg.requestId()),
328  std::move(msg) );
329  }
330  else switch(msg.type())
331  {
332  case WampMsgType::hello:
333  processHello(std::move(msg));
334  break;
335 
336  case WampMsgType::challenge:
337  processChallenge(std::move(msg));
338  break;
339 
340  case WampMsgType::welcome:
341  processWelcome(std::move(msg));
342  break;
343 
344  case WampMsgType::abort:
345  processAbort(std::move(msg));
346  break;
347 
348  case WampMsgType::goodbye:
349  processGoodbye(std::move(msg));
350  break;
351 
352  case WampMsgType::error:
353  processWampReply(msg.requestKey(), std::move(msg));
354  break;
355 
356  default:
357  // Role-specific unsolicited messages. Ignore them if we're
358  // shutting down.
359  if (state() != State::shuttingDown)
360  {
361  auto self = this->shared_from_this();
362  post(&Peer::onInbound, self, std::move(msg));
363  }
364  break;
365  }
366  }
367 
368  void processWampReply(const RequestKey& key, Message&& msg)
369  {
370  auto kv = oneShotRequestMap_.find(key);
371  if (kv != oneShotRequestMap_.end())
372  {
373  auto handler = std::move(kv->second);
374  oneShotRequestMap_.erase(kv);
375  post(std::move(handler), make_error_code(ProtocolErrc::success),
376  std::move(msg));
377  }
378  else
379  {
380  auto kv = multiShotRequestMap_.find(key);
381  if (kv != multiShotRequestMap_.end())
382  {
383  if (msg.isProgressiveResponse())
384  {
385  post(kv->second,
386  make_error_code(ProtocolErrc::success), std::move(msg));
387  }
388  else
389  {
390  auto handler = std::move(kv->second);
391  multiShotRequestMap_.erase(kv);
392  post(std::move(handler),
393  make_error_code(ProtocolErrc::success), std::move(msg));
394  }
395  }
396  }
397  }
398 
399  void processHello(Message&& msg)
400  {
401  assert(state() == State::establishing);
402  setState(State::established);
403  auto self = this->shared_from_this();
404  post(&Peer::onInbound, self, std::move(msg));
405  }
406 
407  void processChallenge(Message&& msg)
408  {
409  assert(state() == State::establishing);
410  setState(State::authenticating);
411  auto self = this->shared_from_this();
412  post(&Peer::onInbound, self, std::move(msg));
413  }
414 
415  void processWelcome(Message&& msg)
416  {
417  assert((state() == State::establishing) ||
418  (state() == State::authenticating));
419  setState(State::established);
420  processWampReply(RequestKey(WampMsgType::hello, 0), std::move(msg));
421  }
422 
423  void processAbort(Message&& msg)
424  {
425  assert((state() == State::establishing) ||
426  (state() == State::authenticating));
427  setState(State::closed);
428  processWampReply(RequestKey(WampMsgType::hello, 0), std::move(msg));
429  }
430 
431  void processGoodbye(Message&& msg)
432  {
433  if (state() == State::shuttingDown)
434  {
435  setState(State::closed);
436  processWampReply(msg.requestKey(), std::move(msg));
437  }
438  else
439  {
440  const auto& reason = message_cast<GoodbyeMessage>(msg).reasonUri();
441  SessionErrc errc;
443  abortPending(make_error_code(errc));
444  GoodbyeMessage msg("wamp.error.goodbye_and_out");
445  send(msg);
446  setState(State::closed);
447  }
448  }
449 
450  bool checkError(std::error_code ec)
451  {
452  bool success = !ec;
453  if (!success)
454  fail(ec);
455  return success;
456  }
457 
458  bool check(bool condition, ProtocolErrc errc)
459  {
460  if (!condition)
461  fail(errc);
462  return condition;
463  }
464 
465  bool checkValidMsg(WampMsgType type)
466  {
467  auto traits = MessageTraits::lookup(type);
468  bool valid = isMsgSupported(traits);
469 
470  if (!valid)
472  else
473  {
474  switch (state())
475  {
476  case State::establishing:
477  valid = traits.forEstablishing;
478  break;
479 
480  case State::authenticating:
481  valid = traits.forChallenging;
482  break;
483 
484  case State::established:
485  case State::shuttingDown:
486  valid = traits.forEstablished;
487  break;
488 
489  default:
490  valid = false;
491  break;
492  }
493 
494  if (!valid)
496  }
497 
498  return valid;
499  }
500 
501  void fail(std::error_code ec)
502  {
503  setState(State::failed);
504  abortPending(ec);
505  transport_->close();
506  }
507 
508  void abortPending(std::error_code ec, bool terminating = false)
509  {
510  if (!terminating)
511  {
512  for (auto& kv: oneShotRequestMap_)
513  post(std::move(kv.second), ec, Message());
514  for (auto& kv: multiShotRequestMap_)
515  post(std::move(kv.second), ec, Message());
516  }
517  oneShotRequestMap_.clear();
518  multiShotRequestMap_.clear();
519  }
520 
521  void trace(const Message& msg, bool isTx)
522  {
523  if (traceHandler_)
524  {
525  std::ostringstream oss;
526  oss << (isTx ? "Tx" : "Rx") << " message: [";
527  if (!msg.fields().empty())
528  {
529  // Print message type field as {"NAME":<Field>} pair
530  oss << "{\"" << msg.nameOr("INVALID") << "\":"
531  << msg.fields().at(0) << "}";
532 
533  for (Array::size_type i=1; i<msg.fields().size(); ++i)
534  {
535  oss << "," << msg.fields().at(i);
536  }
537  }
538  oss << ']';
539  dispatchVia(userExecutor_, traceHandler_, oss.str());
540  }
541  }
542 
543  IoStrand strand_;
544  AnyIoExecutor userExecutor_;
545  TransportPtr transport_;
546  EncoderType encoder_;
547  DecoderType decoder_;
548  LogHandler traceHandler_;
549  StateChangeHandler stateChangeHandler_;
550  std::atomic<State> state_;
551  OneShotRequestMap oneShotRequestMap_;
552  MultiShotRequestMap multiShotRequestMap_;
553  RequestId nextRequestId_ = nullRequestId();
554 
555  static constexpr RequestId maxRequestId_ = 9007199254740992ull;
556 };
557 
558 } // namespace internal
559 
560 } // namespace wamp
561 
562 #endif // CPPWAMP_INTERNAL_DIALOGUE_HPP
wamp::dispatchVia
void dispatchVia(const E &fallbackExec, F &&handler, Ts &&... args)
Dispatches the given handler using the given fallback executor, passing the given arguments.
Definition: anyhandler.hpp:303
wamp::RequestId
int64_t RequestId
Ephemeral ID associated with a WAMP request.
Definition: wampdefs.hpp:23
wamp::ProtocolErrc::unexpectedMsg
@ unexpectedMsg
Received unexpected WAMP message.
wamp::lookupWampErrorUri
bool lookupWampErrorUri(const std::string &uri, SessionErrc fallback, SessionErrc &result)
Definition: error.ipp:238
wamp::AnyIoExecutor
boost::asio::any_io_executor AnyIoExecutor
Polymorphic executor for all I/O objects.
Definition: asiodefs.hpp:27
wamp::TransportErrc::badTxLength
@ badTxLength
Outgoing message exceeds maximum length.
wamp::SessionState::closed
@ closed
Transport connected, but WAMP session is closed.
wamp::ProtocolErrc::unsupportedMsg
@ unsupportedMsg
Received unsupported WAMP message.
wamp::ProtocolErrc
ProtocolErrc
Error code values used with the ProtocolCategory error category.
Definition: error.hpp:308
wamp::SessionErrc::cancelled
@ cancelled
A previously issued call was cancelled.
wamp::IoStrand
boost::asio::strand< AnyIoExecutor > IoStrand
Serializes I/O operations.
Definition: asiodefs.hpp:41
wamp::ProtocolErrc::badSchema
@ badSchema
Invalid WAMP message schema.
wamp
Definition: anyhandler.hpp:36
wamp::ProtocolErrc::success
@ success
Operation successful.
wamp::SessionErrc::success
@ success
Operation successful.
wamp::MessageBuffer
std::vector< uint8_t > MessageBuffer
Container type used for encoded WAMP messages that are sent/received over a transport.
Definition: messagebuffer.hpp:25
wamp::SessionErrc::sessionEnded
@ sessionEnded
Operation aborted; session ended by this peer.
wamp::Array
std::vector< Variant > Array
Variant bound type for arrays of variants.
Definition: variantdefs.hpp:51
wamp::SessionErrc::closeRealm
@ closeRealm
The other peer is leaving the realm.
wamp::decode
std::error_code decode(TInput &&input, Variant &variant)
Decodes from the given byte sequence or stream to the given variant.
Definition: codec.hpp:172
wamp::SessionState
SessionState
Enumerates the possible states that a client or router session can be in.
Definition: wampdefs.hpp:34
wamp::SessionErrc
SessionErrc
Error code values used with the SessionCategory error category.
Definition: error.hpp:142
wamp::CallCancelMode::kill
@ kill
INTERRUPT sent to callee; RESULT or ERROR returned, depending on callee.
wamp::postVia
void postVia(const E &fallbackExec, F &&handler, Ts &&... args)
Posts the given handler using the given fallback executor, passing the given arguments.
Definition: anyhandler.hpp:327
wamp::AnyCompletionHandler
boost::asio::any_completion_handler< TSignature > AnyCompletionHandler
Type-erases a one-shot (and possibly move-only) asynchronous completion handler.
Definition: anyhandler.hpp:55