public interface TopicSpecification
TopicControl.addTopic(String, TopicSpecification, AddCallback)
.
Topic specifications allow an application to introspect the type and
capabilities of a topic. Topic specifications are provided to
value streams
and topic
notification listeners
.
A topic specification has a topic type
and a map of
property settings which define the behavior of the topic. A default topic
specification for a topic type can be created using
Diffusion.newTopicSpecification(TopicType)
. Topic specifications with
different properties can be derived from a default instance using the
withProperties(Map)
and withProperty(String, String)
builder methods.
Depending on the topic type, some properties must be included in the specification when creating a topic and some properties have no effect. The required and optional properties for each the topic type are set out in the following table. Properties unsupported by the topic type are ignored.
Default when optional | STRING JSON BINARY |
DOUBLE INT64 |
RECORD_V2 |
TIME_SERIES |
||
---|---|---|---|---|---|---|
COMPRESSION |
low |
Optional | — | Optional | Optional | |
CONFLATION |
conflate |
Optional | Optional | Optional | † | |
DONT_RETAIN_VALUE |
false |
Optional | Optional | Optional | — | |
OWNER |
Optional | Optional | Optional | Optional | ||
PERSISTENT |
true |
Optional | Optional | Optional | Optional | |
PRIORITY |
default |
Optional | Optional | Optional | Optional | |
PUBLISH_VALUES_ONLY |
false |
Optional | — | Optional | Optional | |
REMOVAL |
Optional | Optional | Optional | Optional | ||
SCHEMA |
— | — | Optional | — | ||
TIDY_ON_UNSUBSCRIBE |
false |
Optional | Optional | Optional | Optional | |
TIME_SERIES_EVENT_VALUE_TYPE |
— | — | — | Required | ||
TIME_SERIES_RETAINED_RANGE |
limit 10 |
— | — | — | Optional | |
TIME_SERIES_SUBSCRIPTION_RANGE |
As
documented |
— | — | — | Optional | |
VALIDATE_VALUES |
false |
Optional | Optional | Optional | Optional |
TIME_SERIES
topics have restricted values for the
CONFLATION
property. They are only allowed to have the values
off
or unsubscribe
.
Modifier and Type | Field and Description |
---|---|
static String |
COMPRESSION
Key of the topic property that allows the compression policy to be set
on a per-topic basis.
|
static String |
CONFLATION
Key of the topic property that specifies the conflation policy of the
topic.
|
static String |
DONT_RETAIN_VALUE
Key of the topic property that specifies a topic should not retain its
last value.
|
static String |
OWNER
Key of the topic property that allows the creator of a topic to extend
READ_TOPIC, MODIFY_TOPIC, and UPDATE_TOPIC permissions to a specific
principal, in addition to the permissions granted by the authorisation
rules in the security store.
|
static String |
PERSISTENT
Key of the topic property that can be used to prevent a topic from being
persisted when the server is configured to enable persistence.
|
static String |
PRIORITY
Key of the topic property that specifies the topic delivery priority.
|
static String |
PUBLISH_VALUES_ONLY
Key of the topic property that specifies whether a topic should publish
only values.
|
static String |
REMOVAL
Key of the topic property that specifies a removal policy for automatic
removal of the topic (and/or other topics).
|
static String |
SCHEMA
Key of the topic property that specifies a schema which constrains topic
values.
|
static String |
TIDY_ON_UNSUBSCRIBE
Key of the topic property that specifies the 'tidy on unsubscribe' option
for a topic.
|
static String |
TIME_SERIES_EVENT_VALUE_TYPE
Key of the topic property that specifies the event data type for a time
series topic.
|
static String |
TIME_SERIES_RETAINED_RANGE
Key of the topic property that specifies the range of events retained by
a time series topic.
|
static String |
TIME_SERIES_SUBSCRIPTION_RANGE
Key of the topic property that specifies the range of time series topic
events to send to new subscribers.
|
static String |
VALIDATE_VALUES
Key of the topic property that specifies whether a topic should validate
inbound values.
|
Modifier and Type | Method and Description |
---|---|
Map<String,String> |
getProperties()
Returns the topic properties.
|
TopicType |
getType()
Returns the topic type.
|
TopicSpecification |
withoutProperties(String... propertyNames)
Returns a new specification that does not have the specified properties.
|
TopicSpecification |
withProperties(Map<String,String> properties)
Creates a new specification with the specified properties.
|
TopicSpecification |
withProperty(String key,
String value)
Creates a new specification with the specified property value set.
|
static final String PUBLISH_VALUES_ONLY
By default, a topic that supports delta streams will publish the
difference between two values (a delta) when doing so is more efficient
than publishing the complete new value. Subscribing sessions can use a
value stream
to automatically apply the delta to a
local copy of the topic value to calculate the new value.
Setting PUBLISH_VALUES_ONLY
to true
disables this
behavior so that deltas are never published. Doing so is usually not
recommended because it will result in more data being transmitted, less
efficient use of network resources, and increased transmission latency.
On the other hand, calculating deltas can require significant CPU from
the server or, if update streams are used, from the updating client. The
CPU cost will be higher if there are many differences between successive
values, in which case delta streams confer fewer benefits. If successive
values are unrelated to each other, consider setting
PUBLISH_VALUES_ONLY
to true
. Also consider setting
PUBLISH_VALUES_ONLY
to true
if the network capacity is
high and the bandwidth savings of deltas are not required.
DONT_RETAIN_VALUE
,
Constant Field Valuesstatic final String VALIDATE_VALUES
By default, the server does not validate received values before sending them on to client sessions. Invalid or corrupt values will be stored in the topic and passed on to sessions. If this property is set to "true", the server will perform additional validation on values to check that they are valid instances of the data type, and if it is not then it will return an error to the updater and not update the topic.
If this value is not set (or set to something other than "true"), no
server validation of inbound values is performed. This is the recommended
setting as there is a performance overhead to validation and a session
using topic update
cannot send invalid values
anyway.
static final String TIDY_ON_UNSUBSCRIBE
By default, if a session unsubscribes from a topic, it will receive any updates for that topic that were previously queued but not sent.
If this property is set to "true", when a session unsubscribes from the topic, any updates for the topic that are still queued for the session are removed. There is a performance overhead to using this option as the client queue must be scanned to find topic updates to remove, however it may prove useful for preventing unwanted data being sent to sessions.
static final String TIME_SERIES_EVENT_VALUE_TYPE
The value is the type name
of a data type.
DataTypes.getByName(String)
,
Constant Field Valuesstatic final String TIME_SERIES_RETAINED_RANGE
When a new event is added to the time series, older events that fall outside of the range are discarded.
If the property is not specified, a time series topic will retain the ten most recent events.
The property value is a time series range expression string composed of one or more constraint clauses. Constraints are combined to provide a range of events from the end of the time series.
MS
– milliseconds; S
– seconds; H
– hours.
If a range expression contains multiple constraints, the constraint that selects the smallest range is used.
Property value | Meaning |
---|---|
limit 5 |
The five most recent events |
last 10s |
All events that are no more than ten seconds older than the latest event |
last 10s limit 5 |
The five most recent events that are no more than ten seconds older than the latest event |
Range expressions are not case sensitive: limit 5 last 10s
is
equivalent to LIMIT 5 LAST 10S
.
static final String TIME_SERIES_SUBSCRIPTION_RANGE
The property value is a time series range expression, following the
format used for TIME_SERIES_RETAINED_RANGE
.
If the property is not specified, new subscribers will be sent the latest
event if delta streams are enabled and no events if delta streams are
disabled. See the description of Subscription range in the
time series feature
documentation.
static final String SCHEMA
This property is only used by RECORD_V2
topics. The property value can be generated using the
Schema.asJSONString()
method of a Schema
created using a
SchemaBuilder
.
static final String DONT_RETAIN_VALUE
By default, a topic will retain its latest value. The latest value will
be sent to new subscribers. Setting this property to true
disables this behavior. New subscribers will not be sent an initial
value. No value will be returned for fetch operations that select the
topic. This is useful for data streams where the values are only
transiently valid.
Setting DONT_RETAIN_VALUE
to true
also disables delta
streams, regardless of the PUBLISH_VALUES_ONLY
value. If
subsequent values are likely to be related, delta streams usually provide
performance benefits (see PUBLISH_VALUES_ONLY
). Consider leaving
DONT_RETAIN_VALUE
set to false
to benefit from delta
streams, even if there is no other requirement to retain the last value.
Bearing in mind the performance trade-offs of disabling delta streams,
there are two reasons to consider setting DONT_RETAIN_VALUE
to
true
. First, it stops the server and each subscribed client from
keeping a copy of the value, reducing their memory requirements. Second,
when a topic has a high update rate and is replicated across a cluster,
it can significantly improve throughput because the values need not be
persisted to the cluster.
Time series topics ignore this property and always retain the latest value.
static final String PERSISTENT
By default, a topic will be persisted if persistence is enabled at the server and the topic type supports persistence.
Setting PERSISTENT to false
will prevent the topic from being
persisted.
static final String REMOVAL
This property is specified as an expression which defines one or more conditions that are to be satisfied before automatic removal occurs.
The expression takes the form:
when conditions [remove "selector"]
At least one condition must be supplied. If more than one is supplied,
they must be separated by logical operators (and
or or
).
The natural evaluation order of the operators may be changed by
surrounding with parentheses (e.g. (condition and
condition)).
The remove
clause is optional. It provides a
TopicSelector
expression representing the topics to be removed.
If a remove
clause is specified, the topic with the removal
policy will only be removed if its path matches the selector expression.
The selector must be surrounded by either double or single quotes.
When many topics have the same removal policy, it is better to
set the REMOVAL
property for one of them, using a remove
clause that selects all of the topics. This is more efficient because it
allows the server to avoid evaluating the same condition many times.
The permissions that are applied at the time of removal are those defined by the roles of the principal that created the topic at the time of creation. The roles of that principal may therefore change before the removal with no effect, but if the permissions given to the roles change it may have an effect upon the final removal.
Only one occurrence of each of the following condition types may be included within the expression:
Condition Type | Format | Usage |
---|---|---|
time after | time after absoluteTime |
Removal should occur after a specified absolute time. Absolute time may be specified as a number of milliseconds since the epoch (00:00:00 on 1 January 1970) or as a quoted date and time formatted in RFC_1123 date time format. Either single or double quotes may be used. |
subscriptions less than | [local] subscriptions < n for forPeriod [after afterPeriod] |
Removal should occur when the topic has had less than the specified
number (n) of subscriptions for a given period (forPeriod)
of time. Optionally, an initial period (afterPeriod) may be
specified by which to delay the initial checking of this condition. See
below for period formats.
The optional |
no updates for | no updates for forPeriod [after afterPeriod] |
Removal should occur when the topic has had no updates for a given period (forPeriod) of time. Optionally, an initial period (afterPeriod) may be specified by which to delay the initial checking of this condition. See below for period formats. |
Multiple occurrences of the following condition types may be included within the expression:
Condition Type | Format | Usage |
---|---|---|
no session has | no [local] session has "criteria" [for forPeriod] [after afterPeriod] |
Removal should occur when there are no sessions satisfying certain
criteria. Optionally the criteria can be required to be satisfied
for a period of time (forPeriod). Optionally, an initial period
(afterPeriod) can be specified to delay the initial check of the
criteria. Session selection criteria are specified as defined in
session filters and must be surrounded by single or
double quotes. See below for period formats.
The optional |
this session closes |
This is a shorthand form of no local session has that may be
used to indicate that the topic is to be removed when the session that
created it closes. |
s
(seconds), m
(minutes), h
(hours) or
d
(days). For example, 10 minutes would be specified as
10m
.
If quotes or backslashes (\
) are required within quoted values
such as selectors or session criteria then they may be escaped by
preceding with \
. The convenience method
Diffusion.escape(String)
is provided to escape such characters in
a value. The expression is validated only by the server and therefore if
an invalid expression is specified it will be reported as an
TopicControl.InvalidTopicSpecificationException
.
Examples:
when time after 1518780068112
The topic will be removed when the date and time indicated by the
specified number of milliseconds since the epoch has passed.
when time after "Tue, 3 Jun 2018 11:05:30 GMT"
The topic will be removed when the specified date and time has passed.
when time after "Tue, 3 Jun 2018 11:05:30 GMT" remove "*alpha/beta//"
The topic alpha/beta and all topics subordinate to it will be removed
when the specified date and time has passed.
when subscriptions < 1 for 20m
The topic will be removed when it has had no subscriptions for a
continuous period of 20 minutes.
when subscriptions < 2 for 20m after 1h
The topic will be removed when it has had less than 2 subscriptions for a
continuous period of 20 minutes after one hour has passed since its
creation.
when no updates for 3h
The topic will be removed when it has had no updates for a continuous
period of 3 hours.
when no updates for 15m after 1d
The topic will be removed when it has had no updates for a continuous
period of 15 minutes after one day has passed since its creation.
when this session closes
The topic will be removed when the session creating it closes.
when no session has '$Principal is "Alice"'
The topic will be removed when there are no sessions with the principal
'Alice'.
when no session has '$Principal is "Alice"' for 10m
The topic will be removed when there are no sessions with the principal
'Alice' for a continuous period of 10 minutes.
when no session has 'Department is "Accounts"' for 30m after 2h
The topic will be removed when there have been no sessions from the
Accounts department for a continuous period of 30 minutes after 2 hours
have passed since its creation.
when time after "Tue, 3 Jun 2018 11:05:30 GMT" and subscriptions < 1 for 30m
The topic will be removed when the specified date and time has passed and
the topic has had no subscriptions for a continuous period of 30 minutes
after that time.
when time after "Tue, 3 Jun 2018 11:05:30 GMT" and subscriptions < 2 for 10m after 1h
The topic will be removed when the specified date and time has passed and
the topic has had less than 2 subscriptions for a continuous period of 10
minutes after that time plus one hour.
when time after "Tue, 3 Jun 2018 11:05:30 GMT" or subscriptions < 2 for 10m after 1h
The topic will be removed when the specified date and time has passed or
the topic has had less than 2 subscriptions for a continuous period of 10
minutes after one hour from its creation.
when time after "Tue, 3 Jun 2018 11:05:30 GMT" and (subscriptions < 2 for 10m after 1h or no updates for 20m)
The topic will be removed when the specified date and time has passed and
either the topic has had less than 2 subscriptions for a continuous
period of 10 minutes after that time plus one hour or it has had no
updates for a continuous period of 20 minutes. Note that the parentheses
are significant here as without them the topic would be removed if it had
had no updates for 20 minutes regardless of the time and subscriptions
clause.
Notes and restrictions on use
The after
time periods refer to the period since the topic was
created or restored from persistence store after a server is restarted.
They are designed as a 'grace' period after the topic comes into
existence before the related condition starts to be evaluated. When not
specified the conditions start to be evaluated as soon as the topic is
created or restored.
The server will evaluate conditions on a periodic basis (every few seconds) so the exact removal time will not be precise for low periodic granularity.
The meaning of the for
period in a no session has
condition is subtly different from its use in other conditions. It does
not guarantee that there has been no session satisfying the condition at
some point between evaluations, only that when evaluated the given period
of time has passed since it was last evaluated and found to have no
matching sessions.
Subscriptions is the number of subscriptions to a topic.
Automatic topic removal is supported for a topic that is replicated
across the local cluster, and for a topic with with fanout replicas on
downstream remote servers. A subscriptions less than
condition
will be evaluated against the total number of subscriptions across the
cluster and on all fanout replicas on downstream remote servers. A
no session has
condition will consider all sessions hosted across
the cluster and all sessions hosted by downstream remote servers that
have a fanout replica of the topic. The local
keyword can be used
to restrict evaluation to the local cluster, ignoring fanout replicas.
static final String CONFLATION
Conflation is the process of merging or discarding topic updates queued for a session to reduce the server memory footprint and network data. The server will conflate sessions that have a large number of queued messages to meet configured queue size targets. The sessions with the largest queues are typically slow consumers or have been disconnected – both will benefit from conflation. This property allows conflation behavior to be tuned on a topic-by-topic basis.
The supported policies are:
off
conflate
unsubscribe
always
The default policy used when the property is not specified and the
topic type is not time series is conflate
. The default policy
used when the property is not specified and the topic type is time
series is off
.
The policy off
disables conflation for the topic. This policy
disables all conflation for the topic, so topic updates will never be
merged or discarded.
The policy conflate
automatically conflates topic updates when
back pressure is detected by the server.
The policy unsubscribe
automatically unsubscribes the topic when
back pressure is detected by the server. The unsubscription is not
persisted to the cluster. If a session fails over to a different server
it will be resubscribed to the topic.
The policy always
automatically conflates topic updates as they
are queued for the session. This is an eager policy that ensures only the
latest update is queued for the topic, minimising the server memory and
network bandwidth used by the session.
The conflate
and unsubscribe
policies are applied when
the server detects back pressure for a session. The server configuration
places limits on the data queued for each session. If these limits are
breached, the server will conflate the session queue to attempt to reduce
its size. If the session queue still exceeds the limits after conflation,
the session will be terminated.
Conflation can be
disabled on a session-by-session basis
. If conflation is disabled for a
session the policy will not be applied to topic updates queued for the
session but will be for other sessions that have conflation enabled.
The policies conflate
and always
are not supported for
time series topics as they would cause missing events. Attempts to enable
these policies with time series topics will cause the creation of the
topic to fail, reporting that the specification is invalid.
static final String OWNER
A session that has authenticated using the principal can update and remove the topic, so the principal can be considered the topic owner. To fetch or subscribe to the topic, the principal must also be granted the SELECT_TOPIC permission by the security store rules.
This may be used in the following cases:
1) A session creates a topic and makes its own principal the owner.
2) A session creates a topic and makes another principal the owner.
The format of the property value is:
$Principal is "name"
where name is the name of the principal. Single quotes may be used
instead of double quotes and special characters can be escaped using
Diffusion.escape(String)
if required.
The purpose of this property is to allow a client to create topics on
behalf of other users. This can be used in conjunction with the
REMOVAL
property so that such topics are removed when there are
no longer any sessions for the named principal.
For example:
specification
.withProperty(OWNER, "$Principal is 'myPrincipal'")
.withProperty(REMOVAL, "when no session has '$Principal is \"myPrincipal\"' for 5s");
static final String COMPRESSION
Compression reduces the bandwidth required to broadcast topic updates to subscribed sessions, at the cost of increased server CPU.
Changes to a topic's value are published to each subscribed session as a sequence of topic messages. A topic message can carry the latest value or the difference between the latest value and the previous value (a delta). The compression policy determines if and how published topic messages are compressed. Topic messages are not exposed through the client API; the client library handles decompression and decodes deltas automatically, passing reconstructed values to the application.
The compression policy for a topic is specified by setting this property to one of several values:
off
low
medium
high
The policies are listed in the order of increasing compression and
increasing CPU cost. off
disables compression completely for the
topic and requires no additional CPU; high
compresses the topic
messages to the smallest number of bytes, but has the highest CPU cost.
Generally some compression is beneficial, so the default value for this
property is low
.
Prior to version 6.4, only two values were allowed: true
(equivalent to medium
, and the previous default policy) and
false
(equivalent to off
). These values are still
supported.
This property is only one factor that determines whether a topic message will be compressed. Other factors include:
static final String PRIORITY
The supported delivery priorities are:
low
default
high
The delivery priority affects the order of topic updates sent to a
subscribed client session. When there are multiple topic updates for
topics with different priorities in a session's outbound queue, updates
for high
priority topics will be delivered first, followed by
updates for default
priority topics, followed by updates for
low
priority topics. Topic subscription and unsubscription
notifications are also delivered according to the topic delivery
priority.
Using different delivery priorities is most beneficial when there is a large backlog of queued updates to deliver to a client session. On lightly loaded systems, updates typically remain in the outbound queue for a few milliseconds and so there is a lower chance of topic updates being reordered based on their priority. The backlog will be larger if the topic update rate is higher; the server or the client are more heavily loaded; the client session becomes temporarily disconnected; or if there is poor network connectivity between the server and the client.
Messages from the server to the client that are not topic updates, for
example ping
requests and responses, are queued with the
default
delivery priority.
TopicType getType()
Map<String,String> getProperties()
TopicSpecification withProperty(String key, String value) throws IllegalArgumentException
key
- the property keyvalue
- the property valueIllegalArgumentException
- if key is not a supported property keyNullPointerException
- if either the key or value is nullTopicSpecification withProperties(Map<String,String> properties) throws IllegalArgumentException
properties
- map of propertiesNullPointerException
- if properties
is nullIllegalArgumentException
- any key is not a supported property keyNullPointerException
- if any key or value in properties
is
nullTopicSpecification withoutProperties(String... propertyNames)
propertyNames
- a list of the property names that should be excludedCopyright © 2024 DiffusionData Ltd. All Rights Reserved.