Using streams for subscription
Register a stream against a set of topics to access values published to those topics. For a registered stream to access the value of a topic, the topic type must match the stream and the client must be subscribed to the topic. Diffusion has two types of streams: Value streams and Fallbacl streams.
Subscribing to a topic causes the value of the topic to be sent from Diffusion™ Cloud to the client. Registering a stream that matches the topic enables the client to access these values. For more information, see Subscribing to topics
Value streams
Value streams are typed. Register value streams against a set of topics by using a topic selector. A value stream receives updates for any subscribed topics that match the value stream's type and the topic selector used when registering the value stream.
- JSON
- JSON topics are routed to this type of stream.
- Binary
- Binary topics are routed to this type of stream.
- String
- String topics are routed to this type of stream.
- Int64
- Int64 topics are routed to this type of stream.
- Double
- Double topics are routed to this type of stream.
- RecordV2
- RecordV2 topics are routed to this type of stream.
- Content
- JSON, binary, string, int64, double, recordV2 and single value topics are routed to this type of stream.
If a value stream receives a delta update, this delta is automatically applied to a locally cached value so that the stream always receives full values.
Using a value stream
Register the typed stream against the topic or topics that you want the stream to receive updates from:
// Register a JSON value stream session.addStream('topic_selector', diffusion.datatypes.json()) .on('value', (path, specification, newValue, oldValue) => { // Action to take when update is received }); // Register a binary value stream session.addStream('topic_selector', diffusion.datatypes.binary()) .on('value', (path, specification, newValue, oldValue) => { // Action to take when update is received });
var session = Diffusion.Sessions .Principal("admin") .Credentials(Diffusion.Credentials.Password("password")) .Open(serverUrl); // Register a JSON value stream session.Topics.AddStream("topic_selector", new DefaultValueStream<IJSON>() ); // Register a binary value stream session.Topics.AddStream("topic_selector", new DefaultValueStream<IBinary>() );
final Session session = Diffusion.sessions().open("ws://localhost:8080"); final Topics topics = session.feature(Topics.class); // Register a JSON value stream topics.addStream(">my/topic/path", JSON.class, new Topics.ValueStream.Default<>()); // Register a binary value stream topics.addStream(">my/topic/path", Binary.class, new Topics.ValueStream.Default<>());
// Register a JSON value stream VALUE_STREAM_T json_value_stream = { .datatype = DATATYPE_JSON, .on_subscription = on_subscribe, .on_unsubscription = on_unsubscribe, .on_value = on_json_value }; add_stream(session, topic_path, &json_value_stream); // Register a binary value stream VALUE_STREAM_T binary_value_stream = { .datatype = DATATYPE_BINARY, .on_subscription = on_subscribe, .on_unsubscription = on_unsubscribe, .on_value = on_binary_value }; add_stream(session, topic_path, &binary_value_stream);
// register a JSON value stream let json_value_stream = PTDiffusionJSON.valueStream(with: self) do { try session.topics.add(json_value_stream, withSelectorExpression: ">public/json//", error: ()) } catch { print("An error occurred: %@", error) } // register a binary value stream let binary_value_stream = PTDiffusionBinary.valueStream(with: self) do { try session.topics.add(binary_value_stream, withSelectorExpression: ">public/binary//", error: ()) } catch { print("An error occurred: %@", error) }
Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.
The examples above show how to register a default or no-op value stream against a set of topics. The stream receives values from any topic in the set whose topic data type matches the stream data type.
To make use of the values sent to your client, implement a value stream that takes the required action when an update is received from a subscribed topic that matches the type of the stream:
session.addStream('topic_selector', diffusion.datatypes.json()) .on({ value : (topic, specification, newValue, oldValue) => { console.log('Update from: ', topic, newValue.get()); }, subscribe : (topic, specification) => { console.log('Subscribed to: ', topic); }, unsubscribe : (topic, specification, reason) => { console.log('Unsubscribed from: ', topic); } });
/// <summary> /// Basic implementation of the IValueStream for JSON topics. /// </summary> private sealed class JSONStream : IValueStream<IJSON> { /// <summary> /// Notification of stream being closed normally. /// </summary> public void OnClose() => WriteLine("The subscrption stream is now closed."); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription(string topicPath, ITopicSpecification specification) => WriteLine($"Client subscribed to topic '{topicPath}'."); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason) => WriteLine($"Client unsubscribed from topic '{topicPath}' with reason '{reason}'."); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue) => WriteLine($"New value of topic '{topicPath}' is {newValue.ToJSONString()}."); }
private static class JSONStream extends ValueStream.Default<JSON> { @Override public void onValue( String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) { System.out.println(newValue.toJsonString()); } }
static int on_subscribe( const char *const topic_path, const TOPIC_SPECIFICATION_T * specification, void *context) { // subscribed to `topic_path` return HANDLER_SUCCESS; } static int on_unsubscribe( const char *const topic_path, const TOPIC_SPECIFICATION_T * specification, NOTIFY_UNSUBSCRIPTION_REASON_T reason, void *context) { // unsubscribed from `topic_path` due to `reason` return HANDLER_SUCCESS; } static int on_json_value( const char *const topic_path, const TOPIC_SPECIFICATION_T * specification, DIFFUSION_DATATYPE datatype, const DIFFUSION_VALUE_T * old_value, const DIFFUSION_VALUE_T * new_value, void *context) { // handle json topic update return HANDLER_SUCCESS; }
class RegisterStreams: PTDiffusionJSONValueStreamDelegate, PTDiffusionBinaryValueStreamDelegate { func diffusionStream(_ stream: PTDiffusionValueStream, didUpdateTopicPath topicPath: String, specification: PTDiffusionTopicSpecification, oldBinary: PTDiffusionBinary?, newBinary: PTDiffusionBinary) { print("Binary topic %@ update: %@ --> %@", topicPath, oldBinary!, newBinary) } func diffusionStream(_ stream: PTDiffusionValueStream, didUpdateTopicPath topicPath: String, specification: PTDiffusionTopicSpecification, oldJSON oldJson: PTDiffusionJSON?, newJSON newJson: PTDiffusionJSON) { print("JSON topic %@ update: %@ --> %@", topicPath, oldJson!, newJson) } func diffusionStream(_ stream: PTDiffusionStream, didSubscribeToTopicPath topicPath: String, specification: PTDiffusionTopicSpecification) { print("Subscribed to topic %@", topicPath) } func diffusionStream(_ stream: PTDiffusionStream, didUnsubscribeFromTopicPath topicPath: String, specification: PTDiffusionTopicSpecification, reason: PTDiffusionTopicUnsubscriptionReason) { print("Unsubscribed from topic: %@. Reason: %@", topicPath, reason) } func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) { print("Failed with error: %@", error.localizedDescription) } func diffusionDidClose(_ stream: PTDiffusionStream) { print("Value stream closed") }
Registering a fallback stream
You can register one or more fallback streams to receive updates to subscribed topics that do not have a value stream registered against them:
session.addFallbackStream(diffusion.datatypes.json()) .on('value', (topic, specification, newValue, oldValue) => { // Do something });
var session = Diffusion.Sessions .Principal("admin") .Credentials(Diffusion.Credentials.Password("password")) .Open(serverUrl); // Register a json fallback stream session.Topics.AddFallbackStream<IJSON>( new DefaultValueStream<IJSON>() );
topics.addFallbackStream(JSON.class, new Topics.ValueStream.Default<>());
VALUE_STREAM_T value_stream = { .datatype = DATATYPE_JSON, .on_subscription = on_subscribe, .on_unsubscription = on_unsubscribe, .on_value = on_json_value }; add_fallback_stream(session, &value_stream);
// register self as the fallback handler for JSON value updates let value_stream = PTDiffusionJSON.valueStream(with: self) do { try session.topics.addFallbackStream(value_stream, error: ()) } catch { print ("An error occurred: %@", error) }
A fallback value stream receives all updates for topics of the matching type that do not have a stream already registered against them.