CppWAMP
C++11 client library for the WAMP protocol
corosession.hpp
Go to the documentation of this file.
1 /*------------------------------------------------------------------------------
2  Copyright Butterfly Energy Systems 2014-2015, 2018, 2022.
3  Distributed under the Boost Software License, Version 1.0.
4  http://www.boost.org/LICENSE_1_0.txt
5 ------------------------------------------------------------------------------*/
6 
7 #ifndef CPPWAMP_CORO_COROSESSION_HPP
8 #define CPPWAMP_CORO_COROSESSION_HPP
9 
10 //------------------------------------------------------------------------------
14 //------------------------------------------------------------------------------
15 
16 #include <boost/asio/post.hpp>
17 #include <boost/asio/spawn.hpp>
18 #include "../asyncresult.hpp"
19 #include "../config.hpp"
20 #include "../session.hpp"
21 
22 namespace wamp
23 {
24 
25 //------------------------------------------------------------------------------
56 //------------------------------------------------------------------------------
57 template <typename TIgnoredBase = Session>
58 class CPPWAMP_DEPRECATED CoroSession: public Session
59 {
60 public:
62  using Ptr = std::shared_ptr<CoroSession>;
63 
65  using Base = Session;
66 
69 
71  using EventSlot = std::function<void (Event)>;
72 
74  using CallSlot = std::function<Outcome (Invocation)>;
75 
77  using InterruptSlot = std::function<Outcome (Interruption)>;
78 
80  template <typename TSpawnHandler>
81  using YieldContext = boost::asio::basic_yield_context<TSpawnHandler>;
82 
84  static Ptr create(AnyIoExecutor exec, const Connector::Ptr& connector);
85 
87  static Ptr create(AnyIoExecutor exec, const ConnectorList& connectors);
88 
95  template <typename TExecutionContext>
96  static CPPWAMP_ENABLED_TYPE(Ptr, isExecutionContext<TExecutionContext>())
97  create(
98  TExecutionContext& context,
100  const Connector::Ptr& connector
102  )
103  {
104  return create(context.get_executor(), connector);
105  }
106 
113  template <typename TExecutionContext>
114  static CPPWAMP_ENABLED_TYPE(Ptr, isExecutionContext<TExecutionContext>())
115  create(
116  TExecutionContext& context,
118  const ConnectorList& connectors)
120  {
121  return create(context.get_executor(), connectors);
122  }
123 
126 
127  void connect(AsyncHandler<size_t> handler);
128 
130  template <typename H>
131  size_t connect(YieldContext<H> yield, std::error_code* ec = nullptr);
132 
134  void join(Realm realm, AsyncHandler<SessionInfo> handler);
135 
137  template <typename H>
138  SessionInfo join(Realm realm, YieldContext<H> yield,
139  std::error_code* ec = nullptr);
140 
142  void authenticate(Authentication auth);
143 
145  void leave(AsyncHandler<Reason> handler);
146 
148  template <typename H>
149  Reason leave(YieldContext<H> yield, std::error_code* ec = nullptr);
150 
152  void leave(Reason reason, AsyncHandler<Reason> handler);
153 
155  template <typename H>
156  Reason leave(Reason reason, YieldContext<H> yield,
157  std::error_code* ec = nullptr);
159 
162 
164  void subscribe(Topic topic, EventSlot slot,
166 
168  template <typename H>
169  Subscription subscribe(Topic topic, EventSlot slot,
170  YieldContext<H> yield, std::error_code* ec = nullptr);
171 
173  void unsubscribe(const Subscription& sub);
174 
177  void unsubscribe(const Subscription& sub, AsyncHandler<bool> handler);
178 
181  template <typename H>
182  bool unsubscribe(const Subscription& sub, YieldContext<H> yield,
183  std::error_code* ec = nullptr);
184 
186  void publish(Pub pub);
187 
190  void publish(Pub pub, AsyncHandler<PublicationId> handler);
191 
193  template <typename H>
194  PublicationId publish(Pub pub, YieldContext<H> yield,
195  std::error_code* ec = nullptr);
197 
200 
201  void enroll(Procedure procedure, CallSlot slot,
203 
205  template <typename H>
206  Registration enroll(Procedure procedure, CallSlot slot,
207  YieldContext<H> yield, std::error_code* ec = nullptr);
208 
211  void enroll(Procedure procedure, CallSlot callSlot,
212  InterruptSlot interruptSlot,
214 
216  template <typename H>
217  Registration enroll(Procedure procedure, CallSlot slot,
218  InterruptSlot interruptSlot, YieldContext<H> yield,
219  std::error_code* ec = nullptr);
220 
222  void unregister(const Registration& reg);
223 
226  void unregister(const Registration& reg, AsyncHandler<bool> handler);
227 
230  template <typename H>
231  bool unregister(const Registration& reg, YieldContext<H> yield,
232  std::error_code* ec = nullptr);
233 
235  RequestId call(Rpc procedure, AsyncHandler<Result> handler);
236 
238  template <typename H>
239  Result call(Rpc rpc, YieldContext<H> yield, std::error_code* ec = nullptr);
241 
244 
245  template <typename H>
246  void suspend(YieldContext<H> yield);
248 
251 
257  void reset() = delete;
259 
260 protected:
261  using Base::Base;
262 };
263 
264 
265 //******************************************************************************
266 // CoroSession implementation
267 //******************************************************************************
268 
269 //------------------------------------------------------------------------------
271 //------------------------------------------------------------------------------
272 template <typename B>
274  AnyIoExecutor exec,
276  const Connector::Ptr& connector
278  )
279 {
280  return Ptr(new CoroSession(exec, {connector}));
281 }
282 
283 //------------------------------------------------------------------------------
285 //------------------------------------------------------------------------------
286 template <typename B>
288  AnyIoExecutor exec,
290  const ConnectorList& connectors
292  )
293 {
294  return Ptr(new CoroSession(exec, connectors));
295 }
296 
297 //------------------------------------------------------------------------------
313 //------------------------------------------------------------------------------
314 template <typename B>
316  AsyncHandler<size_t> handler
318  )
319 {
320  CPPWAMP_LOGIC_CHECK(state() == State::disconnected,
321  "Session is not disconnected");
322  Base::connect(std::move(handler));
323 }
324 
325 //------------------------------------------------------------------------------
329 //------------------------------------------------------------------------------
330 template <typename B>
331 template <typename H>
333  YieldContext<H> yield,
334  std::error_code* ec
336  )
337 {
338  CPPWAMP_LOGIC_CHECK(state() == State::disconnected,
339  "Session is not disconnected");
340  auto index = Base::connect(yield);
341  if (!ec)
342  return index.value();
343  *ec = !index ? index.error() : std::error_code{};
344  return index.value_or(0);
345 }
346 
347 //------------------------------------------------------------------------------
360 //------------------------------------------------------------------------------
361 template <typename B>
363  Realm realm,
366  )
367 {
368  CPPWAMP_LOGIC_CHECK(this->state() == State::closed,
369  "Session is not closed");
370  Base::join(std::move(realm), std::move(handler));
371 }
372 
373 //------------------------------------------------------------------------------
377 //------------------------------------------------------------------------------
378 template <typename B>
379 template <typename H>
381  Realm realm,
382  YieldContext<H> yield,
383  std::error_code* ec
385  )
386 {
387  CPPWAMP_LOGIC_CHECK(this->state() == State::closed,
388  "Session is not closed");
389  auto info = Base::join(std::move(realm), yield);
390  if (!ec)
391  return info.value();
392  *ec = !info ? info.error() : std::error_code{};
393  return info.value_or(SessionInfo{});
394 }
395 
396 //------------------------------------------------------------------------------
399 //------------------------------------------------------------------------------
400 template <typename B>
402 {
403  CPPWAMP_LOGIC_CHECK(this->state() == State::authenticating,
404  "Session is not authenticating");
405  Base::authenticate(std::move(auth));
406 }
407 
408 //------------------------------------------------------------------------------
421 //------------------------------------------------------------------------------
422 template <typename B>
424  AsyncHandler<Reason> handler
426  )
427 {
428  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
429  "Session is not established");
430  Base::leave(std::move(handler));
431 }
432 
433 //------------------------------------------------------------------------------
437 //------------------------------------------------------------------------------
438 template <typename B>
439 template <typename H>
441  YieldContext<H> yield,
442  std::error_code* ec
444  )
445 {
446  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
447  "Session is not established");
448  auto reason = Base::leave(yield);
449  if (!ec)
450  return reason.value();
451  *ec = !reason ? reason.error() : std::error_code{};
452  return reason.value_or(Reason{});
453 }
454 
455 //------------------------------------------------------------------------------
466 //------------------------------------------------------------------------------
467 template <typename B>
469  Reason reason,
470  AsyncHandler<Reason> handler
472  )
473 {
474  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
475  "Session is not established");
476  Base::leave(std::move(reason), std::move(handler));
477 }
478 
479 //------------------------------------------------------------------------------
483 //------------------------------------------------------------------------------
484 template <typename B>
485 template <typename H>
487  Reason reason,
489  YieldContext<H> yield,
490  std::error_code* ec
492  )
493 {
494  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
495  "Session is not established");
496  auto result = Base::leave(std::move(reason), yield);
497  if (!ec)
498  return result.value();
499  *ec = !result ? result.error() : std::error_code{};
500  return result.value_or(Reason{});
501 }
502 
503 //------------------------------------------------------------------------------
514 //------------------------------------------------------------------------------
515 template <typename B>
517  Topic topic,
518  EventSlot slot,
522 )
523 {
524  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
525  "Session is not established");
526  using std::move;
527  return Base::subscribe(move(topic), move(slot), move(handler));
528 }
529 
530 //------------------------------------------------------------------------------
534 //------------------------------------------------------------------------------
535 template <typename B>
536 template <typename H>
538  Topic topic,
539  EventSlot slot,
541  YieldContext<H> yield,
542  std::error_code* ec
544  )
545 {
546  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
547  "Session is not established");
548  auto sub = Base::subscribe(std::move(topic), std::move(slot), yield);
549  if (!ec)
550  return sub.value();
551  *ec = !sub ? sub.error() : std::error_code{};
552  return sub.value_or(Subscription{});
553 }
554 
555 //------------------------------------------------------------------------------
565 //------------------------------------------------------------------------------
566 template <typename B>
568  const Subscription& sub
569  )
570 {
571  Base::unsubscribe(sub);
572 }
573 
574 //------------------------------------------------------------------------------
595 //------------------------------------------------------------------------------
596 template <typename B>
598  const Subscription& sub,
599  AsyncHandler<bool> handler
601  )
602 {
603  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
604  "Session is not established");
605  Base::unsubscribe(sub, std::move(handler));
606 }
607 
608 //------------------------------------------------------------------------------
612 //------------------------------------------------------------------------------
613 template <typename B>
614 template <typename H>
616  const Subscription& sub,
617  YieldContext<H> yield,
618  std::error_code* ec
620  )
621 {
622  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
623  "Session is not established");
624  auto unsubscribed = Base::unsubscribe(sub, yield);
625  if (!ec)
626  return unsubscribed.value();
627  *ec = !unsubscribed ? unsubscribed.error() : std::error_code{};
628  return unsubscribed.value_or(false);
629 }
630 
631 //------------------------------------------------------------------------------
634 //------------------------------------------------------------------------------
635 template <typename B>
637  Pub pub
638  )
639 {
640  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
641  "Session is not established");
642  Base::publish(pub);
643 }
644 
645 //------------------------------------------------------------------------------
655 //------------------------------------------------------------------------------
656 template <typename B>
658  Pub pub,
661  )
662 {
663  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
664  "Session is not established");
665  Base::publish(pub, std::move(handler));
666 }
667 
668 //------------------------------------------------------------------------------
672 //------------------------------------------------------------------------------
673 template <typename B>
674 template <typename H>
676  Pub pub,
677  YieldContext<H> yield,
678  std::error_code* ec
680  )
681 {
682  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
683  "Session is not established");
684  auto pubId = Base::publish(std::move(pub), yield);
685  if (!ec)
686  return pubId.value();
687  *ec = !pubId ? pubId.error() : std::error_code{};
688  return pubId.value_or(0);
689 }
690 
691 //------------------------------------------------------------------------------
705 //------------------------------------------------------------------------------
706 template <typename B>
708  Procedure procedure,
709  CallSlot slot,
712 )
713 {
714  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
715  "Session is not established");
716  Base::enroll(std::move(procedure), std::move(slot), std::move(handler));
717 }
718 
719 //------------------------------------------------------------------------------
723 //------------------------------------------------------------------------------
724 template <typename B>
725 template <typename H>
727  Procedure procedure,
728  CallSlot slot,
730  YieldContext<H> yield,
731  std::error_code* ec
734  )
735 {
736  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
737  "Session is not established");
738  auto reg = Base::enroll(std::move(procedure), std::move(slot), yield);
739  if (!ec)
740  return reg.value();
741  *ec = !reg ? reg.error() : std::error_code{};
742  return reg.value_or(Registration{});
743 }
744 
745 //------------------------------------------------------------------------------
759 //------------------------------------------------------------------------------
760 template <typename B>
762  Procedure procedure,
763  CallSlot callSlot,
764  InterruptSlot interruptSlot,
768 )
769 {
770  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
771  "Session is not established");
772  Base::enroll(std::move(procedure), std::move(callSlot),
773  std::move(interruptSlot), std::move(handler));
774 }
775 
776 //------------------------------------------------------------------------------
780 //------------------------------------------------------------------------------
781 template <typename B>
782 template <typename H>
784  Procedure procedure,
785  CallSlot callSlot,
787  InterruptSlot interruptSlot,
789  YieldContext<H> yield,
790  std::error_code* ec
793  )
794 {
795  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
796  "Session is not established");
797  auto reg = Base::enroll(std::move(procedure), std::move(callSlot),
798  std::move(interruptSlot), yield);
799  if (!ec)
800  return reg.value();
801  *ec = !reg ? reg.error() : std::error_code{};
802  return reg.value_or(Registration{});
803 }
804 
805 //------------------------------------------------------------------------------
815 //------------------------------------------------------------------------------
816 template <typename B>
818  const Registration& reg
819  )
820 {
821  Base::unregister(reg);
822 }
823 
824 //------------------------------------------------------------------------------
841 //------------------------------------------------------------------------------
842 template <typename B>
844  const Registration& reg,
845  AsyncHandler<bool> handler
847  )
848 {
849  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
850  "Session is not established");
851  Base::unregister(reg, std::move(handler));
852 }
853 
854 //------------------------------------------------------------------------------
858 //------------------------------------------------------------------------------
859 template <typename B>
860 template <typename H>
862  const Registration& reg,
863  YieldContext<H> yield,
864  std::error_code* ec
866  )
867 {
868  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
869  "Session is not established");
870  auto unregistered = Base::unregister(reg, yield);
871  if (!ec)
872  return unregistered.value();
873  *ec = !unregistered ? unregistered.error() : std::error_code{};
874  return unregistered.value_or(false);
875 }
876 
877 //------------------------------------------------------------------------------
890 //------------------------------------------------------------------------------
891 template <typename B>
893  Rpc rpc,
894  AsyncHandler<Result> handler
896  )
897 {
898  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
899  "Session is not established");
900  CallChit chit;
901  Base::call(std::move(rpc), chit, std::move(handler));
902  return chit.requestId();
903 }
904 
905 //------------------------------------------------------------------------------
909 //------------------------------------------------------------------------------
910 template <typename B>
911 template <typename H>
913  Rpc rpc,
914  YieldContext<H> yield,
915  std::error_code* ec
917  )
918 {
919  CPPWAMP_LOGIC_CHECK(this->state() == State::established,
920  "Session is not established");
921  auto result = Base::call(std::move(rpc), yield);
922  if (!ec)
923  return result.value();
924  *ec = !result ? result.error() : std::error_code{};
925  return result.value_or(Result{});
926 }
927 
928 //------------------------------------------------------------------------------
935 //------------------------------------------------------------------------------
936 template <typename B>
937 template <typename H>
939 {
940  boost::asio::post(this->userIosvc(), yield);
941 }
942 
943 } // namespace wamp
944 
945 #endif // CPPWAMP_CORO_COROSESSION_HPP
wamp::CoroSession::unregister
void unregister(const Registration &reg)
Unregisters a remote procedure call.
Definition: corosession.hpp:817
wamp::Reason
Provides the reason URI and other options contained within GOODBYE messages.
Definition: peerdata.hpp:181
wamp::RequestId
int64_t RequestId
Ephemeral ID associated with a WAMP request.
Definition: wampdefs.hpp:23
wamp::CoroSession::call
RequestId call(Rpc procedure, AsyncHandler< Result > handler)
Asynchronously calls a remote procedure.
Definition: corosession.hpp:892
wamp::CoroSession::authenticate
void authenticate(Authentication auth)
Sends an AUTHENTICATE in response to a CHALLENGE.
Definition: corosession.hpp:401
wamp::CoroSession::connect
void connect(AsyncHandler< size_t > handler)
Asynchronously attempts to connect to a router.
Definition: corosession.hpp:315
wamp::CoroSession
Coroutine API used by a client peer in WAMP applications.
Definition: corosession.hpp:58
wamp::AnyIoExecutor
boost::asio::any_io_executor AnyIoExecutor
Polymorphic executor for all I/O objects.
Definition: asiodefs.hpp:27
wamp::Realm
Realm URI and other options contained within WAMP HELLO messages.
Definition: peerdata.hpp:59
CPPWAMP_LOGIC_CHECK
#define CPPWAMP_LOGIC_CHECK(cond, msg)
Conditionally throws an error::Logic exception having the given message string.
Definition: error.hpp:36
wamp::Authentication
Provides the Signature and Extra dictionary contained within WAMP AUTHENTICATE messages.
Definition: peerdata.hpp:205
wamp::AnyReusableHandler< void(Event)>
wamp::CoroSession::YieldContext
boost::asio::basic_yield_context< TSpawnHandler > YieldContext
Yield context type used by the boost::asio::spawn handler.
Definition: corosession.hpp:81
wamp::CoroSession::suspend
void suspend(YieldContext< H > yield)
Cooperatively suspend this coroutine to allow others to run.
Definition: corosession.hpp:938
wamp::Interruption
Contains details within WAMP INTERRUPT messages.
Definition: peerdata.hpp:916
wamp::Connector::Ptr
std::shared_ptr< Connector > Ptr
Shared pointer to a Connector.
Definition: connector.hpp:43
wamp::Session::Ptr
std::shared_ptr< Session > Ptr
Shared pointer to a Session.
Definition: session.hpp:117
wamp::Topic
Provides the topic URI and other options contained within WAMP ‘SUBSCRIBE’ messages.
Definition: peerdata.hpp:331
wamp::Procedure
Contains the procedure URI and other options contained within WAMP REGISTER messages.
Definition: peerdata.hpp:492
wamp::CoroSession::leave
void leave(AsyncHandler< Reason > handler)
Asynchronously leaves the WAMP session.
Definition: corosession.hpp:423
wamp
Definition: anyhandler.hpp:36
wamp::Connector
Abstract base class for establishing client transport endpoints.
Definition: connector.hpp:39
wamp::CoroSession::subscribe
void subscribe(Topic topic, EventSlot slot, AsyncHandler< Subscription > handler)
Asynchronously subscribes to WAMP pub/sub events having the given topic.
Definition: corosession.hpp:516
wamp::CoroSession::enroll
void enroll(Procedure procedure, CallSlot slot, AsyncHandler< Registration > handler)
Asynchronously registers a WAMP remote procedure call.
Definition: corosession.hpp:707
wamp::Subscription
Represents a pub/sub event subscription.
Definition: subscription.hpp:41
wamp::CallChit
Lightweight token representing a call request.
Definition: chits.hpp:29
wamp::CoroSession::unsubscribe
void unsubscribe(const Subscription &sub)
Unsubscribes a subscription to a topic.
Definition: corosession.hpp:567
wamp::CoroSession::Ptr
std::shared_ptr< CoroSession > Ptr
Shared pointer to a CoroSession.
Definition: corosession.hpp:62
wamp::Rpc
Contains the procedure URI, options, and payload contained within WAMP CALL messages.
Definition: peerdata.hpp:535
wamp::CoroSession::join
void join(Realm realm, AsyncHandler< SessionInfo > handler)
Asynchronously attempts to join the given WAMP realm.
Definition: corosession.hpp:362
wamp::Outcome
Contains the outcome of an RPC invocation.
Definition: peerdata.hpp:709
wamp::PublicationId
int64_t PublicationId
Ephemeral ID associated with an event publication.
Definition: wampdefs.hpp:25
wamp::Pub
Provides the topic URI, options, and payload contained within WAMP PUBLISH messages.
Definition: peerdata.hpp:363
wamp::AsyncHandler
std::function< void(ErrorOr< T >)> AsyncHandler
Type alias for a handler taking an ErrorOr<T> parameter.
Definition: erroror.hpp:495
wamp::Result
Contains the remote procedure result options/payload within WAMP RESULT and YIELD messages.
Definition: peerdata.hpp:646
wamp::Registration
Represents a remote procedure registration.
Definition: registration.hpp:42
wamp::ConnectorList
std::vector< Connector::Ptr > ConnectorList
List of Connector objects to use when attempting connection.
Definition: connector.hpp:74
wamp::Session
Session API used by a client peer in WAMP applications.
Definition: session.hpp:110
wamp::SessionInfo
Session information contained within WAMP WELCOME messages.
Definition: peerdata.hpp:97
wamp::CallChit::requestId
RequestId requestId() const
Obtains the request ID associated with the call.
Definition: chits.ipp:20
wamp::Invocation
Contains payload arguments and other options within WAMP INVOCATION messages.
Definition: peerdata.hpp:799
wamp::SessionState
SessionState
Enumerates the possible states that a client or router session can be in.
Definition: wampdefs.hpp:34
wamp::Event
Provides the subscription/publication ids, options, and payload contained within WAMP EVENT messages.
Definition: peerdata.hpp:427
wamp::CoroSession::publish
void publish(Pub pub)
Publishes an event.
Definition: corosession.hpp:636
wamp::CoroSession::create
static Ptr create(AnyIoExecutor exec, const Connector::Ptr &connector)
Creates a new CoroSession instance.
Definition: corosession.hpp:273