Options
All
  • Public
  • Public/Protected
  • All
Menu

Interface Topics

This feature allows a client session to subscribe to topics to receive streamed topic updates, fetch the state of topics and/or update topics with new values.

Specifically, the feature provides the ability to:

  • Subscribe to topics and specify streams to receive updates;
  • Fetch the current state of topics (even if not subscribed);
  • By extending the topic update feature, update topics with new values;
  • By extending the topic views feature, manage topic views.

Subscription and unsubscription

A session can issue requests to subscribe to topics at any time, even if the topics do not exist at the server. Each subscription request provides a topic selector that is evaluated by the server to select matching topics. The session will be subscribed to any topics that match the selector unless they are already subscribed, or the session has insufficient permission. The subscription request is also retained at the server and the session will be automatically subscribed to newly created topics that match the selector (unless a subsequent unsubscription cancels the request).

Sessions receive notifications from topics that they are subscribed to via subscription streams (see below). When a session is subscribed to a topic, all matching streams will first receive a subscription notification that provides details about the topic. If the server has a value for the topic, the value will be delivered to the streams before any other notifications.

A session can unsubscribe from topics at any time. This is also specified using a topic selector. On unsubscription, matching streams are notified via the on('unsubscribe') notification. This notification will give the reason for unsubscription (for example, by request of the session, request of the server, or topic removal).

Subscriptions and unsubscriptions can occur for reasons other than requests from the session. A session can be subscribed to or unsubscribed from a topic by another session using the ClientControl feature. The removal of a topic also automatically causes unsubscription for subscribed sessions.

Subscription requests are subject to authorization checks. The session must have SELECT_TOPIC permission for the topic selector used to subscribe. Matching topics will be further filtered to those for which the session has READ_TOPIC permission.

Subscription streams

A session can listen to subscription events and updates for a selection of topics by adding one or more streams. A stream is registered using a topic selector which specifies the topics that the stream applies to. When an update is received for a topic then it will be routed to every stream that matches both the topic selector and the stream's value type. If more than one stream matches, all will receive the update; the order in which they are notified is not defined.

A stream can be added several times for different selectors. The mapping of topic selectors to streams is maintained locally in the client process.

It is also possible to add one or more fallback streams which will receive updates that do not match any stream registered with a selector. This is useful for default processing or simply to catch unprocessed updates. A fallback stream can be added using addFallbackStream. Zero, one, or more fallback streams may be assigned. If no fallback stream is specified, any updates that are not routed to any other stream will simply be discarded.

If the session is already subscribed to a topic when a matching stream is added, the stream will immediately receive a subscription notification. For most topic types, the latest value is locally cached and will be provided to the stream following the subscription notification.

A stream will receive a close callback when unregistered and an error(SESSION_CLOSED) callback if the session is closed.

Value streams

A ValueStream receives values for matching topics as and when updates are received from the server. Delta updates received from the server are automatically applied to locally cached values so that the stream always receives full values for each update.

Value streams are typed to a specified value class and only updates for compatible topics will be routed to the stream. The following table shows how the value class maps to compatible topic types that will be routed to the stream:

Value Class Compatible Topic Types
JSON JSON STRING INT64 DOUBLE
String STRING
Int64 INT64
Double DOUBLE
Binary BINARY
Bytes JSON STRING INT64 DOUBLE BINARY RECORD_V2
RecordV2 RECORD_V2

Value stream implementations can be added using addStream. A value stream can be also added to received updates from time series topics. The following table shows how the value class specified when adding the stream maps to the event value class of time series topics that will be routed to the stream:

Event Value Class Time Series Event Value Class
JSON JSON STRING INT64 DOUBLE
String STRING
Int64 INT64
Double DOUBLE
Binary BINARY
Bytes JSON STRING INT64 DOUBLE BINARY RECORD_V2
RecordV2 RECORD_V2

Fetch

A session can issue a request to fetch details of a topic or topics (subject to authorization) at any time. The topics required are specified using a topic selector.

The results of a fetch will return the topic path and type of each selected topic. The results may also optionally return the topic values and/or properties.

A new request can be created using Topics.fetchRequest and modified to specify additional requirements of the fetch operation. The request is issued to the server using the fetch method on the request. This will return the results via a Promise.

Access control

A session must have SELECT_TOPIC permission for the path prefix of the topic selector used to subscribe or fetch. The topics that result from a subscription or fetch request are further filtered using the READ_TOPIC permission.

No access control restrictions are applied to unsubscription.

Accessing the feature

This feature is directly available on a session object:

example
session.select('foo/bar);
author

DiffusionData Limited

since

5.0

Hierarchy

Index

Methods

addFallbackStream

  • This adds a value stream for a given Data Type without a selector which will be a fallback stream to receive all events that do not have a stream registered.

    Example:

    // Produce a fallback value stream for receiving JSON values.
    var json = diffusion.datatypes.json();
    
    session.addFallbackStream(json).on('value', function(topic, specification, newValue, oldValue) {
          console.log('New value ', newValue.get());
    });
    throws

    an IllegalArgumentError if dataType is not a valid data type

    Parameters

    Returns ValueStream

    a fallback stream

addStream

  • Create a ValueStream to receive updates from topics that match the provided topic selector.

    This method will not cause the server to send any topic updates unless already subscribed. This allows the registration of listeners prior to subscribing via Session.select, or to add/remove listeners independently of subscriptions on the server.

    The values as specific types, use the Streams will only receive values from topics for which the specified Data Type is compatible. Passing AnyDataType as second argument will create a polymorphic value stream that receives all data types. It is then up to the value handler to interpret the incoming data.

    The first argument of this function can be a string, a TopicSelector, or a non-empty array of strings and TopicSelectors.

    Example:

    // Produce a value stream for receiving JSON values.
    var json = diffusion.datatypes.json();
    
    session.addStream(topic, json).on('value', function(topic, specification, newValue, oldValue) {
          console.log('New value ', newValue.get());
    });
    throws

    an IllegalArgumentError if dataType is not a valid data type or if the selector could not be parsed

    Parameters

    Returns ValueStream

    a new ValueStream for the provided data type

fetchRequest

  • Creates an unconfigured fetch request.

    The returned request can be invoked with fetch. The server will evaluate the query and return a fetch result that provides the paths and types of the matching topics which the session has permission to read.

    You will usually want to restrict the query to a subset of the topic tree, and to retrieve the topic values and/or properties. This is achieved by applying one or more of the fluent builder methods provided by FetchRequest to produce more refined requests.

    Example:

    // Create and send a fetch request. Then pass the results to a resultHandler
    session.fetchRequest()
           .withValues(diffusion.datatypes.StringDataType)
           .fetch("*A/B//")
           .then(resultHandler);
    see

    diffusion.topics.FetchRequest

    since

    6.2

    Returns FetchRequest

    a new unconfigured fetch request

select

  • Subscribe the session to a topic selector in order to receive updates and subscription events.

    Subscription causes the server to establish a subscription for this session to any topic that matches the specified selector, including topics that are added after the initial call to Session.select.

    If the provided selector string does not begin with one of the prefixes defined by TopicSelectors, it will be treated as a direct topic path.

    This function can take any number of arguments. Each argument can be a string or a TopicSelector. Alternatively, an array of strings and TopicSelectors can be passed as a single argument. At least one valid selector has to be specified.

    The session will become subscribed to each existing topic matching the selector unless the session is already subscribed to the topic, or the session does not have READ_TOPIC permission for the topic path. For each topic to which the session becomes subscribed, a subscription notification and initial value (if any) will be delivered to registered value streams before the returned promise completes.

    The subscription request is also retained at the server and the session will be automatically subscribed to newly created topics that match the selector (unless a subsequent unsubscription cancels the request).

    Example:

    // Subscribe to a topic foo
    session.select("foo").then(function() {
        // Successfully subscribed
    }, function(err) {
        // There was an error with subscribing to topic "foo"
    });
    throws

    an IllegalArgumentError if the selector could not be parsed

    Parameters

    • selector: Array<string | TopicSelector>

      the topic selector to subscribe to.

    Returns Result<void>

    a result that completes when this operation succeeds

  • Parameters

    Returns Result<void>

unsubscribe

  • Unsubscribe the client from a given topic selector.

    No more updates will be received from the server for any topics matched by the selector. If no topics exist that match the selector, the server will do nothing.

    Each topic that this session is unsubscribed from will cause an unsubscribe event. Any ValueStream objects produced from Session.addStream will remain open, and will continue to emit updates for topics that the session has not been unsubscribed from.

    The returned result will resolve normally when the session has been unsubscribed. It will resolve with an error if the session is unable to unsubscribe, for instance due to security constraints.

    This function can take any number of arguments. Each argument can be a string or a TopicSelector. Alternatively, an array of strings and TopicSelectors can be passed as a single argument. At least one valid selector has to be specified.

    Example:

    // Unsubscribe from a single topic
    session.unsubscribe('foo');

    Example:

    // Unsubscribe from multiple topics
    session.unsubscribe('?foo/.*');
    throws

    an IllegalArgumentError if the selector could not be parsed

    Parameters

    • selector: Array<string | TopicSelector>

      the topic selector to unsubscribe from.

    Returns Result<void>

    a Result for this operation

  • Parameters

    Returns Result<void>