Just a second...

Receiving topic notifications

Receive topic notifications using topic selectors. This enables a client to receive updates when topics are added or removed, without the topic values.

Note: Topic notifications are supported by the Android™ API, Java™ API and JavaScript® API.

The client must register a listener object to receive notifications about selected topics. Use a topic selector to specify the topics.

For more details about topic notifications, see Topic notifications.

Required permissions: select_topic and read_topic permissions for the specified topics

Receiving topic notifications

A client can register to receive notifications about a set of topics via a listener object.

JavaScript
const listener = {
    onDescendantNotification: (topicPath, type) => {
        // notification for an immediate descendant
    },
    onTopicNotification: (topicPath, topicSpecification, type) => {
        // notification for the topic
    },
    onClose: () => {
    // the handler is closed
    },
    onError: (error) => {
        // an error occured
    }
};

const registration = await session.notifications.addListener(listener);
registration.select('foo');

.NET
/**
 * Copyright © 2021, 2022 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation of a listener for topic notifications.
    /// </summary>
    public sealed class TopicNotificationListener {
        private const string TOPIC_PREFIX = "topic-notifications";

        public async Task TopicNotificationListenerExample(string serverUrl) {
            var selector = $"?{TOPIC_PREFIX}//";

            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open( serverUrl );

            var notifications = session.TopicNotifications;

            INotificationRegistration registration = null;
            string path = string.Empty;

            // Register a listener to receive topic notifications.
            try
            {
                registration = await notifications.AddListenerAsync(new Listener());
            }
            catch(Exception ex)
            {
                WriteLine($"Failed to add listener : {ex}.");
                session.Close();
                return;
            }

            // Start receiving notifications.
            try
            {
                await registration.SelectAsync(selector);
            }
            catch (Exception ex)
            {
                WriteLine($"Selector '{selector}' registration failed : {ex}.");
                session.Close();
                return;
            }

            // Add a topic.
            try
            {
                path = $"{TOPIC_PREFIX}/{DateTime.Now.ToFileTimeUtc()}";
                var specification = session.TopicControl.NewSpecification(TopicType.STRING);

                await session.TopicControl.AddTopicAsync(path, specification);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to add topic '{path}' : {ex}.");
                session.Close();
                return;
            }

            // Remove the topic.
            try
            {
                await session.TopicControl.RemoveTopicsAsync(path);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to remove topic '{path}' : {ex}.");
                session.Close();
                return;
            }

            // Stop receiving notifications.
            try
            {
                await registration.DeselectAsync(selector);
            }
            catch (Exception ex)
            {
                WriteLine($"Deselection failed for selector '{selector}' : {ex}.");
                session.Close();
                return;
            }

            // Unregister the listener.
            try
            {
                await registration.CloseAsync();
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unregister the listener : {ex}.");
            }

            // Close the session
            session.Close();
        }

        /// <summary>
        /// The listener for topic notifications.
        /// </summary>
        private class Listener : ITopicNotificationListener {
            /// <summary>
            /// Indicates that the stream was closed.
            /// </summary>
            public void OnClose()
            {
                WriteLine("The listener was closed.");
            }

            /// <summary>
            /// Notification for an immediate descendant of a selected topic path.
            /// </summary>
            public void OnDescendantNotification(string topicPath, NotificationType type)
            {
                WriteLine($"Descendant topic '{topicPath}' has been {type}.");
            }

            /// <summary>
            /// Indicates an error received by the callback.
            /// </summary>
            public void OnError(ErrorReason errorReason)
            {
                WriteLine($"The listener received the error: '{errorReason}'.");
            }

            /// <summary>
            /// Notification for a selected topic.
            /// </summary>
            public void OnTopicNotification(string topicPath, ITopicSpecification specification, 
                                            NotificationType type)
            {
                WriteLine($"Topic '{topicPath}' has been {type}.");
            }
        }
    }
}
Java and Android
final TopicNotifications notifications = session.feature(TopicNotifications.class);

final TopicNotificationListener listener = new TopicNotificationListener() {
    @Override
    public void onTopicNotification(String topicPath, TopicSpecification specification, NotificationType type) {
        // Handle notifications for selected/deselected topics
    }

    @Override
    public void onDescendantNotification(String topicPath, NotificationType type) {
        // Handle notifications for immediate descendants
    }


    @Override
    public void onClose() {
        // The listener has been closed
    }

    @Override
    public void onError(ErrorReason error) {
        // The listener has encountered an error
    }
};

final CompletableFuture<NotificationRegistration> future = notifications.addListener(listener);
final NotificationRegistration registration = future.get();

registration.select("foo");
                    
C
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"

DIFFUSION_REGISTRATION_T *g_registration;

static int on_listener_registered(
        const DIFFUSION_REGISTRATION_T *registration,
        void *context)
{
        // listener has been registered
        g_registration = diffusion_registration_dup(registration);
        return HANDLER_SUCCESS;
}


static int on_descendant_notification(
        const char *topic_path,
        const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
        void *context)
{
        // handle topic notification for descendant of `topic_path`
        return HANDLER_SUCCESS;
}


static int on_topic_notification(
        const char *topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        const DIFFUSION_TOPIC_NOTIFICATION_TYPE_T type,
        void *context)
{
        // handle topic notification for `topic_path`
        return HANDLER_SUCCESS;
}


static void on_close()
{
        // topic notification listener has been closed
}


static int on_selected(void *context)
{
        // topic notification listener has been registered
        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";
        const char *topic_path = "my/topic/path";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        // Create a session, synchronously
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        // create a topic notification listener
        DIFFUSION_TOPIC_NOTIFICATION_LISTENER_T listener = {
                .on_registered = on_listener_registered,
                .on_descendant_notification = on_descendant_notification,
                .on_topic_notification = on_topic_notification,
                .on_close = on_close
        };
        diffusion_topic_notification_add_listener(session, listener, NULL);

        // Sleep for a while
        sleep(5);

        // register the topic notification listener
        DIFFUSION_TOPIC_NOTIFICATION_REGISTRATION_PARAMS_T params = {
                .on_selected = on_selected,
                .registration = g_registration,
                .selector = topic_path
        };
        diffusion_topic_notification_registration_select(session, params, NULL);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        diffusion_registration_free(g_registration);
        return EXIT_SUCCESS;
}
Apple
class TopicNotificationListener: PTDiffusionTopicNotificationListener {
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveDescendantNotificationWith type: PTDiffusionTopicNotificationType,
                         topicPath: String) {
        // Handle notifications for immediate descendants
    }

    
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveNotificationWith type: PTDiffusionTopicNotificationType,
                         topicPath: String, specification: PTDiffusionTopicSpecification) {
        // Handle notifications for selected/deselected topics
    }


    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        // The listener has encountered an error
    }


    func diffusionDidClose(_ stream: PTDiffusionStream) {
        // The listener has been closed
    }
}


func register_topic_notification_listener(session: PTDiffusionSession) {
    let listener = TopicNotificationListener()

    session.topicNotifications.add(listener) { (registration, error) in
        if (error != nil) {
            print("An error occurred: %@", error!.localizedDescription)
            return
        }

        registration?.select(withTopicSelectorExpression: "foo") { _ in }
    }
}