Example: Create a JSON topic
The following examples create a JSON topic and receive a stream of values from the topic.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // 1. Data Types are exposed from the top level Diffusion namespace. It is often easier // to assign these directly to a local variable. var jsonDataType = diffusion.datatypes.json(); // 2. Data Types are currently provided for JSON and Binary topic types. session.topics.add('topic/json', diffusion.topics.TopicType.JSON); // 3. Values can be created directly from the data type. var jsonValue = jsonDataType.from({ "foo" : "bar" }); // Topics are updated using the standard update mechanisms session.topics.update('topic/json', jsonValue); // Subscriptions are performed normally session.select('topic/json'); // 4. Streams can be specialised to provide values from a specific datatype. session.addStream('topic/json', jsonDataType).on('value', function(topic, specification, newValue, oldValue) { // When a JSON or Binary topic is updated, any value handlers on a subscription will be called with both the // new value, and the old value. // The oldValue parameter will be undefined if this is the first value received for a topic. // For JSON topics, value#get returns a JavaScript object // For Binary topics, value#get returns a Buffer instance console.log("Update for " + topic, newValue.get()); }); // 5. Raw values of an appropriate type can also be used for JSON and Binary topics. // For example, plain JSON objects can be used to update JSON topics. session.topics.update('topic/json', { "foo" : "baz", "numbers" : [1, 2, 3] }); });
Java
and Android
package com.pushtechnology.diffusion.examples; import static java.util.Objects.requireNonNull; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.util.Map; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.dataformat.cbor.CBORFactory; import com.fasterxml.jackson.dataformat.cbor.CBORGenerator; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.Registration; import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.RemovalContextCallback; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.UpdateSource; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater; import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateContextCallback; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.SessionClosedException; import com.pushtechnology.diffusion.client.topics.details.TopicType; import com.pushtechnology.diffusion.datatype.json.JSON; import com.pushtechnology.diffusion.datatype.json.JSONDataType; /** * This example shows a control client creating a JSON topic and sending updates * to it. * <P> * There will be a topic for each currency for which rates are provided. The * topic will be created under the FX topic - so, for example FX/GBP will * contain a map of all rate conversions from the base GBP currency. The rates * are represented as string decimal values (e.g. "12.457"). * <P> * The {@code addRates} method shows how to create a new rates topic, specifying * its initial map of values. * <P> * The {@code changeRates} method which takes a map shows how to completely * replace the set of rates for a currency with a new map of rates. * <P> * The {@code changeRates} method which takes a string shows an alternative * mechanism where the new rates are simply supplied as a JSON string. * <P> * Either of the changeRates methods could be used and after the first usage for * any topic the values is cached, and so subsequent set calls can compare with * the last value and send only the differences to the server. * * @author Push Technology Limited * @since 5.7 * @see ClientConsumingJSONTopics */ public final class ControlClientUpdatingJSONTopics { private static final String ROOT_TOPIC = "FX"; private final Session session; private final TopicControl topicControl; private volatile TopicUpdateControl.ValueUpdater<JSON> valueUpdater; private volatile Registration updateSourceRegistration; private final CBORFactory cborFactory = new CBORFactory(); private final JSONDataType jsonDataType = Diffusion.dataTypes().json(); /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80" */ public ControlClientUpdatingJSONTopics(String serverUrl) { cborFactory.setCodec(new ObjectMapper()); session = Diffusion.sessions().principal("control").password("password") .open(serverUrl); topicControl = session.feature(TopicControl.class); // Register as an updater for all topics under the root and request // that all topics created are removed when the session closes session.feature(TopicUpdateControl.class).registerUpdateSource( ROOT_TOPIC, new UpdateSource.Default() { @Override public void onRegistered( String topicPath, Registration registration) { updateSourceRegistration = registration; } @Override public void onActive(String topicPath, Updater updater) { topicControl.removeTopicsWithSession( ROOT_TOPIC, new TopicTreeHandler.Default()); valueUpdater = updater.valueUpdater(JSON.class); } @Override public void onClose(String topicPath) { session.close(); } }); } /** * Add a new rates topic. * * @param currency the base currency * @param values the full map of initial rates values * @param callback reports outcome * @throws IOException if unable to convert rates map */ public void addRates( String currency, Map<String, String> values, AddContextCallback<String> callback) throws IOException { topicControl.addTopic( rateTopicName(currency), TopicType.JSON, mapToJSON(values), currency, callback); } /** * Update an existing rates topic, replacing the rates mappings with a new * set of mappings. * * @param currency the base currency * @param values the new rates values * @param callback reports outcome * @throws IOException if unable to convert rates map */ public void changeRates( String currency, Map<String, String> values, UpdateContextCallback<String> callback) throws IOException { if (valueUpdater == null) { throw new IllegalStateException("Not registered as updater"); } valueUpdater.update( rateTopicName(currency), mapToJSON(values), currency, callback); } /** * Update an existing rates topic, replacing the rates mappings with a new * set of mappings specified as a JSON string, for example * {"USD":"123.45","HKD":"456.3"}. * * @param currency the base currency * @param jsonString a JSON string specifying the map of currency rates * @param callback reports the outcome * @throws IOException if unable to convert string */ public void changeRates( String currency, String jsonString, UpdateContextCallback<String> callback) throws SessionClosedException, IllegalArgumentException, IOException { if (valueUpdater == null) { throw new IllegalStateException("Not registered as updater"); } valueUpdater.update( rateTopicName(currency), jsonDataType.fromJsonString(jsonString), currency, callback); } /** * Convert a given map to a JSON object. */ private JSON mapToJSON(Map<String, String> values) throws IOException { // Use the third-party Jackson library to write out the values map as a // CBOR-format binary. final ByteArrayOutputStream baos = new ByteArrayOutputStream(); final CBORGenerator generator = cborFactory.createGenerator(baos); generator.writeObject(values); return jsonDataType.readValue(baos.toByteArray()); } /** * Remove a rates entry (removes its topic) and clear cached value for the * topic. * * @param currency the currency * * @param callback reports the outcome */ public void removeRates( String currency, RemovalContextCallback<String> callback) { final String topicName = rateTopicName(currency); if (valueUpdater != null) { valueUpdater.removeCachedValues(topicName); } topicControl.remove(topicName, currency, callback); } /** * Close the session. */ public void close() { updateSourceRegistration.close(); } /** * Generate a hierarchical topic name for a rates topic. * <P> * e.g. for currency=GBP would return "FX/GBP". * * @param currency the currency * @return the topic name */ private static String rateTopicName(String currency) { return String.format("%s/%s", ROOT_TOPIC, requireNonNull(currency)); } }
.NET
Change the URL from that provided in the example to the URL of Diffusion™ Cloud. Diffusion Cloud service URLs end in diffusion.cloud
This page last modified: 2020/06/25