Example: Make non-exclusive updates to a topic
The following examples use the Diffusion™ API to update a topic with content. Updating a topic this way does not prevent other clients from updating the topic.
JavaScript
// 1. A session may update any existing topic. Update values must be of the same type as the topic being updated. // Add a topic first with a string type await session.topics.add('foo', diffusion.topics.TopicType.STRING); // Update the topic await session.topicUpdate.set('foo', diffusion.datatypes.string(), 'hello'); // Update the topic again await session.topicUpdate.set('foo', diffusion.datatypes.string(), 'world');
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using PushTechnology.ClientInterface.Data.JSON; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds and updates a JSON topic. /// </summary> public sealed class JSONTopicsManager { public async Task JSONTopicsManagerExample(string serverUrl) { var session = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var topicControl = session.TopicControl; var topicUpdate = session.TopicUpdate; // Create a JSON topic 'random/JSON' string topic = "random/JSON"; try { await topicControl.AddTopicAsync(topic, TopicType.JSON); WriteLine($"Topic '{topic}' added successfully."); } catch (Exception ex) { WriteLine( $"Failed to add topic '{topic}' : {ex}." ); session.Close(); return; } WriteLine($"Updating topic '{topic}' with a new value:"); var newValue = Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," + "\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}"); try { await topicUpdate.SetAsync(topic, newValue); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); } // Remove the JSON topic 'random/JSON' try { await topicControl.RemoveTopicsAsync(topic); } catch (Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } // Close the session session.Close(); } } /// <summary> /// Client implementation that subscribes to a JSON topic and consumes the data it receives. /// </summary> public sealed class JSONTopicsConsumer { public async Task JSONTopicsConsumerExample(string serverUrl) { // Connect anonymously var session = Diffusion.Sessions.Open(serverUrl); // Get the Topics feature to subscribe to topics var topics = session.Topics; string topic = "random/JSON"; // Add a topic stream for 'random/JSON' var jsonStream = new JSONStream(); topics.AddStream(topic, jsonStream); try { // Subscribe to 'random/JSON' topic await topics.SubscribeAsync(topic); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { // Note that closing the session, will automatically unsubscribe from all topics // the client is subscribed to. topics.RemoveStream(jsonStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for JSON topics. /// </summary> private sealed class JSONStream : IValueStream<IJSON> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() => WriteLine("The subscrption stream is now closed."); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription(string topicPath, ITopicSpecification specification) => WriteLine($"Client subscribed to topic '{topicPath}'."); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason) => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'."); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue) => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}."); } } }
Java and Android
import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * An example of using a control client to create and update a topic in non * exclusive mode (as opposed to acting as an exclusive update source). In this * mode other clients could update the same topic (on a last update wins basis). * <P> * This uses the 'TopicControl' feature to create a topic and the * 'TopicUpdateControl' feature to send updates to it. * <P> * To send updates to a topic, the client session requires the 'update_topic' * permission for that branch of the topic tree. * * @author DiffusionData Limited * @since 5.3 */ public final class ControlClientUpdatingSingleValueTopic { private static final String TOPIC = "MyTopic"; private final Session session; private final TopicControl topicControl; private final TopicUpdateControl updateControl; /** * Constructor. */ public ControlClientUpdatingSingleValueTopic() { session = Diffusion.sessions().principal("control").password("password") .open("ws://diffusion.example.com:80"); topicControl = session.feature(TopicControl.class); updateControl = session.feature(TopicUpdateControl.class); // Create the topic and request that it is removed when the session // closes topicControl.addTopic( TOPIC, TopicType.SINGLE_VALUE, new AddCallback.Default() { @Override public void onTopicAdded(String topicPath) { topicControl.removeTopicsWithSession( TOPIC, new TopicTreeHandler.Default()); } }); } /** * Update the topic with a string value. * * @param value the update value * @param callback the update callback */ public void update(String value, UpdateCallback callback) { updateControl.updater().update(TOPIC, value, callback); } /** * Close the session. */ public void close() { session.close(); } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_topic_update(void *context) { // topic has been updated return HANDLER_SUCCESS; } static int on_topic_added( SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context) { // topic has been added return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // Create a JSON topic TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON); ADD_TOPIC_CALLBACK_T add_topic_callback = { .on_topic_added_with_specification = on_topic_added }; add_topic_from_specification(session, topic_path, specification, add_topic_callback); // Sleep for a while sleep(5); // Update the topic with a JSON value BUF_T *value_buf = buf_create(); write_diffusion_json_value("{\"hello\": \"world\"}", value_buf); DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T update_params = { .topic_path = topic_path, .datatype = DATATYPE_JSON, .update = value_buf, .on_topic_update = on_topic_update }; diffusion_topic_update_set(session, update_params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); buf_free(value_buf); credentials_free(credentials); topic_specification_free(specification); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion /** * An example of using a control client to create and update a topic in non * exclusive mode (as opposed to acting as an exclusive update source). * * In this mode other clients could update the same topic (on a last update wins basis). * * This uses the 'TopicControl' feature to create a topic and the * 'TopicUpdate' feature to send updates to it. * * To send updates to a topic, the client session requires the 'update_topic' * permission for that branch of the topic tree. */ class TopicUpdateNonExclusive { func run_example(url: URL, topic_path: String, value: String) { // setup credentials and configuration let credentials = PTDiffusionCredentials(password: "password") let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials) // establish a session PTDiffusionSession.open(with: url, configuration: configuration) { (session, error) in if (error != nil) { print("An error has occurred while establishing a session: %@", error!.localizedDescription) } else { // add a String topic at `topic_path` session!.topicControl.addTopic(withPath: topic_path, type: PTDiffusionTopicType.string) { (result, error) in if (error != nil) { print("An error has occurred while creating the topic '%@': %@", topic_path, error!.localizedDescription) } else { print("Topic '%@' has been successfully created", topic_path) // Set the topic value at `topic_path` to `value` try! session!.topicUpdate.setWithPath(topic_path, toStringValue: value) { (error) in if (error != nil) { print("An error has occurred while set the topic %@ to %@: %@", topic_path, value, error!.localizedDescription) } else { print("Topic '%@' has been successfully updated to %@", topic_path, value) } } } } } } } }
Change the URL from that provided in the example to the URL of the Diffusion server .