Just a second...

Receiving topic notifications

Receive topic notifications using topic selectors. This enables a client to receive updates when topics are added or removed, without the topic values.

Note: Topic notifications are supported by the Android™ API, Java™ API and JavaScript® API.

The client must register a listener object to receive notifications about selected topics. Use a topic selector to specify the topics.

For more details about topic notifications, see Topic notifications.

Required permissions:select_topic and read_topic permissions for the specified topics

Receiving topic notifications

A client can register to receive notifications about a set of topics via a listener object.

JavaScript
const listener = {
    onDescendantNotification: (topicPath, type) => {
        // notification for an immediate descendant
    },
    onTopicNotification: (topicPath, topicSpecification, type) => {
        // notification for the topic
    },
    onClose: () => {
    // the handler is closed
    },
    onError: (error) => {
        // an error occured
    }
};

const registration = await session.notifications.addListener(listener);
registration.select('foo');

.NET
registration = await notifications.AddListenerAsync(new Listener());


...


/// <summary>
/// The listener for topic notifications.
/// </summary>
private class Listener : ITopicNotificationListener {
    /// <summary>
    /// Indicates that the stream was closed.
    /// </summary>
    public void OnClose()
    {
        WriteLine("The listener was closed.");
    }

    /// <summary>
    /// Notification for an immediate descendant of a selected topic path.
    /// </summary>
    public void OnDescendantNotification(string topicPath, NotificationType type)
    {
        WriteLine($"Descendant topic '{topicPath}' has been {type}.");
    }

    /// <summary>
    /// Indicates an error received by the callback.
    /// </summary>
    public void OnError(ErrorReason errorReason)
    {
        WriteLine($"The listener received the error: '{errorReason}'.");
    }

    /// <summary>
    /// Notification for a selected topic.
    /// </summary>
    public void OnTopicNotification(string topicPath, ITopicSpecification specification, 
                                    NotificationType type)
    {
        WriteLine($"Topic '{topicPath}' has been {type}.");
    }
}
Java and Android
final Session session = Diffusion.sessions().open("ws://localhost:8080");
final TopicNotifications notifications = session.feature(TopicNotifications.class);

final TopicNotificationListener listener = new TopicNotificationListener() {
    @Override
    public void onTopicNotification(String topicPath,
        TopicSpecification specification,
        NotificationType type) {
        // Handle notifications for selected/deselected topics
    }

    @Override
    public void onDescendantNotification(String topicPath, NotificationType type) {
        // Handle notifications for immediate descendants
    }

    @Override
    public void onClose() {
        // The listener has been closed
    }

    @Override
    public void onError(ErrorReason error) {
        // The listener has encountered an error
    }
};

final CompletableFuture<NotificationRegistration> future = notifications.addListener(listener);
final NotificationRegistration registration = future.join();

registration.select("foo");
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
    #include <unistd.h>
#else
    #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"

DIFFUSION_REGISTRATION_T *g_registration;

static int on_listener_registered(
    const DIFFUSION_REGISTRATION_T *registration,
    void *context)
{
    // listener has been registered
    g_registration = diffusion_registration_dup(registration);
    return HANDLER_SUCCESS;
}

static int on_descendant_notification(
    const char *topic_path,
    const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
    void *context)
{
    // handle topic notification for descendant of `topic_path`
    return HANDLER_SUCCESS;
}

static int on_topic_notification(
    const char *topic_path,
    const TOPIC_SPECIFICATION_T *specification,
    const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
    void *context)
{
    // handle topic notification for `topic_path`
    return HANDLER_SUCCESS;
}

static void on_close()
{
    // topic notification listener has been closed
}

static int on_selected(
    void *context)
{
    // topic notification listener has been registered
    return HANDLER_SUCCESS;
}

int main(
    int argc,
    char **argv)
{
    const char *url = "ws://localhost:8080";
    const char *principal = "control";
    const char *password = "password";
    const char *topic_path = "my/topic/path";

    CREDENTIALS_T *credentials = credentials_create_password(password);

    SESSION_T *session;
    DIFFUSION_ERROR_T error = {0};

    // Create a session, synchronously
    session = session_create(url, principal, credentials, NULL, NULL, &error);
    if (session == NULL)
    {
        printf("Failed to create session: %s\n", error.message);
        free(error.message);
        credentials_free(credentials);
        return EXIT_FAILURE;
    }

    // create a topic notification listener
    DIFFUSION_TOPIC_NOTIFICATION_LISTENER_T listener = {
        .on_registered = on_listener_registered,
        .on_descendant_notification = on_descendant_notification,
        .on_topic_notification = on_topic_notification,
        .on_close = on_close};
    diffusion_topic_notification_add_listener(session, listener, NULL);

    // Sleep for a while
    sleep(5);

    // register the topic notification listener
    DIFFUSION_TOPIC_NOTIFICATION_REGISTRATION_PARAMS_T params = {
        .on_selected = on_selected,
        .registration = g_registration,
        .selector = topic_path};
    diffusion_topic_notification_registration_select(session, params, NULL);

    // Sleep for a while
    sleep(5);

    // Close the session, and release resources and memory
    session_close(session, NULL);
    session_free(session);

    credentials_free(credentials);
    diffusion_registration_free(g_registration);
    return EXIT_SUCCESS;
}
Apple
class TopicNotificationListener: PTDiffusionTopicNotificationListener {
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveDescendantNotificationWith type: PTDiffusionTopicNotificationType,
                         topicPath: String) {
        // Handle notifications for immediate descendants
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveNotificationWith type: PTDiffusionTopicNotificationType,
                         topicPath: String, specification: PTDiffusionTopicSpecification) {
        // Handle notifications for selected/deselected topics
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        // The listener has encountered an error
    }


    func diffusionDidClose(_ stream: PTDiffusionStream) {
        // The listener has been closed
    }
}


func register_topic_notification_listener(session: PTDiffusionSession) {
    let listener = TopicNotificationListener()

    session.topicNotifications.add(listener) { (registration, error) in
        if (error != nil) {
            print("An error occurred: %@", error!.localizedDescription)
            return
        }

        registration?.select(withTopicSelectorExpression: "foo") { _ in }
    }
}