Example: Receive missing topic notifications
The following examples use the TopicControl feature in the Diffusion™ API to register a missing topic notification handler.
JavaScript
// Connect to the server. Change these options to suit your own environment. // Node.js will not accept self-signed certificates by default. If you have // one of these, set the environment variable NODE_TLS_REJECT_UNAUTHORIZED=0 // before running this example. const session = await diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true }); // Register a missing topic handler on the 'example' root topic // Any subscriptions to missing topics along this path will invoke this handler session.topics.addMissingTopicHandler('example', { // Called when a handler is successfully registered onRegister : (path, close) => { console.log('Registered missing topic handler on path: ' + path); // Once we've registered the handler, we initiate a subscription with the selector '?example/topic/.*' // This will invoke the handler. session.select('?example/topic/.*'); session.addStream('?example/topic/.*', diffusion.datatypes.string()).on('subscribe', (path) => { console.log('Subscribed to topic: ' + path); }); }, // Called when the handler is closed onClose : (path) => { console.log(`Missing topic handler on path '${path}' has been closed`); }, // Called if there is an error on the handler onError : (path, error) => { console.log('Error on missing topic handler'); }, // Called when we've received a missing topic notification on our registered handler path onMissingTopic : (notification) => { console.log('Received missing topic notification with selector: ' + notification.selector); // Once we've received the missing topic notification initiated from subscribing to '?example/topic/.*', // we add a topic that will match the selector const topic = 'example/topic/foo'; session.topics.add(topic, diffusion.topics.TopicType.STRING).then((result) => { console.log('Topic add success: ' + topic); }, (reason) => { console.log('Topic add failed: ' + reason); }); } });
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Implementation of a missing topic handler. /// </summary> public sealed class AddMissingTopicHandler { public async Task AddMissingTopicHandlerExample(string serverUrl) { var controlSession = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var clientSession = Diffusion.Sessions.Principal("client").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); string selector = "?Example/Some Topic//"; string topicPath = "Example/Some Topic"; WriteLine($"Adding missing topic handler for topic '{topicPath}'."); var registration = await controlSession.TopicControl.AddMissingTopicHandlerAsync( topicPath, new MissingTopicNotificationStream(controlSession)); WriteLine($"Subscribing to topic '{topicPath}'."); await clientSession.Topics.SubscribeAsync("?Example/Some Topic//"); await Task.Delay(TimeSpan.FromSeconds(1)); // Clean up await controlSession.TopicControl.RemoveTopicsAsync(topicPath); WriteLine($"Topic '{topicPath}' removed."); await clientSession.Topics.UnsubscribeAsync(selector, CancellationToken.None); WriteLine($"Unsubscribing to topic '{topicPath}'."); await registration.CloseAsync(); clientSession.Close(); controlSession.Close(); } /// <summary> /// Basic implementation of the stream that will be called when a session subscribes using /// a topic selector that matches no topics. /// </summary> private sealed class MissingTopicNotificationStream : IMissingTopicNotificationStream { private ISession session; public MissingTopicNotificationStream(ISession session) => this.session = session; public void OnClose() => WriteLine("Handler is removed."); public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); public void OnMissingTopic(IMissingTopicNotification notification) { WriteLine($"Topic '{notification.TopicPath}' does not exist."); session.TopicControl.AddTopic( notification.TopicPath, session.TopicControl.NewSpecification(TopicType.STRING), new TopicControlAddCallback(notification)); } } /// <summary> /// Implementation of a callback interface for adding topics. /// </summary> private sealed class TopicControlAddCallback : ITopicControlAddCallback { IMissingTopicNotification notification; public TopicControlAddCallback(IMissingTopicNotification notification) => this.notification = notification; public void OnDiscard() => WriteLine("The stream is now closed."); public void OnTopicAdded(string topicPath) => WriteLine($"Topic '{topicPath}' added."); public void OnTopicAddFailed(string topicPath, TopicAddFailReason reason) => WriteLine($"The topic '{topicPath}' could not be added - reason: {reason}."); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2014, 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.ErrorReason; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotification; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.MissingTopicNotificationStream; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.TopicSelector.Type; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * An example of registering a missing topic notification handler and processing * notifications using a control client. * * @author DiffusionData Limited */ public final class ControlClientHandlingMissingTopicNotification { private static final Logger LOG = LoggerFactory.getLogger(ControlClientHandlingMissingTopicNotification.class); private final Session session; private final TopicControl topicControl; /** * Constructor. */ public ControlClientHandlingMissingTopicNotification(String serverUrl) throws InterruptedException, ExecutionException, TimeoutException { // Create a session session = Diffusion.sessions().password("password").principal("admin") .open(serverUrl); topicControl = session.feature(TopicControl.class); // Registers a missing topic notification on a topic path topicControl.addMissingTopicHandler( "Accounts", new NotificationStream()).get(5, TimeUnit.SECONDS); } private final class NotificationStream implements MissingTopicNotificationStream { @Override public void onClose() { } @Override public void onError(ErrorReason errorReason) { } @Override public void onMissingTopic(MissingTopicNotification notification) { // This handler will create a missing topic if a path selector // requesting a topic starting with "Accounts/" is selected and // the requesting session has the principal 'control'. if (notification.getTopicSelector().getType() == Type.PATH) { final String path = notification.getTopicPath(); if (path.startsWith("Accounts/") && "control".equals( notification.getSessionProperties().get(Session.PRINCIPAL))) { topicControl.addTopic( path, TopicType.STRING).whenComplete((result, ex) -> { if (ex == null) { LOG.info("Missing topic " + path + " " + result); } else { LOG.warn("Failed to create missing topic " + path, ex); } }); } } } } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_missing_topic( SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context) { // handle missing topic notification return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // register missing topic handler MISSING_TOPIC_PARAMS_T params = { .on_missing_topic = on_missing_topic, .topic_path = topic_path, .context = NULL }; missing_topic_register_handler(session, params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class MissingTopicHandlerExample: PTDiffusionMissingTopicHandler { var session: PTDiffusionSession? func startWithURL(url: URL) { let credentials = PTDiffusionCredentials(password: "password") let sessionConfiguration = PTDiffusionSessionConfiguration(principal: "control", credentials:credentials) print("Connecting...") PTDiffusionSession.open(with: url, configuration: sessionConfiguration) { (session, error) in if (session == nil) { print("Failed to open session: %@", error!.localizedDescription) return } // At this point we now have a connected session. print("Connected.") // To maintain a strong reference to the session. self.session = session! self.registerAsMissingTopicHandler(session: session!) } } func registerAsMissingTopicHandler(session: PTDiffusionSession) { session.topicControl.add(self, forTopicPath: "Example/Control Client Handler") { (registration, error) in if (registration != nil) { print("Registered as missing topic handler.") } else { print("Failed to register as missing topic handler: %@", error!.localizedDescription) } } } func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration, hadMissingTopicNotification notification: PTDiffusionMissingTopicNotification) { print("Received Missing Topic Notification: %@", notification); let expression: String = notification.topicSelectorExpression // Expect a path pattern expression. if (!expression.hasPrefix(">")) { print("Topic selector expression is not a path pattern.") return } // extract topic path from path pattern expression let index = expression.index(expression.startIndex, offsetBy: 1) let topicPath = String(expression[index...]) // Add a stateless topic at this topic path. self.session?.topicControl.addTopic(withPath: topicPath, type: PTDiffusionTopicType.string) { (result, error) in if (result == nil) { print("Error occurred while creating topic: %@", error!.localizedDescription) } else { print("Topic created.") } } } func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) { print("Registration %@ closed.") } func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration, didFailWithError error: Error) { print("Registration %@ failed: %@", registration, error.localizedDescription) } }
Change the URL from that provided in the example to the URL of the Diffusion server .