All Superinterfaces:
Feature, SessionTrees, TopicUpdate, TopicViews

public interface Topics extends TopicUpdate, TopicViews, SessionTrees
This feature allows a client session to subscribe to topics to receive streamed topic updates, fetch the state of topics and/or update topics with new values.

Specifically, the feature provides the ability to:

  • Subscribe to topics and specify streams to receive updates;
  • Fetch the current state of topics (even if not subscribed);
  • By extending the topic update feature, update topics with new values;
  • By extending the topic views feature, manage topic views.

Subscription and unsubscription

A session can issue requests to subscribe to topics at any time, even if the topics do not exist at the server. Each subscription request provides a topic selector that is evaluated by the server to select matching topics. The session will be subscribed to any topics that match the selector unless they are already subscribed, or the session has insufficient permission. The subscription request is also retained at the server and the session will be automatically subscribed to newly created topics that match the selector (unless a subsequent unsubscription cancels the request).

Sessions receive notifications from topics that they are subscribed to via subscription streams (see below). When a session is subscribed to a topic, all matching streams will first receive a subscription notification that provides details about the topic. If the server has a value for the topic, the value will be delivered to the streams before any other notifications.

A session can unsubscribe from topics at any time. This is also specified using a topic selector. On unsubscription, matching streams are notified via the onUnsubscription notification. This notification will give the reason for unsubscription (for example, by request of the session, request of the server, or topic removal).

Subscriptions and unsubscriptions can occur for reasons other than requests from the session. A session can be subscribed to or unsubscribed from a topic by another session using the SubscriptionControl feature. The removal of a topic also automatically causes unsubscription for subscribed sessions.

Subscription requests are subject to authorisation checks. The session must have SELECT_TOPIC permission for the topic selector used to subscribe. Matching topics will be further filtered to those for which the session has READ_TOPIC permission.

Topic selection scopes

Topic selection scopes allow an application with multiple components to use a single Diffusion session. An application component can use a topic selection scope to manage a set of selectors that is unaffected by unsubscriptions performed by other application components. The session will be subscribed to all topics with paths matching a selector in any scope. The unsubscribe operation removes a selector from specific scopes.

A scope may be specified to a subscribe or unsubscribe method, indicating that the selection only applies to that scope. The server manages scopes to ensure that unsubscriptions applied to one scope do not affect another.

Scope names are case sensitive. A scope name may not begin with the character $ as this is reserved for internal use.

Unsubscription using a wildcard selector that indicates all topics (such as "?.*//") effectively removes the scope.

An application can request unsubscription from all scopes using unsubscribeAllScopes(java.lang.String).

The default selection scope is used for all methods that do not explicitly specify a scope.

Subscription streams

A session can listen to subscription events and updates for a selection of topics by adding one or more streams. A stream is registered using a topic selector which specifies the topics that the stream applies to. When an update is received for a topic then it will be routed to every stream that matches both the topic selector and the stream's value type. If more than one stream matches, all will receive the update; the order in which they are notified is not defined.

A stream can be added several times for different selectors. If the same stream (determined by equals) is registered for several selectors that match an event, the stream will only be notified of the event once. The mapping of topic selectors to streams is maintained locally in the client process.

It is also possible to add one or more fallback streams which will receive updates that do not match any stream registered with a selector. This is useful for default processing or simply to catch unprocessed updates. A fallback stream can be added using addFallbackStream. Zero, one, or more fallback streams may be assigned. If no fallback stream is specified, any updates that are not routed to any other stream will simply be discarded.

If the session is already subscribed to a topic when a matching stream is added, the stream will immediately receive a subscription notification. For most topic types, the latest value is locally cached and will be provided to the stream following the subscription notification.

A stream will receive an onClose callback when unregistered and an onError(SESSION_CLOSED) callback if the session is closed.

Value streams

A ValueStream receives values for matching topics as and when updates are received from the server. Delta updates received from the server are automatically applied to locally cached values so that the stream always receives full values for each update.

Value streams are typed to a specified value class and only updates for compatible topics will be routed to the stream. The following table shows how the value class maps to compatible topic types that will be routed to the stream:

Value Class Compatible Topic Types
JSON JSON STRING INT64 DOUBLE
String STRING
Long INT64
Double DOUBLE
Binary BINARY
Bytes JSON STRING INT64 DOUBLE BINARY RECORD_V2
RecordV2 RECORD_V2

Value stream implementations can be added using addStream.

A value stream can be added to received updates from time series topics using addTimeSeriesStream. The following table shows how the value class specified when adding the stream maps to the event value class of time series topics that will be routed to the stream:

Event Value Class Time Series Event Value Class
JSON JSON STRING INT64 DOUBLE
String STRING
Long INT64
Double DOUBLE
Binary BINARY
Bytes JSON STRING INT64 DOUBLE BINARY RECORD_V2
RecordV2 RECORD_V2

Fetch

A session can issue a request to fetch details of a topic or topics (subject to authorization) at any time. The topics required are specified using a topic selector.

The results of a fetch will return the topic path and type of each selected topic. The results may also optionally return the topic values and/or properties.

A new request can be created using fetchRequest() and modified to specify additional requirements of the fetch operation. The request is issued to the server using the fetch method on the request. This will return the results via a CompletableFuture.

Access control

A session must have SELECT_TOPIC permission for the topic selector used to subscribe or fetch. The topics that result from a subscription or fetch request are further filtered using the READ_TOPIC permission.

No access control restrictions are applied to unsubscription.

Accessing the feature

This feature can be obtained from a session as follows:

 Topics topics = session.feature(Topics.class);
 
Since:
5.0
Author:
DiffusionData Limited
  • Field Details

  • Method Details

    • addStream

      <V> void addStream(TopicSelector topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException
      Add a value stream to receive topic events for topics that match a given TopicSelector and have a value class that matches a specified type. If the stream matches a time series topic with a compatible time series event type it will receive onValue events without metadata.

      See Topics class documentation for full details of the use of value streams.

      Type Parameters:
      V - the value class
      Parameters:
      topics - selector of one or more topics
      valueClass - the class of values that the stream accepts
      stream - the stream to add
      Throws:
      IllegalArgumentException - if valueClass is neither a valid Diffusion DataType or a superclass of one.
      Since:
      5.7
    • addStream

      <V> void addStream(String topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException
      Add a value stream to receive topic events for topics that match a given TopicSelector expression and have a value class that matches a specified type. If the stream matches a time series topic with a compatible time series event type it will receive onValue events without metadata.

      See Topics class documentation for full details of the use of value streams.

      Type Parameters:
      V - the value class
      Parameters:
      topics - as a TopicSelector expression
      valueClass - the class of values that the stream accepts
      stream - the stream to add
      Throws:
      IllegalArgumentException - if topics is not a valid selector expression or if valueClass is neither a valid Diffusion DataType or a superclass of one.
      Since:
      5.7
    • addFallbackStream

      <V> void addFallbackStream(Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException
      Add a fallback stream.

      See Topics class documentation for full details regarding the use of fallback streams.

      Parameters:
      valueClass - the class of values that the stream accepts
      stream - the stream to add
      Throws:
      IllegalArgumentException - if valueClass is neither a valid Diffusion DataType or a superclass of one.
      Since:
      5.7
    • addTimeSeriesStream

      <V> void addTimeSeriesStream(TopicSelector topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) throws IllegalArgumentException
      Add a value stream to receive topic events for time series topics that match a given TopicSelector and have a compatible time series event value class.

      See the Topics class documentation for details of the use of value streams, and the TimeSeries class documentation for details of time series topics.

      This method must be used instead of addStream to add a ValueStream<TimeSeries.Event<V>> because there is no way to express a class literal of type Class<TimeSeries.Event<V>>. The stream can be removed with removeStream(com.pushtechnology.diffusion.client.callbacks.Stream).

      Type Parameters:
      V - the time series value class
      Parameters:
      topics - selector of one or more topics
      eventValueClass - The type of event values accepted by this stream. The registration will match time series topics with a compatible event value class. See the Topics class documentation for details.
      stream - the stream to add
      Throws:
      IllegalArgumentException - if valueClass is neither a valid Diffusion DataType or a superclass of one.
      Since:
      6.0
    • addTimeSeriesStream

      <V> void addTimeSeriesStream(String topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) throws IllegalArgumentException
      Add a value stream to receive topic events for time series topics that match a given TopicSelector expression and have a compatible time series value class.

      See the Topics class documentation for details of the use of value streams, and the TimeSeries class documentation for details of time series topics.

      This method must be used instead of addStream to add a ValueStream<TimeSeries.Event<V>> because Java has no way to express a class literal of type Class<TimeSeries.Event<V>>. The stream can be removed with removeStream(com.pushtechnology.diffusion.client.callbacks.Stream).

      Type Parameters:
      V - the time series value class
      Parameters:
      topics - as a TopicSelector expression
      eventValueClass - The type of event values accepted by this stream. The registration will match time series topics with a compatible event value class. See the Topics class documentation for details.
      stream - the stream to add
      Throws:
      IllegalArgumentException - if topics is not a valid selector expression or if valueClass is neither a valid Diffusion DataType or a superclass of one.
      Since:
      6.0
    • removeStream

      void removeStream(Stream stream)
      Remove a stream.

      More formally, this method removes all streams that compare equal to stream, regardless of the topic selector for which they are registered. It will also remove any fallback stream equal to stream. If there are no such streams, no changes are made.

      Parameters:
      stream - the value stream to remove
      Since:
      5.7
    • subscribe

      default CompletableFuture<?> subscribe(TopicSelector topics)
      Request subscription to topics for the default topic selection scope.

      This is equivalent to calling subscribe(TopicSelector, String) specifying the default selection scope.

      Since:
      6.0
      See Also:
    • subscribe

      CompletableFuture<?> subscribe(TopicSelector topics, String scope)
      Request subscription to topics.

      The session will become subscribed to each existing topic matching the selector unless the session is already subscribed to the topic, or the session does not have READ_TOPIC permission for the topic path. For each topic to which the session becomes subscribed, a subscription notification and initial value (if any) will be delivered to registered value streams before the returned CompletableFuture completes.

      The subscription request is also retained at the server and the session will be automatically subscribed to newly created topics that match the selector (unless a subsequent unsubscription cancels the request).

      Parameters:
      topics - specifies the topics to request subscription to
      scope - specifies the scope of the selection. See Topic Selection Scopes
      Returns:
      a CompletableFuture that completes when a response is received from the server.

      If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.

      Otherwise, the CompletableFuture will complete exceptionally with a CompletionException. Common reasons for failure, listed by the exception reported as the cause, include:

      Since:
      6.12
    • subscribe

      default CompletableFuture<?> subscribe(String topics)
      Request subscription to topics for the default topic selection scope.

      This is equivalent to calling subscribe(String, String) specifying the default selection scope.

      Since:
      6.0
      See Also:
    • subscribe

      CompletableFuture<?> subscribe(String topics, String scope)
      Request subscription to topics.

      This is equivalent to calling subscribe(TopicSelector, String) with a selector parsed using TopicSelectors.parse(String).

      Parameters:
      topics - specifies the topics to request subscription to
      scope - specifies the scope of the selection. See Topic Selection Scopes
      Returns:
      a CompletableFuture that completes when a response is received from the server.

      If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.

      Otherwise, the CompletableFuture will complete exceptionally with a CompletionException. Common reasons for failure, listed by the exception reported as the cause, include:

      Throws:
      IllegalArgumentException - if topics is an invalid topic selector expression
      Since:
      6.12
    • unsubscribe

      default CompletableFuture<?> unsubscribe(TopicSelector topics)
      Unsubscribe from topics for the default topic selection scope.

      This is equivalent to calling unsubscribe(TopicSelector, String) specifying the default selection scope.

      Since:
      6.0
      See Also:
    • unsubscribe

      CompletableFuture<?> unsubscribe(TopicSelector topics, String scope)
      Unsubscribe from topics.

      This can be used at any time whilst connected to reduce the set of topics to which the session is subscribed or negate earlier subscription requests.

      Parameters:
      topics - the topics to unsubscribe from
      scope - specifies the scope of the selection. See Topic Selection Scopes
      Returns:
      a CompletableFuture that completes when a response is received from the server.

      If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.

      Otherwise, the CompletableFuture will complete exceptionally with a CompletionException. Common reasons for failure, listed by the exception reported as the cause, include:

      Since:
      6.12
    • unsubscribe

      default CompletableFuture<?> unsubscribe(String topics)
      Unsubscribe from topics for the default topic selection scope.

      This is equivalent to calling unsubscribe(String, String) specifying the default selection scope.

      Since:
      6.0
      See Also:
    • unsubscribe

      CompletableFuture<?> unsubscribe(String topics, String scope)
      Unsubscribe from topics.

      This is equivalent to calling unsubscribe(TopicSelector, String) with a selector parsed using TopicSelectors.parse(String).

      Parameters:
      topics - the topics to unsubscribe from
      scope - specifies the scope of the selection. See Topic Selection Scopes
      Returns:
      a CompletableFuture that completes when a response is received from the server.

      If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.

      Otherwise, the CompletableFuture will complete exceptionally with a CompletionException. Common reasons for failure, listed by the exception reported as the cause, include:

      Throws:
      IllegalArgumentException - if topics is an invalid topic selector expression
      Since:
      6.12
    • unsubscribeAllScopes

      CompletableFuture<?> unsubscribeAllScopes(String topics)
      Unsubscribe topics from all topic selection scopes.

      This is equivalent to calling unsubscribeAllScopes(TopicSelector) with a selector parsed using TopicSelectors.parse(String).

      Parameters:
      topics - the topics to unsubscribe from
      Returns:
      a CompletableFuture that completes when a response is received from the server.

      If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.

      Otherwise, the CompletableFuture will complete exceptionally with a CompletionException. Common reasons for failure, listed by the exception reported as the cause, include:

      Throws:
      IllegalArgumentException - if topics is an invalid topic selector expression
      Since:
      6.12
    • unsubscribeAllScopes

      CompletableFuture<?> unsubscribeAllScopes(TopicSelector topics)
      Unsubscribe topics from all topic selection scopes.

      This can be used at any time whilst connected to reduce the set of topics to which the session is subscribed or negate earlier subscription requests and will apply to all scopes in use.

      Parameters:
      topics - the topics to unsubscribe from
      Returns:
      a CompletableFuture that completes when a response is received from the server.

      If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.

      Otherwise, the CompletableFuture will complete exceptionally with a CompletionException. Common reasons for failure, listed by the exception reported as the cause, include:

      Since:
      6.12
    • fetchRequest

      Topics.FetchRequest<Void> fetchRequest()
      Creates an unconfigured fetch request.

      The returned request can be invoked with fetch. The server will evaluate the query and return a fetch result that provides the paths and types of the matching topics which the session has READ_TOPIC permission.

      You will usually want to restrict the query to a subset of the topic tree, and to retrieve the topic values and/or properties. This is achieved by applying one or more of the fluent builder methods provided by FetchRequest to produce more refined requests.

      For example:

      FetchResult result = topics.fetchRequest().withValues(String.class).fetch("*A/B//").get();

      Returns:
      a new unconfigured fetch request
      Since:
      6.2
      See Also: