CppWAMP
C++11 client library for the WAMP protocol
callertimeout.hpp
1 /*------------------------------------------------------------------------------
2  Copyright Butterfly Energy Systems 2014-2015, 2022.
3  Distributed under the Boost Software License, Version 1.0.
4  http://www.boost.org/LICENSE_1_0.txt
5 ------------------------------------------------------------------------------*/
6 
7 #ifndef CPPWAMP_INTERNAL_CALLER_TIMEOUT_HPP
8 #define CPPWAMP_INTERNAL_CALLER_TIMEOUT_HPP
9 
10 #include <chrono>
11 #include <memory>
12 #include <set>
13 #include <utility>
14 #include <boost/asio/steady_timer.hpp>
15 #include <boost/asio/strand.hpp>
16 #include "../asiodefs.hpp"
17 #include "../peerdata.hpp"
18 
19 namespace wamp
20 {
21 
22 namespace internal
23 {
24 
25 //------------------------------------------------------------------------------
26 struct CallerTimeoutRecord
27 {
28  using Clock = std::chrono::steady_clock;
29  using Duration = Clock::duration;
30  using Timepoint = Clock::time_point;
31 
32  CallerTimeoutRecord() = default;
33 
34  CallerTimeoutRecord(Duration timeout, RequestId rid)
35  : deadline(Clock::now() + timeout),
36  requestId(rid)
37  {}
38 
39  bool operator<(const CallerTimeoutRecord& rhs) const
40  {
41  return deadline < rhs.deadline;
42  }
43 
44  Timepoint deadline;
45  RequestId requestId = 0;
46 };
47 
48 //------------------------------------------------------------------------------
49 class CallerTimeoutScheduler :
50  public std::enable_shared_from_this<CallerTimeoutScheduler>
51 {
52 public:
53  using Duration = std::chrono::steady_clock::duration;
54  using TimeoutHandler = std::function<void (RequestId)>;
55 
56  using Ptr = std::shared_ptr<CallerTimeoutScheduler>;
57 
58  static Ptr create(IoStrand strand)
59  {
60  return Ptr(new CallerTimeoutScheduler(std::move(strand)));
61  }
62 
63  void listen(TimeoutHandler handler)
64  {
65  timeoutHandler_ = std::move(handler);
66  }
67 
68  void add(Duration timeout, RequestId rid)
69  {
70  // The first record represents a deadline being waited on
71  // by the timer.
72 
73  CallerTimeoutRecord rec{timeout, rid};
74  bool wasIdle = deadlines_.empty();
75  bool preemptsCurrentDeadline =
76  !wasIdle && (rec < *deadlines_.begin());
77 
78  deadlines_.insert(rec);
79  if (wasIdle)
80  processNextDeadline();
81  else if (preemptsCurrentDeadline)
82  timer_.cancel();
83  }
84 
85  void remove(RequestId rid)
86  {
87  if (deadlines_.empty())
88  return;
89 
90  auto rec = deadlines_.begin();
91  if (rec->requestId == rid)
92  {
93  deadlines_.erase(rec);
94  timer_.cancel();
95  return;
96  }
97 
98  // The set should be small, so just do a linear search.
99  auto end = deadlines_.end();
100  for (; rec != end; ++rec)
101  {
102  if (rec->requestId == rid)
103  {
104  deadlines_.erase(rec);
105  return;
106  }
107  }
108  }
109 
110  void clear()
111  {
112  timeoutHandler_ = nullptr;
113  deadlines_.clear();
114  timer_.cancel();
115  }
116 
117 private:
118  using WeakPtr = std::weak_ptr<CallerTimeoutScheduler>;
119 
120  explicit CallerTimeoutScheduler(IoStrand strand)
121  : timer_(std::move(strand))
122  {}
123 
124  void processNextDeadline()
125  {
126  auto deadline = deadlines_.begin()->deadline;
127  auto requestId = deadlines_.begin()->requestId;
128  timer_.expires_at(deadline);
129  WeakPtr self(shared_from_this());
130  timer_.async_wait([self, requestId](boost::system::error_code ec)
131  {
132  auto ptr = self.lock();
133  if (ptr)
134  ptr->onTimer(ec, requestId);
135  });
136  }
137 
138  void onTimer(boost::system::error_code ec, RequestId requestId)
139  {
140  if (!deadlines_.empty())
141  {
142  auto top = deadlines_.begin();
143  bool preempted = top->requestId != requestId;
144  if (!preempted)
145  {
146  if (!ec && timeoutHandler_)
147  timeoutHandler_(top->requestId);
148  deadlines_.erase(top);
149  }
150  if (!deadlines_.empty())
151  processNextDeadline();
152  }
153  }
154 
155  std::set<CallerTimeoutRecord> deadlines_;
156  boost::asio::steady_timer timer_;
157  TimeoutHandler timeoutHandler_;
158 };
159 
160 } // namespace internal
161 
162 } // namespace wamp
163 
164 #endif // CPPWAMP_INTERNAL_CALLER_TIMEOUT_HPP
wamp::RequestId
int64_t RequestId
Ephemeral ID associated with a WAMP request.
Definition: wampdefs.hpp:23
wamp::IoStrand
boost::asio::strand< AnyIoExecutor > IoStrand
Serializes I/O operations.
Definition: asiodefs.hpp:41
wamp
Definition: anyhandler.hpp:36