CppWAMP
C++11 client library for the WAMP protocol
client.hpp
1 /*------------------------------------------------------------------------------
2  Copyright Butterfly Energy Systems 2014-2015, 2022.
3  Distributed under the Boost Software License, Version 1.0.
4  http://www.boost.org/LICENSE_1_0.txt
5 ------------------------------------------------------------------------------*/
6 
7 #ifndef CPPWAMP_INTERNAL_CLIENT_HPP
8 #define CPPWAMP_INTERNAL_CLIENT_HPP
9 
10 #include <cassert>
11 #include <map>
12 #include <memory>
13 #include <sstream>
14 #include <string>
15 #include <system_error>
16 #include <type_traits>
17 #include <utility>
18 #include <boost/asio/post.hpp>
19 #include "../anyhandler.hpp"
20 #include "../registration.hpp"
21 #include "../subscription.hpp"
22 #include "../version.hpp"
23 #include "callertimeout.hpp"
24 #include "clientinterface.hpp"
25 #include "peer.hpp"
26 
27 namespace wamp
28 {
29 
30 namespace internal
31 {
32 
33 //------------------------------------------------------------------------------
34 // Provides the implementation of the wamp::Session class.
35 //------------------------------------------------------------------------------
36 template <typename TCodec, typename TTransport>
37 class Client : public ClientInterface, public Peer<TCodec, TTransport>
38 {
39 public:
40  using Ptr = std::shared_ptr<Client>;
41  using WeakPtr = std::weak_ptr<Client>;
42  using Codec = TCodec;
43  using Transport = TTransport;
44  using TransportPtr = std::shared_ptr<Transport>;
45  using State = SessionState;
46 
47  template <typename TValue>
48  using CompletionHandler = AnyCompletionHandler<void(ErrorOr<TValue>)>;
49 
50  static Ptr create(TransportPtr&& transport)
51  {
52  return Ptr(new Client(std::move(transport)));
53  }
54 
55  ~Client() override {terminate();}
56 
57  State state() const override {return Base::state();}
58 
59  IoStrand strand() const override {return Base::strand();}
60 
61  void join(Realm&& realm, CompletionHandler<SessionInfo>&& handler) override
62  {
63  struct Requested
64  {
65  Ptr self;
66  CompletionHandler<SessionInfo> handler;
67  String realmUri;
68  Abort* abortPtr;
69 
70  void operator()(std::error_code ec, Message reply)
71  {
72  auto& me = *self;
73  if (me.checkError(ec, handler))
74  {
75  if (reply.type() == WampMsgType::welcome)
76  me.onWelcome(std::move(handler), std::move(reply),
77  std::move(realmUri));
78  else
79  me.onJoinAborted(std::move(handler), std::move(reply),
80  abortPtr);
81  }
82  }
83  };
84 
85  realm.withOption("agent", Version::agentString())
86  .withOption("roles", roles());
87  this->start();
88  this->request(realm.message({}),
89  Requested{shared_from_this(), std::move(handler),
90  realm.uri(), realm.abort({})});
91  }
92 
93  void authenticate(Authentication&& auth) override
94  {
95  this->send(auth.message({}));
96  }
97 
98  void safeAuthenticate(Authentication&& auth) override
99  {
100  struct Dispatched
101  {
102  std::weak_ptr<Client> self;
103  Authentication auth;
104 
105  void operator()()
106  {
107  auto me = self.lock();
108  if (me)
109  me->authenticate(std::move(auth));
110  }
111  };
112 
113  boost::asio::dispatch(strand(),
114  Dispatched{shared_from_this(), std::move(auth)});
115  }
116 
117  void leave(Reason&& reason, CompletionHandler<Reason>&& handler) override
118  {
119  struct Adjourned
120  {
121  Ptr self;
122  CompletionHandler<Reason> handler;
123 
124  void operator()(std::error_code ec, Message reply)
125  {
126  auto& me = *self;
127  me.readership_.clear();
128  me.registry_.clear();
129  if (me.checkError(ec, handler))
130  {
131  auto& goodBye = message_cast<GoodbyeMessage>(reply);
132  me.dispatchUserHandler(handler,
133  Reason({}, std::move(goodBye)));
134  }
135  }
136  };
137 
138  using std::move;
139  timeoutScheduler_->clear();
140  auto self = this->shared_from_this();
141  Base::adjourn(reason,
142  Adjourned{shared_from_this(), std::move(handler)});
143  }
144 
145  void disconnect() override
146  {
147  pendingInvocations_.clear();
148  timeoutScheduler_->clear();
149  this->close(false);
150  }
151 
152  void terminate() override
153  {
154  initialize({}, {}, {}, {}, {});
155  pendingInvocations_.clear();
156  timeoutScheduler_->clear();
157  this->close(true);
158  }
159 
160  void subscribe(Topic&& topic, EventSlot&& slot,
161  CompletionHandler<Subscription>&& handler) override
162  {
163  struct Requested
164  {
165  Ptr self;
166  SubscriptionRecord rec;
167  CompletionHandler<Subscription> handler;
168 
169  void operator()(std::error_code ec, Message reply)
170  {
171  auto& me = *self;
172  if (me.checkReply(WampMsgType::subscribed, ec, reply,
173  SessionErrc::subscribeError, handler))
174  {
175  const auto& subMsg = message_cast<SubscribedMessage>(reply);
176  auto subId = subMsg.subscriptionId();
177  auto slotId = me.nextSlotId();
178  Subscription sub(self, subId, slotId, {});
179  me.topics_.emplace(rec.topicUri, subId);
180  me.readership_[subId][slotId] = std::move(rec);
181  me.dispatchUserHandler(handler, std::move(sub));
182  }
183 
184  }
185  };
186 
187  using std::move;
188  SubscriptionRecord rec = {topic.uri(), move(slot)};
189 
190  auto kv = topics_.find(rec.topicUri);
191  if (kv == topics_.end())
192  {
193  auto self = this->shared_from_this();
194  this->request(
195  topic.message({}),
196  Requested{shared_from_this(), move(rec), move(handler)});
197  }
198  else
199  {
200  auto subId = kv->second;
201  auto slotId = nextSlotId();
202  Subscription sub{this->shared_from_this(), subId, slotId, {}};
203  readership_[subId][slotId] = move(rec);
204  dispatchUserHandler(handler, move(sub));
205  }
206  }
207 
208  void unsubscribe(const Subscription& sub) override
209  {
210  auto kv = readership_.find(sub.id());
211  if (kv != readership_.end())
212  {
213  auto& localSubs = kv->second;
214  if (!localSubs.empty())
215  {
216  auto subKv = localSubs.find(sub.slotId({}));
217  if (subKv != localSubs.end())
218  {
219  if (localSubs.size() == 1u)
220  topics_.erase(subKv->second.topicUri);
221 
222  localSubs.erase(subKv);
223  if (localSubs.empty())
224  {
225  readership_.erase(kv);
226  sendUnsubscribe(sub.id());
227  }
228  }
229  }
230  }
231  }
232 
233  void unsubscribe(const Subscription& sub,
234  CompletionHandler<bool>&& handler) override
235  {
236  bool unsubscribed = false;
237  auto kv = readership_.find(sub.id());
238  if (kv != readership_.end())
239  {
240  auto& localSubs = kv->second;
241  if (!localSubs.empty())
242  {
243  auto subKv = localSubs.find(sub.slotId({}));
244  if (subKv != localSubs.end())
245  {
246  unsubscribed = true;
247  if (localSubs.size() == 1u)
248  topics_.erase(subKv->second.topicUri);
249 
250  localSubs.erase(subKv);
251  if (localSubs.empty())
252  {
253  readership_.erase(kv);
254  sendUnsubscribe(sub.id(), std::move(handler));
255  }
256  }
257  }
258  }
259  else
260  {
261  postVia(userExecutor(), std::move(handler), unsubscribed);
262  }
263  }
264 
265  void safeUnsubscribe(const Subscription& sub) override
266  {
267  auto self = std::weak_ptr<Client>(shared_from_this());
268  boost::asio::dispatch(
269  strand(),
270  [self, sub]() mutable
271  {
272  auto me = self.lock();
273  if (me)
274  me->unsubscribe(std::move(sub));
275  });
276  }
277 
278  void publish(Pub&& pub) override
279  {
280  this->send(pub.message({}));
281  }
282 
283  void publish(Pub&& pub, CompletionHandler<PublicationId>&& handler) override
284  {
285  struct Requested
286  {
287  Ptr self;
288  CompletionHandler<PublicationId> handler;
289 
290  void operator()(std::error_code ec, Message reply)
291  {
292  auto& me = *self;
293  if (me.checkReply(WampMsgType::published, ec, reply,
294  SessionErrc::publishError, handler))
295  {
296  const auto& pubMsg = message_cast<PublishedMessage>(reply);
297  me.dispatchUserHandler(handler, pubMsg.publicationId());
298  }
299  }
300  };
301 
302  pub.withOption("acknowledge", true);
303  auto self = this->shared_from_this();
304  this->request(pub.message({}),
305  Requested{shared_from_this(), std::move(handler)});
306  }
307 
308  void enroll(Procedure&& procedure, CallSlot&& callSlot,
309  InterruptSlot&& interruptSlot,
310  CompletionHandler<Registration>&& handler) override
311  {
312  struct Requested
313  {
314  Ptr self;
315  RegistrationRecord rec;
316  CompletionHandler<Registration> handler;
317 
318  void operator()(std::error_code ec, Message reply)
319  {
320  auto& me = *self;
321  if (me.checkReply(WampMsgType::registered, ec, reply,
322  SessionErrc::registerError, handler))
323  {
324  const auto& regMsg = message_cast<RegisteredMessage>(reply);
325  auto regId = regMsg.registrationId();
326  Registration reg(self, regId, {});
327  me.registry_[regId] = std::move(rec);
328  me.dispatchUserHandler(handler, std::move(reg));
329  }
330  }
331  };
332 
333  using std::move;
334  RegistrationRecord rec{ move(callSlot), move(interruptSlot) };
335  auto self = this->shared_from_this();
336  this->request(procedure.message({}),
337  Requested{shared_from_this(), move(rec), move(handler)});
338  }
339 
340  void unregister(const Registration& reg) override
341  {
342  struct Requested
343  {
344  Ptr self;
345 
346  void operator()(std::error_code ec, Message reply)
347  {
348  // Don't propagate WAMP errors, as we prefer this
349  // to be a no-fail cleanup operation.
350  self->warnReply(WampMsgType::unregistered, ec, reply,
352  }
353  };
354 
355  auto kv = registry_.find(reg.id());
356  if (kv != registry_.end())
357  {
358  registry_.erase(kv);
359  if (state() == State::established)
360  {
361  auto self = this->shared_from_this();
362  UnregisterMessage msg(reg.id());
363  this->request(msg, Requested{shared_from_this()});
364  }
365  }
366  }
367 
368  void unregister(const Registration& reg,
369  CompletionHandler<bool>&& handler) override
370  {
371  struct Requested
372  {
373  Ptr self;
374  CompletionHandler<bool> handler;
375 
376  void operator()(std::error_code ec, Message reply)
377  {
378  auto& me = *self;
379  if (me.checkReply(WampMsgType::unregistered, ec, reply,
381  {
382  me.dispatchUserHandler(handler, true);
383  }
384  }
385  };
386 
387  CPPWAMP_LOGIC_CHECK(state() == State::established,
388  "Session is not established");
389  auto kv = registry_.find(reg.id());
390  if (kv != registry_.end())
391  {
392  registry_.erase(kv);
393  auto self = this->shared_from_this();
394  UnregisterMessage msg(reg.id());
395  this->request(msg,
396  Requested{shared_from_this(), std::move(handler)});
397  }
398  else
399  {
400  postVia(userExecutor(), std::move(handler), false);
401  }
402  }
403 
404  void safeUnregister(const Registration& reg) override
405  {
406  auto self = std::weak_ptr<Client>(shared_from_this());
407  boost::asio::dispatch(
408  strand(),
409  [self, reg]() mutable
410  {
411  auto me = self.lock();
412  if (me)
413  me->unregister(std::move(reg));
414  });
415  }
416 
417  CallChit oneShotCall(Rpc&& rpc,
418  CompletionHandler<Result>&& handler) override
419  {
420  struct Requested
421  {
422  Ptr self;
423  Error* errorPtr;
424  CompletionHandler<Result> handler;
425 
426  void operator()(std::error_code ec, Message reply)
427  {
428  auto& me = *self;
429  if (me.checkReply(WampMsgType::result, ec, reply,
430  SessionErrc::callError, handler, errorPtr))
431  {
432  auto& resultMsg = message_cast<ResultMessage>(reply);
433  me.dispatchUserHandler(handler,
434  Result({}, std::move(resultMsg)));
435  }
436  }
437  };
438 
439  auto self = this->shared_from_this();
440  auto cancelSlot =
441  boost::asio::get_associated_cancellation_slot(handler);
442  auto requestId = this->request(
443  rpc.message({}),
444  Requested{shared_from_this(), rpc.error({}), std::move(handler)});
445  CallChit chit{shared_from_this(), requestId, rpc.cancelMode(), {}};
446 
447  if (cancelSlot.is_connected())
448  {
449  cancelSlot.assign(
450  [chit](boost::asio::cancellation_type_t) {chit.cancel();});
451  }
452 
453  if (rpc.callerTimeout().count() != 0)
454  timeoutScheduler_->add(rpc.callerTimeout(), requestId);
455 
456  return chit;
457  }
458 
459  CallChit ongoingCall(Rpc&& rpc, OngoingCallHandler&& handler) override
460  {
461  using std::move;
462 
463  struct Requested
464  {
465  Ptr self;
466  Error* errorPtr;
467  OngoingCallHandler handler;
468 
469  void operator()(std::error_code ec, Message reply)
470  {
471  auto& me = *self;
472  if (me.checkReply(WampMsgType::result, ec, reply,
473  SessionErrc::callError, handler, errorPtr))
474  {
475  auto& resultMsg = message_cast<ResultMessage>(reply);
476  me.dispatchUserHandler(handler,
477  Result({}, std::move(resultMsg)));
478  }
479  }
480  };
481 
482  auto self = this->shared_from_this();
483  auto cancelSlot =
484  boost::asio::get_associated_cancellation_slot(handler);
485  auto requestId = this->ongoingRequest(
486  rpc.message({}),
487  Requested{shared_from_this(), rpc.error({}), std::move(handler)});
488  CallChit chit{shared_from_this(), requestId, rpc.cancelMode(), {}};
489 
490  if (cancelSlot.is_connected())
491  {
492  cancelSlot.assign(
493  [chit](boost::asio::cancellation_type_t) {chit.cancel();});
494  }
495 
496  if (rpc.callerTimeout().count() != 0)
497  timeoutScheduler_->add(rpc.callerTimeout(), requestId);
498 
499  return chit;
500  }
501 
502  void cancelCall(RequestId reqId, CallCancelMode mode) override
503  {
504  Base::cancelCall(CallCancellation{reqId, mode});
505  }
506 
507  void safeCancelCall(RequestId reqId, CallCancelMode mode) override
508  {
509  std::weak_ptr<Client> self{shared_from_this()};
510  boost::asio::dispatch(
511  strand(),
512  [self, reqId, mode]()
513  {
514  auto me = self.lock();
515  if (me)
516  me->cancelCall(reqId, mode);
517  });
518  }
519 
520  void yield(RequestId reqId, Result&& result) override
521  {
522  if (!result.isProgressive())
523  pendingInvocations_.erase(reqId);
524  this->send(result.yieldMessage({}, reqId));
525  }
526 
527  void yield(RequestId reqId, Error&& error) override
528  {
529  pendingInvocations_.erase(reqId);
530  this->sendError(WampMsgType::invocation, reqId, std::move(error));
531  }
532 
533  void safeYield(RequestId reqId, Result&& result) override
534  {
535  struct Dispatched
536  {
537  std::weak_ptr<Client> self;
538  RequestId reqId;
539  Result result;
540 
541  void operator()()
542  {
543  auto me = self.lock();
544  if (me)
545  me->yield(reqId, std::move(result));
546  }
547  };
548 
549  boost::asio::dispatch(
550  strand(),
551  Dispatched{shared_from_this(), reqId, std::move(result)});
552  }
553 
554  void safeYield(RequestId reqId, Error&& error) override
555  {
556  struct Dispatched
557  {
558  std::weak_ptr<Client> self;
559  RequestId reqId;
560  Error error;
561 
562  void operator()()
563  {
564  auto me = self.lock();
565  if (me)
566  me->yield(reqId, std::move(error));
567  }
568  };
569 
570  boost::asio::dispatch(
571  strand(),
572  Dispatched{shared_from_this(), reqId, std::move(error)});
573  }
574 
575  void initialize(
576  AnyIoExecutor userExecutor,
577  LogHandler warningHandler,
578  LogHandler traceHandler,
579  StateChangeHandler stateChangeHandler,
580  ChallengeHandler challengeHandler) override
581  {
582  Base::setUserExecutor(std::move(userExecutor));
583  warningHandler_ = std::move(warningHandler);
584  Base::setTraceHandler(std::move(traceHandler));
585  Base::setStateChangeHandler(std::move(stateChangeHandler));
586  challengeHandler_ = std::move(challengeHandler);
587  }
588 
589  void setWarningHandler(LogHandler handler) override
590  {
591  warningHandler_ = std::move(handler);
592  }
593 
594  void setTraceHandler(LogHandler handler) override
595  {
596  Base::setTraceHandler(std::move(handler));
597  }
598 
599  void setStateChangeHandler(StateChangeHandler handler) override
600  {
601  Base::setStateChangeHandler(std::move(handler));
602  }
603 
604  void setChallengeHandler( ChallengeHandler handler) override
605  {
606  challengeHandler_ = std::move(handler);
607  }
608 
609 private:
610  struct SubscriptionRecord
611  {
612  String topicUri;
613  EventSlot slot;
614  };
615 
616  struct RegistrationRecord
617  {
618  CallSlot callSlot;
619  InterruptSlot interruptSlot;
620  };
621 
622  using Base = Peer<Codec, Transport>;
623  using WampMsgType = internal::WampMsgType;
624  using Message = internal::WampMessage;
625  using SlotId = uint64_t;
626  using LocalSubs = std::map<SlotId, SubscriptionRecord>;
627  using Readership = std::map<SubscriptionId, LocalSubs>;
628  using TopicMap = std::map<std::string, SubscriptionId>;
629  using Registry = std::map<RegistrationId, RegistrationRecord>;
630  using InvocationMap = std::map<RequestId, RegistrationId>;
631  using CallerTimeoutDuration = typename Rpc::CallerTimeoutDuration;
632 
633  using Base::userExecutor;
634 
635  Client(TransportPtr transport)
636  : Base(std::move(transport)),
637  timeoutScheduler_(CallerTimeoutScheduler::create(this->strand()))
638  {}
639 
640  Ptr shared_from_this()
641  {
642  return std::static_pointer_cast<Client>( Base::shared_from_this() );
643  }
644 
645  void sendUnsubscribe(SubscriptionId subId)
646  {
647  struct Requested
648  {
649  Ptr self;
650 
651  void operator()(std::error_code ec, Message reply)
652  {
653  // Don't propagate WAMP errors, as we prefer
654  // this to be a no-fail cleanup operation.
655  self->warnReply(WampMsgType::unsubscribed, ec, reply,
656  SessionErrc::unsubscribeError);
657  }
658  };
659 
660  if (state() == State::established)
661  {
662  auto self = this->shared_from_this();
663  UnsubscribeMessage msg(subId);
664  this->request(msg, Requested{shared_from_this()});
665  }
666  }
667 
668  void sendUnsubscribe(SubscriptionId subId,
669  CompletionHandler<bool>&& handler)
670  {
671  struct Requested
672  {
673  Ptr self;
674  CompletionHandler<bool> handler;
675 
676  void operator()(std::error_code ec, Message reply)
677  {
678  auto& me = *self;
679  if (me.checkReply(WampMsgType::unsubscribed, ec, reply,
680  SessionErrc::unsubscribeError, handler))
681  {
682  me.dispatchUserHandler(handler, true);
683  }
684  }
685  };
686 
687  CPPWAMP_LOGIC_CHECK((this->state() == State::established),
688  "Session is not established");
689  auto self = this->shared_from_this();
690  UnsubscribeMessage msg(subId);
691  this->request(msg, Requested{shared_from_this(), std::move(handler)});
692  }
693 
694  virtual bool isMsgSupported(const MessageTraits& traits) override
695  {
696  return traits.isClientRx;
697  }
698 
699  virtual void onInbound(Message msg) override
700  {
701  switch (msg.type())
702  {
703  case WampMsgType::challenge:
704  onChallenge(std::move(msg));
705  break;
706 
707  case WampMsgType::event:
708  onEvent(std::move(msg));
709  break;
710 
711  case WampMsgType::invocation:
712  onInvocation(std::move(msg));
713  break;
714 
715  case WampMsgType::interrupt:
716  onInterrupt(std::move(msg));
717  break;
718 
719  default:
720  assert(false);
721  }
722  }
723 
724  void onWelcome(CompletionHandler<SessionInfo>&& handler, Message&& reply,
725  String&& realmUri)
726  {
727  WeakPtr self = this->shared_from_this();
728  timeoutScheduler_->listen([self](RequestId reqId)
729  {
730  auto ptr = self.lock();
731  if (ptr)
732  ptr->cancelCall(reqId, CallCancelMode::killNoWait);
733  });
734 
735  auto& welcomeMsg = message_cast<WelcomeMessage>(reply);
736  SessionInfo info{{}, std::move(realmUri), std::move(welcomeMsg)};
737  dispatchUserHandler(handler, std::move(info));
738  }
739 
740  void onJoinAborted(CompletionHandler<SessionInfo>&& handler,
741  Message&& reply, Abort* abortPtr)
742  {
743  using std::move;
744 
745  auto& abortMsg = message_cast<AbortMessage>(reply);
746  const auto& uri = abortMsg.reasonUri();
747  SessionErrc errc;
748  bool found = lookupWampErrorUri(uri, SessionErrc::joinError, errc);
749  const auto& details = reply.as<Object>(1);
750 
751  if (abortPtr != nullptr)
752  {
753  *abortPtr = Abort({}, move(abortMsg));
754  }
755  else if (warningHandler_ && (!found || !details.empty()))
756  {
757  std::ostringstream oss;
758  oss << "JOIN request aborted with error URI=" << uri;
759  if (!reply.as<Object>(1).empty())
760  oss << ", Details=" << reply.at(1);
761  warn(oss.str());
762  }
763 
764  dispatchUserHandler(handler, makeUnexpectedError(errc));
765  }
766 
767  void onChallenge(Message&& msg)
768  {
769  auto self = this->shared_from_this();
770  auto& challengeMsg = message_cast<ChallengeMessage>(msg);
771  Challenge challenge({}, self, std::move(challengeMsg));
772 
773  if (challengeHandler_)
774  {
775  dispatchUserHandler(challengeHandler_, std::move(challenge));
776  }
777  else
778  {
779  if (warningHandler_)
780  {
781  std::ostringstream oss;
782  oss << "Received a CHALLENGE with no registered handler "
783  "(with method=" << challenge.method() << " extra="
784  << challenge.options() << ")";
785  warn(oss.str());
786  }
787 
788  // Send empty signature to avoid deadlock with other peer.
789  authenticate(Authentication(""));
790  }
791  }
792 
793  void onEvent(Message&& msg)
794  {
795  auto& eventMsg = message_cast<EventMessage>(msg);
796  auto kv = readership_.find(eventMsg.subscriptionId());
797  if (kv != readership_.end())
798  {
799  const auto& localSubs = kv->second;
800  assert(!localSubs.empty());
801  Event event({}, userExecutor(), std::move(eventMsg));
802  for (const auto& subKv: localSubs)
803  postEvent(subKv.second, event);
804  }
805  else if (warningHandler_)
806  {
807  std::ostringstream oss;
808  oss << "Received an EVENT that is not subscribed to "
809  "(with subId=" << eventMsg.subscriptionId()
810  << " pubId=" << eventMsg.publicationId() << ")";
811  warn(oss.str());
812  }
813  }
814 
815  void postEvent(const SubscriptionRecord& sub, const Event& event)
816  {
817  struct Posted
818  {
819  Ptr self;
820  EventSlot slot;
821  Event event;
822 
823  void operator()()
824  {
825  auto& me = *self;
826 
827  // Copy the subscription and publication IDs before the Event
828  // object gets moved away.
829  auto subId = event.subId();
830  auto pubId = event.pubId();
831 
832  // The catch clauses are to prevent the publisher crashing
833  // subscribers when it passes arguments having incorrect type.
834  try
835  {
836  slot(std::move(event));
837  }
838  catch (const Error& e)
839  {
840  if (me.warningHandler_)
841  me.warnEventError(e, subId, pubId);
842  }
843  catch (const error::BadType& e)
844  {
845  if (me.warningHandler_)
846  me.warnEventError(Error(e), subId, pubId);
847  }
848  }
849  };
850 
851  auto exec = boost::asio::get_associated_executor(sub.slot,
852  userExecutor());
853  boost::asio::post(exec, Posted{shared_from_this(), sub.slot, event});
854  }
855 
856  void warnEventError(const Error& e, SubscriptionId subId,
857  PublicationId pubId)
858  {
859  std::ostringstream oss;
860  oss << "EVENT handler reported an error: "
861  << e.args()
862  << " (with subId=" << subId
863  << " pubId=" << pubId << ")";
864  warn(oss.str());
865  }
866 
867  void onInvocation(Message&& msg)
868  {
869  auto& invMsg = message_cast<InvocationMessage>(msg);
870  auto requestId = invMsg.requestId();
871  auto regId = invMsg.registrationId();
872 
873  auto kv = registry_.find(regId);
874  if (kv != registry_.end())
875  {
876  auto self = this->shared_from_this();
877  const RegistrationRecord& rec = kv->second;
878  Invocation inv({}, self, userExecutor(), std::move(invMsg));
879  pendingInvocations_[requestId] = regId;
880  postRpcRequest(rec.callSlot, std::move(inv));
881  }
882  else
883  {
884  this->sendError(WampMsgType::invocation, requestId,
885  Error("wamp.error.no_such_procedure"));
886  }
887  }
888 
889  void onInterrupt(Message&& msg)
890  {
891  auto& interruptMsg = message_cast<InterruptMessage>(msg);
892  auto found = pendingInvocations_.find(interruptMsg.requestId());
893  if (found != pendingInvocations_.end())
894  {
895  auto registrationId = found->second;
896  pendingInvocations_.erase(found);
897  auto kv = registry_.find(registrationId);
898  if ((kv != registry_.end()) &&
899  (kv->second.interruptSlot != nullptr))
900  {
901  auto self = this->shared_from_this();
902  const RegistrationRecord& rec = kv->second;
903  using std::move;
904  Interruption intr({}, self, userExecutor(), move(interruptMsg));
905  postRpcRequest(rec.interruptSlot, move(intr));
906  }
907  }
908  }
909 
910  template <typename TSlot, typename TInvocationOrInterruption>
911  void postRpcRequest(TSlot slot, TInvocationOrInterruption&& request)
912  {
913  using std::move;
914 
915  struct Posted
916  {
917  Ptr self;
918  TSlot slot;
919  TInvocationOrInterruption request;
920 
921  void operator()()
922  {
923  auto& me = *self;
924 
925  // Copy the request ID before the request object gets moved away.
926  auto requestId = request.requestId();
927 
928  try
929  {
930  Outcome outcome(slot(move(request)));
931  switch (outcome.type())
932  {
933  case Outcome::Type::deferred:
934  // Do nothing
935  break;
936 
937  case Outcome::Type::result:
938  me.safeYield(requestId, move(outcome).asResult());
939  break;
940 
941  case Outcome::Type::error:
942  me.safeYield(requestId, move(outcome).asError());
943  break;
944 
945  default:
946  assert(false && "unexpected Outcome::Type");
947  }
948  }
949  catch (Error& error)
950  {
951  me.yield(requestId, move(error));
952  }
953  catch (const error::BadType& e)
954  {
955  // Forward Variant conversion exceptions as ERROR messages.
956  me.yield(requestId, Error(e));
957  }
958  }
959  };
960 
961  auto exec = boost::asio::get_associated_executor(slot, userExecutor());
962  boost::asio::post(
963  exec,
964  Posted{shared_from_this(), move(slot), move(request)});
965  }
966 
967  template <typename THandler>
968  bool checkError(std::error_code ec, THandler& handler)
969  {
970  if (ec)
971  dispatchUserHandler(handler, UnexpectedError(ec));
972  return !ec;
973  }
974 
975  template <typename THandler>
976  bool checkReply(WampMsgType type, std::error_code ec, Message& reply,
977  SessionErrc defaultErrc, THandler& handler,
978  Error* errorPtr = nullptr)
979  {
980  bool success = checkError(ec, handler);
981  if (success)
982  {
983  if (reply.type() == WampMsgType::error)
984  {
985  success = false;
986  auto& errMsg = message_cast<ErrorMessage>(reply);
987  const auto& uri = errMsg.reasonUri();
988  SessionErrc errc;
989  bool found = lookupWampErrorUri(uri, defaultErrc, errc);
990  bool hasArgs = !errMsg.args().empty() ||
991  !errMsg.kwargs().empty();
992  if (errorPtr != nullptr)
993  {
994  *errorPtr = Error({}, std::move(errMsg));
995  }
996  else if (warningHandler_ && (!found || hasArgs))
997  {
998  std::ostringstream oss;
999  oss << "Expected " << MessageTraits::lookup(type).name
1000  << " reply but got ERROR with URI=" << uri;
1001  if (!errMsg.args().empty())
1002  oss << ", Args=" << errMsg.args();
1003  if (!errMsg.kwargs().empty())
1004  oss << ", ArgsKv=" << errMsg.kwargs();
1005  warn(oss.str());
1006  }
1007 
1008  dispatchUserHandler(handler, makeUnexpectedError(errc));
1009  }
1010  else
1011  assert((reply.type() == type) && "Unexpected WAMP message type");
1012  }
1013  return success;
1014  }
1015 
1016  void warnReply(WampMsgType type, std::error_code ec, Message& reply,
1017  SessionErrc defaultErrc)
1018  {
1019  if (ec)
1020  {
1021  warn(error::Failure::makeMessage(ec));
1022  }
1023  else if (reply.type() == WampMsgType::error)
1024  {
1025  auto& errMsg = message_cast<ErrorMessage>(reply);
1026  const auto& uri = errMsg.reasonUri();
1027  std::ostringstream oss;
1028  oss << "Expected " << MessageTraits::lookup(type).name
1029  << " reply but got ERROR with URI=" << uri;
1030  if (!errMsg.args().empty())
1031  oss << ", Args=" << errMsg.args();
1032  if (!errMsg.kwargs().empty())
1033  oss << ", ArgsKv=" << errMsg.kwargs();
1034  warn(oss.str());
1035  }
1036  else
1037  {
1038  assert((reply.type() == type) && "Unexpected WAMP message type");
1039  }
1040  }
1041 
1042  void warn(std::string log)
1043  {
1044  if (warningHandler_)
1045  dispatchUserHandler(warningHandler_, std::move(log));
1046  }
1047 
1048  template <typename S, typename... Ts>
1049  void dispatchUserHandler(AnyCompletionHandler<S>& handler, Ts&&... args)
1050  {
1051  dispatchVia(userExecutor(), std::move(handler),
1052  std::forward<Ts>(args)...);
1053  }
1054 
1055  template <typename S, typename... Ts>
1056  void dispatchUserHandler(const AnyReusableHandler<S>& handler, Ts&&... args)
1057  {
1058  dispatchVia(userExecutor(), handler, std::forward<Ts>(args)...);
1059  }
1060 
1061  SlotId nextSlotId() {return nextSlotId_++;}
1062 
1063  SlotId nextSlotId_ = 0;
1064  TopicMap topics_;
1065  Readership readership_;
1066  Registry registry_;
1067  InvocationMap pendingInvocations_;
1068  CallerTimeoutScheduler::Ptr timeoutScheduler_;
1069  LogHandler warningHandler_;
1070  ChallengeHandler challengeHandler_;
1071 };
1072 
1073 } // namespace internal
1074 
1075 } // namespace wamp
1076 
1077 #endif // CPPWAMP_INTERNAL_CLIENT_HPP
wamp::dispatchVia
void dispatchVia(const E &fallbackExec, F &&handler, Ts &&... args)
Dispatches the given handler using the given fallback executor, passing the given arguments.
Definition: anyhandler.hpp:303
wamp::RequestId
int64_t RequestId
Ephemeral ID associated with a WAMP request.
Definition: wampdefs.hpp:23
wamp::AnyIoExecutor
boost::asio::any_io_executor AnyIoExecutor
Polymorphic executor for all I/O objects.
Definition: asiodefs.hpp:27
CPPWAMP_LOGIC_CHECK
#define CPPWAMP_LOGIC_CHECK(cond, msg)
Conditionally throws an error::Logic exception having the given message string.
Definition: error.hpp:36
wamp::SessionErrc::registerError
@ registerError
Register error reported by dealer.
wamp::Version::agentString
static std::string agentString()
Obtains the agent string sent in HELLO messages.
Definition: version.ipp:61
wamp::IoStrand
boost::asio::strand< AnyIoExecutor > IoStrand
Serializes I/O operations.
Definition: asiodefs.hpp:41
wamp::Object
std::map< String, Variant > Object
Variant bound type for maps of variants.
Definition: variantdefs.hpp:52
wamp::SessionErrc::callError
@ callError
Call error reported by callee or dealer.
wamp::UnexpectedError
Unexpected< std::error_code > UnexpectedError
Type alias for Unexpected<std::error_code>.
Definition: erroror.hpp:106
wamp::makeUnexpectedError
UnexpectedError makeUnexpectedError(TErrorEnum errc)
Convenience function that creates an UnexpectedError from an error code enum.
Definition: erroror.hpp:113
wamp::SubscriptionId
int64_t SubscriptionId
Ephemeral ID associated with an topic subscription.
Definition: wampdefs.hpp:24
wamp
Definition: anyhandler.hpp:36
wamp::SessionErrc::publishError
@ publishError
Publish error reported by broker.
wamp::SessionErrc::success
@ success
Operation successful.
wamp::PublicationId
int64_t PublicationId
Ephemeral ID associated with an event publication.
Definition: wampdefs.hpp:25
wamp::SessionCategory::lookupWampErrorUri
bool lookupWampErrorUri(const std::string &uri, SessionErrc fallback, SessionErrc &result)
Looks up the SessionErrc enumerator that corresponds to the given error URI.
Definition: error.ipp:238
wamp::SessionErrc::subscribeError
@ subscribeError
Subscribe error reported by broker.
wamp::SessionState
SessionState
Enumerates the possible states that a client or router session can be in.
Definition: wampdefs.hpp:34
wamp::SessionErrc
SessionErrc
Error code values used with the SessionCategory error category.
Definition: error.hpp:142
wamp::postVia
void postVia(const E &fallbackExec, F &&handler, Ts &&... args)
Posts the given handler using the given fallback executor, passing the given arguments.
Definition: anyhandler.hpp:327
wamp::String
std::string String
Variant bound type for text strings.
Definition: variantdefs.hpp:50
wamp::CallCancelMode
CallCancelMode
Enumerates the possible call cancelling modes.
Definition: wampdefs.hpp:49
wamp::AnyCompletionHandler
boost::asio::any_completion_handler< TSignature > AnyCompletionHandler
Type-erases a one-shot (and possibly move-only) asynchronous completion handler.
Definition: anyhandler.hpp:55
wamp::SessionErrc::unregisterError
@ unregisterError
Unregister error reported by dealer.