7 #ifndef CPPWAMP_INTERNAL_DIALOGUE_HPP
8 #define CPPWAMP_INTERNAL_DIALOGUE_HPP
18 #include <boost/asio/strand.hpp>
19 #include "../anyhandler.hpp"
20 #include "../codec.hpp"
21 #include "../peerdata.hpp"
22 #include "../error.hpp"
23 #include "../variant.hpp"
24 #include "../wampdefs.hpp"
25 #include "wampmessage.hpp"
37 template <
typename TCodec,
typename TTransport>
38 class Peer :
public std::enable_shared_from_this<Peer<TCodec, TTransport>>
42 using Transport = TTransport;
43 using TransportPtr = std::shared_ptr<Transport>;
46 State state()
const {
return state_.load();}
48 IoStrand strand()
const {
return strand_;}
51 using Message = WampMessage;
52 using OneShotHandler =
54 using MultiShotHandler = std::function<void (std::error_code, Message)>;
55 using LogHandler = AnyReusableHandler<void (std::string)>;
56 using StateChangeHandler = AnyReusableHandler<void (State)>;
58 explicit Peer(TransportPtr&& transport)
59 : strand_(transport->strand()),
60 transport_(std::move(transport)),
64 virtual bool isMsgSupported(
const MessageTraits& traits) = 0;
66 virtual void onInbound(Message msg) = 0;
68 const AnyIoExecutor& userExecutor()
const {
return userExecutor_;}
72 assert(state() == State::closed || state() == State::disconnected);
73 setState(State::establishing);
75 if (!transport_->isStarted())
77 std::weak_ptr<Peer>
self(this->shared_from_this());
82 auto me =
self.lock();
84 me->onTransportRx(std::move(buffer));
86 [
self](std::error_code ec)
88 auto me =
self.lock();
96 void adjourn(Reason& reason, OneShotHandler&& handler)
100 std::shared_ptr<Peer>
self;
101 OneShotHandler handler;
103 void operator()(std::error_code ec, Message reply)
108 me.setState(State::closed);
110 me.post(std::move(handler),
115 me.post(move(handler), ec, Message());
120 assert(state() == State::established);
121 setState(State::shuttingDown);
122 auto self = this->shared_from_this();
123 request(reason.message({}),
124 Requested{this->shared_from_this(), std::move(handler)});
127 void close(
bool terminating)
129 setState(State::disconnected);
134 void send(Message& msg)
139 void sendError(WampMsgType reqType,
RequestId reqId, Error&& error)
141 send(error.errorMessage({}, reqType, reqId));
144 RequestId request(Message& msg, OneShotHandler&& handler)
146 return sendRequest(msg, oneShotRequestMap_, std::move(handler));
149 RequestId ongoingRequest(Message& msg, MultiShotHandler&& handler)
151 return sendRequest(msg, multiShotRequestMap_, std::move(handler));
154 void cancelCall(CallCancellation&& cancellation)
159 RequestKey key{WampMsgType::call, cancellation.requestId()};
162 auto kv = oneShotRequestMap_.find(key);
163 if (kv != oneShotRequestMap_.end())
165 auto handler = std::move(kv->second);
166 oneShotRequestMap_.erase(kv);
168 post(std::move(handler), ec, Message());
172 auto kv = multiShotRequestMap_.find(key);
173 if (kv != multiShotRequestMap_.end())
175 auto handler = std::move(kv->second);
176 multiShotRequestMap_.erase(kv);
178 post(std::move(handler), ec, Message());
184 sendMessage(cancellation.message({}));
187 template <
typename TErrorValue>
188 void fail(TErrorValue errc)
190 fail(make_error_code(errc));
195 userExecutor_ = std::move(exec);
198 void setTraceHandler(LogHandler handler)
200 traceHandler_ = std::move(handler);
203 void setStateChangeHandler(StateChangeHandler handler)
205 stateChangeHandler_ = std::move(handler);
208 template <
typename TFunctor,
typename... TArgs>
209 void post(TFunctor&& fn, TArgs&&... args)
211 boost::asio::post(strand_,
212 std::bind(std::forward<TFunctor>(fn),
213 std::forward<TArgs>(args)...));
217 static constexpr
unsigned progressiveResponseFlag_ = 0x01;
219 using RequestKey =
typename Message::RequestKey;
220 using OneShotRequestMap = std::map<RequestKey, OneShotHandler>;
221 using MultiShotRequestMap = std::map<RequestKey, MultiShotHandler>;
222 using EncoderType = Encoder<Codec, MessageBuffer>;
223 using DecoderType = Decoder<Codec, MessageBuffer>;
225 void setState(State s)
227 auto old = state_.exchange(s);
228 if (old != s && stateChangeHandler_)
229 postVia(userExecutor_, stateChangeHandler_, s);
234 assert(msg.type() != WampMsgType::none);
236 auto requestId = setMessageRequestId(msg);
239 encoder_.encode(msg.fields(), buffer);
240 if (buffer.size() > transport_->maxSendLength())
244 transport_->send(std::move(buffer));
248 template <
typename TRequestMap,
typename THandler>
249 RequestId sendRequest(Message& msg, TRequestMap& requests,
252 assert(msg.type() != WampMsgType::none);
254 auto requestId = setMessageRequestId(msg);
256 encoder_.encode(msg.fields(), buffer);
257 if (buffer.size() > transport_->maxSendLength())
260 auto key = msg.requestKey();
261 auto found = requests.find(key);
262 if (found != requests.end())
264 post(std::move(found->second),
266 requests.erase(found);
269 requests.emplace(msg.requestKey(), std::move(handler));
272 transport_->send(std::move(buffer));
276 RequestId setMessageRequestId(Message& msg)
280 if (msg.hasRequestId())
282 requestId = nextRequestId();
283 msg.setRequestId(requestId);
291 if (nextRequestId_ >= maxRequestId_)
292 nextRequestId_ = nullRequestId();
293 return ++nextRequestId_;
299 if (s == State::establishing || s == State::authenticating ||
300 s == State::established || s == State::shuttingDown)
303 if (checkError(
decode(buffer, v)) &&
307 auto msg = Message::parse(std::move(v.as<
Array>()), ec);
311 if (checkValidMsg(msg.type()))
312 processMessage(std::move(msg));
320 return decoder_.decode(buffer, variant);
323 void processMessage(Message&& msg)
325 if (msg.repliesTo() != WampMsgType::none)
327 processWampReply( RequestKey(msg.repliesTo(), msg.requestId()),
330 else switch(msg.type())
332 case WampMsgType::hello:
333 processHello(std::move(msg));
336 case WampMsgType::challenge:
337 processChallenge(std::move(msg));
340 case WampMsgType::welcome:
341 processWelcome(std::move(msg));
344 case WampMsgType::abort:
345 processAbort(std::move(msg));
348 case WampMsgType::goodbye:
349 processGoodbye(std::move(msg));
352 case WampMsgType::error:
353 processWampReply(msg.requestKey(), std::move(msg));
359 if (state() != State::shuttingDown)
361 auto self = this->shared_from_this();
362 post(&Peer::onInbound,
self, std::move(msg));
368 void processWampReply(
const RequestKey& key, Message&& msg)
370 auto kv = oneShotRequestMap_.find(key);
371 if (kv != oneShotRequestMap_.end())
373 auto handler = std::move(kv->second);
374 oneShotRequestMap_.erase(kv);
380 auto kv = multiShotRequestMap_.find(key);
381 if (kv != multiShotRequestMap_.end())
383 if (msg.isProgressiveResponse())
390 auto handler = std::move(kv->second);
391 multiShotRequestMap_.erase(kv);
392 post(std::move(handler),
399 void processHello(Message&& msg)
401 assert(state() == State::establishing);
402 setState(State::established);
403 auto self = this->shared_from_this();
404 post(&Peer::onInbound,
self, std::move(msg));
407 void processChallenge(Message&& msg)
409 assert(state() == State::establishing);
410 setState(State::authenticating);
411 auto self = this->shared_from_this();
412 post(&Peer::onInbound,
self, std::move(msg));
415 void processWelcome(Message&& msg)
417 assert((state() == State::establishing) ||
418 (state() == State::authenticating));
419 setState(State::established);
420 processWampReply(RequestKey(WampMsgType::hello, 0), std::move(msg));
423 void processAbort(Message&& msg)
425 assert((state() == State::establishing) ||
426 (state() == State::authenticating));
427 setState(State::closed);
428 processWampReply(RequestKey(WampMsgType::hello, 0), std::move(msg));
431 void processGoodbye(Message&& msg)
433 if (state() == State::shuttingDown)
435 setState(State::closed);
436 processWampReply(msg.requestKey(), std::move(msg));
440 const auto& reason = message_cast<GoodbyeMessage>(msg).reasonUri();
443 abortPending(make_error_code(errc));
444 GoodbyeMessage msg(
"wamp.error.goodbye_and_out");
446 setState(State::closed);
450 bool checkError(std::error_code ec)
465 bool checkValidMsg(WampMsgType type)
467 auto traits = MessageTraits::lookup(type);
468 bool valid = isMsgSupported(traits);
476 case State::establishing:
477 valid = traits.forEstablishing;
480 case State::authenticating:
481 valid = traits.forChallenging;
484 case State::established:
485 case State::shuttingDown:
486 valid = traits.forEstablished;
501 void fail(std::error_code ec)
503 setState(State::failed);
508 void abortPending(std::error_code ec,
bool terminating =
false)
512 for (
auto& kv: oneShotRequestMap_)
513 post(std::move(kv.second), ec, Message());
514 for (
auto& kv: multiShotRequestMap_)
515 post(std::move(kv.second), ec, Message());
517 oneShotRequestMap_.clear();
518 multiShotRequestMap_.clear();
521 void trace(
const Message& msg,
bool isTx)
525 std::ostringstream oss;
526 oss << (isTx ?
"Tx" :
"Rx") <<
" message: [";
527 if (!msg.fields().empty())
530 oss <<
"{\"" << msg.nameOr(
"INVALID") <<
"\":"
531 << msg.fields().at(0) <<
"}";
533 for (Array::size_type i=1; i<msg.fields().size(); ++i)
535 oss <<
"," << msg.fields().at(i);
539 dispatchVia(userExecutor_, traceHandler_, oss.str());
545 TransportPtr transport_;
546 EncoderType encoder_;
547 DecoderType decoder_;
548 LogHandler traceHandler_;
549 StateChangeHandler stateChangeHandler_;
550 std::atomic<State> state_;
551 OneShotRequestMap oneShotRequestMap_;
552 MultiShotRequestMap multiShotRequestMap_;
553 RequestId nextRequestId_ = nullRequestId();
555 static constexpr
RequestId maxRequestId_ = 9007199254740992ull;
562 #endif // CPPWAMP_INTERNAL_DIALOGUE_HPP