Just a second...

Example: Receive missing topic notifications

The following examples use the TopicControl feature in the Diffusion™ API to register a missing topic notification handler.

JavaScript
var diffusion = require('diffusion');

// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true
}).then(function(session) {

	// Register a missing topic handler on the "example" root topic
	// Any subscriptions to missing topics along this path will invoke this handler
	session.topics.addMissingTopicHandler("example", {
		// Called when a handler is successfully registered
		onRegister : function(path, close) {
			console.log("Registered missing topic handler on path: " + path);
			// Once we've registered the handler, we initiate a subscription with the selector "?example/topic/.*"
			// This will invoke the handler.
			session.select("?example/topic/.*");
			session.addStream("?example/topic/.*", diffucion.datatypes.string()).on('subscribe', function(path) {
				console.log("Subscribed to topic: " + path);
			});
		},
		// Called when the handler is closed
		onClose : function(path) {
			console.log("Missing topic handler on path '" + path + "' has been closed");
		},
		// Called if there is an error on the handler
		onError : function(path, error) {
			console.log("Error on missing topic handler");
		},
		// Called when we've received a missing topic notification on our registered handler path
		onMissingTopic : function(notification) {
			console.log("Received missing topic notification with selector: " + notification.selector);
			// Once we've received the missing topic notification initiated from subscribing to "?example/topic/.*",
			// we add a topic that will match the selector

			var topic = "example/topic/foo";

			session.topics.add(topic, diffusion.topics.TopicType.STRING).then(function(result) {
				console.log("Topic add success: " + topic);
				// If the topic addition is successful, we proceed() with the session's subscription.
				// The client will now be subscribed to the topic
				notification.proceed();
			}, function(reason) {
				console.log("Topic add failed: " + reason);
				// If the topic addition fails, we cancel() the session's subscription request.
				notification.cancel();
			});
		}
	});
});
                    
.NET
/**
 * Copyright © 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
 
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;

namespace PushTechnology.ClientInterface.Example
{
    /// <summary>
    /// Implementation of a missing topic handler.
    /// </summary>
    public sealed class AddMissingTopicHandler
    {
        public AddMissingTopicHandler()
        {
            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var clientSession = Diffusion.Sessions.Principal("client").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            string topicPath = "Example/Some Topic";
            
            WriteLine($"Adding missing topic handler for topic '{topicPath}'.");

            var registration = 
                await controlSession.TopicControl.AddMissingTopicHandlerAsync(
                        topicPath, 
                        new MissingTopicNotificationStream(controlSession));

            WriteLine($"Subscribing to topic '{topicPath}'.");
            
            await clientSession.Topics.SubscribeAsync("?Example/Some Topic//");

            await Task.Delay(TimeSpan.FromSeconds(1));
            
            // Clean up
            await session.TopicControl.RemoveTopicsAsync(topicPath);

            WriteLine($"Topic '{topicPath}' removed.");

            await clientSession.Topics.UnsubscribeAsync(selector, cancellationToken);

            WriteLine($"Unsubscribing to topic '{topicPath}'.");

            await registration.CloseAsync();
            
            clientSession.Close();
            controlSession.Close();
        }

        /// <summary>
        /// Basic implementation of the stream that will be called when a session subscribes using
        /// a topic selector that matches no topics.
        /// </summary>
        private sealed class MissingTopicNotificationStream : IMissingTopicNotificationStream
        {
            private ISession session;

            public MissingTopicNotificationStream(ISession session) => this.session = session;

            public void OnClose() => WriteLine("Handler is removed.");

            public void OnError(ErrorReason errorReason) 
                        => WriteLine($"An error has occured : {errorReason}.");

            public void OnMissingTopic(IMissingTopicNotification notification)
            {
                WriteLine($"Topic '{notification.TopicPath}' does not exist.");

                session.TopicControl.AddTopic(
                        notification.TopicPath, 
                        session.TopicControl.NewSpecification(TopicType.STRING), 
                        new TopicControlAddCallback(notification));
            }
        }
        
        /// <summary>
        /// Implementation of a callback interface for adding topics.
        /// </summary>
        private sealed class TopicControlAddCallback : ITopicControlAddCallback
        {
            IMissingTopicNotification notification;

            public TopicControlAddCallback(IMissingTopicNotification notification) 
                        => this.notification = notification;

            public void OnDiscard() => WriteLine("The stream is now closed.");

            public void OnTopicAdded(string topicPath) => WriteLine($"Topic '{topicPath}' added.");

            public void OnTopicAddFailed(string topicPath, TopicAddFailReason reason) 
                        => WriteLine($"The topic '{topicPath}' could not be added - reason: {reason}.");
        }    
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2014, 2021 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.examples;

import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotificationStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector.Type;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * An example of registering a missing topic notification handler and processing
 * notifications using a control client.
 *
 * @author Push Technology Limited
 */
public final class ControlClientHandlingMissingTopicNotification {

    private static final Logger LOG =
        LoggerFactory.getLogger(ControlClientHandlingMissingTopicNotification.class);

    private final Session session;
    private final TopicControl topicControl;

    /**
     * Constructor.
     */
    public ControlClientHandlingMissingTopicNotification(String serverUrl)
        throws InterruptedException, ExecutionException, TimeoutException {
        // Create a session
        session = Diffusion.sessions().password("password").principal("admin")
            .open(serverUrl);

        topicControl = session.feature(TopicControl.class);

        // Registers a missing topic notification on a topic path
        topicControl.addMissingTopicHandler(
            "Accounts",
            new NotificationStream()).get(5, TimeUnit.SECONDS);

    }

    private final class NotificationStream implements
        MissingTopicNotificationStream {
        @Override
        public void onClose() {
        }

        @Override
        public void onError(ErrorReason errorReason) {
        }

        @Override
        public void onMissingTopic(MissingTopicNotification notification) {
            // This handler will create a missing topic if a path selector
            // requesting a topic starting with "Accounts/" is selected and
            // the requesting session has the principal 'control'.
            if (notification.getTopicSelector().getType() == Type.PATH) {
                final String path = notification.getTopicPath();
                if (path.startsWith("Accounts/") &&
                    "control".equals(
                        notification.getSessionProperties().get(Session.PRINCIPAL))) {

                    topicControl.addTopic(
                        path,
                        TopicType.STRING).whenComplete((result, ex) -> {
                            if (ex == null) {
                                LOG.info("Missing topic " + path + " " + result);
                            }
                            else {
                                LOG.warn("Failed to create missing topic " + path, ex);
                            }
                        });
                }
            }
        }
    }

}
C
/*
 * This example shows how to register a missing topic notification
 * handler and return a missing topic notification response - calling
 * missing_topic_proceed() once we've created the topic.
 */
#include <stdio.h>
#include <stdlib.h>
#include <time.h>
#include <unistd.h>

#include <apr.h>
#include <apr_thread_mutex.h>
#include <apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'r', "topic_root", "Topic root to process missing topic notifications on", ARG_OPTIONAL, ARG_HAS_VALUE, "foo"},
        END_OF_ARG_OPTS
};

static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("Topic added");
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        puts("Topic add failed");
        printf("Reason code: %d\n", response->reason);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_discard(SESSION_T *session, void *context)
{
        puts("Topic add discarded");
        return HANDLER_SUCCESS;
}

/*
 * A request has been made for a topic that doesn't exist; create it
 * and inform Diffusion that the client's subcription request can
 * proceed.
 */
static int
on_missing_topic(SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context)
{
        printf("Missing topic: %s\n", request->topic_selector);

        BUF_T *sample_data_buf = buf_create();
        buf_write_string(sample_data_buf, "Hello, world");

        // Add the missing topic.
        ADD_TOPIC_PARAMS_T topic_params = {
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed,
                .on_discard = on_topic_add_discard,
                .topic_path = strdup(request->topic_selector+1),
                .details = create_topic_details_single_value(M_DATA_TYPE_STRING),
                .content = content_create(CONTENT_ENCODING_NONE, sample_data_buf)
        };

        add_topic(session, topic_params);

        // Proceed with the client's subscription to the topic
        missing_topic_proceed(session, (SVC_MISSING_TOPIC_REQUEST_T *) request);

        return HANDLER_SUCCESS;
}

/*
 * Entry point for the example.
 */
int
main(int argc, char **argv)
{
        /*
         * Standard command-line parsing.
         */
        HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        const char *topic_root = hash_get(options, "topic_root");

        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }

        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };

        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session != NULL) {
                printf("Session created (state=%d, id=%s)\n",
                       session_state_get(session),
                       session_id_to_string(session->id));
        }
        else {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                return EXIT_FAILURE;
        }

        /*
         * Register the missing topic handler
         */
        MISSING_TOPIC_PARAMS_T handler = {
                .on_missing_topic = on_missing_topic,
                .topic_path = topic_root,
                .context = NULL
        };

        missing_topic_register_handler(session, handler);

        /*
         * Run for 5 minutes.
         */
        sleep(5 * 60);

        /*
         * Close session and clean up.
         */
        session_close(session, NULL);
        session_free(session);

        hash_free(options, NULL, free);

        return EXIT_SUCCESS;
}
                    
Apple
//  Diffusion Client Library for iOS and OS X - User Manual Code Snippets
//
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class MissingTopicHandlerExample: PTDiffusionMissingTopicHandler  {
    var session: PTDiffusionSession?

    func startWithURL(url: URL) {
        let credentials = PTDiffusionCredentials(password: "password")

        let sessionConfiguration =
            PTDiffusionSessionConfiguration(principal: "control",
                                            credentials:credentials)

        print("Connecting...")

        PTDiffusionSession.open(with: url,
                                configuration: sessionConfiguration) { (session, error) in

            if (session == nil) {
                print("Failed to open session: %@", error!.localizedDescription)
                return
            }

            // At this point we now have a connected session.
            print("Connected.")

            // To maintain a strong reference to the session.
            self.session = session!

            self.registerAsMissingTopicHandler(session: session!)
        }
    }

    func registerAsMissingTopicHandler(session: PTDiffusionSession) {
        session.topicControl.add(self, forTopicPath: "Example/Control Client Handler") { (registration, error) in

            if (registration != nil) {
                print("Registered as missing topic handler.")
            }
            else {
                print("Failed to register as missing topic handler: %@", error!.localizedDescription)
            }
        }
    }

    func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
                                        hadMissingTopicNotification notification: PTDiffusionMissingTopicNotification) {
        print("Received Missing Topic Notification: %@", notification);

        let expression: String = notification.topicSelectorExpression

        // Expect a path pattern expression.
        if (!expression.hasPrefix(">")) {
            print("Topic selector expression is not a path pattern.")
            return
        }

        // extract topic path from path pattern expression
        let index = expression.index(expression.startIndex, offsetBy: 1)
        let topicPath = String(expression[index...])

        // Add a stateless topic at this topic path.
        self.session?.topicControl.addTopic(withPath: topicPath,
                                            type:PTDiffusionTopicType.string,
                                            completionHandler: { (result, error) in
            if (result == nil) {
                print("Error occurred while creating topic: %@", error!.localizedDescription)
            }
            else {
                print("Topic created.")
            }
        })
    }

    func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
        print("Registration %@ closed.")
    }

    func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration, didFailWithError error: Error) {
        print("Registration %@ failed: %@", registration, error.localizedDescription)
    }

}


Change the URL from that provided in the example to the URL of the Diffusion server.