Example: Subscribe other clients to topics via session ID
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
JavaScript
const controlSession = await diffusion.connect({ host : 'host_name', port : 443, secure : true, principal : 'control', credentials : 'password' }); const topic = 'topic/example'; controlSession.topics.add(topic, diffusion.topics.TopicType.STRING); const clientSession = await diffusion.connect({ host : 'host_name', port : 443, secure : true }); const clientSessionId = clientSession.sessionId; const topicSelector = '?topic//'; await controlSession.clients.subscribe(clientSessionId, topicSelector); console.log(`Subscribed session "${clientSessionId}" to topic "${topicSelector}".`)
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Clients; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that subscribes another session to topics. /// </summary> public sealed class SubscriptionControlSubscribe { public async Task SubscriptionControlSubscribeExample(string serverUrl) { var topic = $"topic/example"; var controlSession = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var subscriptionControl = controlSession.SubscriptionControl; try { await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, CancellationToken.None); } catch (Exception ex) { WriteLine($"Failed to add topic '{topic}' : {ex}."); controlSession.Close(); return; } ISession session = Diffusion.Sessions.Principal("client") .Credentials(Diffusion.Credentials.Password("password")) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .NoReconnection() .Open(serverUrl); WriteLine($"Session with id '{session.SessionId}' created."); var subscriptionCallback = new SubscriptionCallback(); var topicSelector = "?topic//"; try { subscriptionControl.Subscribe(session.SessionId, topicSelector, subscriptionCallback); WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'."); } catch (Exception ex) { WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}."); session.Close(); controlSession.Close(); return; } try { subscriptionControl.Unsubscribe(session.SessionId, topicSelector, subscriptionCallback); WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'."); } catch (Exception ex) { WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}."); } // Close the sessions session.Close(); controlSession.Close(); } /// <summary> /// The callback for subscription operations. /// </summary> private class SubscriptionCallback : ISubscriptionCallback { /// <summary> /// Indicates that the session was closed. /// </summary> public void OnDiscard() { WriteLine("The session was closed."); } /// <summary> /// Indicates that a requested operation has been handled by the server. /// </summary> public void OnComplete() { WriteLine("Subscription complete."); } } } }
Java and Android
/******************************************************************************* * Copyright (C) 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.callbacks.ErrorReason; import com.pushtechnology.diffusion.client.features.TopicUpdate; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * This demonstrates using a client to subscribe and unsubscribe other clients * to topics using a session id. * <P> * This uses the 'SubscriptionControl' feature. * * @author DiffusionData Limited */ public class SubscriptionControlSubscribe { public static void main(String[] args) throws Exception { final String myTopicPath = "my/topic/path"; // establish control and client sessions final Session controlSession = Diffusion.sessions() .principal("control") .password("password") .open("ws://localhost:8080"); final Session clientSession = Diffusion.sessions() .principal("client") .password("password") .open("ws://localhost:8080"); // initialise SubscriptionControl and TopicUpdate features final SubscriptionControl subscriptionControl = controlSession .feature(SubscriptionControl.class); final TopicUpdate topicUpdate = controlSession.feature(TopicUpdate.class); // register a stream to receive topic events clientSession.feature(Topics.class) .addStream(myTopicPath, String.class, new MyValueStream()); // subscribe the client session to the topic using the session id subscriptionControl.subscribe(clientSession.getSessionId(), myTopicPath) .join(); // add and set a topic with an initial value topicUpdate.addAndSet( myTopicPath, Diffusion.newTopicSpecification(TopicType.STRING), String.class, "myValue" ).join(); // set a new topic value topicUpdate.set(myTopicPath, String.class, "myNewValue").join(); // unsubscribe the client session from the topic using the session id subscriptionControl.unsubscribe(clientSession.getSessionId(), myTopicPath); Thread.sleep(1000); controlSession.close(); clientSession.close(); } private static final class MyValueStream implements Topics.ValueStream<String> { @Override public void onValue(String topicPath, TopicSpecification topicSpecification, String oldValue, String newValue) { System.out.printf("%s set to %s\n", topicPath, newValue); } @Override public void onSubscription(String topicPath, TopicSpecification topicSpecification) { System.out.printf("subscribed to %s\n", topicPath); } @Override public void onUnsubscription(String topicPath, TopicSpecification topicSpecification, Topics.UnsubscribeReason unsubscribeReason) { System.out.printf("unsubscribed from %s\n", topicPath); } @Override public void onClose() { } @Override public void onError(ErrorReason errorReason) { } } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" char *g_topic_selector = "my/topic/path"; static int on_subscription_complete( SESSION_T *session, void *context) { // client has been successfully subscribed to a topic return HANDLER_SUCCESS; } static int on_session_open( SESSION_T *session, const SESSION_PROPERTIES_EVENT_T *request, void *context) { if(session_id_cmp(*session->id, request->session_id) == 0) { // It's our own session, ignore. return HANDLER_SUCCESS; } // Subscribe the client session to the topic SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = { .session_id = request->session_id, .topic_selector = g_topic_selector, .on_complete = on_subscription_complete }; subscribe_client(session, subscribe_params); return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; CREDENTIALS_T *credentials = credentials_create_password(password); // create a session with Diffusion DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } /* * Register a session properties listener, so we are notified * of new client connections. */ SET_T *required_properties = set_new_string(1); set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES); SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = { .on_session_open = on_session_open, .required_properties = required_properties }; session_properties_listener_register(session, params); set_free(required_properties); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class SubscriptionControl { func subscribe(session: PTDiffusionSession, sessionId: PTDiffusionSessionId, topic_path: String) { session.subscriptionControl.subscribeSessionId(sessionId, withTopicSelectorExpression: topic_path) { error in if (error != nil) { print("An error has occurred while subscribing session '%@' to topic '%@': %@", sessionId, topic_path, error!.localizedDescription) } else { print("Session '%@' has been successfully subscribed to topic '%@'", sessionId, topic_path) } } } func unsubscribe(session: PTDiffusionSession, sessionId: PTDiffusionSessionId, topic_path: String) { session.subscriptionControl.unsubscribeSessionId(sessionId, withTopicSelectorExpression: topic_path) { error in if (error != nil) { print("An error has occurred while unsubscribing session '%@' from topic '%@': %@", sessionId, topic_path, error!.localizedDescription) } else { print("Session '%@' has been successfully unsubscribed from topic '%@'", sessionId, topic_path) } } } }
Change the URL from that provided in the example to the URL of the Diffusion server .