Just a second...

Using streams for subscription

Register a stream against a set of topics to access values published to those topics. For a registered stream to access the value of a topic, the topic type must match the stream and the client must be subscribed to the topic.

Subscribing to a topic causes the value of the topic to be sent from the Diffusion™ server to the client. Registering a stream that matches the topic enables the client to access these values. For more information, see Subscribing to topics

Two kinds of stream are provided to receive updates from subscribed topics: value streams and topic streams.

Value streams

Value streams are typed. Register value streams against a set of topics by using a topic selector. A value stream receives updates for any subscribed topics that match the value stream's type and the topic selector used when registering the value stream.

A value stream can have one of the following types:
JSON
JSON topics are routed to this type of stream.
Binary
Binary topics are routed to this type of stream.
String
String topics are routed to this type of stream.
Int64
Int64 topics are routed to this type of stream.
Double
Double topics are routed to this type of stream.
RecordV2
RecordV2 topics are routed to this type of stream.
Content
JSON, binary, string, int64, double, recordV2 and single value topics are routed to this type of stream.

If a value stream receives a delta update, this delta is automatically applied to a locally cached value so that the stream always receives full values.

Using a value stream

Register the typed stream against the topic or topics that you want the stream to receive updates from:

JavaScript
// Register a JSON value stream
session.addStream('topic_selector', diffusion.datatypes.json())
    .on('value', (path, specification, newValue, oldValue) => {
        // Action to take when update is received
    });

// Register a binary value stream
session.addStream('topic_selector', diffusion.datatypes.binary())
    .on('value', (path, specification, newValue, oldValue) => {
        // Action to take when update is received
    });
.NET
var topics = session.Topics;

// Register a JSON value stream
topics.AddStream( "topic_selector", new Topics.DefaultValueStream<IJSON>() );

// Register a binary value stream
topics.AddStream( "topic_selector", new Topics.DefaultValueStream<IBinary>() );
                    
Java and Android
final Topics topics = session.feature(Topics.class);

// Register a JSON value stream
topics.addStream(topic_selector, JSON.class, new Topics.ValueStream.Default<JSON>());

// Register a binary value stream
topics.addStream(topic_selector, Binary.class, new Topics.ValueStream.Default<Binary>());
                    
C
// Register a JSON value stream
VALUE_STREAM_T json_value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscribe,
        .on_unsubscription = on_unsubscribe,
        .on_value = on_json_value
};
add_stream(session, topic_path, &json_value_stream);

// Register a binary value stream
VALUE_STREAM_T binary_value_stream = {
        .datatype = DATATYPE_BINARY,
        .on_subscription = on_subscribe,
        .on_unsubscription = on_unsubscribe,
        .on_value = on_binary_value
};
add_stream(session, topic_path, &binary_value_stream);
Apple
// register a JSON value stream
let json_value_stream = PTDiffusionJSON.valueStream(with: self)
do {
    try session.topics.add(json_value_stream,
                           withSelectorExpression: ">public/json//",
                           error: ())
} catch {
    print("An error occurred: %@", error)
}

// register a binary value stream
let binary_value_stream = PTDiffusionBinary.valueStream(with: self)
do {
    try session.topics.add(binary_value_stream,
                           withSelectorExpression: ">public/binary//",
                           error: ())
} catch {
    print("An error occurred: %@", error)
}

Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.

The examples above show how to register a default or no-op value stream against a set of topics. The stream receives values from any topic in the set whose topic data type matches the stream data type.

To make use of the values sent to your client, implement a value stream that takes the required action when an update is received from a subscribed topic that matches the type of the stream:

JavaScript
session.addStream('topic_selector', diffusion.datatypes.json())
    .on({
        value : (topic, specification, newValue, oldValue) => {
            console.log('Update from: ', topic, newValue.get());
        },
        subscribe : (topic, specification) => {
            console.log('Subscribed to: ', topic);
        },
        unsubscribe : (topic, specification, reason) => {
            console.log('Unsubscribed from: ', topic);
        }
    });
.NET
/// Basic implementation of the IValueStream for JSON topics.

private sealed class JSONStream : IValueStream<IJSON> {

/// Notification of stream being closed normally.

public void OnClose()
=> WriteLine( "The subscrption stream is now closed." );

/// Notification of a contextual error related to this callback.

/// Situations in which OnError is called include the session being closed, a communication
/// timeout, or a problem with the provided parameters. No further calls will be made to this callback.

public void OnError( ErrorReason errorReason )
=> WriteLine( $"An error has occured : {errorReason}." );

/// Notification of a successful subscription.

public void OnSubscription( string topicPath, ITopicSpecification specification )
=> WriteLine( $"Client subscribed to {topicPath}." );

/// Notification of a successful unsubscription.

public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason )
=> WriteLine( $"Client unsubscribed from {topicPath} : {reason}." );

/// Topic update received.

public void OnValue( string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue )
=> WriteLine( $"New value of {topicPath} is {newValue.ToJSONString()}." );
}
                    
Java and Android
private class JSONStream extends ValueStream.Default<JSON> {
    @Override
    public void onValue(
        String topicPath,
        TopicSpecification specification,
        JSON oldValue,
        JSON newValue) {
        LOG.info(newValue.toJsonString());
    }
}
                    
C
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


static int on_subscribe(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        void *context)
{
        // subscribed to `topic_path`
        return HANDLER_SUCCESS;
}


static int on_unsubscribe(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        NOTIFY_UNSUBSCRIPTION_REASON_T reason,
        void *context)
{
        // unsubscribed from `topic_path` due to `reason`
        return HANDLER_SUCCESS;
}


static int on_json_value(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        // handle json topic update
        return HANDLER_SUCCESS;
}


static int on_binary_value(
        const char *const topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        // handle binary topic update
        return HANDLER_SUCCESS;
}


void register_value_stream(
        SESSION_T *session,
        char *topic_path)
{
        // Register a JSON value stream
        VALUE_STREAM_T json_value_stream = {
                .datatype = DATATYPE_JSON,
                .on_subscription = on_subscribe,
                .on_unsubscription = on_unsubscribe,
                .on_value = on_json_value
        };
        add_stream(session, topic_path, &json_value_stream);

        // Register a binary value stream
        VALUE_STREAM_T binary_value_stream = {
                .datatype = DATATYPE_BINARY,
                .on_subscription = on_subscribe,
                .on_unsubscription = on_unsubscribe,
                .on_value = on_binary_value
        };
        add_stream(session, topic_path, &binary_value_stream);
}


void register_fallback_stream(SESSION_T *session)
{
        VALUE_STREAM_T value_stream = {
                .datatype = DATATYPE_JSON,
                .on_subscription = on_subscribe,
                .on_unsubscription = on_unsubscribe,
                .on_value = on_json_value
        };
        add_fallback_stream(session, &value_stream);
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);
        SESSION_T *session =
                session_create(url, principal, credentials, NULL, NULL, NULL);

        char *topic_path = "my/topic/path";

        register_value_stream(session, topic_path);
        register_fallback_stream(session);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
class RegisterStreams: PTDiffusionJSONValueStreamDelegate, PTDiffusionBinaryValueStreamDelegate {

    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldBinary: PTDiffusionBinary?,
                         newBinary: PTDiffusionBinary) {
        print("Binary topic %@ update: %@ --> %@", topicPath, oldBinary!, newBinary)
    }


    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldJSON oldJson: PTDiffusionJSON?,
                         newJSON newJson: PTDiffusionJSON) {
        print("JSON topic %@ update: %@ --> %@", topicPath, oldJson!, newJson)
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didSubscribeToTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification) {
        print("Subscribed to topic %@", topicPath)
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didUnsubscribeFromTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         reason: PTDiffusionTopicUnsubscriptionReason) {
        print("Unsubscribed from topic: %@. Reason: %@", topicPath, reason)
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        print("Failed with error: %@", error.localizedDescription)
    }


    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Value stream closed")
    }

Topic streams

Note: Where a value stream is available for your topic type, we recommend you use a value stream instead of a topic stream.

Topic streams are not typed and are used to receive value and delta updates for all subscribed topics that match the topic selectors used when registering the value stream.

This type of stream provides the value and the deltas but relies upon the application to apply the deltas to a client-maintained current value. It is important, when using a topic stream with a record topic, to register the stream before subscribing to the topic. This ensures that a full value is received by the subscribing client.

Using a topic stream

Register the stream against the topic or topics that you want the stream to receive updates from:

JavaScript
session.stream('topic_selector')
       .on('update', function(update, topic) {
            // Do something
       });
.NET
var topics = session.Topics;

// Add a topic stream that you implemented elsewhere
topics.AddTopicStream( topic_selector, myTopicStream );
                    
Java and Android
Topics topics = session.feature(Topics.class);
// Add a topic stream that you implemented elsewhere
topics.addTopicStream(topic_selector, new myTopicStream(data));
                    

Use topic selectors to register the stream against multiple topics. For more information, see Topic selectors.

Registering a fallback stream

You can register one or more fallback streams to receive updates to subscribed topics that do not have a value stream or topic stream registered against them:

JavaScript
session.addFallbackStream(diffusion.datatypes.json())
    .on('value', (topic, specification, newValue, oldValue) => {
        // Do something
    });
.NET
var topics = session.Topics;

topics.AddFallbackStream<IJSON>( new Topics.DefaultValueStream<IJSON>() );
                    
Java and Android
final Topics topics = session.feature(Topics.class);

topics.addFallbackStream(topic_selector, JSON.class, new Topics.ValueStream.Default());
                    
C
VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscribe,
        .on_unsubscription = on_unsubscribe,
        .on_value = on_json_value
};
add_fallback_stream(session, &value_stream);
Apple
// register self as the fallback handler for JSON value updates
let value_stream = PTDiffusionJSON.valueStream(with: self)

do {
    try session.topics.addFallbackStream(value_stream, error: ())
} catch {
    print ("An error occurred: %@", error)
}

A fallback value stream receives all updates for topics of the matching type that do not have a stream already registered against them.

A fallback topic stream receives all updates for topics of any type that do not have a stream already registered against them.