7 #ifndef CPPWAMP_INTERNAL_CLIENT_HPP
8 #define CPPWAMP_INTERNAL_CLIENT_HPP
15 #include <system_error>
16 #include <type_traits>
18 #include <boost/asio/post.hpp>
19 #include "../anyhandler.hpp"
20 #include "../registration.hpp"
21 #include "../subscription.hpp"
22 #include "../version.hpp"
23 #include "callertimeout.hpp"
24 #include "clientinterface.hpp"
36 template <
typename TCodec,
typename TTransport>
37 class Client :
public ClientInterface,
public Peer<TCodec, TTransport>
40 using Ptr = std::shared_ptr<Client>;
41 using WeakPtr = std::weak_ptr<Client>;
43 using Transport = TTransport;
44 using TransportPtr = std::shared_ptr<Transport>;
47 template <
typename TValue>
50 static Ptr create(TransportPtr&& transport)
52 return Ptr(
new Client(std::move(transport)));
55 ~Client()
override {terminate();}
57 State state()
const override {
return Base::state();}
59 IoStrand strand()
const override {
return Base::strand();}
61 void join(Realm&& realm, CompletionHandler<SessionInfo>&& handler)
override
66 CompletionHandler<SessionInfo> handler;
70 void operator()(std::error_code ec, Message reply)
73 if (me.checkError(ec, handler))
75 if (reply.type() == WampMsgType::welcome)
76 me.onWelcome(std::move(handler), std::move(reply),
79 me.onJoinAborted(std::move(handler), std::move(reply),
86 .withOption(
"roles", roles());
88 this->request(realm.message({}),
89 Requested{shared_from_this(), std::move(handler),
90 realm.uri(), realm.abort({})});
93 void authenticate(Authentication&& auth)
override
95 this->send(auth.message({}));
98 void safeAuthenticate(Authentication&& auth)
override
102 std::weak_ptr<Client>
self;
107 auto me =
self.lock();
109 me->authenticate(std::move(auth));
113 boost::asio::dispatch(strand(),
114 Dispatched{shared_from_this(), std::move(auth)});
117 void leave(Reason&& reason, CompletionHandler<Reason>&& handler)
override
122 CompletionHandler<Reason> handler;
124 void operator()(std::error_code ec, Message reply)
127 me.readership_.clear();
128 me.registry_.clear();
129 if (me.checkError(ec, handler))
131 auto& goodBye = message_cast<GoodbyeMessage>(reply);
132 me.dispatchUserHandler(handler,
133 Reason({}, std::move(goodBye)));
139 timeoutScheduler_->clear();
140 auto self = this->shared_from_this();
141 Base::adjourn(reason,
142 Adjourned{shared_from_this(), std::move(handler)});
145 void disconnect()
override
147 pendingInvocations_.clear();
148 timeoutScheduler_->clear();
152 void terminate()
override
154 initialize({}, {}, {}, {}, {});
155 pendingInvocations_.clear();
156 timeoutScheduler_->clear();
160 void subscribe(Topic&& topic, EventSlot&& slot,
161 CompletionHandler<Subscription>&& handler)
override
166 SubscriptionRecord rec;
167 CompletionHandler<Subscription> handler;
169 void operator()(std::error_code ec, Message reply)
172 if (me.checkReply(WampMsgType::subscribed, ec, reply,
175 const auto& subMsg = message_cast<SubscribedMessage>(reply);
176 auto subId = subMsg.subscriptionId();
177 auto slotId = me.nextSlotId();
178 Subscription sub(
self, subId, slotId, {});
179 me.topics_.emplace(rec.topicUri, subId);
180 me.readership_[subId][slotId] = std::move(rec);
181 me.dispatchUserHandler(handler, std::move(sub));
188 SubscriptionRecord rec = {topic.uri(), move(slot)};
190 auto kv = topics_.find(rec.topicUri);
191 if (kv == topics_.end())
193 auto self = this->shared_from_this();
196 Requested{shared_from_this(), move(rec), move(handler)});
200 auto subId = kv->second;
201 auto slotId = nextSlotId();
202 Subscription sub{this->shared_from_this(), subId, slotId, {}};
203 readership_[subId][slotId] = move(rec);
204 dispatchUserHandler(handler, move(sub));
208 void unsubscribe(
const Subscription& sub)
override
210 auto kv = readership_.find(sub.id());
211 if (kv != readership_.end())
213 auto& localSubs = kv->second;
214 if (!localSubs.empty())
216 auto subKv = localSubs.find(sub.slotId({}));
217 if (subKv != localSubs.end())
219 if (localSubs.size() == 1u)
220 topics_.erase(subKv->second.topicUri);
222 localSubs.erase(subKv);
223 if (localSubs.empty())
225 readership_.erase(kv);
226 sendUnsubscribe(sub.id());
233 void unsubscribe(
const Subscription& sub,
234 CompletionHandler<bool>&& handler)
override
236 bool unsubscribed =
false;
237 auto kv = readership_.find(sub.id());
238 if (kv != readership_.end())
240 auto& localSubs = kv->second;
241 if (!localSubs.empty())
243 auto subKv = localSubs.find(sub.slotId({}));
244 if (subKv != localSubs.end())
247 if (localSubs.size() == 1u)
248 topics_.erase(subKv->second.topicUri);
250 localSubs.erase(subKv);
251 if (localSubs.empty())
253 readership_.erase(kv);
254 sendUnsubscribe(sub.id(), std::move(handler));
261 postVia(userExecutor(), std::move(handler), unsubscribed);
265 void safeUnsubscribe(
const Subscription& sub)
override
267 auto self = std::weak_ptr<Client>(shared_from_this());
268 boost::asio::dispatch(
270 [
self, sub]()
mutable
272 auto me =
self.lock();
274 me->unsubscribe(std::move(sub));
278 void publish(Pub&& pub)
override
280 this->send(pub.message({}));
283 void publish(Pub&& pub, CompletionHandler<PublicationId>&& handler)
override
288 CompletionHandler<PublicationId> handler;
290 void operator()(std::error_code ec, Message reply)
293 if (me.checkReply(WampMsgType::published, ec, reply,
296 const auto& pubMsg = message_cast<PublishedMessage>(reply);
297 me.dispatchUserHandler(handler, pubMsg.publicationId());
302 pub.withOption(
"acknowledge",
true);
303 auto self = this->shared_from_this();
304 this->request(pub.message({}),
305 Requested{shared_from_this(), std::move(handler)});
308 void enroll(Procedure&& procedure, CallSlot&& callSlot,
309 InterruptSlot&& interruptSlot,
310 CompletionHandler<Registration>&& handler)
override
315 RegistrationRecord rec;
316 CompletionHandler<Registration> handler;
318 void operator()(std::error_code ec, Message reply)
321 if (me.checkReply(WampMsgType::registered, ec, reply,
324 const auto& regMsg = message_cast<RegisteredMessage>(reply);
325 auto regId = regMsg.registrationId();
326 Registration reg(
self, regId, {});
327 me.registry_[regId] = std::move(rec);
328 me.dispatchUserHandler(handler, std::move(reg));
334 RegistrationRecord rec{ move(callSlot), move(interruptSlot) };
335 auto self = this->shared_from_this();
336 this->request(procedure.message({}),
337 Requested{shared_from_this(), move(rec), move(handler)});
340 void unregister(
const Registration& reg)
override
346 void operator()(std::error_code ec, Message reply)
350 self->warnReply(WampMsgType::unregistered, ec, reply,
355 auto kv = registry_.find(reg.id());
356 if (kv != registry_.end())
359 if (state() == State::established)
361 auto self = this->shared_from_this();
362 UnregisterMessage msg(reg.id());
363 this->request(msg, Requested{shared_from_this()});
368 void unregister(
const Registration& reg,
369 CompletionHandler<bool>&& handler)
override
374 CompletionHandler<bool> handler;
376 void operator()(std::error_code ec, Message reply)
379 if (me.checkReply(WampMsgType::unregistered, ec, reply,
382 me.dispatchUserHandler(handler,
true);
388 "Session is not established");
389 auto kv = registry_.find(reg.id());
390 if (kv != registry_.end())
393 auto self = this->shared_from_this();
394 UnregisterMessage msg(reg.id());
396 Requested{shared_from_this(), std::move(handler)});
400 postVia(userExecutor(), std::move(handler),
false);
404 void safeUnregister(
const Registration& reg)
override
406 auto self = std::weak_ptr<Client>(shared_from_this());
407 boost::asio::dispatch(
409 [
self, reg]()
mutable
411 auto me =
self.lock();
413 me->unregister(std::move(reg));
417 CallChit oneShotCall(Rpc&& rpc,
418 CompletionHandler<Result>&& handler)
override
424 CompletionHandler<Result> handler;
426 void operator()(std::error_code ec, Message reply)
429 if (me.checkReply(WampMsgType::result, ec, reply,
432 auto& resultMsg = message_cast<ResultMessage>(reply);
433 me.dispatchUserHandler(handler,
434 Result({}, std::move(resultMsg)));
439 auto self = this->shared_from_this();
441 boost::asio::get_associated_cancellation_slot(handler);
442 auto requestId = this->request(
444 Requested{shared_from_this(), rpc.error({}), std::move(handler)});
445 CallChit chit{shared_from_this(), requestId, rpc.cancelMode(), {}};
447 if (cancelSlot.is_connected())
450 [chit](boost::asio::cancellation_type_t) {chit.cancel();});
453 if (rpc.callerTimeout().count() != 0)
454 timeoutScheduler_->add(rpc.callerTimeout(), requestId);
459 CallChit ongoingCall(Rpc&& rpc, OngoingCallHandler&& handler)
override
467 OngoingCallHandler handler;
469 void operator()(std::error_code ec, Message reply)
472 if (me.checkReply(WampMsgType::result, ec, reply,
475 auto& resultMsg = message_cast<ResultMessage>(reply);
476 me.dispatchUserHandler(handler,
477 Result({}, std::move(resultMsg)));
482 auto self = this->shared_from_this();
484 boost::asio::get_associated_cancellation_slot(handler);
485 auto requestId = this->ongoingRequest(
487 Requested{shared_from_this(), rpc.error({}), std::move(handler)});
488 CallChit chit{shared_from_this(), requestId, rpc.cancelMode(), {}};
490 if (cancelSlot.is_connected())
493 [chit](boost::asio::cancellation_type_t) {chit.cancel();});
496 if (rpc.callerTimeout().count() != 0)
497 timeoutScheduler_->add(rpc.callerTimeout(), requestId);
504 Base::cancelCall(CallCancellation{reqId, mode});
509 std::weak_ptr<Client>
self{shared_from_this()};
510 boost::asio::dispatch(
512 [
self, reqId, mode]()
514 auto me =
self.lock();
516 me->cancelCall(reqId, mode);
520 void yield(
RequestId reqId, Result&& result)
override
522 if (!result.isProgressive())
523 pendingInvocations_.erase(reqId);
524 this->send(result.yieldMessage({}, reqId));
527 void yield(
RequestId reqId, Error&& error)
override
529 pendingInvocations_.erase(reqId);
530 this->sendError(WampMsgType::invocation, reqId, std::move(error));
533 void safeYield(
RequestId reqId, Result&& result)
override
537 std::weak_ptr<Client>
self;
543 auto me =
self.lock();
545 me->yield(reqId, std::move(result));
549 boost::asio::dispatch(
551 Dispatched{shared_from_this(), reqId, std::move(result)});
554 void safeYield(
RequestId reqId, Error&& error)
override
558 std::weak_ptr<Client>
self;
564 auto me =
self.lock();
566 me->yield(reqId, std::move(error));
570 boost::asio::dispatch(
572 Dispatched{shared_from_this(), reqId, std::move(error)});
577 LogHandler warningHandler,
578 LogHandler traceHandler,
579 StateChangeHandler stateChangeHandler,
580 ChallengeHandler challengeHandler)
override
582 Base::setUserExecutor(std::move(userExecutor));
583 warningHandler_ = std::move(warningHandler);
584 Base::setTraceHandler(std::move(traceHandler));
585 Base::setStateChangeHandler(std::move(stateChangeHandler));
586 challengeHandler_ = std::move(challengeHandler);
589 void setWarningHandler(LogHandler handler)
override
591 warningHandler_ = std::move(handler);
594 void setTraceHandler(LogHandler handler)
override
596 Base::setTraceHandler(std::move(handler));
599 void setStateChangeHandler(StateChangeHandler handler)
override
601 Base::setStateChangeHandler(std::move(handler));
604 void setChallengeHandler( ChallengeHandler handler)
override
606 challengeHandler_ = std::move(handler);
610 struct SubscriptionRecord
616 struct RegistrationRecord
619 InterruptSlot interruptSlot;
622 using Base = Peer<Codec, Transport>;
623 using WampMsgType = internal::WampMsgType;
624 using Message = internal::WampMessage;
625 using SlotId = uint64_t;
626 using LocalSubs = std::map<SlotId, SubscriptionRecord>;
627 using Readership = std::map<SubscriptionId, LocalSubs>;
628 using TopicMap = std::map<std::string, SubscriptionId>;
629 using Registry = std::map<RegistrationId, RegistrationRecord>;
630 using InvocationMap = std::map<RequestId, RegistrationId>;
631 using CallerTimeoutDuration =
typename Rpc::CallerTimeoutDuration;
633 using Base::userExecutor;
635 Client(TransportPtr transport)
636 : Base(std::move(transport)),
637 timeoutScheduler_(CallerTimeoutScheduler::create(this->strand()))
640 Ptr shared_from_this()
642 return std::static_pointer_cast<Client>( Base::shared_from_this() );
651 void operator()(std::error_code ec, Message reply)
655 self->warnReply(WampMsgType::unsubscribed, ec, reply,
656 SessionErrc::unsubscribeError);
660 if (state() == State::established)
662 auto self = this->shared_from_this();
663 UnsubscribeMessage msg(subId);
664 this->request(msg, Requested{shared_from_this()});
669 CompletionHandler<bool>&& handler)
674 CompletionHandler<bool> handler;
676 void operator()(std::error_code ec, Message reply)
679 if (me.checkReply(WampMsgType::unsubscribed, ec, reply,
680 SessionErrc::unsubscribeError, handler))
682 me.dispatchUserHandler(handler,
true);
688 "Session is not established");
689 auto self = this->shared_from_this();
690 UnsubscribeMessage msg(subId);
691 this->request(msg, Requested{shared_from_this(), std::move(handler)});
694 virtual bool isMsgSupported(
const MessageTraits& traits)
override
696 return traits.isClientRx;
699 virtual void onInbound(Message msg)
override
703 case WampMsgType::challenge:
704 onChallenge(std::move(msg));
707 case WampMsgType::event:
708 onEvent(std::move(msg));
711 case WampMsgType::invocation:
712 onInvocation(std::move(msg));
715 case WampMsgType::interrupt:
716 onInterrupt(std::move(msg));
724 void onWelcome(CompletionHandler<SessionInfo>&& handler, Message&& reply,
727 WeakPtr
self = this->shared_from_this();
728 timeoutScheduler_->listen([
self](
RequestId reqId)
730 auto ptr =
self.lock();
732 ptr->cancelCall(reqId, CallCancelMode::killNoWait);
735 auto& welcomeMsg = message_cast<WelcomeMessage>(reply);
736 SessionInfo info{{}, std::move(realmUri), std::move(welcomeMsg)};
737 dispatchUserHandler(handler, std::move(info));
740 void onJoinAborted(CompletionHandler<SessionInfo>&& handler,
741 Message&& reply, Abort* abortPtr)
745 auto& abortMsg = message_cast<AbortMessage>(reply);
746 const auto& uri = abortMsg.reasonUri();
749 const auto& details = reply.as<
Object>(1);
751 if (abortPtr !=
nullptr)
753 *abortPtr = Abort({}, move(abortMsg));
755 else if (warningHandler_ && (!found || !details.empty()))
757 std::ostringstream oss;
758 oss <<
"JOIN request aborted with error URI=" << uri;
759 if (!reply.as<
Object>(1).empty())
760 oss <<
", Details=" << reply.at(1);
767 void onChallenge(Message&& msg)
769 auto self = this->shared_from_this();
770 auto& challengeMsg = message_cast<ChallengeMessage>(msg);
771 Challenge challenge({},
self, std::move(challengeMsg));
773 if (challengeHandler_)
775 dispatchUserHandler(challengeHandler_, std::move(challenge));
781 std::ostringstream oss;
782 oss <<
"Received a CHALLENGE with no registered handler "
783 "(with method=" << challenge.method() <<
" extra="
784 << challenge.options() <<
")";
789 authenticate(Authentication(
""));
793 void onEvent(Message&& msg)
795 auto& eventMsg = message_cast<EventMessage>(msg);
796 auto kv = readership_.find(eventMsg.subscriptionId());
797 if (kv != readership_.end())
799 const auto& localSubs = kv->second;
800 assert(!localSubs.empty());
801 Event event({}, userExecutor(), std::move(eventMsg));
802 for (
const auto& subKv: localSubs)
803 postEvent(subKv.second, event);
805 else if (warningHandler_)
807 std::ostringstream oss;
808 oss <<
"Received an EVENT that is not subscribed to "
809 "(with subId=" << eventMsg.subscriptionId()
810 <<
" pubId=" << eventMsg.publicationId() <<
")";
815 void postEvent(
const SubscriptionRecord& sub,
const Event& event)
829 auto subId =
event.subId();
830 auto pubId =
event.pubId();
836 slot(std::move(event));
838 catch (
const Error& e)
840 if (me.warningHandler_)
841 me.warnEventError(e, subId, pubId);
843 catch (
const error::BadType& e)
845 if (me.warningHandler_)
846 me.warnEventError(Error(e), subId, pubId);
851 auto exec = boost::asio::get_associated_executor(sub.slot,
853 boost::asio::post(exec, Posted{shared_from_this(), sub.slot,
event});
859 std::ostringstream oss;
860 oss <<
"EVENT handler reported an error: "
862 <<
" (with subId=" << subId
863 <<
" pubId=" << pubId <<
")";
867 void onInvocation(Message&& msg)
869 auto& invMsg = message_cast<InvocationMessage>(msg);
870 auto requestId = invMsg.requestId();
871 auto regId = invMsg.registrationId();
873 auto kv = registry_.find(regId);
874 if (kv != registry_.end())
876 auto self = this->shared_from_this();
877 const RegistrationRecord& rec = kv->second;
878 Invocation inv({},
self, userExecutor(), std::move(invMsg));
879 pendingInvocations_[requestId] = regId;
880 postRpcRequest(rec.callSlot, std::move(inv));
884 this->sendError(WampMsgType::invocation, requestId,
885 Error(
"wamp.error.no_such_procedure"));
889 void onInterrupt(Message&& msg)
891 auto& interruptMsg = message_cast<InterruptMessage>(msg);
892 auto found = pendingInvocations_.find(interruptMsg.requestId());
893 if (found != pendingInvocations_.end())
895 auto registrationId = found->second;
896 pendingInvocations_.erase(found);
897 auto kv = registry_.find(registrationId);
898 if ((kv != registry_.end()) &&
899 (kv->second.interruptSlot !=
nullptr))
901 auto self = this->shared_from_this();
902 const RegistrationRecord& rec = kv->second;
904 Interruption intr({},
self, userExecutor(), move(interruptMsg));
905 postRpcRequest(rec.interruptSlot, move(intr));
910 template <
typename TSlot,
typename TInvocationOrInterruption>
911 void postRpcRequest(TSlot slot, TInvocationOrInterruption&& request)
919 TInvocationOrInterruption request;
926 auto requestId = request.requestId();
930 Outcome outcome(slot(move(request)));
931 switch (outcome.type())
933 case Outcome::Type::deferred:
937 case Outcome::Type::result:
938 me.safeYield(requestId, move(outcome).asResult());
941 case Outcome::Type::error:
942 me.safeYield(requestId, move(outcome).asError());
946 assert(
false &&
"unexpected Outcome::Type");
951 me.yield(requestId, move(error));
953 catch (
const error::BadType& e)
956 me.yield(requestId, Error(e));
961 auto exec = boost::asio::get_associated_executor(slot, userExecutor());
964 Posted{shared_from_this(), move(slot), move(request)});
967 template <
typename THandler>
968 bool checkError(std::error_code ec, THandler& handler)
975 template <
typename THandler>
976 bool checkReply(WampMsgType type, std::error_code ec, Message& reply,
978 Error* errorPtr =
nullptr)
980 bool success = checkError(ec, handler);
983 if (reply.type() == WampMsgType::error)
986 auto& errMsg = message_cast<ErrorMessage>(reply);
987 const auto& uri = errMsg.reasonUri();
990 bool hasArgs = !errMsg.args().empty() ||
991 !errMsg.kwargs().empty();
992 if (errorPtr !=
nullptr)
994 *errorPtr = Error({}, std::move(errMsg));
996 else if (warningHandler_ && (!found || hasArgs))
998 std::ostringstream oss;
999 oss <<
"Expected " << MessageTraits::lookup(type).name
1000 <<
" reply but got ERROR with URI=" << uri;
1001 if (!errMsg.args().empty())
1002 oss <<
", Args=" << errMsg.args();
1003 if (!errMsg.kwargs().empty())
1004 oss <<
", ArgsKv=" << errMsg.kwargs();
1011 assert((reply.type() == type) &&
"Unexpected WAMP message type");
1016 void warnReply(WampMsgType type, std::error_code ec, Message& reply,
1021 warn(error::Failure::makeMessage(ec));
1023 else if (reply.type() == WampMsgType::error)
1025 auto& errMsg = message_cast<ErrorMessage>(reply);
1026 const auto& uri = errMsg.reasonUri();
1027 std::ostringstream oss;
1028 oss <<
"Expected " << MessageTraits::lookup(type).name
1029 <<
" reply but got ERROR with URI=" << uri;
1030 if (!errMsg.args().empty())
1031 oss <<
", Args=" << errMsg.args();
1032 if (!errMsg.kwargs().empty())
1033 oss <<
", ArgsKv=" << errMsg.kwargs();
1038 assert((reply.type() == type) &&
"Unexpected WAMP message type");
1042 void warn(std::string log)
1044 if (warningHandler_)
1045 dispatchUserHandler(warningHandler_, std::move(log));
1048 template <
typename S,
typename... Ts>
1049 void dispatchUserHandler(AnyCompletionHandler<S>& handler, Ts&&... args)
1052 std::forward<Ts>(args)...);
1055 template <
typename S,
typename... Ts>
1056 void dispatchUserHandler(
const AnyReusableHandler<S>& handler, Ts&&... args)
1058 dispatchVia(userExecutor(), handler, std::forward<Ts>(args)...);
1061 SlotId nextSlotId() {
return nextSlotId_++;}
1063 SlotId nextSlotId_ = 0;
1065 Readership readership_;
1067 InvocationMap pendingInvocations_;
1068 CallerTimeoutScheduler::Ptr timeoutScheduler_;
1069 LogHandler warningHandler_;
1070 ChallengeHandler challengeHandler_;
1077 #endif // CPPWAMP_INTERNAL_CLIENT_HPP