Receiving topic notifications
Receive topic notifications using topic selectors. This enables a client to receive updates when topics are added or removed, without the topic values.
Note: Topic notifications are supported by the Android™
API, Java™
API and JavaScript® API.
The client must register a listener object to receive notifications about selected topics. Use a topic selector to specify the topics.
For more details about topic notifications, see Topic notifications.
Required permissions:
and permissions for the specified topicsReceiving topic notifications
A client can register to receive notifications about a set of topics via a listener object.
JavaScript
const listener = { onDescendantNotification: (topicPath, type) => { // notification for an immediate descendant }, onTopicNotification: (topicPath, topicSpecification, type) => { // notification for the topic }, onClose: () => { // the handler is closed }, onError: (error) => { // an error occured } }; const registration = await session.notifications.addListener(listener); registration.select('foo');
.NET
registration = await notifications.AddListenerAsync(new Listener()); ... /// <summary> /// The listener for topic notifications. /// </summary> private class Listener : ITopicNotificationListener { /// <summary> /// Indicates that the stream was closed. /// </summary> public void OnClose() { WriteLine("The listener was closed."); } /// <summary> /// Notification for an immediate descendant of a selected topic path. /// </summary> public void OnDescendantNotification(string topicPath, NotificationType type) { WriteLine($"Descendant topic '{topicPath}' has been {type}."); } /// <summary> /// Indicates an error received by the callback. /// </summary> public void OnError(ErrorReason errorReason) { WriteLine($"The listener received the error: '{errorReason}'."); } /// <summary> /// Notification for a selected topic. /// </summary> public void OnTopicNotification(string topicPath, ITopicSpecification specification, NotificationType type) { WriteLine($"Topic '{topicPath}' has been {type}."); } }
Java and Android
final Session session = Diffusion.sessions().open("ws://localhost:8080"); final TopicNotifications notifications = session.feature(TopicNotifications.class); final TopicNotificationListener listener = new TopicNotificationListener() { @Override public void onTopicNotification(String topicPath, TopicSpecification specification, NotificationType type) { // Handle notifications for selected/deselected topics } @Override public void onDescendantNotification(String topicPath, NotificationType type) { // Handle notifications for immediate descendants } @Override public void onClose() { // The listener has been closed } @Override public void onError(ErrorReason error) { // The listener has encountered an error } }; final CompletableFuture<NotificationRegistration> future = notifications.addListener(listener); final NotificationRegistration registration = future.join(); registration.select("foo");
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" DIFFUSION_REGISTRATION_T *g_registration; static int on_listener_registered( const DIFFUSION_REGISTRATION_T *registration, void *context) { // listener has been registered g_registration = diffusion_registration_dup(registration); return HANDLER_SUCCESS; } static int on_descendant_notification( const char *topic_path, const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type, void *context) { // handle topic notification for descendant of `topic_path` return HANDLER_SUCCESS; } static int on_topic_notification( const char *topic_path, const TOPIC_SPECIFICATION_T *specification, const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type, void *context) { // handle topic notification for `topic_path` return HANDLER_SUCCESS; } static void on_close() { // topic notification listener has been closed } static int on_selected(void *context) { // topic notification listener has been registered return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // create a topic notification listener DIFFUSION_TOPIC_NOTIFICATION_LISTENER_T listener = { .on_registered = on_listener_registered, .on_descendant_notification = on_descendant_notification, .on_topic_notification = on_topic_notification, .on_close = on_close }; diffusion_topic_notification_add_listener(session, listener, NULL); // Sleep for a while sleep(5); // register the topic notification listener DIFFUSION_TOPIC_NOTIFICATION_REGISTRATION_PARAMS_T params = { .on_selected = on_selected, .registration = g_registration, .selector = topic_path }; diffusion_topic_notification_registration_select(session, params, NULL); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); diffusion_registration_free(g_registration); return EXIT_SUCCESS; }
Apple
class TopicNotificationListener: PTDiffusionTopicNotificationListener { func diffusionStream(_ stream: PTDiffusionStream, didReceiveDescendantNotificationWith type: PTDiffusionTopicNotificationType, topicPath: String) { // Handle notifications for immediate descendants } func diffusionStream(_ stream: PTDiffusionStream, didReceiveNotificationWith type: PTDiffusionTopicNotificationType, topicPath: String, specification: PTDiffusionTopicSpecification) { // Handle notifications for selected/deselected topics } func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) { // The listener has encountered an error } func diffusionDidClose(_ stream: PTDiffusionStream) { // The listener has been closed } } func register_topic_notification_listener(session: PTDiffusionSession) { let listener = TopicNotificationListener() session.topicNotifications.add(listener) { (registration, error) in if (error != nil) { print("An error occurred: %@", error!.localizedDescription) return } registration?.select(withTopicSelectorExpression: "foo") { _ in } } }