Calling RPCs
The call operation is used to call remote procedures. It takes a wamp::Rpc object as a parameter, which contains the following information related to the call:
- the procedure URI to call,
- positional wamp::Variant arguments, if applicable,
- a wamp::Object containing keyword arguments, if applicable, and,
- an wamp::Options dictionary for advanced WAMP features
call
returns a wamp::Result object, which contains the result payload and other details returned by the callee.
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
session->connect(yield).value();
session->join(
Realm(
"somerealm"), yield).value();
Result result = session->call(
Rpc(
"getElapsed"), yield).value();
int elapsed = 0;
result.convertTo(elapsed);
std::cout << "The elapsed number of seconds is " << elapsed << "\n";
auto sum = session->call(
Rpc(
"add").withArgs(12, 34), yield).value();
std::cout << "12 + 34 is " << sum[0] << "\n";
session->call(
Rpc(
"setProperties").withKwargs({{
"name",
"John"}})
.withDiscloseMe(),
yield ).value();
});
Call Slots
CppWamp allows you to register callback functions that will be executed whenever a procedure with a certain name is invoked remotely by a dealer. Such callback functions are named call slots. Call slots can be any callable entity that can be stored into a std::function
:
- free functions,
- member functions (via
std::bind
),
- function objects,
- lambdas, etc.
Registering Procedures
enroll() is used to register a remote procedure call slot:
Registration reg = session->enroll(Procedure("name"), slot, yield);
where:
- Registration is a lightweight object that can be later used to unregister the procedure,
- Procedure is an object containing the procedure URI, plus other optional information related to advanced WAMP features, and,
slot
is the handler function to be invoked for the registered RPC.
enroll
expects a call slot with the following signature:
where:
Example:
{
std::string item;
int cost;
inv.convertTo(item, cost);
int total = ...
return {total};
}
auto reg = session->enroll(
&addToCart,
yield).value();
reg.unregister();
wamp::deferment may be returned in situations where the result cannot be computed immediately within the asynchronous context of the RPC handler. A use case for this would be an RPC handler that needs to invoke the RPC of another service to complete the request. See Registrations/Deferred Outcomes for an example.
For a more in-depth discussion of registrations in general, consult the Registrations page.
Registering Statically-Typed Procedures
Certain remote procedures may expect a fixed number of statically-typed arguments. For these situations, the wamp::simpleRpc template function can be used:
simpleRpc<ReturnType, StaticTypeList...>(slot),
yield).value();
where:
- ReturnType is the type returned by the RPC (may be
void
)
- StaticTypeList... is a parameter pack of static types
- slot has the signature: ``` ReturnType function(StaticTypeList...) ```
Example:
float sum(float x, float y)
{
return x + y;
}
simpleRpc<float, float, float>(&sum), yield).value();
If the wamp::Invocation
object is required within a statically-typed RPC, or if the RPC must return a wamp::Outcome
, then the wamp::unpackedRpc template function can be used instead:
unpackedRpc<StaticTypeList...>(slot),
yield).value();
where:
- StaticTypeList... is a parameter pack of static types
- slot has the signature:
Example:
Outcome sum(Invocation inv, float x, float y)
{
return {x + y};
}
unpackedRpc<float, float>(&sum), yield).value();
Registering Coroutine Procedures
If the remote procedure must perform it's work in an asynchronous fashion within the context of a coroutine, then wamp::simpleCoroRpc or wamp::unpackedCoroRpc may be used:
std::string getName(std::string id, boost::asio::yield_context yield)
{
auto result = session->call(
Rpc(
"db.lookup").withArgs(
id), yield);
return result.kwargs["name"];
}
simpleCoroRpc<std::string, std::string>(&getName),
yield).value();
- Note
- This will spawn a new coroutine every time the RPC is invoked.
Returning an ERROR Response to the Callee
A wamp::Error object can be returned from a call slot, which results in an ERROR
message being sent back to the callee. This would be used, for example, when invalid arguments are passed to a remote procedure:
{
std::string item;
int cost = 0;
try
{
inv.convertTo(item, cost);
}
{
return Error(
"wamp.error.invalid_argument")
.withArgs("expected [String, Int] arguments") );
}
int total = computeTotal(currentTotal, item, cost);
return {total};
}
Alternatively, you can throw an Error
object from a call slot:
{
std::string item;
int cost = 0;
try
{
inv.convertTo(item, cost);
}
{
throw Error(
"wamp.error.invalid_argument")
.withArgs("expected [String, Int] arguments") );
}
int total = computeTotal(currentTotal, item, cost);
return {total};
}
Capturing an ERROR Response from the Caller
When calling an RPC, ERROR
messages returned by the caller can be captured via the wamp::Rpc::captureError
option:
try
{
session->call(
Rpc(
"foo").withArgs(42)
.captureError(error),
yield).value();
}
{
if (!error)
{
std::cout << "Got error URI: " << error.reason() << "\n"
<< "with args: " << error.args() << "\n";
}
}
- Note
- Even if
Rpc::captureError
is not used, a call
operation will still fail if the callee returns an ERROR
message. It's just that the details within the ERROR
message will be discarded and remain unknown to the caller.
Scoped Registrations
A wamp::ScopedRegistration object can be used to limit an RPC registration's lifetime to a particular scope. When a ScopedRegistration
object is destroyed, it automatically unregisters the RPC. This helps in automating the lifetime management of RPC registrations using RAII techniques.
For a more practical example of using scoped registrations, see Registrations/Scoped Registrations.
Cancelling RPCs
If supported by the router, a caller may cancel an RPC in progress via any of the following methods:
- Use the Session::call overloads taking a wamp::CallChit out parameter, then call CallChit::cancel.
- Use wamp::Session::cancel, passing a wamp::CallChit.
- Bind a Boost.Asio cancellation slot to the completion token passed to Session::call.
A callee wanting to make an RPC interruptible may pass an additional interruptSlot
argument to the enroll
method.
Example interruptible callee:
std::map<RequestId, std::shared_ptr<boost::asio::steady_timer>> timers;
std::string delayedEcho(std::string message, boost::asio::yield_context yield)
{
int seconds;
inv.convertTo(message, seconds);
auto timer = std::make_shared<boost::asio::steady_timer>(ioctx);
timer->expires_from_now(std::chrono::seconds(seconds);
boost::system::error_code ec;
timer->async_wait(yield[ec]);
if (ec = boost::asio::error::operation_aborted)
throw Error{
"myapp.remind_me.aborted"};
return message;
}
{
if (found != timers.end())
found->second->cancel();
}
{
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
session->connect(yield).value();
session->join(
Realm(
"somerealm"), yield).value();
auto reg = session->enroll(
simpleCoroRpc<std::string, std::string>(&delayedEcho),
&cancelEcho,
yield).value();
});
}
Example caller issuing a call cancellation via wamp::CallChit:
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
session->connect(yield).value();
session->join(
Realm(
"somerealm"), yield).value();
session->call(
Rpc(
"getElapsed").withArgs(
"hello", 10),
chit,
{
if (result)
std::cout << "Result: " << result.value() << "\n";
else
std::cout << "Error: " << result.errorInfo() << "\n";
});
boost::asio::steady_timer timer(ioctx, std::chrono::seconds(5));
timer.async_wait(yield);
chit.
cancel(CancelMode::kill);
});
Example caller issuing a call cancellation via a boost::asio::cancellation_signal
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
session->connect(yield).value();
session->join(
Realm(
"somerealm"), yield).value();
boost::asio::cancellation_signal cancelSignal;
boost::asio::steady_timer timer(ioctx, std::chrono::seconds(5));
timer.async_wait(
[&cancelSignal](boost::system::error_code)
{
cancelSignal.emit(boost::asio::cancellation_type::total);
});
auto result = session->call(
Rpc(
"getElapsed").withArgs(
"hello", 10)
.withCancelMode(CancelMode::kill),
boost::asio::bind_cancellation_slot(cancelSignal.slot(), yield);
if (result)
std::cout << "Result: " << result.value() << "\n";
else
std::cout << "Error: " << result.errorInfo() << "\n";
});
Call Timeouts
A remote procedure call can be set up to automatically cancel itself if a result is not received within a certain time period. This is done using either:
The withDealerTimeout
option requires that the router support call timeouts. The withCallerTimeout
option only requires that the router support call cancellation.
With both methods, if timeout occurs, the callee will receive an INTERRUPT
message and the call handler will be passed the wamp::SessionErrc::cancelled error code.
Progressive Calls Results
Progressive calls results is a WAMP feature that allows a sequence of partial results to be returned from the same remote procedure call.
A callee wanting to provide progressive call results must:
It may also be desirable to pass an interruption handler to the session enroll method, so that the callee may stop yielding partial results in the event that the call is cancelled (see Progressive Call Result Cancellation in the WAMP specification).
The following example shows a callee registering an RPC that yields progressive results at fixed time intervals:
std::set<RequestId> interruptions;
{
unsigned count = 0;
inv.convertTo(count);
boost::asio::spawn(
[inv, &count](boost::asio::yield_context yield)
{
boost::asio::steady_timer timer(inv.executor());
auto deadline = std::chrono::steady_clock::now();
while (count > 0)
{
auto found = interruptions.find(inv.requestId());
if (found != interruptions.end())
{
interruptions.erase(found);
return;
}
deadline += std::chrono::seconds(1);
timer.expires_at(deadline);
timer.async_wait(yield);
inv.yield(Result({count}).withProgress());
--count;
}
});
}
{
return Error(
"wamp.error.canceled");
}
int main()
{
auto tcp = connector<Json>(ioctx,
TcpHost(
"localhost", 12345));
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
session->connect(yield).value();
session->join(
Realm(
"somerealm"), yield).value();
session->enroll(
Procedure(
"countdown"), &countdown, &interrupt,
yield).value();
});
ioctx.run();
return 0;
}
A caller wanting to call an RPC with progressive call results must:
The following example shows a caller that calls the RPC from the previous example:
int main()
{
auto tcp = connector<Json>(ioctx,
TcpHost(
"localhost", 12345));
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
session->connect(yield).value();
session->join(
Realm(
"somerealm"), yield).value();
session->call(
Rpc(
"countdown").withProgressiveResults(),
{
unsigned n = 0;
std::cout << "Tick: " << n << "\n";
});
});
ioctx.run();
return 0;
}
Next: Publish-Subscribe