Handling subscriptions to missing topics
A client can handle subscription requests for topics that do not exist and can act on those notifications, for example, by creating a topic on demand that matches the request.
Required permissions:
,The client can register itself as a handler for missing topics for any branch of the topic tree. The client is notified of any subscription using a selector that does not resolve to any existing topics. This enables the client to create the missing topics if your application requires this.
As of Diffusion™ 6.7, the notification contains the full set of session properties of the requesting session as well as the names of the servers that the request has passed through.
The missing topic handler is removed when the registering session is closed. If the registering session loses connection, it goes into DISCONNECTED state. When in DISCONNECTED state the handler remains active but cannot pass on the notifications to the client. If the client then closes, these notifications are discarded.
To ensure that missing topic notifications are always received by your solution, you can use multiple clients to register missing topic handlers. Ensure that if any of these clients lose connection they go straight to CLOSED state by setting the reconnection timeout to zero so that when the client loses connection it closes straight away, and further missing topic notifications are routed to a handler registered by another client.
Identifying when a particular client connects
You can use missing topic notifications when your application needs to detect when a particular uniquely identifiable client connects.
If a client session subscribes to a topic with a path that does not exist and is unique to that client (for example, the path contains a user ID), this will trigger a missing topic notification, informing any handler that the client has connected.
Registering a missing topic notification handler
Register a handler against a branch of the topic tree:
session.topics.addMissingTopicHandler('topic_branch', { // Implement handling code onMissingTopic: (notification) => { // a missing topic notification has been received }, onClose: (path) => { // the handler is closed }, onError: (path, error) => { // an error occured }, onRegister: (path, deregister) => { // the missing topic notification handler has been registered } });
string topicPath = "Example/Some Topic"; WriteLine($"Adding missing topic handler for topic '{topicPath}'."); var registration = await controlSession.TopicControl.AddMissingTopicHandlerAsync( topicPath, new MissingTopicNotificationStream(controlSession)); ... /// <summary> /// Basic implementation of the stream that will be called when a session subscribes using /// a topic selector that matches no topics. /// </summary> private sealed class MissingTopicNotificationStream : IMissingTopicNotificationStream { private ISession session; public MissingTopicNotificationStream(ISession session) => this.session = session; public void OnClose() => WriteLine("Handler is removed."); public void OnError(ErrorReason errorReason) => WriteLine($"An error has occured : {errorReason}."); public void OnMissingTopic(IMissingTopicNotification notification) { WriteLine($"Topic '{notification.TopicPath}' does not exist."); session.TopicControl.AddTopic( notification.TopicPath, session.TopicControl.NewSpecification(TopicType.STRING), new TopicControlAddCallback(notification)); } }
final Session session = Diffusion.sessions().open("ws://localhost:8080"); final TopicControl topicControl = session.feature(TopicControl.class); topicControl.addMissingTopicHandler("my/topic/path", new MyNotificationStream());
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_missing_topic( SESSION_T *session, const SVC_MISSING_TOPIC_REQUEST_T *request, void *context) { // handle missing topic notification return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // register missing topic handler MISSING_TOPIC_PARAMS_T params = { .on_missing_topic = on_missing_topic, .topic_path = topic_path, .context = NULL }; missing_topic_register_handler(session, params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
class MissingTopicNotify : PTDiffusionMissingTopicHandler { func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration, hadMissingTopicNotification notification: PTDiffusionMissingTopicNotification) { // handle missing topic notification } func diffusionTopicTreeRegistrationDidClose(_ registration: PTDiffusionTopicTreeRegistration) { // registration has closed } func diffusionTopicTreeRegistration(_ registration: PTDiffusionTopicTreeRegistration, didFailWithError error: Error) { // registration failed with error } func handler_register(session: PTDiffusionSession) { session.topicControl.add(self, forTopicPath: "topic_branch") { (registration, error) in if (error != nil) { print("An error has occurred: %@", error!.localizedDescription) return } // missing topic handler has been registered } } }