Example: Make exclusive updates to a topic
The following examples use the Diffusion™ API to register as the update source of a topic and to update that topic with content. A client that updates a topic using this method locks the topic and prevents other clients from updating the topic.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // A session may establish an exclusive update source. Once active, only this session may update topics at or // under the registration branch. session.topics.registerUpdateSource('exclusive/topic', { onRegister : function(topic, deregister) { // The handler provides a deregistration function to remove this registration and allow other sessions to // update topics under the registered path. }, onActive : function(topic, updater) { // Once active, a handler may use the provided updater to update any topics at or under the registered path updater.update('exclusive/topic/bar', 123).then(function() { // The update was successful. }, function(err) { // There was an error updating the topic }); }, onStandBy : function(topic) { // If there is another update source registered for the same topic path, any subsequent registrations will // be put into a standby state. The registration is still held by the server, and the 'onActive' function // will be called if the pre-existing registration is closed at a later point in time }, onClose : function(topic, err) { // The 'onClose' function will be called once the registration is closed, either by the session being closed // or the 'deregister' function being called. } }); });
.NET
/** * Copyright © 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using PushTechnology.ClientInterface.Data.JSON; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds and updates a JSON topic exclusively. /// </summary> public sealed class JSONTopicsExclusiveUpdate{ public async Task JSONTopicsExclusiveUpdateExample(string serverUrl) { var session = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var topicControl = session.TopicControl; var topicUpdate = session.TopicUpdate; // Create a JSON topic 'random/JSON' string topic = "random/JSON"; try { await topicControl.AddTopicAsync(topic, TopicType.JSON); WriteLine($"Topic '{topic}' added successfully."); } catch (Exception ex) { WriteLine( $"Failed to add topic '{topic}' : {ex}." ); session.Close(); return; } WriteLine($"Updating topic '{topic}' with a new value:"); var newValue = Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," + "\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}"); var sessionLock = await session.LockAsync(topic); try { var constraint = Diffusion.UpdateConstraints.Locked(sessionLock); await topicUpdate.SetAsync(topic, newValue, constraint); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); } // Remove the JSON topic 'random/JSON' try { await topicControl.RemoveTopicsAsync(topic); } catch (Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } await sessionLock.UnlockAsync(); // Close the session session.Close(); } } /// <summary> /// Client implementation that subscribes to a JSON topic and consumes the data it receives. /// </summary> public sealed class JSONTopics { public async Task JSONTopicsConsumerExample(string serverUrl) { // Connect anonymously var session = Diffusion.Sessions.Open(serverUrl); // Get the Topics feature to subscribe to topics var topics = session.Topics; string topic = "random/JSON"; // Add a topic stream for 'random/JSON' var jsonStream = new JSONStream(); topics.AddStream(topic, jsonStream); try { // Subscribe to 'random/JSON' topic await topics.SubscribeAsync(topic); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { // Note that closing the session, will automatically unsubscribe from all topics // the client is subscribed to. topics.RemoveStream(jsonStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for JSON topics. /// </summary> private sealed class JSONStream : IValueStream<IJSON> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() => WriteLine("The subscrption stream is now closed."); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription(string topicPath, ITopicSpecification specification) => WriteLine($"Client subscribed to topic '{topicPath}'."); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason) => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'."); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue) => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}."); } } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" DIFFUSION_SESSION_LOCK_T *g_session_lock; static int on_topic_update(void *context) { // topic has been updated return HANDLER_SUCCESS; } static int on_topic_added( SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context) { // topic has been added return HANDLER_SUCCESS; } static int on_lock_acquired( const DIFFUSION_SESSION_LOCK_T *session_lock, void *context) { // lock has been acquired g_session_lock = diffusion_session_lock_dup(session_lock); return HANDLER_SUCCESS; } static int on_unlock( bool lock_owned, void *context) { // lock has been released diffusion_session_lock_free(g_session_lock); g_session_lock = NULL; return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // Create a JSON topic TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON); ADD_TOPIC_CALLBACK_T add_topic_callback = { .on_topic_added_with_specification = on_topic_added }; add_topic_from_specification(session, topic_path, specification, add_topic_callback); // Sleep for a while sleep(5); // acquire a session lock DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = { .on_lock_acquired = on_lock_acquired }; diffusion_session_lock(session, "lock_a", lock_params); // Sleep for a while sleep(5); // create update constraint that requires session lock to update topic DIFFUSION_TOPIC_UPDATE_CONSTRAINT_T *update_constraint = diffusion_topic_update_constraint_locked(g_session_lock); // Update the topic with a JSON value BUF_T *value_buf = buf_create(); write_diffusion_json_value("{\"hello\": \"world\"}", value_buf); DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T update_params = { .topic_path = topic_path, .datatype = DATATYPE_JSON, .update = value_buf, .on_topic_update = on_topic_update }; diffusion_topic_update_set_with_constraint(session, update_constraint, update_params); // Sleep for a while sleep(5); // release the session lock DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = { .on_unlock = on_unlock }; diffusion_session_lock_unlock(session, g_session_lock, unlock_params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); buf_free(value_buf); credentials_free(credentials); topic_specification_free(specification); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion /** * An example of using a control client to create and update a topic in exclusive mode */ class TopicUpdateExclusive { func run_example(url: URL, topic_path: String, value: String, lock_name: String) { // setup credentials and configuration let credentials = PTDiffusionCredentials(password: "password") let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials) // establish the session PTDiffusionSession.open(with: url, configuration: configuration) { (session, error) in if (error != nil) { print("An error has occurred while establishing a session: %@", error!.localizedDescription) } else { // create a new String Topic at `topic_path` session!.topicControl.addTopic(withPath: topic_path, type: PTDiffusionTopicType.string) { (result, error) in if (error != nil) { print("An error has occurred while creating the topic '%@': %@", topic_path, error!.localizedDescription) } else { print("Topic %@ has been successfully created", topic_path) // acquire the lock `lock_name` session?.lock(withName:lock_name) { (lock, error) in if (error != nil) { print("An error has occurred while attempting to retrieve lock '%@: %@", lock_name, error!.localizedDescription) } else { print("Successfully acquired lock '%@'", lock_name) // add an update constraint that requires the calling session to hold the // lock `lock_name` to update topic `topic_path` let update_constraint = PTDiffusionUpdateConstraint.locked(with: lock!) // update `topic_path` with `value` try! session!.topicUpdate.setWithPath(topic_path, toStringValue: value, constraint: update_constraint) { (error) in if (error != nil) { print("An error has occurred while set the topic '%@' to '%@': %@", topic_path, value, error!.localizedDescription) } else { print("Topic '%@' has been successfully updated to %@", topic_path, value) } } // if there are no more updates to be done by this session, release the lock lock!.unlock { (was_owner, error) in if (error != nil) { print("An error has occurred while releasing the lock: %@", error!.localizedDescription) } else { print("Lock has been successfully released") } } } } } } } } } }
Change the URL from that provided in the example to the URL of the Diffusion server .