Using streams for subscription
Register a stream against a set of topics to access values published to those topics. For a registered stream to access the value of a topic, the topic type must match the stream and the client must be subscribed to the topic.
Subscribing to a topic causes the value of the topic to be sent from Diffusion™ Cloud to the client. Registering a stream that matches the topic enables the client to access these values. For more information, see Subscribing to topics
Two kinds of stream are provided to receive updates from subscribed topics: value streams and topic streams.
Value streams
Value streams are typed. Register value streams against a set of topics by using a topic selector. A value stream receives updates for any subscribed topics that match the value stream's type and the topic selector used when registering the value stream.
- JSON
- JSON topics are routed to this type of stream.
- Binary
- Binary topics are routed to this type of stream.
- String
- String topics are routed to this type of stream.
- Int64
- Int64 topics are routed to this type of stream.
- Double
- Double topics are routed to this type of stream.
- RecordV2
- RecordV2 topics are routed to this type of stream.
- Content
- JSON, binary, string, int64, double, recordV2 and single value topics are routed to this type of stream.
If a value stream receives a delta update, this delta is automatically applied to a locally cached value so that the stream always receives full values.
Using a value stream
Register the typed stream against the topic or topics that you want the stream to receive updates from:
// Register a JSON value stream session.addStream('topic_selector', diffusion.datatypes.json()) .on('value', function(path, specification, newValue, oldValue) { // Action to take when update is received }); // Register a binary value stream session.addStream('topic_selector', diffusion.datatypes.binary()) .on('value', function(path, specification, newValue, oldValue) { // Action to take when update is received });
// Register a JSON value stream PTDiffusionValueStream *const jsonValueStream = [PTDiffusionJSON valueStreamWithDelegate:self]; [session.topics addStream : jsonValueStream, withSelector : topic_selector]; // Register a binary value stream PTDiffusionValueStream *const binaryValueStream = [PTDiffusionBinary valueStreamWithDelegate:self]; [session.topics addStream : binaryValueStream, withSelector : topic_selector];
final Topics topics = session.feature(Topics.class); // Register a JSON value stream topics.addStream(topic_selector, JSON.class, new Topics.ValueStream.Default<JSON>()); // Register a binary value stream topics.addStream(topic_selector, Binary.class, new Topics.ValueStream.Default<Binary>());
var topics = session.Topics; // Register a JSON value stream topics.AddStream( "topic_selector", new Topics.DefaultValueStream<IJSON>() ); // Register a binary value stream topics.AddStream( "topic_selector", new Topics.DefaultValueStream<IBinary>() );
Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.
The examples above show how to register a default or no-op value stream against a set of topics. The stream receives values from any topic in the set whose topic data type matches the stream data type.
To make use of the values sent to your client, implement a value stream that takes the required action when an update is received from a subscribed topic that matches the type of the stream:
session.addStream('topic_selector', diffusion.datatypes.json()) .on({ value : function(topic, specification, newValue, oldValue) { console.log('Update from: ', topic, newValue.get()); }, subscribe : function(topic, specification) { console.log('Subscribed to: ', topic); }, unsubscribe : function(topic, specification, reason) { console.log('Unsubscribed from: ', topic); }});
@implementation JSONSubscribeExample (PTDiffusionJSONValueStreamDelegate) -(void) diffusionStream:(PTDiffusionStream *const)stream didSubscribeToTopicPath:(NSString *const)topicPath specification:(PTDiffusionTopicSpecification *const)specification { NSLog(@"Subscribed: %@", topicPath); } -(void)diffusionStream:(PTDiffusionValueStream *const)stream didUpdateTopicPath:(NSString *const)topicPath specification:(PTDiffusionTopicSpecification *const)specification oldJSON:(PTDiffusionJSON *const)oldJSON newJSON:(PTDiffusionJSON *const)newJSON { NSError * error; NSDictionary *const map = [newJSON objectWithError:&error]; if (!map) { NSLog(@"Failed to create map from received JSON. Error: %@", error); return; } // For the purposes of a meaningful example, only emit a log line if we // have a rate for GBP to USD. if ([currency isEqualToString:@"GBP"]) { const id rate = map[@"USD"]; if (rate) { NSLog(@"Rate for GBP to USD: %@", rate); } } } -(void) diffusionStream:(PTDiffusionStream *const)stream didUnsubscribeFromTopicPath:(NSString *const)topicPath specification:(PTDiffusionTopicSpecification *const)specification reason:(const PTDiffusionTopicUnsubscriptionReason)reason { NSLog(@"Unsubscribed: %@", topicPath); } @end
private class JSONStream extends ValueStream.Default<JSON> { @Override public void onValue( String topicPath, TopicSpecification specification, JSON oldValue, JSON newValue) { LOG.info(newValue.toJsonString()); } }
/// Basic implementation of the IValueStream for JSON topics. private sealed class JSONStream : IValueStream<IJSON> { /// Notification of stream being closed normally. public void OnClose() => WriteLine( "The subscrption stream is now closed." ); /// Notification of a contextual error related to this callback. /// Situations in which OnError is called include the session being closed, a communication /// timeout, or a problem with the provided parameters. No further calls will be made to this callback. public void OnError( ErrorReason errorReason ) => WriteLine( $"An error has occured : {errorReason}." ); /// Notification of a successful subscription. public void OnSubscription( string topicPath, ITopicSpecification specification ) => WriteLine( $"Client subscribed to {topicPath}." ); /// Notification of a successful unsubscription. public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason ) => WriteLine( $"Client unsubscribed from {topicPath} : {reason}." ); /// Topic update received. public void OnValue( string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue ) => WriteLine( $"New value of {topicPath} is {newValue.ToJSONString()}." ); }
Topic streams
Topic streams are not typed and are used to receive value and delta updates for all subscribed topics that match the topic selectors used when registering the value stream.
This type of stream provides the value and the deltas but relies upon the application to apply the deltas to a client-maintained current value. It is important, when using a topic stream with a record topic, to register the stream before subscribing to the topic. This ensures that a full value is received by the subscribing client.
Using a topic stream
Register the stream against the topic or topics that you want the stream to receive updates from:
session.stream('topic_selector') .on('update', function(update, topic) { // Do something });
// Register self as the handler for topic updates on a set of topics. [session.topics addTopicStreamWithSelector : topic_selector, delegate : self];
Topics topics = session.feature(Topics.class); // Add a topic stream that you implemented elsewhere topics.addTopicStream(topic_selector, new myTopicStream(data));
var topics = session.Topics; // Add a topic stream that you implemented elsewhere topics.AddTopicStream( topic_selector, myTopicStream );
Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.
Registering a fallback stream
You can register one or more fallback streams to receive updates to subscribed topics that do not have a value stream or topic stream registered against them:
session.addFallbackStream(diffusion.datatypes.json()) .on('value', function(topic, specification, newValue, oldValue) { // Do something });
// Register self as the fallback handler for JSON value updates. PTDiffusionValueStream *const valueStream = [PTDiffusionJSON valueStreamWithDelegate:self]; [session.topics addFallbackStream:valueStream];
final Topics topics = session.feature(Topics.class); topics.addFallbackStream(topic_selector, JSON.class, new Topics.ValueStream.Default());
var topics = session.Topics; topics.AddFallbackStream<IJSON>( new Topics.DefaultValueStream<IJSON>() );
/* * Install a global topic handler to capture messages for * topics we haven't explicitly subscribed to, and therefore * don't have a specific handler for. */ session->global_topic_handler = on_unexpected_topic_message;
A fallback value stream receives all updates for topics of the matching type that do not have a stream already registered against them.
A fallback topic stream receives all updates for topics of any type that do not have a stream already registered against them.
This page last modified: 2020/06/25