CppWAMP
C++11 client library for the WAMP protocol
Publish-Subscribe

Publishing Events

The session publish operation is used to publish events related to a topic. It takes a wamp::Pub object as a parameter, which contains the following information related to the publication:

  • the topic URI to publish under,
  • positional wamp::Variant arguments, if applicable,
  • a wamp::Object containing keyword arguments, if applicable, and,
  • a wamp::Options dictionary for advanced WAMP features

The follwing example uses a non-blocking publish, which enqueues the publication for transmission.

using namespace wamp;
boost::asio::spawn(ioctx, [&](boost::asio::yield_context yield)
{
auto session = Session::create(tcp);
session->connect(yield).value();
session->join(ioctx, "somerealm", yield).value();
// Publish a publication with no arguments
session->publish(Pub("topic"));
// Publish a publication with two positional arguments
session->publish(Pub("topic").withArgs(12, "foo"));
// Publish a publication with a single keyword argument.
// Also enable the 'disclose_me' advanced feature.
session->publish( Pub("topic").withKwargs({{"name", "John"}})
.withDiscloseMe() );
});

If you need to obtain the publication ID of a published event, then you need to use the overload that takes a completion token:

using namespace wamp;
PublicationId pid = session->publish(Pub("topic").withArgs(12, "foo"),
yield).value();

Event Slots

CppWamp allows you to register callback functions that will be executed whenever an event with a certain topic is received from a broker. Such callback functions are named event slots. Event slots can be any callable entity that can be stored into a std::function:

  • free functions,
  • member functions (via std::bind),
  • function objects,
  • lambdas, etc.

Subscribing to Events

subscribe() is used to register an event slot for a given topic:

using namespace wamp;
Subscription sub = session->subscribe(Topic("name"), slot, yield);

where:

  • wamp::Subscription is a lightweight object that can be later used to unsubscribe from the event,
  • wamp::Topic is an object containing the topic URI, plus other optional information related to advanced WAMP features, and,
  • slot is the handler function to be invoked when an event with the given topic is received.

subscribe expects a call slot with the following signature:

void function(wamp::Event)

where:

  • wamp::Event is an object containing information and payload arguments related to the event publication.

Example:

void onSensorSampled(Event event)
{
float value = 0;
event.args().convertTo(value); // throws if conversion fails
std::cout << "Sensor sample value: " << value << "\n";
}
// Within coroutine
using namespace wamp;
auto sub = session->subscribe(
Topic("sensor.").usingPrefixMatch(),
&onSensorSampled,
yield).value();
// :::
// Unsubscribing

For a more in-depth discussion on subscriptions, consult the Subscriptions page.

Subscribing to Statically-Typed Events

Certain event handlers may expect a fixed number of statically-typed arguments. For these situations, the wamp::simpleEvent template function can be used:

// :::
using namespace wamp;
Subscription sub = session->subscribe(
Topic("name"),
simpleEvent<StaticTypeList...>(slot),
yield).value();

where:

  • StaticTypeList... is a parameter pack of static types
  • slot has the signature:
    void function(StaticTypeList...)

Example:

void onSensorSampled(float value)
{
std::cout << "Sensor sample value: " << value << "\n";
}
// Within coroutine
using namespace wamp;
session->subscribe(Topic("sensorSampled"),
simpleEvent<float>(&onSensorSampled), yield);
// ^^^^^
// Note that this type matches the one in onSensorSampled

When a statically-typed event handler needs the wamp::Event object, the unpackedEvent template function can be used instead:

// :::
using namespace wamp;
Subscription sub = session->subscribe(
Topic("name"),
unpackedEvent<StaticTypeList...>(slot),
yield).value();

where:

  • StaticTypeList... is a parameter pack of static types
  • slot has the signature:
    void function(wamp::Event, StaticTypeList...)

Example:

void onSensorSampled(Event event, float value)
{
std::cout << "Sensor sample value: " << value << "\n";
std::cout << "Publication ID: " << event.pubId() << "\n";
}
// Within coroutine
using namespace wamp;
session->subscribe(Topic("sensorSampled"),
unpackedEvent<float>(&onSensorSampled), yield);
// ^^^^^
// Note that this type matches the one in onSensorSampled
// (following the Event parameter)

Registering Coroutine Event Handlers

If the event handler must perform it's work in an asynchronous fashion within the context of a coroutine, then wamp::simpleCoroEvent or wamp::unpackedCoroEvent may be used:

void sensorSampled(float value, boost::asio::yield_context yield)
{
// Perform asynchronous operation within event handler
session->call(Rpc("log.append").withArgs(value), yield).value();
}
// Within coroutine
using namespace wamp;
session->subscribe(Topic("sensor_sampled"),
simpleCoroEvent<float>(&sensorSampled),
yield).value();
Note
This will spawn a new coroutine every time the event handler is invoked.

Scoped Subscriptions

A wamp::ScopedSubscription object can be used to limit a subscription's lifetime to a particular scope. When a ScopedSubscription object is destroyed, it automatically unsubscribes the subscription. This helps in automating the lifetime management of topic subscriptions using RAII techniques.

using namespace wamp;
{
ScopedSubscription sub = session->subscribe(Topic("foo"), &onFoo, yield);
}
// The "foo" is automatically unsubscribed when the 'sub' scoped subscription
// goes out of scope here.

For a more practical example of using scoped subscriptions, see Registrations/Scoped Subscriptions.


Next: Asynchronous Callbacks

wamp::Session::create
static Ptr create(AnyIoExecutor exec, const Connector::Ptr &connector)
Creates a new Session instance.
Definition: session.ipp:22
wamp::ScopedSubscription
Limits a Subscription's lifetime to a particular scope.
Definition: subscription.hpp:92
wamp::Topic
Provides the topic URI and other options contained within WAMP ‘SUBSCRIBE’ messages.
Definition: peerdata.hpp:331
wamp
Definition: anyhandler.hpp:36
unpacker.hpp
Contains utilities for unpacking positional arguments passed to event slots and call slots.
wamp::Subscription
Represents a pub/sub event subscription.
Definition: subscription.hpp:41
wamp::PublicationId
int64_t PublicationId
Ephemeral ID associated with an event publication.
Definition: wampdefs.hpp:25
wamp::Pub
Provides the topic URI, options, and payload contained within WAMP PUBLISH messages.
Definition: peerdata.hpp:363
wamp::Subscription::unsubscribe
void unsubscribe() const
Unsubscribes from the topic.
Definition: subscription.ipp:67
wamp::Event
Provides the subscription/publication ids, options, and payload contained within WAMP EVENT messages.
Definition: peerdata.hpp:427