Example: Subscribe other clients to topics via session filter
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
Subscribe other clients to topics via session filter
JavaScript
const controlSession = await diffusion.connect({ host : 'host_name', port : 443, secure : true, principal : 'control', credentials : 'password' }); const topic = 'topic/example'; controlSession.topics.add(topic, diffusion.topics.TopicType.STRING); const clientSession = await diffusion.connect({ host : 'host_name', port : 443, secure : true }); const filter = `$SessionId is "${clientSession.sessionId}"`; const topicSelector = '?topic//'; const numSubscribed = await controlSession.clients.subscribe(filter, topicSelector); console.log(`${numSubscribed} sessions satisfying filter "${filter}" have been subscribed to topic "${topicSelector}".`)
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Clients; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Types; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that subscribes to topics with a filter. /// </summary> public sealed class SubscriptionControlSubscribeFilter { public SubscriptionControlSubscribeFilter(string serverUrl) { var topic = $"?topic-example//"; var controlSession = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var clientSession = Diffusion.Sessions.Principal("client").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open($"{serverUrl}"); var subscriptionControl = controlSession.SubscriptionControl; var filterCallback = new SubscriptionByFilterCallback(); var filter = "$SessionId is \"" + clientSession.SessionId + "\""; try { subscriptionControl.SubscribeByFilter(filter, topic, filterCallback); WriteLine($"Sessions satisfying filter '{filter}' are subscribed to '{topic}'."); Thread.Sleep(300); } catch (Exception ex) { WriteLine($"Failed to subscribe by filter '{filter}' : {ex}."); controlSession.Close(); return; } try { subscriptionControl.UnsubscribeByFilter(filter, topic, filterCallback); WriteLine($"Sessions satisfying filter '{filter}' are unsubscribed to '{topic}'."); Thread.Sleep(300); } catch (Exception ex) { WriteLine($"Failed to unsubscribe by filter '{filter}' : {ex}."); } controlSession.Close(); clientSession.Close(); } /// <summary> /// The callback for filtered subscriptions and unsubscriptions. /// </summary> private class SubscriptionByFilterCallback : ISubscriptionByFilterCallback { /// <summary> /// Indicates successful processing of the request at the server. /// </summary> /// <param name="numberSelected">Indicates the number of sessions that satisfied the filter and which qualified /// for subscription/unsubscription.</param> public void OnComplete(int numberSelected) { WriteLine($"The number of sessions that qualified for subscription/unsubscription is: {numberSelected}."); } /// <summary> /// The filter was rejected. No sessions were subscribed/unsubscribed. /// </summary> /// <param name="errors">Errors found.</param> public void OnRejected(ICollection<IErrorReport> errors) { WriteLine($"The following errors occured:"); foreach(var error in errors) { WriteLine($"{error}."); } } /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason provided.</param> public void OnError(ErrorReason errorReason) { WriteLine($"An error has occured : {errorReason}."); } } } }
Java and Android
/******************************************************************************* * Copyright (C) 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.ErrorReason; import com.pushtechnology.diffusion.client.features.TopicUpdate; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * This demonstrates using a client to subscribe and unsubscribe other clients * to topics using a session filter. * <P> * This uses the 'SubscriptionControl' feature. * * @author DiffusionData Limited */ public final class SubscriptionControlSubscribeFilter { public static void main(String[] args) throws Exception { final String myTopicPath = "my/topic/path"; // establish control and client sessions final Session controlSession = Diffusion.sessions() .principal("control") .password("password") .open("ws://localhost:8080"); final Session clientSession = Diffusion.sessions() .principal("client") .password("password") .open("ws://localhost:8080"); // initialise SubscriptionControl and TopicUpdate features final SubscriptionControl subscriptionControl = controlSession .feature(SubscriptionControl.class); final TopicUpdate topicUpdate = controlSession.feature(TopicUpdate.class); // register a stream to receive topic events clientSession.feature(Topics.class) .addStream(myTopicPath, String.class, new MyValueStream()); // subscribe the client session to the topic using a session filter subscriptionControl.subscribeByFilter("$Principal is 'client'", myTopicPath) .join(); // add and set a topic with an initial value topicUpdate.addAndSet( myTopicPath, newTopicSpecification(TopicType.STRING), String.class, "myValue" ).join(); // set a new topic value topicUpdate.set(myTopicPath, String.class, "myNewValue").join(); // unsubscribe the client session from the topic using the session filter subscriptionControl.unsubscribeByFilter("$Principal is 'client'", myTopicPath); Thread.sleep(1000); clientSession.close(); controlSession.close(); } private static class MyValueStream implements Topics.ValueStream<String> { @Override public void onValue(String topicPath, TopicSpecification topicSpecification, String oldValue, String newValue) { System.out.printf("%s set to %s\n", topicPath, newValue); } @Override public void onSubscription(String topicPath, TopicSpecification topicSpecification) { System.out.printf("subscribed to %s\n", topicPath); } @Override public void onUnsubscription(String topicPath, TopicSpecification topicSpecification, Topics.UnsubscribeReason unsubscribeReason) { System.out.printf("unsubscribed from %s\n", topicPath); } @Override public void onClose() { } @Override public void onError(ErrorReason errorReason) { } } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_subscription_by_filter_complete( const int number_selected, void *context) { // clients that matched `session_filter` have been successfully subscribed to `topic_path` return HANDLER_SUCCESS; } static int on_unsubscription_by_filter_complete( const int number_selected, void *context) { // clients that matched `session_filter` have been successfully unsubscribed from `topic_path` return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; const char *session_filter = "$Principal is 'client'"; CREDENTIALS_T *credentials = credentials_create_password(password); // create a session with Diffusion DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // subscribe sessions that match `session_filter` to `topic_path` DIFFUSION_SUBSCRIBE_BY_FILTER_PARAMS_T sub_params = { .topic_selector = topic_path, .filter = session_filter, .on_subscribe_by_filter = on_subscription_by_filter_complete, }; diffusion_subscribe_by_filter(session, sub_params, NULL); // Sleep for a while sleep(5); // unsubscribe sessions that match `session_filter` from `topic_path` DIFFUSION_UNSUBSCRIBE_BY_FILTER_PARAMS_T unsub_params = { .topic_selector = topic_path, .filter = session_filter, .on_unsubscribe_by_filter = on_unsubscription_by_filter_complete, }; diffusion_unsubscribe_by_filter(session, unsub_params, NULL); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class SubscriptionControlFilter { func subscribe(session: PTDiffusionSession, filter: String, topic_path: String) { session.subscriptionControl.subscribe(withFilter: filter, topicSelectorExpression: topic_path) { (selected_sessions, error) in if (error != nil) { print("An error has occurred while subscribing session that match filter '%@' to topic '%@': %@", filter, topic_path, error!.localizedDescription) } else { print("Successfully subscribed %lu matching sessions with filter '%@' to topic '%@'", selected_sessions, filter, topic_path) } } } func unsubscribe(session: PTDiffusionSession, filter: String, topic_path: String) { session.subscriptionControl.unsubscribe(withFilter: filter, topicSelectorExpression: topic_path) { (selected_sessions, error) in if (error != nil) { print("An error has occurred while unsubscribing session that match filter '%@' from topic '%@': %@", filter, topic_path, error!.localizedDescription) } else { print("Successfully unsubscribed %lu matching sessions with filter '%@' from topic '%@'", selected_sessions, filter, topic_path) } } } }
Change the URL from that provided in the example to the URL of the Diffusion server .