Receiving topic notifications
Receive topic notifications using topic selectors. This enables a client to receive updates when topics are added or removed, without the topic values.
Note: Topic notifications are supported by the
Android™
API,
Java™
API and
JavaScript®
API.
The client must register a listener object to receive notifications about selected topics. Use a topic selector to specify the topics.
For more details about topic notifications, see Topic notifications.
Required permissions: and permissions for the specified topics
Receiving topic notifications
A client can register to receive notifications about a set of topics via a listener object.
JavaScript
const listener = {
onDescendantNotification: (topicPath, type) => {
// notification for an immediate descendant
},
onTopicNotification: (topicPath, topicSpecification, type) => {
// notification for the topic
},
onClose: () => {
// the handler is closed
},
onError: (error) => {
// an error occured
}
};
const registration = await session.notifications.addListener(listener);
registration.select('foo');
.NET
registration = await notifications.AddListenerAsync(new Listener());
...
/// <summary>
/// The listener for topic notifications.
/// </summary>
private class Listener : ITopicNotificationListener {
/// <summary>
/// Indicates that the stream was closed.
/// </summary>
public void OnClose()
{
WriteLine("The listener was closed.");
}
/// <summary>
/// Notification for an immediate descendant of a selected topic path.
/// </summary>
public void OnDescendantNotification(string topicPath, NotificationType type)
{
WriteLine($"Descendant topic '{topicPath}' has been {type}.");
}
/// <summary>
/// Indicates an error received by the callback.
/// </summary>
public void OnError(ErrorReason errorReason)
{
WriteLine($"The listener received the error: '{errorReason}'.");
}
/// <summary>
/// Notification for a selected topic.
/// </summary>
public void OnTopicNotification(string topicPath, ITopicSpecification specification,
NotificationType type)
{
WriteLine($"Topic '{topicPath}' has been {type}.");
}
}
Java and Android
final Session session = Diffusion.sessions().open("ws://localhost:8080");
final TopicNotifications notifications = session.feature(TopicNotifications.class);
final TopicNotificationListener listener = new TopicNotificationListener() {
@Override
public void onTopicNotification(String topicPath,
TopicSpecification specification,
NotificationType type) {
// Handle notifications for selected/deselected topics
}
@Override
public void onDescendantNotification(String topicPath, NotificationType type) {
// Handle notifications for immediate descendants
}
@Override
public void onClose() {
// The listener has been closed
}
@Override
public void onError(ErrorReason error) {
// The listener has encountered an error
}
};
final CompletableFuture<NotificationRegistration> future = notifications.addListener(listener);
final NotificationRegistration registration = future.join();
registration.select("foo");
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
DIFFUSION_REGISTRATION_T *g_registration;
static int on_listener_registered(
const DIFFUSION_REGISTRATION_T *registration,
void *context)
{
// listener has been registered
g_registration = diffusion_registration_dup(registration);
return HANDLER_SUCCESS;
}
static int on_descendant_notification(
const char *topic_path,
const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
void *context)
{
// handle topic notification for descendant of `topic_path`
return HANDLER_SUCCESS;
}
static int on_topic_notification(
const char *topic_path,
const TOPIC_SPECIFICATION_T *specification,
const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
void *context)
{
// handle topic notification for `topic_path`
return HANDLER_SUCCESS;
}
static void on_close()
{
// topic notification listener has been closed
}
static int on_selected(
void *context)
{
// topic notification listener has been registered
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
const char *topic_path = "my/topic/path";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// Create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// create a topic notification listener
DIFFUSION_TOPIC_NOTIFICATION_LISTENER_T listener = {
.on_registered = on_listener_registered,
.on_descendant_notification = on_descendant_notification,
.on_topic_notification = on_topic_notification,
.on_close = on_close};
diffusion_topic_notification_add_listener(session, listener, NULL);
// Sleep for a while
sleep(5);
// register the topic notification listener
DIFFUSION_TOPIC_NOTIFICATION_REGISTRATION_PARAMS_T params = {
.on_selected = on_selected,
.registration = g_registration,
.selector = topic_path};
diffusion_topic_notification_registration_select(session, params, NULL);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
diffusion_registration_free(g_registration);
return EXIT_SUCCESS;
}
Apple
class TopicNotificationListener: PTDiffusionTopicNotificationListener {
func diffusionStream(_ stream: PTDiffusionStream,
didReceiveDescendantNotificationWith type: PTDiffusionTopicNotificationType,
topicPath: String) {
// Handle notifications for immediate descendants
}
func diffusionStream(_ stream: PTDiffusionStream,
didReceiveNotificationWith type: PTDiffusionTopicNotificationType,
topicPath: String, specification: PTDiffusionTopicSpecification) {
// Handle notifications for selected/deselected topics
}
func diffusionStream(_ stream: PTDiffusionStream,
didFailWithError error: Error) {
// The listener has encountered an error
}
func diffusionDidClose(_ stream: PTDiffusionStream) {
// The listener has been closed
}
}
func register_topic_notification_listener(session: PTDiffusionSession) {
let listener = TopicNotificationListener()
session.topicNotifications.add(listener) { (registration, error) in
if (error != nil) {
print("An error occurred: %@", error!.localizedDescription)
return
}
registration?.select(withTopicSelectorExpression: "foo") { _ in }
}
}