Example: Make non-exclusive updates to a topic
The following examples use the Diffusion™ API to update a topic with content. Updating a topic this way does not prevent other clients from updating the topic.
JavaScript
// 1. A session may update any existing topic. Update values must be of the same type as the topic being updated.
// Add a topic first with a string type
await session.topics.add('foo', diffusion.topics.TopicType.STRING);
// Update the topic
await session.topicUpdate.set('foo', diffusion.datatypes.string(), 'hello');
// Update the topic again
await session.topicUpdate.set('foo', diffusion.datatypes.string(), 'world');
.NET
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using PushTechnology.ClientInterface.Data.JSON;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that adds and updates a JSON topic.
/// </summary>
public sealed class JSONTopicsManager {
public async Task JSONTopicsManagerExample(string serverUrl) {
var session = Diffusion.Sessions.Principal("control").Password("password")
.CertificateValidation((cert, chain, errors)
=> CertificateValidationResult.ACCEPT)
.Open(serverUrl);
var topicControl = session.TopicControl;
var topicUpdate = session.TopicUpdate;
// Create a JSON topic 'random/JSON'
string topic = "random/JSON";
try {
await topicControl.AddTopicAsync(topic, TopicType.JSON);
WriteLine($"Topic '{topic}' added successfully.");
}
catch (Exception ex)
{
WriteLine( $"Failed to add topic '{topic}' : {ex}." );
session.Close();
return;
}
WriteLine($"Updating topic '{topic}' with a new value:");
var newValue = Diffusion.DataTypes.JSON.FromJSONString(
"{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," +
"\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}");
try
{
await topicUpdate.SetAsync(topic, newValue);
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
catch (Exception ex)
{
WriteLine($"Topic {topic} could not be updated : {ex}.");
}
// Remove the JSON topic 'random/JSON'
try {
await topicControl.RemoveTopicsAsync(topic);
}
catch (Exception ex)
{
WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
}
// Close the session
session.Close();
}
}
/// <summary>
/// Client implementation that subscribes to a JSON topic and consumes the data it receives.
/// </summary>
public sealed class JSONTopicsConsumer
{
public async Task JSONTopicsConsumerExample(string serverUrl)
{
// Connect anonymously
var session = Diffusion.Sessions.Open(serverUrl);
// Get the Topics feature to subscribe to topics
var topics = session.Topics;
string topic = "random/JSON";
// Add a topic stream for 'random/JSON'
var jsonStream = new JSONStream();
topics.AddStream(topic, jsonStream);
try
{
// Subscribe to 'random/JSON' topic
await topics.SubscribeAsync(topic);
await Task.Delay(TimeSpan.FromMilliseconds(300));
}
catch (Exception ex)
{
WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
}
finally
{
// Note that closing the session, will automatically unsubscribe from all topics
// the client is subscribed to.
topics.RemoveStream(jsonStream);
session.Close();
}
}
/// <summary>
/// Basic implementation of the IValueStream for JSON topics.
/// </summary>
private sealed class JSONStream : IValueStream<IJSON>
{
/// <summary>
/// Notification of stream being closed normally.
/// </summary>
public void OnClose()
=> WriteLine("The subscrption stream is now closed.");
/// <summary>
/// Notification of a contextual error related to this callback.
/// </summary>
/// <param name="errorReason">Error reason.</param>
public void OnError(ErrorReason errorReason)
=> WriteLine($"An error has occured : {errorReason}.");
/// <summary>
/// Notification of a successful subscription.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
public void OnSubscription(string topicPath, ITopicSpecification specification)
=> WriteLine($"Client subscribed to topic '{topicPath}'.");
/// <summary>
/// Notification of a successful unsubscription.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
/// <param name="reason">Error reason.</param>
public void OnUnsubscription(string topicPath, ITopicSpecification specification,
TopicUnsubscribeReason reason)
=> WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'.");
/// <summary>
/// Topic update received.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
/// <param name="oldValue">Value prior to update.</param>
/// <param name="newValue">Value after update.</param>
public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue,
IJSON newValue)
=> WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}.");
}
}
}
Java and Android
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
/**
* An example of using a control client to create and update a topic in non
* exclusive mode (as opposed to acting as an exclusive update source). In this
* mode other clients could update the same topic (on a last update wins basis).
* <P>
* This uses the 'TopicControl' feature to create a topic and the
* 'TopicUpdateControl' feature to send updates to it.
* <P>
* To send updates to a topic, the client session requires the 'update_topic'
* permission for that branch of the topic tree.
*
* @author DiffusionData Limited
* @since 5.3
*/
public final class ControlClientUpdatingSingleValueTopic {
private static final String TOPIC = "MyTopic";
private final Session session;
private final TopicControl topicControl;
private final TopicUpdateControl updateControl;
/**
* Constructor.
*/
public ControlClientUpdatingSingleValueTopic() {
session =
Diffusion.sessions().principal("control").password("password")
.open("ws://diffusion.example.com:80");
topicControl = session.feature(TopicControl.class);
updateControl = session.feature(TopicUpdateControl.class);
// Create the topic and request that it is removed when the session
// closes
topicControl.addTopic(
TOPIC,
TopicType.SINGLE_VALUE,
new AddCallback.Default() {
@Override
public void onTopicAdded(String topicPath) {
topicControl.removeTopicsWithSession(
TOPIC,
new TopicTreeHandler.Default());
}
});
}
/**
* Update the topic with a string value.
*
* @param value the update value
* @param callback the update callback
*/
public void update(String value, UpdateCallback callback) {
updateControl.updater().update(TOPIC, value, callback);
}
/**
* Close the session.
*/
public void close() {
session.close();
}
}
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
static int on_topic_update(
void *context)
{
// topic has been updated
return HANDLER_SUCCESS;
}
static int on_topic_added(
SESSION_T *session,
TOPIC_ADD_RESULT_CODE result_code,
void *context)
{
// topic has been added
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
const char *topic_path = "my/topic/path";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// Create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// Create a JSON topic
TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON);
ADD_TOPIC_CALLBACK_T add_topic_callback = {.on_topic_added_with_specification = on_topic_added};
add_topic_from_specification(session, topic_path, specification, add_topic_callback);
// Sleep for a while
sleep(5);
// Update the topic with a JSON value
BUF_T *value_buf = buf_create();
write_diffusion_json_value("{\"hello\": \"world\"}", value_buf);
DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T update_params =
{.topic_path = topic_path, .datatype = DATATYPE_JSON, .update = value_buf, .on_topic_update = on_topic_update};
diffusion_topic_update_set(session, update_params);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
buf_free(value_buf);
credentials_free(credentials);
topic_specification_free(specification);
return EXIT_SUCCESS;
}
Apple
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
/**
* An example of using a control client to create and update a topic in non
* exclusive mode (as opposed to acting as an exclusive update source).
*
* In this mode other clients could update the same topic (on a last update wins basis).
*
* This uses the 'TopicControl' feature to create a topic and the
* 'TopicUpdate' feature to send updates to it.
*
* To send updates to a topic, the client session requires the 'update_topic'
* permission for that branch of the topic tree.
*/
class TopicUpdateNonExclusive {
func run_example(url: URL, topic_path: String, value: String) {
// setup credentials and configuration
let credentials = PTDiffusionCredentials(password: "password")
let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials)
// establish a session
PTDiffusionSession.open(with: url,
configuration: configuration) { (session, error) in
if (error != nil) {
print("An error has occurred while establishing a session: %@",
error!.localizedDescription)
}
else {
// add a String topic at `topic_path`
session!.topicControl.addTopic(withPath: topic_path,
type: PTDiffusionTopicType.string) { (result, error) in
if (error != nil) {
print("An error has occurred while creating the topic '%@': %@",
topic_path,
error!.localizedDescription)
}
else {
print("Topic '%@' has been successfully created", topic_path)
// Set the topic value at `topic_path` to `value`
try! session!.topicUpdate.setWithPath(topic_path,
toStringValue: value) { (error) in
if (error != nil) {
print("An error has occurred while set the topic %@ to %@: %@",
topic_path,
value,
error!.localizedDescription)
}
else {
print("Topic '%@' has been successfully updated to %@", topic_path, value)
}
}
}
}
}
}
}
}
Change the URL from that provided in the example to the URL of the Diffusion server .