Publish
The term Publish in Pub/Sub refers to publishing data to topics.
Publishing messages
All pub/sub messages in Diffusion® are sent as updates to specific topics.
Topics have unique names, also referred to as topic paths. Topics work as a way of grouping and filtering the data that individual applications are sent, by allowing clients to select which topics to receive updates from.
Each topic can have an arbitrary number of subscribers; publishers can update topics even if there are no subscribers, and clients can pre-emptively subscribe to topic paths that don’t yet exist.
Topics can be added or removed dynamically.
Adding topics
You can create a topic via a client SDK or the REST API.
To add a topic:
session.feature(TopicControl.class).addTopic("my/topic/path", TopicType.JSON);
const result = await session.topics.add('my/topic/path', diffusion.topics.TopicType.JSON);
if (result.added) {
console.log(`Topic "${result.topic}" has been successfully created`);
} else {
console.log('Topic already exists.')
}
await session.TopicControl.AddTopicAsync("my/topic/path", TopicType.JSON);
WriteLine($"Topic has been added!");
static int on_topic_added(
SESSION_T *session,
TOPIC_ADD_RESULT_CODE result_code,
void *context)
{
return HANDLER_SUCCESS;
}
...
ADD_TOPIC_CALLBACK_T callback = {
.on_topic_added_with_specification = on_topic_added
};
TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON);
add_topic_from_specification(session, "my/topic/path", specification, callback);
await session.topics.add_topic("my/topic/path", diffusion.datatypes.JSON)
print("Topic has been added!")
session.topicControl.addTopic(withPath: "my/topic/path", type: PTDiffusionTopicType.JSON) { (result, error) in
print("Topic has been added!")
}
Adding topics is idempotent - that is, adding a topic to an existing path will succeed (as long as the topic properties are the same). |
Publishing to a topic
Once a topic is created, you can publish an update to give it a value.
Topics must exist before you publish to them. |
To publish to a topic:
session.feature(TopicUpdate.class).set("my/topic/path", JSON.class,
Diffusion.dataTypes().json().fromJsonString("{\"hello\": \"world\"}"));
const jsonDataType = diffusion.datatypes.json();
const jsonValue = jsonDataType.from({
hello : 'world'
});
await session.topicUpdate.set('my/topic/path', jsonDataType, jsonValue);
console.log('Topic has been updated');
var jsonValue = Diffusion.DataTypes.JSON.FromJSONString("{\"hello\": \"world\"}");
await session.TopicUpdate.SetAsync("my/topic/path", jsonValue);
WriteLine($"Topic has been updated!");
static int on_topic_update(void *context)
{
return HANDLER_SUCCESS;
}
...
BUF_T *value_buf = buf_create();
write_diffusion_json_value("{\"hello\": \"world\"}", value_buf);
DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T parameters = {
.topic_path = "my/topic/path",
.datatype = DATATYPE_JSON,
.update = value_buf,
.on_topic_update = on_topic_update
};
diffusion_topic_update_set(session, parameters);
json_value = diffusion.datatypes.JSON({"hello": "world"})
await session.topics.set_topic("my/topic/path", json_value, diffusion.datatypes.JSON)
print("Topic has been updated")
let json_value = try! PTDiffusionJSON.init(jsonString: "{\"hello\": \"world\"}")
session.topicUpdate.setWithPath("my/topic/path", toJSONValue: json_value) { (error) in
print("Topic has been updated!")
}