Interface Topics
- All Superinterfaces:
Feature
,SessionTrees
,TopicUpdate
,TopicViews
Specifically, the feature provides the ability to:
- Subscribe to topics and specify streams to receive updates;
- Fetch the current state of topics (even if not subscribed);
- By extending the
topic update feature
, update topics with new values; - By extending the
topic views feature
, manage topic views.
Subscription and unsubscription
A session can issue requests to subscribe to topics at any time, even if the
topics do not exist at the server. Each subscription request provides a
topic selector
that is evaluated by the server to
select matching topics. The session will be subscribed to any topics that
match the selector unless they are already subscribed, or the session has
insufficient permission. The subscription request is also retained at the
server and the session will be automatically subscribed to newly created
topics that match the selector (unless a subsequent unsubscription cancels
the request).
Sessions receive notifications from topics that they are subscribed to via subscription streams (see below). When a session is subscribed to a topic, all matching streams will first receive a subscription notification that provides details about the topic. If the server has a value for the topic, the value will be delivered to the streams before any other notifications.
A session can unsubscribe from topics at any time. This is also specified
using a topic selector. On unsubscription, matching streams are notified via
the onUnsubscription
notification. This notification will give the
reason for unsubscription (for example, by request of the session, request of
the server, or topic removal).
Subscriptions and unsubscriptions can occur for reasons other than requests
from the session. A session can be subscribed to or unsubscribed from a topic
by another session using the SubscriptionControl
feature. The removal
of a topic also automatically causes unsubscription for subscribed sessions.
Subscription requests are subject to authorisation checks. The session must
have SELECT_TOPIC
permission for the
topic selector used to subscribe. Matching topics will be further filtered to
those for which the session has READ_TOPIC
permission.
Topic selection scopes
Topic selection scopes allow an application with multiple components to use a single Diffusion session. An application component can use a topic selection scope to manage a set of selectors that is unaffected by unsubscriptions performed by other application components. The session will be subscribed to all topics with paths matching a selector in any scope. The unsubscribe operation removes a selector from specific scopes.
A scope may be specified to a subscribe
or
unsubscribe
method, indicating that the
selection only applies to that scope. The server manages scopes to ensure
that unsubscriptions applied to one scope do not affect another.
Scope names are case sensitive. A scope name may not begin with the character $ as this is reserved for internal use.
Unsubscription using a wildcard selector that indicates all topics (such as
"?.*//
") effectively removes the scope.
An application can request unsubscription from all scopes using
unsubscribeAllScopes(java.lang.String)
.
The default selection scope
is used for all
methods that do not explicitly specify a scope.
Subscription streams
A session can listen to subscription events and updates for a selection of topics by adding one or more streams. A stream is registered using a topic selector which specifies the topics that the stream applies to. When an update is received for a topic then it will be routed to every stream that matches both the topic selector and the stream's value type. If more than one stream matches, all will receive the update; the order in which they are notified is not defined.
A stream can be added several times for different selectors. If the same
stream (determined by equals
) is registered for
several selectors that match an event, the stream will only be notified of
the event once. The mapping of topic selectors to streams is maintained
locally in the client process.
It is also possible to add one or more fallback streams which will
receive updates that do not match any stream registered with a selector. This
is useful for default processing or simply to catch unprocessed updates. A
fallback stream can be added using addFallbackStream
. Zero, one, or more fallback streams may be assigned. If
no fallback stream is specified, any updates that are not routed to any other
stream will simply be discarded.
If the session is already subscribed to a topic when a matching stream is added, the stream will immediately receive a subscription notification. For most topic types, the latest value is locally cached and will be provided to the stream following the subscription notification.
A stream will receive an
onClose
callback when unregistered and an
onError(SESSION_CLOSED)
callback if the session is closed.
Value streams
A ValueStream
receives values for matching topics as and
when updates are received from the server. Delta updates received from the
server are automatically applied to locally cached values so that the stream
always receives full values for each update.
Value streams are typed to a specified value class and only updates for compatible topics will be routed to the stream. The following table shows how the value class maps to compatible topic types that will be routed to the stream:
Value Class | Compatible Topic Types |
---|---|
JSON |
JSON STRING
INT64 DOUBLE
|
String |
STRING |
Long |
INT64 |
Double |
DOUBLE |
Binary |
BINARY |
Bytes |
JSON STRING
INT64 DOUBLE
BINARY RECORD_V2 |
RecordV2 |
RECORD_V2 |
Value stream implementations can be added using
addStream
.
A value stream can be added to received updates from time
series topics
using addTimeSeriesStream
. The following table shows how the value class specified
when adding the stream maps to the event value class of time series topics
that will be routed to the stream:
Event Value Class | Time Series Event Value Class |
---|---|
JSON |
JSON STRING
INT64 DOUBLE
|
String |
STRING |
Long |
INT64 |
Double |
DOUBLE |
Binary |
BINARY |
Bytes |
JSON STRING
INT64 DOUBLE
BINARY RECORD_V2 |
RecordV2 |
RECORD_V2 |
Fetch
A session can issue a request to fetch details of a topic or topics (subject to authorization) at any time. The topics required are specified using a topic selector.
The results of a fetch will return the topic path and type of each selected topic. The results may also optionally return the topic values and/or properties.
A new request can be created using fetchRequest()
and modified to
specify additional requirements of the fetch operation. The request is issued
to the server using the fetch
method on the request. This will return the results via a
CompletableFuture
.
Access control
A session must have SELECT_TOPIC
permission for the topic selector used to
subscribe
or fetch
. The topics that result from a subscription or fetch request are
further filtered using the READ_TOPIC
permission.
No access control restrictions are applied to
unsubscription
.
Accessing the feature
This feature can be obtained from a session
as follows:
Topics topics = session.feature(Topics.class);
- Since:
- 5.0
- Author:
- DiffusionData Limited
-
Nested Class Summary
Nested ClassesModifier and TypeInterfaceDescriptionstatic interface
A parameterised query that can be used to search the topic tree.static interface
Encapsulates the results from a fetch operation issued to the server.static interface
Base subscriber stream interface.static enum
The reason that an unsubscription occurred.static interface
Stream interface that can be registered to receive subscription and value events whenever an update is received from the server.Nested classes/interfaces inherited from interface com.pushtechnology.diffusion.client.features.control.topics.SessionTrees
SessionTrees.BranchMapping, SessionTrees.BranchMappingTable, SessionTrees.InvalidBranchMappingException
Nested classes/interfaces inherited from interface com.pushtechnology.diffusion.client.features.TopicUpdate
TopicUpdate.FailedPatchException, TopicUpdate.InvalidPatchException, TopicUpdate.JsonPatchResult
-
Field Summary
Fields -
Method Summary
Modifier and TypeMethodDescription<V> void
addFallbackStream
(Class<? extends V> valueClass, Topics.ValueStream<V> stream) Add a fallback stream.<V> void
addStream
(TopicSelector topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) Add a value stream to receive topic events for topics that match a givenTopicSelector
and have a value class that matches a specified type.<V> void
addStream
(String topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) Add a value stream to receive topic events for topics that match a givenTopicSelector
expression and have a value class that matches a specified type.<V> void
addTimeSeriesStream
(TopicSelector topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) Add a value stream to receive topic events for time series topics that match a givenTopicSelector
and have a compatible time series event value class.<V> void
addTimeSeriesStream
(String topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) Add a value stream to receive topic events for time series topics that match a givenTopicSelector
expression and have a compatible time series value class.Creates an unconfigured fetch request.void
removeStream
(Stream stream) Remove a stream.default CompletableFuture<?>
subscribe
(TopicSelector topics) Request subscription to topics for the default topic selection scope.subscribe
(TopicSelector topics, String scope) Request subscription to topics.default CompletableFuture<?>
Request subscription to topics for the default topic selection scope.Request subscription to topics.default CompletableFuture<?>
unsubscribe
(TopicSelector topics) Unsubscribe from topics for the default topic selection scope.unsubscribe
(TopicSelector topics, String scope) Unsubscribe from topics.default CompletableFuture<?>
unsubscribe
(String topics) Unsubscribe from topics for the default topic selection scope.unsubscribe
(String topics, String scope) Unsubscribe from topics.unsubscribeAllScopes
(TopicSelector topics) Unsubscribe topics from all topic selection scopes.unsubscribeAllScopes
(String topics) Unsubscribe topics from all topic selection scopes.Methods inherited from interface com.pushtechnology.diffusion.client.session.Feature
getSession
Methods inherited from interface com.pushtechnology.diffusion.client.features.control.topics.SessionTrees
getBranchMappingTable, listSessionTreeBranchesWithMappings, putBranchMappingTable
Methods inherited from interface com.pushtechnology.diffusion.client.features.TopicUpdate
addAndSet, addAndSet, applyJsonPatch, applyJsonPatch, createUpdateStream, createUpdateStream, createUpdateStream, createUpdateStream, newUpdateStreamBuilder, set, set
Methods inherited from interface com.pushtechnology.diffusion.client.features.control.topics.views.TopicViews
createTopicView, getTopicView, listTopicViews, removeTopicView
-
Field Details
-
DEFAULT_SELECTION_SCOPE
The default topic selection scope.This is used by
subscribe
andunsubscribe
methods that do not explicitly specify a topic selection scope.- See Also:
-
-
Method Details
-
addStream
<V> void addStream(TopicSelector topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException Add a value stream to receive topic events for topics that match a givenTopicSelector
and have a value class that matches a specified type. If the stream matches a time series topic with a compatible time series event type it will receive onValue events without metadata.See
Topics
class documentation for full details of the use of value streams.- Type Parameters:
V
- the value class- Parameters:
topics
- selector of one or more topicsvalueClass
- the class of values that the stream acceptsstream
- the stream to add- Throws:
IllegalArgumentException
- ifvalueClass
is neither a valid DiffusionDataType
or a superclass of one.- Since:
- 5.7
-
addStream
<V> void addStream(String topics, Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException Add a value stream to receive topic events for topics that match a givenTopicSelector
expression and have a value class that matches a specified type. If the stream matches a time series topic with a compatible time series event type it will receive onValue events without metadata.See
Topics
class documentation for full details of the use of value streams.- Type Parameters:
V
- the value class- Parameters:
topics
- as aTopicSelector
expressionvalueClass
- the class of values that the stream acceptsstream
- the stream to add- Throws:
IllegalArgumentException
- iftopics
is not a valid selector expression or ifvalueClass
is neither a valid DiffusionDataType
or a superclass of one.- Since:
- 5.7
-
addFallbackStream
<V> void addFallbackStream(Class<? extends V> valueClass, Topics.ValueStream<V> stream) throws IllegalArgumentException Add a fallback stream.See
Topics
class documentation for full details regarding the use of fallback streams.- Parameters:
valueClass
- the class of values that the stream acceptsstream
- the stream to add- Throws:
IllegalArgumentException
- ifvalueClass
is neither a valid DiffusionDataType
or a superclass of one.- Since:
- 5.7
-
addTimeSeriesStream
<V> void addTimeSeriesStream(TopicSelector topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) throws IllegalArgumentException Add a value stream to receive topic events for time series topics that match a givenTopicSelector
and have a compatible time series event value class.See the
Topics
class documentation for details of the use of value streams, and theTimeSeries
class documentation for details of time series topics.This method must be used instead of
addStream
to add aValueStream<TimeSeries.Event<V>>
because there is no way to express a class literal of typeClass<TimeSeries.Event<V>>
. The stream can be removed withremoveStream(com.pushtechnology.diffusion.client.callbacks.Stream)
.- Type Parameters:
V
- the time series value class- Parameters:
topics
- selector of one or more topicseventValueClass
- The type of event values accepted by this stream. The registration will match time series topics with a compatible event value class. See theTopics
class documentation for details.stream
- the stream to add- Throws:
IllegalArgumentException
- ifvalueClass
is neither a valid DiffusionDataType
or a superclass of one.- Since:
- 6.0
-
addTimeSeriesStream
<V> void addTimeSeriesStream(String topics, Class<? extends V> eventValueClass, Topics.ValueStream<TimeSeries.Event<V>> stream) throws IllegalArgumentException Add a value stream to receive topic events for time series topics that match a givenTopicSelector
expression and have a compatible time series value class.See the
Topics
class documentation for details of the use of value streams, and theTimeSeries
class documentation for details of time series topics.This method must be used instead of
addStream
to add aValueStream<TimeSeries.Event<V>>
because Java has no way to express a class literal of typeClass<TimeSeries.Event<V>>
. The stream can be removed withremoveStream(com.pushtechnology.diffusion.client.callbacks.Stream)
.- Type Parameters:
V
- the time series value class- Parameters:
topics
- as aTopicSelector
expressioneventValueClass
- The type of event values accepted by this stream. The registration will match time series topics with a compatible event value class. See theTopics
class documentation for details.stream
- the stream to add- Throws:
IllegalArgumentException
- iftopics
is not a valid selector expression or ifvalueClass
is neither a valid DiffusionDataType
or a superclass of one.- Since:
- 6.0
-
removeStream
Remove a stream.More formally, this method removes all streams that compare equal to
stream
, regardless of the topic selector for which they are registered. It will also remove any fallback stream equal tostream
. If there are no such streams, no changes are made.- Parameters:
stream
- the value stream to remove- Since:
- 5.7
-
subscribe
Request subscription to topics for the default topic selection scope.This is equivalent to calling
subscribe(TopicSelector, String)
specifying thedefault selection scope
.- Since:
- 6.0
- See Also:
-
subscribe
Request subscription to topics.The session will become subscribed to each existing topic matching the selector unless the session is already subscribed to the topic, or the session does not have
READ_TOPIC
permission for the topic path. For each topic to which the session becomes subscribed, a subscription notification and initial value (if any) will be delivered to registered value streams before the returned CompletableFuture completes.The subscription request is also retained at the server and the session will be automatically subscribed to newly created topics that match the selector (unless a subsequent unsubscription cancels the request).
- Parameters:
topics
- specifies the topics to request subscription toscope
- specifies the scope of the selection. SeeTopic Selection Scopes
- Returns:
- a CompletableFuture that completes when a response is received
from the server.
If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
Otherwise, the CompletableFuture will complete exceptionally with a
CompletionException
. Common reasons for failure, listed by the exception reported as thecause
, include:PermissionsException
– if the calling session does not haveSELECT_TOPIC
permission for the selector expression;SessionClosedException
– if the session is closed.
- Since:
- 6.12
-
subscribe
Request subscription to topics for the default topic selection scope.This is equivalent to calling
subscribe(String, String)
specifying thedefault selection scope
.- Since:
- 6.0
- See Also:
-
subscribe
Request subscription to topics.This is equivalent to calling
subscribe(TopicSelector, String)
with a selector parsed usingTopicSelectors.parse(String)
.- Parameters:
topics
- specifies the topics to request subscription toscope
- specifies the scope of the selection. SeeTopic Selection Scopes
- Returns:
- a CompletableFuture that completes when a response is received
from the server.
If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
Otherwise, the CompletableFuture will complete exceptionally with a
CompletionException
. Common reasons for failure, listed by the exception reported as thecause
, include:PermissionsException
– if the calling session does not haveSELECT_TOPIC
permission for the selector expression;SessionClosedException
– if the session is closed.
- Throws:
IllegalArgumentException
- if topics is an invalid topic selector expression- Since:
- 6.12
-
unsubscribe
Unsubscribe from topics for the default topic selection scope.This is equivalent to calling
unsubscribe(TopicSelector, String)
specifying thedefault selection scope
.- Since:
- 6.0
- See Also:
-
unsubscribe
Unsubscribe from topics.This can be used at any time whilst connected to reduce the set of topics to which the session is subscribed or negate earlier subscription requests.
- Parameters:
topics
- the topics to unsubscribe fromscope
- specifies the scope of the selection. SeeTopic Selection Scopes
- Returns:
- a CompletableFuture that completes when a response is received
from the server.
If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
Otherwise, the CompletableFuture will complete exceptionally with a
CompletionException
. Common reasons for failure, listed by the exception reported as thecause
, include:SessionClosedException
– if the session is closed.
- Since:
- 6.12
-
unsubscribe
Unsubscribe from topics for the default topic selection scope.This is equivalent to calling
unsubscribe(String, String)
specifying thedefault selection scope
.- Since:
- 6.0
- See Also:
-
unsubscribe
Unsubscribe from topics.This is equivalent to calling
unsubscribe(TopicSelector, String)
with a selector parsed usingTopicSelectors.parse(String)
.- Parameters:
topics
- the topics to unsubscribe fromscope
- specifies the scope of the selection. SeeTopic Selection Scopes
- Returns:
- a CompletableFuture that completes when a response is received
from the server.
If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
Otherwise, the CompletableFuture will complete exceptionally with a
CompletionException
. Common reasons for failure, listed by the exception reported as thecause
, include:SessionClosedException
– if the session is closed.
- Throws:
IllegalArgumentException
- if topics is an invalid topic selector expression- Since:
- 6.12
-
unsubscribeAllScopes
Unsubscribe topics from all topic selection scopes.This is equivalent to calling
unsubscribeAllScopes(TopicSelector)
with a selector parsed usingTopicSelectors.parse(String)
.- Parameters:
topics
- the topics to unsubscribe from- Returns:
- a CompletableFuture that completes when a response is received
from the server.
If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
Otherwise, the CompletableFuture will complete exceptionally with a
CompletionException
. Common reasons for failure, listed by the exception reported as thecause
, include:SessionClosedException
– if the session is closed.
- Throws:
IllegalArgumentException
- if topics is an invalid topic selector expression- Since:
- 6.12
-
unsubscribeAllScopes
Unsubscribe topics from all topic selection scopes.This can be used at any time whilst connected to reduce the set of topics to which the session is subscribed or negate earlier subscription requests and will apply to all scopes in use.
- Parameters:
topics
- the topics to unsubscribe from- Returns:
- a CompletableFuture that completes when a response is received
from the server.
If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
Otherwise, the CompletableFuture will complete exceptionally with a
CompletionException
. Common reasons for failure, listed by the exception reported as thecause
, include:SessionClosedException
– if the session is closed.
- Since:
- 6.12
-
fetchRequest
Topics.FetchRequest<Void> fetchRequest()Creates an unconfigured fetch request.The returned request can be invoked with
fetch
. The server will evaluate the query and return a fetch result that provides the paths and types of the matching topics which the session hasREAD_TOPIC
permission.You will usually want to restrict the query to a subset of the topic tree, and to retrieve the topic values and/or properties. This is achieved by applying one or more of the fluent builder methods provided by
FetchRequest
to produce more refined requests.For example:
FetchResult result = topics.fetchRequest().withValues(String.class).fetch("*A/B//").get();
- Returns:
- a new unconfigured fetch request
- Since:
- 6.2
- See Also:
-