CppWAMP
C++11 client library for the WAMP protocol
Remote Procedure Calls

Calling RPCs

The call operation is used to call remote procedures. It takes a wamp::Rpc object as a parameter, which contains the following information related to the call:

  • the procedure URI to call,
  • positional wamp::Variant arguments, if applicable,
  • a wamp::Object containing keyword arguments, if applicable, and,
  • an wamp::Options dictionary for advanced WAMP features

call returns a wamp::Result object, which contains the result payload and other details returned by the callee.

using namespace wamp;
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(ioctx, {tcpJson, tcpPack});
session->connect(yield).value();
session->join(Realm("somerealm"), yield).value();
// Call a remote procedure that takes no arguments
Result result = session->call(Rpc("getElapsed"), yield).value();
int elapsed = 0;
result.convertTo(elapsed); // Will throw error::Conversion
// if conversion fails
std::cout << "The elapsed number of seconds is " << elapsed << "\n";
// Call a remote procedure that takes two positional arguments
auto sum = session->call(Rpc("add").withArgs(12, 34), yield).value();
std::cout << "12 + 34 is " << sum[0] << "\n";
// Call a remote procedure that takes a single keyword argument and
// does not return any result. Also enable the 'disclose_me'
// advanced feature.
session->call( Rpc("setProperties").withKwargs({{"name", "John"}})
.withDiscloseMe(),
yield ).value();
});

Call Slots

CppWamp allows you to register callback functions that will be executed whenever a procedure with a certain name is invoked remotely by a dealer. Such callback functions are named call slots. Call slots can be any callable entity that can be stored into a std::function:

  • free functions,
  • member functions (via std::bind),
  • function objects,
  • lambdas, etc.

Registering Procedures

enroll() is used to register a remote procedure call slot:

Registration reg = session->enroll(Procedure("name"), slot, yield);

where:

  • Registration is a lightweight object that can be later used to unregister the procedure,
  • Procedure is an object containing the procedure URI, plus other optional information related to advanced WAMP features, and,
  • slot is the handler function to be invoked for the registered RPC.

enroll expects a call slot with the following signature:

where:

Example:

using namespace wamp;
Outcome addToCart(Invocation inv)
{
std::string item;
int cost;
inv.convertTo(item, cost); // throws if conversions fail
// Process invocation...
int total = ...
return {total}; // Send a single positional argument back to the caller
}
// Within coroutine
auto reg = session->enroll(
Procedure("addToCart").withDiscloseCaller(),
&addToCart,
yield).value();
// :::
// Unregistering
reg.unregister();

wamp::deferment may be returned in situations where the result cannot be computed immediately within the asynchronous context of the RPC handler. A use case for this would be an RPC handler that needs to invoke the RPC of another service to complete the request. See Registrations/Deferred Outcomes for an example.

For a more in-depth discussion of registrations in general, consult the Registrations page.

Registering Statically-Typed Procedures

Certain remote procedures may expect a fixed number of statically-typed arguments. For these situations, the wamp::simpleRpc template function can be used:

// :::
using namespace wamp;
session->enroll(Procedure("name"),
simpleRpc<ReturnType, StaticTypeList...>(slot),
yield).value();

where:

  • ReturnType is the type returned by the RPC (may be void)
  • StaticTypeList... is a parameter pack of static types
  • slot has the signature: ``` ReturnType function(StaticTypeList...) ```

Example:

float sum(float x, float y)
{
return x + y;
}
// Within coroutine
using namespace wamp;
session->enroll(Procedure("sum"),
simpleRpc<float, float, float>(&sum), yield).value();
// ^^^^^^^^^^^^^^^^^^^
// Note that these types match the ones in the 'sum' signature
// (return type + parameters).

If the wamp::Invocation object is required within a statically-typed RPC, or if the RPC must return a wamp::Outcome, then the wamp::unpackedRpc template function can be used instead:

// :::
using namespace wamp;
Registration reg = session->enroll(Procedure("name"),
unpackedRpc<StaticTypeList...>(slot),
yield).value();

where:

Example:

Outcome sum(Invocation inv, float x, float y)
{
return {x + y};
}
// Within coroutine
using namespace wamp;
session->enroll(Procedure("sum"),
unpackedRpc<float, float>(&sum), yield).value();
// ^^^^^^^^^^^^
// Note that these types match the ones in the 'sum' signature
// (following the Invocation parameter).

Registering Coroutine Procedures

If the remote procedure must perform it's work in an asynchronous fashion within the context of a coroutine, then wamp::simpleCoroRpc or wamp::unpackedCoroRpc may be used:

using namespace wamp;
std::string getName(std::string id, boost::asio::yield_context yield)
{
// Perform asynchronous operation within RPC handler
auto result = session->call(Rpc("db.lookup").withArgs(id), yield);
return result.kwargs["name"];
}
// Within coroutine
session->enroll(Procedure("getName"),
simpleCoroRpc<std::string, std::string>(&getName),
yield).value();
Note
This will spawn a new coroutine every time the RPC is invoked.

Returning an ERROR Response to the Callee

A wamp::Error object can be returned from a call slot, which results in an ERROR message being sent back to the callee. This would be used, for example, when invalid arguments are passed to a remote procedure:

using namespace wamp;
Outcome addToCart(Invocation inv)
{
std::string item;
int cost = 0;
try
{
inv.convertTo(item, cost);
}
catch (const error::Conversion& e)
{
// Send an ERROR back to caller
return Error("wamp.error.invalid_argument")
.withArgs("expected [String, Int] arguments") );
}
// Process invocation...
int total = computeTotal(currentTotal, item, cost);
return {total}; // Send result back to caller
}

Alternatively, you can throw an Error object from a call slot:

using namespace wamp;
Outcome addToCart(Invocation inv)
{
std::string item;
int cost = 0;
try
{
inv.convertTo(item, cost);
}
catch (const error::Conversion& e)
{
// Send an ERROR back to caller
throw Error("wamp.error.invalid_argument")
.withArgs("expected [String, Int] arguments") );
}
// Process invocation...
int total = computeTotal(currentTotal, item, cost);
return {total}; // Send result back to caller
}

Capturing an ERROR Response from the Caller

When calling an RPC, ERROR messages returned by the caller can be captured via the wamp::Rpc::captureError option:

using namespace wamp;
Error error;
try
{
session->call(Rpc("foo").withArgs(42)
.captureError(error),
yield).value();
}
catch (error::Failure& e)
{
if (!error)
{
std::cout << "Got error URI: " << error.reason() << "\n"
<< "with args: " << error.args() << "\n";
}
}
Note
Even if Rpc::captureError is not used, a call operation will still fail if the callee returns an ERROR message. It's just that the details within the ERROR message will be discarded and remain unknown to the caller.

Scoped Registrations

A wamp::ScopedRegistration object can be used to limit an RPC registration's lifetime to a particular scope. When a ScopedRegistration object is destroyed, it automatically unregisters the RPC. This helps in automating the lifetime management of RPC registrations using RAII techniques.

using namespace wamp;
{
ScopedRegistration reg = session->enroll(Procedure("rpc"), &rpc, yield);
}
// "rpc" is automatically unregistered when the 'reg' scoped registration
// goes out of scope here.

For a more practical example of using scoped registrations, see Registrations/Scoped Registrations.

Cancelling RPCs

If supported by the router, a caller may cancel an RPC in progress via any of the following methods:

  1. Use the Session::call overloads taking a wamp::CallChit out parameter, then call CallChit::cancel.
  2. Use wamp::Session::cancel, passing a wamp::CallChit.
  3. Bind a Boost.Asio cancellation slot to the completion token passed to Session::call.

A callee wanting to make an RPC interruptible may pass an additional interruptSlot argument to the enroll method.

Example interruptible callee:

using namespace wamp;
// This is not meant to be efficient or scalable! A more sophisticated example
// would use a single timer object to implement a deadline queue.
std::map<RequestId, std::shared_ptr<boost::asio::steady_timer>> timers;
std::string delayedEcho(std::string message, boost::asio::yield_context yield)
{
int seconds;
inv.convertTo(message, seconds);
auto timer = std::make_shared<boost::asio::steady_timer>(ioctx);
timers.emplace(inv.requestId(), timer);
timer->expires_from_now(std::chrono::seconds(seconds);
boost::system::error_code ec;
timer->async_wait(yield[ec]);
timers.erase(inv.requestId());
if (ec = boost::asio::error::operation_aborted)
throw Error{"myapp.remind_me.aborted"};
return message;
}
void cancelEcho(Interruption intr)
{
auto found = timers.find(intr.requestId());
if (found != timers.end())
found->second->cancel();
}
void run(AsioContext& ioctx)
{
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(ioctx, tcp);
session->connect(yield).value();
session->join(Realm("somerealm"), yield).value();
// Note the additional InterruptSlot argument for enroll.
auto reg = session->enroll(
Procedure("remind_me"),
simpleCoroRpc<std::string, std::string>(&delayedEcho),
&cancelEcho,
yield).value();
// Etc.
});
}

Example caller issuing a call cancellation via wamp::CallChit:

using namespace wamp;
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(ioctx, tcp);
session->connect(yield).value();
session->join(Realm("somerealm"), yield).value();
// Call a remote procedure that takes a while to respond.
// Note that the overload taking a CallChit out parameter is used.
CallChit chit;
session->call(
Rpc("getElapsed").withArgs("hello", 10),
chit,
[](ErrorOr<Result> result)
{
if (result)
std::cout << "Result: " << result.value() << "\n";
else
std::cout << "Error: " << result.errorInfo() << "\n";
});
// Cancel the RPC before it completes.
boost::asio::steady_timer timer(ioctx, std::chrono::seconds(5));
timer.async_wait(yield);
chit.cancel(CancelMode::kill);
});

Example caller issuing a call cancellation via a boost::asio::cancellation_signal

using namespace wamp;
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(ioctx, tcp);
session->connect(yield).value();
session->join(Realm("somerealm"), yield).value();
// Prepare a cancellation signal that will be used to generate a
// cancellation slot as well as triggerring the cancellation.
boost::asio::cancellation_signal cancelSignal;
// Start a timer that will cancel the call in 5 seconds.
boost::asio::steady_timer timer(ioctx, std::chrono::seconds(5));
timer.async_wait(
[&cancelSignal](boost::system::error_code)
{
// The cancellation type is ignored by CppWAMP
cancelSignal.emit(boost::asio::cancellation_type::total);
});
// Call a remote procedure using a cancellation slot bound to a stackful
// coroutine completion token. Note the use of Rpc::withCancelMode which
// specifies the default cancellation mode to use.
auto result = session->call(
Rpc("getElapsed").withArgs("hello", 10)
.withCancelMode(CancelMode::kill),
boost::asio::bind_cancellation_slot(cancelSignal.slot(), yield);
if (result)
std::cout << "Result: " << result.value() << "\n";
else
std::cout << "Error: " << result.errorInfo() << "\n";
});

Call Timeouts

A remote procedure call can be set up to automatically cancel itself if a result is not received within a certain time period. This is done using either:

The withDealerTimeout option requires that the router support call timeouts. The withCallerTimeout option only requires that the router support call cancellation.

With both methods, if timeout occurs, the callee will receive an INTERRUPT message and the call handler will be passed the wamp::SessionErrc::cancelled error code.

Progressive Calls Results

Progressive calls results is a WAMP feature that allows a sequence of partial results to be returned from the same remote procedure call.

A callee wanting to provide progressive call results must:

It may also be desirable to pass an interruption handler to the session enroll method, so that the callee may stop yielding partial results in the event that the call is cancelled (see Progressive Call Result Cancellation in the WAMP specification).

The following example shows a callee registering an RPC that yields progressive results at fixed time intervals:

using namespace wamp;
std::set<RequestId> interruptions;
Outcome countdown(Invocation inv)
{
// The first positional argument contains the count.
unsigned count = 0;
inv.convertTo(count);
boost::asio::spawn(
inv.executor(),
[inv, &count](boost::asio::yield_context yield)
{
boost::asio::steady_timer timer(inv.executor());
auto deadline = std::chrono::steady_clock::now();
while (count > 0)
{
// Break out of the loop if this request was interrupted.
auto found = interruptions.find(inv.requestId());
if (found != interruptions.end())
{
interruptions.erase(found);
return;
}
// Delay for one second.
deadline += std::chrono::seconds(1);
timer.expires_at(deadline);
timer.async_wait(yield);
// Yield intermediate results with the "progess" option.
inv.yield(Result({count}).withProgress());
--count;
}
// Yield the final result without the "progess" option.
inv.yield(Result({count}));
});
return deferment;
}
Outcome interrupt(Interruption intr)
{
// Make the coroutine in the countdown function break out of its loop.
interruptions.insert(intr.requestId());
return Error("wamp.error.canceled");
}
int main()
{
AsioContext ioctx;
auto tcp = connector<Json>(ioctx, TcpHost("localhost", 12345));
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(ioctx, tcp);
session->connect(yield).value();
session->join(Realm("somerealm"), yield).value();
session->enroll(Procedure("countdown"), &countdown, &interrupt,
yield).value();
});
ioctx.run();
return 0;
}

A caller wanting to call an RPC with progressive call results must:

The following example shows a caller that calls the RPC from the previous example:

int main()
{
using namespace wamp;
AsioContext ioctx;
auto tcp = connector<Json>(ioctx, TcpHost("localhost", 12345));
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(ioctx, tcp);
session->connect(yield).value();
session->join(Realm("somerealm"), yield).value();
// The handler will remain in the Session's internal records
// until either a non-progressive RESULT or an ERROR is received.
session->call(Rpc("countdown").withProgressiveResults(),
{
unsigned n = 0;
r.value().convertTo(n);
std::cout << "Tick: " << n << "\n";
});
});
ioctx.run();
return 0;
}

Next: Publish-Subscribe

wamp::Session::create
static Ptr create(AnyIoExecutor exec, const Connector::Ptr &connector)
Creates a new Session instance.
Definition: session.ipp:22
wamp::Realm
Realm URI and other options contained within WAMP HELLO messages.
Definition: peerdata.hpp:59
wamp::Interruption::requestId
RequestId requestId() const
Returns the request ID associated with this interruption.
Definition: peerdata.ipp:1069
wamp::Invocation::yield
void yield(Result result=Result()) const
Manually sends a YIELD result back to the callee.
Definition: peerdata.ipp:939
wamp::Interruption
Contains details within WAMP INTERRUPT messages.
Definition: peerdata.hpp:916
wamp::error::Failure
General purpose runtime exception that wraps a std::error_code.
Definition: error.hpp:52
wamp::Error
Provides the reason URI, options, and payload arguments contained within WAMP ERROR messages.
Definition: peerdata.hpp:292
wamp::Procedure
Contains the procedure URI and other options contained within WAMP REGISTER messages.
Definition: peerdata.hpp:492
wamp::TcpHost
Contains TCP host address information, as well as other socket options.
Definition: tcphost.hpp:103
wamp::deferment
constexpr Deferment deferment
Convenient value of the wamp::Deferment tag type that can be passed to the wamp::Outcome constructor.
Definition: peerdata.hpp:703
wamp::ErrorOr::value
value_type & value() &
Checked access of the stored value.
Definition: erroror.hpp:257
wamp
Definition: anyhandler.hpp:36
unpacker.hpp
Contains utilities for unpacking positional arguments passed to event slots and call slots.
wamp::ScopedRegistration
Limits a Registration's lifetime to a particular scope.
Definition: registration.hpp:89
wamp::CallChit
Lightweight token representing a call request.
Definition: chits.hpp:29
wamp::Invocation::requestId
RequestId requestId() const
Returns the request ID associated with this RPC invocation.
Definition: peerdata.ipp:925
wamp::Rpc
Contains the procedure URI, options, and payload contained within WAMP CALL messages.
Definition: peerdata.hpp:535
wamp::Outcome
Contains the outcome of an RPC invocation.
Definition: peerdata.hpp:709
wamp::CallChit::cancel
void cancel() const
Requests cancellation of the call using the cancel mode that was specified in the Rpc.
Definition: chits.ipp:26
wamp::Result
Contains the remote procedure result options/payload within WAMP RESULT and YIELD messages.
Definition: peerdata.hpp:646
wamp::Registration
Represents a remote procedure registration.
Definition: registration.hpp:42
wamp::ErrorOr
Minimalistic implementation of std::expected<T, std::error_code>
Definition: erroror.hpp:128
wamp::error::Conversion
Exception type thrown when converting a Variant to an invalid type.
Definition: error.hpp:106
wamp::Invocation
Contains payload arguments and other options within WAMP INVOCATION messages.
Definition: peerdata.hpp:799
wamp::Invocation::executor
AnyIoExecutor executor() const
Obtains the executor used to execute user-provided handlers.
Definition: peerdata.ipp:932
wamp::AsioContext
boost::asio::io_context AsioContext
Queues and runs I/O completion handlers.
Definition: asiodefs.hpp:34