Example: Update a JSON topic
The following examples update a JSON topic with values.
JavaScript
// 1. Data Types are exposed from the top level Diffusion namespace. It is often easier // to assign these directly to a local variable. const jsonDataType = diffusion.datatypes.json(); // 2. Data Types are currently provided for JSON and Binary topic types. session.topics.add('topic/json', diffusion.topics.TopicType.JSON); // 3. Values can be created directly from the data type. const jsonValue = jsonDataType.from({ 'foo' : 'bar' }); // Topics are updated using the standard update mechanisms session.topicUpdate.set('topic/json', jsonDataType, jsonValue); // Subscriptions are performed normally session.select('topic/json'); // 4. Streams can be specialised to provide values from a specific datatype. session.addStream('topic/json', jsonDataType).on('value', (topic, specification, newValue, oldValue) => { // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the // new value, and the old value. // The oldValue parameter will be undefined if this is the first value received for a topic. // For JSON topics, value.get returns a JavaScript object // For Binary topics, value.get returns a Buffer instance console.log('Update for ' + topic, newValue.get()); }); // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. // For example, plain JSON objects can be used to update JSON topics. session.topicUpdate.set('topic/json', jsonDataType, { 'foo' : 'baz', 'numbers' : [1, 2, 3] });
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using PushTechnology.ClientInterface.Data.JSON; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds and updates a JSON topic. /// </summary> public sealed class JSONTopicsManager { public async Task JSONTopicsManagerExample(string serverUrl) { var session = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var topicControl = session.TopicControl; var topicUpdate = session.TopicUpdate; // Create a JSON topic 'random/JSON' string topic = "random/JSON"; try { await topicControl.AddTopicAsync(topic, TopicType.JSON); WriteLine($"Topic '{topic}' added successfully."); } catch (Exception ex) { WriteLine( $"Failed to add topic '{topic}' : {ex}." ); session.Close(); return; } WriteLine($"Updating topic '{topic}' with a new value:"); var newValue = Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," + "\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}"); try { await topicUpdate.SetAsync(topic, newValue); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); } // Remove the JSON topic 'random/JSON' try { await topicControl.RemoveTopicsAsync(topic); } catch (Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } // Close the session session.Close(); } } /// <summary> /// Client implementation that subscribes to a JSON topic and consumes the data it receives. /// </summary> public sealed class JSONTopicsConsumer { public async Task JSONTopicsConsumerExample(string serverUrl) { // Connect anonymously var session = Diffusion.Sessions.Open(serverUrl); // Get the Topics feature to subscribe to topics var topics = session.Topics; string topic = "random/JSON"; // Add a topic stream for 'random/JSON' var jsonStream = new JSONStream(); topics.AddStream(topic, jsonStream); try { // Subscribe to 'random/JSON' topic await topics.SubscribeAsync(topic); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { // Note that closing the session, will automatically unsubscribe from all topics // the client is subscribed to. topics.RemoveStream(jsonStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for JSON topics. /// </summary> private sealed class JSONStream : IValueStream<IJSON> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() => WriteLine("The subscrption stream is now closed."); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription(string topicPath, ITopicSpecification specification) => WriteLine($"Client subscribed to topic '{topicPath}'."); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason) => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'."); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue) => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}."); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import static com.pushtechnology.diffusion.client.Diffusion.dataTypes; import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.ErrorReason; import com.pushtechnology.diffusion.client.features.TopicUpdate; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.UpdateConstraint; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; import com.pushtechnology.diffusion.datatype.json.JSON; import com.pushtechnology.diffusion.datatype.json.JSONDataType; import java.util.concurrent.CompletionException; /** * An example of using a client to update a JSON topic * * This uses the 'TopicUpdate' feature. * * @author DiffusionData Limited */ public final class TopicUpdateJSONExample { public static void main(String[] args) { final Session session = Diffusion.sessions() .principal("admin") .password("password") .open("ws://localhost:8080"); final TopicUpdate topicUpdate = session.feature(TopicUpdate.class); final JSONDataType jsonDataType = dataTypes().json(); final String path = "my/topic/path"; // add a value stream and subscribe to receive topic updates session.feature(Topics.class) .addStream(">my/topic/path", JSON.class, new MyValueStream()); session.feature(Topics.class).subscribe(">my/topic/path"); // add a new JSON topic and set an initial value topicUpdate.addAndSet( path, newTopicSpecification(TopicType.JSON), JSON.class, jsonDataType.fromJsonString("{ \"diffusion\": \"data\" }") ).join(); // set a new value for our JSON topic topicUpdate.set( path, JSON.class, jsonDataType.fromJsonString("{ \"hello\": \"world\" }") ).join(); // update the topic only if the JSON value of our topic matches // our constraint value, we expect this to fail final UpdateConstraint.Factory constraints = Diffusion.updateConstraints(); try { topicUpdate.set( path, JSON.class, jsonDataType.fromJsonString("{ \"new\": \"data\" }"), constraints.jsonValue().with("/hello", String.class, "darkness") ).join(); } catch (CompletionException e) { System.out.println("Expected failure: " + e.getCause()); } // apply a JSON patch to replace the value of our JSON data topicUpdate.applyJsonPatch( path, "[{\"op\": \"replace\", \"path\": \"/hello\", \"value\":\"there\" }]" ).join(); session.close(); } private static class MyValueStream implements Topics.ValueStream<JSON> { @Override public void onValue(String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) { System.out.printf("%s updated : %s\n", topicPath, newValue.toJsonString()); } @Override public void onSubscription(String topicPath, TopicSpecification specification) { } @Override public void onUnsubscription(String topicPath, TopicSpecification specification, Topics.UnsubscribeReason reason) { } @Override public void onClose() { } @Override public void onError(ErrorReason errorReason) { } } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_subscription( const char* topic_path, const TOPIC_SPECIFICATION_T *specification, void *context) { // subscribed to `topic_path` return HANDLER_SUCCESS; } static int on_unsubscription( const char* topic_path, const TOPIC_SPECIFICATION_T *specification, NOTIFY_UNSUBSCRIPTION_REASON_T reason, void *context) { // unsubscribed from `topic_path` return HANDLER_SUCCESS; } static int on_value( const char* topic_path, const TOPIC_SPECIFICATION_T * specification, const DIFFUSION_DATATYPE datatype, const DIFFUSION_VALUE_T * old_value, const DIFFUSION_VALUE_T * new_value, void *context) { DIFFUSION_API_ERROR api_error; char *result; bool success = to_diffusion_json_string(new_value, &result, &api_error); if(success) { printf("Received value: %s\n", result); free(result); } else { const char *description = get_diffusion_api_error_description(api_error); printf("Error during diffusion value read: %s\n", description); diffusion_api_error_free(api_error); } return HANDLER_SUCCESS; } static void on_close() { // value stream is now closed } static int on_topic_added( SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context) { // topic was added return HANDLER_SUCCESS; } static int on_topic_removed( SESSION_T *session, const DIFFUSION_TOPIC_REMOVAL_RESULT_T *result, void *context) { // topic was removed return HANDLER_SUCCESS; } static int on_topic_update(void *context) { // topic has been updated return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // Create a value stream VALUE_STREAM_T value_stream = { .datatype = DATATYPE_JSON, .on_subscription = on_subscription, .on_unsubscription = on_unsubscription, .on_value = on_value, .on_close = on_close }; add_stream(session, topic_path, &value_stream); // Subscribe to topics matching the selector SUBSCRIPTION_PARAMS_T subscribe_params = { .topic_selector = topic_path }; subscribe(session, subscribe_params); // Sleep for a while sleep(5); // create JSON topic TOPIC_SPECIFICATION_T *specification = topic_specification_init(TOPIC_TYPE_JSON); ADD_TOPIC_CALLBACK_T add_topic_callback = { .on_topic_added_with_specification = on_topic_added, }; add_topic_from_specification(session, topic_path, specification, add_topic_callback); topic_specification_free(specification); // Sleep for while sleep(5); // Update the topic with a JSON value BUF_T *value_buf = buf_create(); write_diffusion_json_value("{\"hello\": \"world\"}", value_buf); DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T update_params = { .topic_path = topic_path, .datatype = DATATYPE_JSON, .update = value_buf, .on_topic_update = on_topic_update }; diffusion_topic_update_set(session, update_params); buf_free(value_buf); // Sleep for a while sleep(5); // remove JSON topic TOPIC_REMOVAL_PARAMS_T remove_params = { .topic_selector = topic_path, .on_removed = on_topic_removed }; topic_removal(session, remove_params); // Sleep for a while sleep(5); // Unsubscribe from topics matching the selector UNSUBSCRIPTION_PARAMS_T unsub_params = { .topic_selector = topic_path }; unsubscribe(session, unsub_params); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion /** This example shows a control client creating a JSON topic and sending updates to it. There will be a topic for each currency for which rates are provided. The topic will be created under the FX topic - so, for example FX/GBP will contain a map of all rate conversions from the base GBP currency. The rates are represented as string decimal values (e.g. "12.457"). The {@code add_rates} method shows how to create a new rates topic, specifying its initial map of values. The {@code change_rates} method which takes a map shows how to completely replace the set of rates for a currency with a new map of rates. The {@code change_rates} method which takes a string shows an alternative mechanism where the new rates are simply supplied as a JSON string. Either of the change_rates methods could be used and after the first usage for any topic the values is cached, and so subsequent set calls can compare with the last value and send only the differences to the server. */ class TopicUpdateJson { let root_topic : String = "FX" var session : Optional<PTDiffusionSession> init(serverURL: String) { PTDiffusionSession.open(with: URL(string: serverURL)!) { (session, error) in if (error != nil) { print("An error has occurred: %@", error!.localizedDescription) return } self.session = session! } } /** @brief Add a new rates topic @param currency the base currency @param values the full map of initial rates values @param callback reports outcome of operations invoked */ func add_rates(currency: String, values: Dictionary<String, String>, callback: @escaping (PTDiffusionTopicCreationResult?, Error?) -> Void) { let topic_name = self.rate_topic_name(currency: currency) let json_value = self.map_to_json(values: values) let specification = PTDiffusionTopicSpecification(type: PTDiffusionTopicType.JSON) self.session?.topicUpdate.add(withPath: topic_name, specification: specification, andSetToJSONValue: json_value, completionHandler: callback) } /** Update an existing rates topic, replacing the rates mappings with a new set of mappings. @param currency the base currency @param values the full map of initial rates values @param callback reports outcome of operations invoked */ func change_rates(currency: String, values: Dictionary<String, String>, callback: @escaping (Error?) -> Void) { let topic_name = self.rate_topic_name(currency: currency) let json_value = self.map_to_json(values: values) self.session?.topicUpdate.setWithPath(topic_name, toJSONValue: json_value, completionHandler: callback) } /** Update an existing rates topic, replacing the rates mappings with a new set of mappings specified as a JSON string. e.g. {"USD":"123.45","HKD":"456.3"} @param currency the base currency @param values the full map of initial rates values @param callback reports outcome of operations invoked */ func change_rates(currency: String, json_string: String, callback: @escaping (Error?) -> Void) { let topic_name = self.rate_topic_name(currency: currency) let json_value = try! PTDiffusionJSON(jsonString: json_string) self.session?.topicUpdate.setWithPath(topic_name, toJSONValue: json_value, completionHandler: callback) } /** Close the session */ func close() { self.session?.close() } /** Convert a given map to a JSON object */ func map_to_json(values: Dictionary<String, String>) -> PTDiffusionJSON { return try! PTDiffusionJSON(object: values) } /** @brief Generate a hierarchical topic name for a rates topic. e.g. for currency = GBP would return "FB/GBP" @param currency the currency @return the topic name */ func rate_topic_name(currency: String) -> String { return String(format: "%@/%@", self.root_topic, currency); } }
Change the URL from that provided in the example to the URL of the Diffusion™ server .