Example: Make exclusive updates to a topic
The following examples use the Diffusion™ API to register as the update source of a topic and to update that topic with content. A client that updates a topic using this method locks the topic and prevents other clients from updating the topic.
JavaScript
diffusion.connect({
host : 'diffusion.example.com',
port : 443,
secure : true,
principal : 'control',
credentials : 'password'
}).then(function(session) {
// A session may establish an exclusive update source. Once active, only this session may update topics at or
// under the registration branch.
session.topics.registerUpdateSource('exclusive/topic', {
onRegister : function(topic, deregister) {
// The handler provides a deregistration function to remove this registration and allow other sessions to
// update topics under the registered path.
},
onActive : function(topic, updater) {
// Once active, a handler may use the provided updater to update any topics at or under the registered path
updater.update('exclusive/topic/bar', 123).then(function() {
// The update was successful.
}, function(err) {
// There was an error updating the topic
});
},
onStandBy : function(topic) {
// If there is another update source registered for the same topic path, any subsequent registrations will
// be put into a standby state. The registration is still held by the server, and the 'onActive' function
// will be called if the pre-existing registration is closed at a later point in time
},
onClose : function(topic, err) {
// The 'onClose' function will be called once the registration is closed, either by the session being closed
// or the 'deregister' function being called.
}
});
});
.NET
/**
* Copyright © 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using PushTechnology.ClientInterface.Data.JSON;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that adds and updates a JSON topic exclusively.
/// </summary>
public sealed class JSONTopicsExclusiveUpdate{
public async Task JSONTopicsExclusiveUpdateExample(string serverUrl) {
var session = Diffusion.Sessions.Principal("control").Password("password")
.CertificateValidation((cert, chain, errors)
=> CertificateValidationResult.ACCEPT)
.Open(serverUrl);
var topicControl = session.TopicControl;
var topicUpdate = session.TopicUpdate;
// Create a JSON topic 'random/JSON'
string topic = "random/JSON";
try {
await topicControl.AddTopicAsync(topic, TopicType.JSON);
WriteLine($"Topic '{topic}' added successfully.");
}
catch (Exception ex)
{
WriteLine( $"Failed to add topic '{topic}' : {ex}." );
session.Close();
return;
}
WriteLine($"Updating topic '{topic}' with a new value:");
var newValue = Diffusion.DataTypes.JSON.FromJSONString(
"{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," +
"\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}");
var sessionLock = await session.LockAsync(topic);
try
{
var constraint = Diffusion.UpdateConstraints.Locked(sessionLock);
await topicUpdate.SetAsync(topic, newValue, constraint);
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
catch (Exception ex)
{
WriteLine($"Topic {topic} could not be updated : {ex}.");
}
// Remove the JSON topic 'random/JSON'
try {
await topicControl.RemoveTopicsAsync(topic);
}
catch (Exception ex)
{
WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
}
await sessionLock.UnlockAsync();
// Close the session
session.Close();
}
}
/// <summary>
/// Client implementation that subscribes to a JSON topic and consumes the data it receives.
/// </summary>
public sealed class JSONTopics
{
public async Task JSONTopicsConsumerExample(string serverUrl)
{
// Connect anonymously
var session = Diffusion.Sessions.Open(serverUrl);
// Get the Topics feature to subscribe to topics
var topics = session.Topics;
string topic = "random/JSON";
// Add a topic stream for 'random/JSON'
var jsonStream = new JSONStream();
topics.AddStream(topic, jsonStream);
try
{
// Subscribe to 'random/JSON' topic
await topics.SubscribeAsync(topic);
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
catch (Exception ex)
{
WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
}
finally
{
// Note that closing the session, will automatically unsubscribe from all topics
// the client is subscribed to.
topics.RemoveStream(jsonStream);
session.Close();
}
}
/// <summary>
/// Basic implementation of the IValueStream for JSON topics.
/// </summary>
private sealed class JSONStream : IValueStream<IJSON>
{
/// <summary>
/// Notification of stream being closed normally.
/// </summary>
public void OnClose()
=> WriteLine("The subscrption stream is now closed.");
/// <summary>
/// Notification of a contextual error related to this callback.
/// </summary>
/// <param name="errorReason">Error reason.</param>
public void OnError(ErrorReason errorReason)
=> WriteLine($"An error has occured : {errorReason}.");
/// <summary>
/// Notification of a successful subscription.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
public void OnSubscription(string topicPath, ITopicSpecification specification)
=> WriteLine($"Client subscribed to topic '{topicPath}'.");
/// <summary>
/// Notification of a successful unsubscription.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
/// <param name="reason">Error reason.</param>
public void OnUnsubscription(string topicPath, ITopicSpecification specification,
TopicUnsubscribeReason reason)
=> WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'.");
/// <summary>
/// Topic update received.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
/// <param name="oldValue">Value prior to update.</param>
/// <param name="newValue">Value after update.</param>
public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue,
IJSON newValue)
=> WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}.");
}
}
}
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
DIFFUSION_SESSION_LOCK_T *g_session_lock;
static int on_topic_update(
void *context)
{
// topic has been updated
return HANDLER_SUCCESS;
}
static int on_topic_added(
SESSION_T *session,
TOPIC_ADD_RESULT_CODE result_code,
void *context)
{
// topic has been added
return HANDLER_SUCCESS;
}
static int on_lock_acquired(
const DIFFUSION_SESSION_LOCK_T *session_lock,
void *context)
{
// lock has been acquired
g_session_lock = diffusion_session_lock_dup(session_lock);
return HANDLER_SUCCESS;
}
static int on_unlock(
bool lock_owned,
void *context)
{
// lock has been released
diffusion_session_lock_free(g_session_lock);
g_session_lock = NULL;
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
const char *topic_path = "my/topic/path";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// Create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// Create a JSON topic
TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON);
ADD_TOPIC_CALLBACK_T add_topic_callback = {.on_topic_added_with_specification = on_topic_added};
add_topic_from_specification(session, topic_path, specification, add_topic_callback);
// Sleep for a while
sleep(5);
// acquire a session lock
DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = {.on_lock_acquired = on_lock_acquired};
diffusion_session_lock(session, "lock_a", lock_params);
// Sleep for a while
sleep(5);
// create update constraint that requires session lock to update topic
DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *update_constraint = diffusion_topic_update_constraint_locked(g_session_lock);
// Update the topic with a JSON value
BUF_T *value_buf = buf_create();
write_diffusion_json_value("{\"hello\": \"world\"}", value_buf);
DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T update_params =
{.topic_path = topic_path, .datatype = DATATYPE_JSON, .update = value_buf, .on_topic_update = on_topic_update};
diffusion_topic_update_set_with_constraint(session, update_constraint, update_params);
// Sleep for a while
sleep(5);
// release the session lock
DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = {.on_unlock = on_unlock};
diffusion_session_lock_unlock(session, g_session_lock, unlock_params);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
buf_free(value_buf);
credentials_free(credentials);
topic_specification_free(specification);
return EXIT_SUCCESS;
}
Apple
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
/**
* An example of using a control client to create and update a topic in exclusive mode
*/
class TopicUpdateExclusive {
func run_example(url: URL, topic_path: String, value: String, lock_name: String) {
// setup credentials and configuration
let credentials = PTDiffusionCredentials(password: "password")
let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials)
// establish the session
PTDiffusionSession.open(with: url,
configuration: configuration) { (session, error) in
if (error != nil) {
print("An error has occurred while establishing a session: %@",
error!.localizedDescription)
}
else {
// create a new String Topic at `topic_path`
session!.topicControl.addTopic(withPath: topic_path,
type: PTDiffusionTopicType.string) { (result, error) in
if (error != nil) {
print("An error has occurred while creating the topic '%@': %@",
topic_path,
error!.localizedDescription)
}
else {
print("Topic %@ has been successfully created", topic_path)
// acquire the lock `lock_name`
session?.lock(withName:lock_name) { (lock, error) in
if (error != nil) {
print("An error has occurred while attempting to retrieve lock '%@: %@",
lock_name,
error!.localizedDescription)
}
else {
print("Successfully acquired lock '%@'", lock_name)
// add an update constraint that requires the calling session to hold the
// lock `lock_name` to update topic `topic_path`
let update_constraint = PTDiffusionUpdateConstraint.locked(with: lock!)
// update `topic_path` with `value`
try! session!.topicUpdate.setWithPath(topic_path,
toStringValue: value,
constraint: update_constraint) { (error) in
if (error != nil) {
print("An error has occurred while set the topic '%@' to '%@': %@",
topic_path,
value,
error!.localizedDescription)
}
else {
print("Topic '%@' has been successfully updated to %@",
topic_path,
value)
}
}
// if there are no more updates to be done by this session, release the lock
lock!.unlock { (was_owner, error) in
if (error != nil) {
print("An error has occurred while releasing the lock: %@",
error!.localizedDescription)
}
else {
print("Lock has been successfully released")
}
}
}
}
}
}
}
}
}
}
Change the URL from that provided in the example to the URL of the Diffusion server .