Subscribe
The term Subscribe in Pub/Sub refers to subscribing to topics in order to receive updates from them.
The words subscribe and receive are sometimes used synonymously within Diffusion. |
A session can subscribe to data published to topics. Subscription is a way for a session to tell Diffusion® Cloud which topics it’s interested in. You can subscribe to a topic but not receive any data. For example, if you don’t have permissions on that topic, or the topic exists but the topic selector does not match that topic.
When a session subscribes to an existing topic, Diffusion® Cloud:
-
sends that topic’s current value to the session immediately,
-
sends any subsequent updates as they arrive.
This allows Diffusion® Cloud to act as a cache, where subscribers are always guaranteed to get the latest value.
Neither publishers nor subscribers need to know about the existence of each other, so when you write code, you only need to think about the data, not who is using it.
Receiving topic updates
Receiving updates from topics requires two actions:
1) Adding a value stream: A value stream is a way for a session to listen to subscription events and updates from topics.
2) Making a subscription request: This tells Diffusion® Cloud that the session wants to receive updates for one or more topics.
Each of these is detailed below:
1. Adding a value stream
Value streams make it easy for separate application components to listen to the same set of remote updates, without having to coordinate between each other. You can register as many value streams as you need to ensure your application is ready to receive data before making any subscriptions.
Value streams are based on 'types'. You must specify a topic type (such as JSON) for a stream, and it will only match topics of the correct type. This ensures that delta updating can work correctly. |
session.feature(Topics.class)
.addStream("my/topic/path", JSON.class, new Topics.ValueStream.Default<JSON>() {
@Override
public void onValue(
String topicPath,
TopicSpecification specification,
JSON oldValue,
JSON newValue) {
LOG.info("on value: {} old value: {} new value: {}",
topicPath, oldValue.toJsonString(), newValue.toJsonString());
}
@Override
public void onSubscription(
String topicPath,
TopicSpecification specification) {
LOG.info("on subscription: {}", topicPath);
}
@Override
public void onUnsubscription(
String topicPath,
TopicSpecification specification,
UnsubscribeReason reason) {
LOG.info("on unsubscription: {} reason: {}",
topicPath, reason.toString());
}
});
const valueHandler = {
open: () => {
console.log('Value stream is open');
},
value: (topic, spec, value) => {
console.log(`Update for topic ${topic}: ${JSON.stringify(value.get())}`);
},
subscribe: (details, topic) => {
console.log(`Subscribed to topic: ${topic}`);
},
unsubscribe: (reason, topic) => {
console.log(`Unsubscribed from topic: ${topic}`);
},
close: (reason) => {
console.log('Value stream is closed');
},
error: (error) => {
console.log(`An error has occurred: ${error}`);
}
}
...
const subscription = session.addStream('my/topic/path', diffusion.datatypes.json());
subscription.on(valueHandler);
private sealed class JSONValueStreamHandler : IValueStream<IJSON>
{
public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
=> WriteLine($"Topic '{topicPath}' has been updated: {(oldValue == null ? "NULL" : oldValue.ToJSONString())} -> {newValue.ToJSONString()}.");
public void OnSubscription(string topicPath, ITopicSpecification specification)
=> WriteLine($"You have been subscribed to '{topicPath}'.");
public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
=> WriteLine($"You have been unsubscribed from '{topicPath}'.");
public void OnError(ErrorReason errorReason)
=> WriteLine($"Value stream failed with error: {errorReason.Description}.");
public void OnClose()
=> WriteLine("Value stream is now closed.");
}
...
// Create a handler to receive the topic value updates.
var jsonStream = new JSONValueStreamHandler();
// Link the handler to the topic path.
session.Topics.AddStream("my/topic/path", jsonStream);
static int on_subscription(
const char* topic_path,
const TOPIC_SPECIFICATION_T *specification,
void *context)
{
return HANDLER_SUCCESS;
}
static int on_unsubscription(
const char* topic_path,
const TOPIC_SPECIFICATION_T *specification,
NOTIFY_UNSUBSCRIPTION_REASON_T reason,
void *context)
{
return HANDLER_SUCCESS;
}
static int on_value(
const char* topic_path,
const TOPIC_SPECIFICATION_T *const specification,
const DIFFUSION_DATATYPE datatype,
const DIFFUSION_VALUE_T *const old_value,
const DIFFUSION_VALUE_T *const new_value,
void *context)
{
return HANDLER_SUCCESS;
}
static void on_close()
{
// value stream is now closed
}
...
VALUE_STREAM_T value_stream = {
.datatype = DATATYPE_JSON,
.on_subscription = on_subscription,
.on_unsubscription = on_unsubscription,
.on_value = on_value,
.on_close = on_close
};
add_stream(session, "my/topic/path", &value_stream);
def on_update(*, old_value, topic_path, topic_value, **kwargs):
print(f"Topic: {topic_path} has been updated: {old_value} -> {topic_value}")
def on_subscribe(*, topic_path, **kwargs):
print(f"You have been subscribed to {topic_path}")
def on_unsubscribe(*, reason, topic_path, **kwargs):
print(f"You have been unsubscribed from {topic_path}")
...
from diffusion.features.topics.streams import ValueStreamHandler
value_stream = ValueStreamHandler(
data_type=diffusion.datatypes.JSON,
update=on_update,
subscribe=on_subscribe,
unsubscribe=on_unsubscribe,
)
session.topics.add_value_stream(
topic_selector="my/topic/path", stream=value_stream
)
class JSONValueStreamHandler: PTDiffusionJSONValueStreamDelegate {
func diffusionStream(_ stream: PTDiffusionValueStream,
didUpdateTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
oldJSON oldJson: PTDiffusionJSON?,
newJSON newJson: PTDiffusionJSON) {
print("Topic '%@' has been updated: %@ -> %@", topicPath, oldJson ?? "NULL", newJson)
}
func diffusionStream(_ stream: PTDiffusionStream,
didSubscribeToTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification) {
print("You have been subscribed to '%@'", topicPath)
}
func diffusionStream(_ stream: PTDiffusionStream,
didUnsubscribeFromTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
reason: PTDiffusionTopicUnsubscriptionReason) {
print("You have been unsubscribed from '%@'", topicPath)
}
func diffusionStream(_ stream: PTDiffusionStream,
didFailWithError error: Error) {
print("Value stream failed with error: %@", error.localizedDescription)
}
func diffusionDidClose(_ stream: PTDiffusionStream) {
print("Value stream is now closed")
}
}
...
// create a handler to receive the topic value updates
let handler = JSONValueStreamHandler()
let value_stream = PTDiffusionJSON.valueStream(with: handler)
// link the handler to the `topic_path`
try! session.topics.add(value_stream, withSelectorExpression: "my/topic/path", error: ())
2. Subscribing to topics
To actually start receiving topic updates, you must subscribe to one or more topics. If the subscription is successful, any value streams you have registered will begin to receive updates.
Subscriptions are stored in Diffusion® Cloud, meaning that a session will remain subscribed to selected topics until it either unsubscribes, or it is closed.
Subscriptions are subject to authorization rules. If the session you are using doesn’t have permission to subscribe to the requested topic path, the subscription request is rejected with an error message. For more information on permissions, please refer to Permissions. |
session.feature(Topics.class).subscribe("?my/topic//");
const topic_selector_expression = '?my/topic//'
await session.select(topic_selector_expression);
var topicSelectorExpression = "?my/topic//";
await session.Topics.SubscribeAsync(topicSelectorExpression);
WriteLine($"Subscribe request succeeded.");
SUBSCRIPTION_PARAMS_T parameters = {
.topic_selector = "?my/topic//"
};
subscribe(session, parameters);
await session.topics.subscribe("?my/topic//")
print("Subscribe request succeeded")
let topic_selector_expression = "?my/topic//"
session.topics.subscribe(withTopicSelectorExpression: topic_selector_expression) { (error) in
print("Subscribe request succeeded")
}
Unsubscribing from topics
You can unsubscribe from a topic to stop receiving updates from it. You can subscribe and unsubscribe from as many topics and as many times as you like.
Unsubscribing does not affect any active value streams. This means you can use the same value stream for receiving updates, even if you subscribe and unsubscribe multiple times. |
session.feature(Topics.class).unsubscribe("?my/topic//");
const topic_selector_expression = '?my/topic//'
await session.unsubscribe(topic_selector_expression)
var topicSelectorExpression = "?my/topic//";
await session.Topics.UnsubscribeAsync(topicSelectorExpression);
WriteLine($"Unsubscribe request succeeded.");
UNSUBSCRIPTION_PARAMS_T parameters = {
.topic_selector = "?my/topic//"
};
unsubscribe(session, parameters);
await session.topics.unsubscribe("?my/topic//")
print("Unsubscribe request succeeded")
let topic_selector_expression = "?my/topic//"
session.topics.unsubscribe(fromTopicSelectorExpression: topic_selector_expression) { (error) in
print("Unsubscribe request succeeded")
}