Example: Receive missing topic notifications
The following examples use the TopicControl feature in the Diffusion™ API to register a missing topic notification handler.
JavaScript
// Connect to the server. Change these options to suit your own environment.
// Node.js will not accept self-signed certificates by default. If you have
// one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0
// before running this example.
const session = await diffusion.connect({
host : 'diffusion.example.com',
port : 443,
secure : true
});
// Register a missing topic handler on the 'example' root topic
// Any subscriptions to missing topics along this path will invoke this handler
session.topics.addMissingTopicHandler('example', {
// Called when a handler is successfully registered
onRegister : (path, close) => {
console.log('Registered missing topic handler on path: ' + path);
// Once we've registered the handler, we initiate a subscription with the selector '?example/topic/.*'
// This will invoke the handler.
session.select('?example/topic/.*');
session.addStream('?example/topic/.*', diffusion.datatypes.string()).on('subscribe', (path) => {
console.log('Subscribed to topic: ' + path);
});
},
// Called when the handler is closed
onClose : (path) => {
console.log(`Missing topic handler on path '${path}' has been closed`);
},
// Called if there is an error on the handler
onError : (path, error) => {
console.log('Error on missing topic handler');
},
// Called when we've received a missing topic notification on our registered handler path
onMissingTopic : (notification) => {
console.log('Received missing topic notification with selector: ' + notification.selector);
// Once we've received the missing topic notification initiated from subscribing to '?example/topic/.*',
// we add a topic that will match the selector
const topic = 'example/topic/foo';
session.topics.add(topic, diffusion.topics.TopicType.STRING).then((result) => {
console.log('Topic add success: ' + topic);
}, (reason) => {
console.log('Topic add failed: ' + reason);
});
}
});
.NET
/**
* Copyright © 2021 - 2025 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Implementation of a missing topic handler.
/// </summary>
public sealed class AddMissingTopicHandler
{
public async Task AddMissingTopicHandlerExample(string serverUrl)
{
var controlSession = Diffusion.Sessions.Principal("control").Password("password")
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open(serverUrl);
var clientSession = Diffusion.Sessions.Principal("client").Password("password")
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open(serverUrl);
string selector = "?Example/Some Topic//";
string topicPath = "Example/Some Topic";
WriteLine($"Adding missing topic handler for topic '{topicPath}'.");
var registration =
await controlSession.TopicControl.AddMissingTopicHandlerAsync(
topicPath,
new MissingTopicNotificationStream(controlSession));
WriteLine($"Subscribing to topic '{topicPath}'.");
await clientSession.Topics.SubscribeAsync("?Example/Some Topic//");
await Task.Delay(TimeSpan.FromSeconds(1));
// Clean up
await controlSession.TopicControl.RemoveTopicsAsync(topicPath);
WriteLine($"Topic '{topicPath}' removed.");
await clientSession.Topics.UnsubscribeAsync(selector, CancellationToken.None);
WriteLine($"Unsubscribing to topic '{topicPath}'.");
await registration.CloseAsync();
clientSession.Close();
controlSession.Close();
}
/// <summary>
/// Basic implementation of the stream that will be called when a session subscribes using
/// a topic selector that matches no topics.
/// </summary>
private sealed class MissingTopicNotificationStream : IMissingTopicNotificationStream
{
private ISession session;
public MissingTopicNotificationStream(ISession session) => this.session = session;
public void OnClose() => WriteLine("Handler is removed.");
public void OnError(ErrorReason errorReason)
=> WriteLine($"An error has occured : {errorReason}.");
public void OnMissingTopic(IMissingTopicNotification notification)
{
WriteLine($"Topic '{notification.TopicPath}' does not exist.");
var taskAdd = session.TopicControl.AddTopicAsync(
notification.TopicPath,
Diffusion.NewSpecification(TopicType.STRING));
if (taskAdd.Wait(TimeSpan.FromSeconds(10)))
{
WriteLine($"Topic '{notification.TopicPath}' added.");
}
}
}
}
}
Java and Android
/*******************************************************************************
* Copyright (C) 2014, 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotificationStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.TopicSelector.Type;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
/**
* An example of registering a missing topic notification handler and processing
* notifications using a control client.
*
* @author DiffusionData Limited
*/
public final class ControlClientHandlingMissingTopicNotification {
private static final Logger LOG =
LoggerFactory.getLogger(ControlClientHandlingMissingTopicNotification.class);
private final Session session;
private final TopicControl topicControl;
/**
* Constructor.
*/
public ControlClientHandlingMissingTopicNotification(String serverUrl)
throws InterruptedException, ExecutionException, TimeoutException {
// Create a session
session = Diffusion.sessions().password("password").principal("admin")
.open(serverUrl);
topicControl = session.feature(TopicControl.class);
// Registers a missing topic notification on a topic path
topicControl.addMissingTopicHandler(
"Accounts",
new NotificationStream()).get(5, TimeUnit.SECONDS);
}
private final class NotificationStream implements
MissingTopicNotificationStream {
@Override
public void onClose() {
}
@Override
public void onError(ErrorReason errorReason) {
}
@Override
public void onMissingTopic(MissingTopicNotification notification) {
// This handler will create a missing topic if a path selector
// requesting a topic starting with "Accounts/" is selected and
// the requesting session has the principal 'control'.
if (notification.getTopicSelector().getType() == Type.PATH) {
final String path = notification.getTopicPath();
if (path.startsWith("Accounts/") &&
"control".equals(
notification.getSessionProperties().get(Session.PRINCIPAL))) {
topicControl.addTopic(
path,
TopicType.STRING).whenComplete((result, ex) -> {
if (ex == null) {
LOG.info("Missing topic " + path + " " + result);
}
else {
LOG.warn("Failed to create missing topic " + path, ex);
}
});
}
}
}
}
}
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
static int on_missing_topic(
SESSION_T *session,
const SVC_MISSING_TOPIC_REQUEST_T *request,
void *context)
{
// handle missing topic notification
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
const char *topic_path = "my/topic/path";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// Create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// register missing topic handler
MISSING_TOPIC_PARAMS_T params = {.on_missing_topic = on_missing_topic, .topic_path = topic_path, .context = NULL};
missing_topic_register_handler(session, params);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
return EXIT_SUCCESS;
}
Apple
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
class MissingTopicHandlerExample: PTDiffusionMissingTopicHandler {
var session: PTDiffusionSession?
func startWithURL(url: URL) {
let credentials = PTDiffusionCredentials(password: "password")
let sessionConfiguration =
PTDiffusionSessionConfiguration(principal: "control",
credentials:credentials)
print("Connecting...")
PTDiffusionSession.open(with: url,
configuration: sessionConfiguration) { (session, error) in
if (session == nil) {
print("Failed to open session: %@", error!.localizedDescription)
return
}
// At this point we now have a connected session.
print("Connected.")
// To maintain a strong reference to the session.
self.session = session!
self.registerAsMissingTopicHandler(session: session!)
}
}
func registerAsMissingTopicHandler(session: PTDiffusionSession) {
session.topicControl.add(self, forTopicPath: "Example/Control Client Handler") { (registration, error) in
if (registration != nil) {
print("Registered as missing topic handler.")
}
else {
print("Failed to register as missing topic handler: %@", error!.localizedDescription)
}
}
}
func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
hadMissingTopicNotification notification: PTDiffusionMissingTopicNotification) {
print("Received Missing Topic Notification: %@", notification);
let expression: String = notification.topicSelectorExpression
// Expect a path pattern expression.
if (!expression.hasPrefix(">")) {
print("Topic selector expression is not a path pattern.")
return
}
// extract topic path from path pattern expression
let index = expression.index(expression.startIndex, offsetBy: 1)
let topicPath = String(expression[index...])
// Add a stateless topic at this topic path.
self.session?.topicControl.addTopic(withPath: topicPath,
type: PTDiffusionTopicType.string) { (result, error) in
if (result == nil) {
print("Error occurred while creating topic: %@", error!.localizedDescription)
}
else {
print("Topic created.")
}
}
}
func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) {
print("Registration %@ closed.")
}
func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration,
didFailWithError error: Error) {
print("Registration %@ failed: %@", registration, error.localizedDescription)
}
}
Change the URL from that provided in the example to the URL of the Diffusion server .