![]() |
Diffusion C API 6.12.0
|
Subscription to topics and fetching topic data. More...
Data Structures | |
struct | subscription_handlers_s |
struct | subscription_params_s |
Structure supplied when subscribing to a topic. More... | |
struct | svc_notify_subscription_request_s |
Structure describing an incoming notification that the client has been subscribed to a topic. More... | |
struct | notify_subscription_handlers_s |
struct | notify_subscription_params_s |
Structure supplied when registering to receive topic subscription notifications. More... | |
struct | unsubscription_handlers_s |
struct | unsubscription_params_s |
Structure supplied when unsubscribing from a topic. More... | |
struct | svc_notify_unsubscription_request_s |
Structure describing an incoming unsubscription notification. More... | |
struct | notify_unsubscription_handlers_s |
struct | notify_unsubscription_params_s |
Structure supplied when registering to receive topic unsubscription notifications. More... | |
Typedefs | |
typedef int(* | on_subscribe_cb) (SESSION_T *session, void *context) |
Callback for on_subscribe(). | |
typedef struct subscription_params_s | SUBSCRIPTION_PARAMS_T |
Structure supplied when subscribing to a topic. | |
typedef struct svc_notify_subscription_request_s | SVC_NOTIFY_SUBSCRIPTION_REQUEST_T |
Structure describing an incoming notification that the client has been subscribed to a topic. | |
typedef int(* | on_notify_subscription_cb) (SESSION_T *session, const SVC_NOTIFY_SUBSCRIPTION_REQUEST_T *request, void *context) |
Callback for notify_subscription_register(). | |
typedef struct notify_subscription_params_s | NOTIFY_SUBSCRIPTION_PARAMS_T |
Structure supplied when registering to receive topic subscription notifications. | |
typedef int(* | on_unsubscribe_cb) (SESSION_T *session, void *context) |
Callback for on_unsubscribe(). | |
typedef struct unsubscription_params_s | UNSUBSCRIPTION_PARAMS_T |
Structure supplied when unsubscribing from a topic. | |
typedef UNSUBSCRIPTION_PARAMS_T | UNSUBSCRIPTION_ALL_SCOPES_PARAMS_T |
Structure supplied when unsubscribing from a topic for all scopes. | |
typedef struct svc_notify_unsubscription_request_s | SVC_NOTIFY_UNSUBSCRIPTION_REQUEST_T |
Structure describing an incoming unsubscription notification. | |
typedef int(* | on_notify_unsubscription_cb) (SESSION_T *session, const SVC_NOTIFY_UNSUBSCRIPTION_REQUEST_T *request, void *context) |
Callback for notify_unsubscription_register. | |
typedef struct notify_unsubscription_params_s | NOTIFY_UNSUBSCRIPTION_PARAMS_T |
Structure supplied when registering to receive topic unsubscription notifications. | |
Functions | |
TOPIC_HANDLER_T | subscribe (SESSION_T *session, const SUBSCRIPTION_PARAMS_T params) |
Request subscription to topic. | |
TOPIC_HANDLER_T | diffusion_subscribe_with_scope (SESSION_T *session, const char *selection_scope, const SUBSCRIPTION_PARAMS_T params, DIFFUSION_API_ERROR *api_error) |
Request subscription to topics using a scope selection. | |
void | unsubscribe (SESSION_T *session, const UNSUBSCRIPTION_PARAMS_T params) |
Unsubscribe from one or more topics. | |
bool | diffusion_unsubscribe_with_scope (SESSION_T *session, const char *selection_scope, const UNSUBSCRIPTION_PARAMS_T params, DIFFUSION_API_ERROR *api_error) |
Unsubscribe from topics using a selection scope. | |
bool | diffusion_unsubscribe_all_scopes (SESSION_T *session, const UNSUBSCRIPTION_ALL_SCOPES_PARAMS_T params, DIFFUSION_API_ERROR *api_error) |
Unsubscribe topics from all topic selection scopes. | |
void | notify_subscription_register (SESSION_T *session, const NOTIFY_SUBSCRIPTION_PARAMS_T params) |
Register to receive subscription notifications. | |
void | notify_unsubscription_register (SESSION_T *session, const NOTIFY_UNSUBSCRIPTION_PARAMS_T params) |
Register to receive unsubscription notifications. | |
Subscription to topics and fetching topic data.
This feature allows a client session to subscribe to topics to receive streamed topic updates, fetch the state of topics and/or update topics with new values.
Specifically, the feature provides the ability to:
A session can issue requests to subscribe to topics at any time, even if the topics do not exist at the server. Each subscription request provides a topic selector that is evaluated by the server to select matching topics. The session will be subscribed to any topics that match the selector unless they are already subscribed, or the session has insufficient permission. The subscription request is also retained at the server and the session will be automatically subscribed to newly created topics that match the selector (unless a subsequent unsubscription cancels the request).
Sessions receive notifications from topics that they are subscribed to via subscription streams (see below). When a session is subscribed to a topic, all matching streams will first receive a subscription notification that provides details about the topic. If the server has a value for the topic, the value will be delivered to the streams before any other notifications.
A session can unsubscribe from topics at any time. This is also specified using a topic selector. On unsubscription, matching streams are notified via the on_unsubscription
notification. This notification will give the reason for unsubscription (for example, by request of the session, request of the server, or topic removal).
Subscriptions and unsubscriptions can occur for reasons other than requests from the session. A session can be subscribed to or unsubscribed from a topic by another session using the Subscription Control feature. The removal of a topic also automatically causes unsubscription for subscribed sessions.
Subscription requests are subject to authorisation checks. The session must have SELECT_TOPIC SELECT_TOPIC
permission for the topic selector used to subscribe. Matching topics will be further filtered to those for which the session has READ_TOPIC READ_TOPIC
permission.
Topic selection scopes allow an application with multiple components to use a single Diffusion session. An application component can use a topic selection scope to manage a set of selectors that is unaffected by unsubscriptions performed by other application components. The session will be subscribed to all topics with paths matching a selector in any scope. The unsubscribe operation removes a selector from specific scopes.
A scope may be specified to a subscribe
or unsubscribe
method, indicating that the selection only applies to that scope. The server manages scopes to ensure that unsubscriptions applied to one scope do not affect another.
Scope names are case sensitive. A scope name may not begin with the character $
as this is reserved for internal use.
Unsubscription using a wildcard selector that indicates all topics effectively removes the scope. An application can request unsubscription from all scopes using unsubscribe_all_scopes
. The DEFAULT_SELECTION_SCOPE
is used for all methods that do not explicitly specify a scope.
A session can listen to subscription events and updates for a selection of topics by adding one or more streams. A stream is registered using a topic selector which specifies the topics that the stream applies to. When an update is received for a topic then it will be routed to every stream that matches both the topic selector and the stream's value type. If more than one stream matches, all will receive the update; the order in which they are notified is not defined.
A stream can be added several times for different selectors. If the same stream is registered for several selectors that match an event, the stream will only be notified of the event once. The mapping of topic selectors to streams is maintained locally in the client process.
It is also possible to add one or more fallback streams which will receive updates that do not match any stream registered with a selector. This is useful for default processing or simply to catch unprocessed updates. A fallback stream can be added using add_fallback_stream
. Zero, one, or more fallback streams may be assigned. If no fallback stream is specified, any updates that are not routed to any other stream will simply be discarded.
If the session is already subscribed to a topic when a matching stream is added, the stream will immediately receive a subscription notification. For most topic types, the latest value is locally cached and will be provided to the stream following the subscription notification.
A stream will receive an on_close
callback when unregistered and an on_error
callback if the session is closed.
A value stream
receives values for matching topics as and when updates are received from the server. Delta updates received from the server are automatically applied to locally cached values so that the stream always receives full values for each update.
Value streams are typed to a specified value class and only updates for compatible topics will be routed to the stream. The following table shows how the value class maps to compatible topic types that will be routed to the stream:
Value Class | Compatible Topic Types |
---|---|
JSON | JSON STRING INT64 DOUBLE |
String | STRING |
Long | INT64 |
Double | DOUBLE |
Binary | BINARY |
Bytes | JSON STRING INT64 DOUBLE BINARY RECORD_V2 |
RecordV2 | RECORD_V2 |
Value stream implementations can be added using add_stream.
A value stream can be added to received updates from time series topics using add_time_series_stream
. The following table shows how the value class specified when adding the stream maps to the event value class of time series topics that will be routed to the stream:
Event Value Class | Time Series Event Value Class |
---|---|
JSON | JSON STRING INT64 DOUBLE |
String | STRING |
Long | INT64 |
Double | DOUBLE |
Binary | BINARY |
Bytes | JSON STRING INT64 DOUBLE BINARY RECORD_V2 |
RecordV2 | RECORD_V2 |
A session can issue a request to fetch details of a topic or topics (subject to authorization) at any time. The topics required are specified using a topic selector.
The results of a fetch will return the topic path and type of each selected topic. The results may also optionally return the topic values and/or properties.
A new request can be created using diffusion_fetch_request_init
and modified to specify additional requirements of the fetch operation. The request is issued to the server using the diffusion_fetch_request_fetch
method on the request.
A session must have SELECT_TOPIC SELECT_TOPIC
permission for the topic selector used to subscribe
or diffusion_fetch_request_fetch
. The topics that result from a subscription or fetch request are further filtered using the READ_TOPIC READ_TOPIC
permission.
No access control restrictions are applied to unsubscription.
typedef int(* on_notify_subscription_cb) (SESSION_T *session, const SVC_NOTIFY_SUBSCRIPTION_REQUEST_T *request, void *context) |
Callback for notify_subscription_register().
session | The current active session. |
request | The incoming notification request. |
context | User-supplied context from the initial registration call. |
typedef int(* on_notify_unsubscription_cb) (SESSION_T *session, const SVC_NOTIFY_UNSUBSCRIPTION_REQUEST_T *request, void *context) |
Callback for notify_unsubscription_register.
session | The current active session. |
request | The incoming notification message. |
context | User-supplied context from the initial registration call. |
typedef int(* on_subscribe_cb) (SESSION_T *session, void *context) |
Callback for on_subscribe().
session | The current active session. |
context | User-supplied context from the initial subscribe() call. |
typedef int(* on_unsubscribe_cb) (SESSION_T *session, void *context) |
Callback for on_unsubscribe().
session | The current active session. |
context | User-supplied context from the initial unsubscribe() call. |
Structure supplied when unsubscribing from a topic for all scopes.
Reason for unsubscription.
Enumerator | |
---|---|
UNSUBSCRIPTION_REASON_REQUESTED | Unsubscribed by the subscribing client. @since 5.6 |
UNSUBSCRIPTION_REASON_CONTROL | The unsubscription was requested either by another client or by the server.
|
UNSUBSCRIPTION_REASON_REMOVAL | The unsubscription occurred because the topic was removed. @since 5.6 |
UNSUBSCRIPTION_REASON_AUTHORIZATION | The unsubscription occurred because the session is no longer authorized to access the topic.
|
UNSUBSCRIPTION_REASON_UNKNOWN_UNSUBSCRIBE_REASON | A reason that is unsupported by the session. @since 6.1 |
UNSUBSCRIPTION_REASON_BACK_PRESSURE | The server has a significant backlog of messages for the session, and the topic specification has the The session can resubscribe to the topic. The unsubscription is not persisted to the cluster, if the session fails over to a different server it will be resubscribed to the topic.
|
UNSUBSCRIPTION_REASON_BRANCH_MAPPINGS | The unsubscription occurred because branch mapping rules changed. @since 6.7 |
UNSUBSCRIPTION_REASON_SUBSCRIPTION_REFRESH | The server has re-subscribed this session to the topic. Existing streams are unsubscribed because the topic type and other attributes may have changed. This can happen if a set of servers is configured to use session replication, and a session connected to one server reconnects ("fails over") to a different server. A stream that receives an unsubscription notification with this reason will also receive a subscription notification with the new
|
UNSUBSCRIPTION_REASON_STREAM_CHANGE | A fallback stream has been unsubscribed due to the addition of a stream that selects the topic.
|