Example: Create a JSON topic
The following examples create a JSON topic and receive a stream of values from the topic.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier // to assign these directly to a local variable. var jsonDataType = diffusion.datatypes.json(); // 2. Data Types are currently provided for JSON and Binary topic types. session.topics.add('topic/json', diffusion.topics.TopicType.JSON); // 3. Values can be created directly from the data type. var jsonValue = jsonDataType.from({ "foo" : "bar" }); // Topics are updated using the standard update mechanisms session.topics.update('topic/json', jsonValue); // Subscriptions are performed normally session.select('topic/json'); // 4. Streams can be specialised to provide values from a specific datatype. session.addStream('topic/json', jsonDataType).on('value', function(topic, specification, newValue, oldValue) { // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the // new value, and the old value. // The oldValue parameter will be undefined if this is the first value received for a topic. // For JSON topics, value#get returns a JavaScript object // For Binary topics, value#get returns a Buffer instance console.log("Update for " + topic, newValue.get()); }); // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. // For example, plain JSON objects can be used to update JSON topics. session.topics.update('topic/json', { "foo" : "baz", "numbers" : [1, 2, 3] }); });
.NET
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds and updates a JSON topic. /// </summary> public sealed class JSONTopicsManager { public JSONTopicsManager(string serverUrl) { var session = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var topicControl = session.TopicControl; var topicUpdate = session.TopicUpdate; // Create a JSON topic 'random/JSON' string topic = "random/JSON"; try { await topicControl.AddTopicAsync(topic, TopicType.JSON); WriteLine($"Topic '{topic}' added successfully."); } catch (Exception ex) { WriteLine( $"Failed to add topic '{topic}' : {ex}." ); session.Close(); return; } WriteLine($"Updating topic '{topic}' with a new value:"); var newValue = Diffusion.DataTypes.JSON.FromJSONString( "{\"date\":\"" + DateTime.Today.Date.ToString("D") + "\"," + "\"time\":\"" + DateTime.Now.TimeOfDay.ToString("g") + "\"}"); try { await topicUpdate.SetAsync(topic, newValue); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); } // Remove the JSON topic 'random/JSON' try { await topicControl.RemoveTopicsAsync(topic); } catch (Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } // Close the session session.Close(); } } /// <summary> /// Client implementation that subscribes to a JSON topic and consumes the data it receives. /// </summary> public sealed class JSONTopicsConsumer { public JSONTopicsConsumer(string serverUrl) { // Connect anonymously var session = Diffusion.Sessions.Open(serverUrl); // Get the Topics feature to subscribe to topics var topics = session.Topics; string topic = "random/JSON"; // Add a topic stream for 'random/JSON' var jsonStream = new JSONStream(); topics.AddStream(topic, jsonStream); try { // Subscribe to 'random/JSON' topic await topics.SubscribeAsync(topic); await Task.Delay(TimeSpan.FromMilliseconds(300)); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { // Note that closing the session, will automatically unsubscribe from all topics // the client is subscribed to. topics.RemoveStream(jsonStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for JSON topics. /// </summary> private sealed class JSONStream : IValueStream<IJSON> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() => WriteLine("The subscrption stream is now closed."); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription(string topicPath, ITopicSpecification specification) => WriteLine($"Client subscribed to topic '{topicPath}'."); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason) => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'."); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue) => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}."); } } }
Java and Android
package com.pushtechnology.diffusion.examples; import static java.util.Objects.requireNonNull; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import com.fasterxml.jackson.dataformat.cbor.CBORGenerator; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.Registration; import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.UpdateSource; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateContextCallback; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.SessionClosedException; import com.pushtechnology.diffusion.client.topics.details.TopicType; import com.pushtechnology.diffusion.datatype.json.JSON; import com.pushtechnology.diffusion.datatype.json.JSONDataType; /** * This example shows a control client creating a JSON topic and sending updates * to it. * <P> * There will be a topic for each currency for which rates are provided. The * topic will be created under the FX topic - so, for example FX/GBP will * contain a map of all rate conversions from the base GBP currency. The rates * are represented as string decimal values (e.g. "12.457"). * <P> * The {@code addRates} method shows how to create a new rates topic, specifying * its initial map of values. * <P> * The {@code changeRates} method which takes a map shows how to completely * replace the set of rates for a currency with a new map of rates. * <P> * The {@code changeRates} method which takes a string shows an alternative * mechanism where the new rates are simply supplied as a JSON string. * <P> * Either of the changeRates methods could be used and after the first usage for * any topic the values is cached, and so subsequent set calls can compare with * the last value and send only the differences to the server. * * @author Push Technology Limited * @since 5.7 * @see ClientConsumingJSONTopics */ public final class ControlClientUpdatingJSONTopics { private static final String ROOT_TOPIC = "FX"; private final Session session; private final TopicControl topicControl; private volatile TopicUpdateControl.ValueUpdater<JSON> valueUpdater; private volatile Registration updateSourceRegistration; private final CBORFactory cborFactory = new CBORFactory(); private final JSONDataType jsonDataType = Diffusion.dataTypes().json(); /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80" */ public ControlClientUpdatingJSONTopics(String serverUrl) { cborFactory.setCodec(new ObjectMapper()); session = Diffusion.sessions().principal("control").password("password") .open(serverUrl); topicControl = session.feature(TopicControl.class); // Register as an updater for all topics under the root and request // that all topics created are removed when the session closes session.feature(TopicUpdateControl.class).registerUpdateSource( ROOT_TOPIC, new UpdateSource.Default() { @Override public void onRegistered( String topicPath, Registration registration) { updateSourceRegistration = registration; } @Override public void onActive(String topicPath, Updater updater) { topicControl.removeTopicsWithSession( ROOT_TOPIC, new TopicTreeHandler.Default()); valueUpdater = updater.valueUpdater(JSON.class); } @Override public void onClose(String topicPath) { session.close(); } }); } /** * Add a new rates topic. * * @param currency the base currency * @param values the full map of initial rates values * @param callback reports outcome * @throws IOException if unable to convert rates map */ public void addRates( String currency, Map<String, String> values, AddContextCallback<String> callback) throws IOException { topicControl.addTopic( rateTopicName(currency), TopicType.JSON, mapToJSON(values), currency, callback); } /** * Update an existing rates topic, replacing the rates mappings with a new * set of mappings. * * @param currency the base currency * @param values the new rates values * @param callback reports outcome * @throws IOException if unable to convert rates map */ public void changeRates( String currency, Map<String, String> values, UpdateContextCallback<String> callback) throws IOException { if (valueUpdater == null) { throw new IllegalStateException("Not registered as updater"); } valueUpdater.update( rateTopicName(currency), mapToJSON(values), currency, callback); } /** * Update an existing rates topic, replacing the rates mappings with a new * set of mappings specified as a JSON string, for example * {"USD":"123.45","HKD":"456.3"}. * * @param currency the base currency * @param jsonString a JSON string specifying the map of currency rates * @param callback reports the outcome * @throws IOException if unable to convert string */ public void changeRates( String currency, String jsonString, UpdateContextCallback<String> callback) throws SessionClosedException, IllegalArgumentException, IOException { if (valueUpdater == null) { throw new IllegalStateException("Not registered as updater"); } valueUpdater.update( rateTopicName(currency), jsonDataType.fromJsonString(jsonString), currency, callback); } /** * Convert a given map to a JSON object. */ private JSON mapToJSON(Map<String, String> values) throws IOException { // Use the third-party Jackson library to write out the values map as a // CBOR-format binary. final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CBORGenerator generator = cborFactory.createGenerator(baos); generator.writeObject(values); return jsonDataType.readValue(baos.toByteArray()); } /** * Remove a rates entry (removes its topic) and clear cached value for the * topic. * * @param currency the currency * * @param callback reports the outcome */ public void removeRates( String currency, RemovalContextCallback<String> callback) { final String topicName = rateTopicName(currency); if (valueUpdater != null) { valueUpdater.removeCachedValues(topicName); } topicControl.remove(topicName, currency, callback); } /** * Close the session. */ public void close() { updateSourceRegistration.close(); } /** * Generate a hierarchical topic name for a rates topic. * <P> * e.g. for currency=GBP would return "FX/GBP". * * @param currency the currency * @return the topic name */ private static String rateTopicName(String currency) { return String.format("%s/%s", ROOT_TOPIC, requireNonNull(currency)); } }
Change the URL from that provided in the example to the URL of the Diffusion™ server.