Example: Subscribe other clients to topics via session ID
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
.NET
/** * Copyright © 2021, 2022 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features.Control.Clients; using PushTechnology.ClientInterface.Client.Features.Control.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that subscribes another session to topics. /// </summary> public sealed class SubscriptionControlSubscribe { public async Task SubscriptionControlSubscribeExample(string serverUrl) { var topic = $"topic/example"; var controlSession = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var subscriptionControl = controlSession.SubscriptionControl; try { await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, CancellationToken.None); } catch (Exception ex) { WriteLine($"Failed to add topic '{topic}' : {ex}."); controlSession.Close(); return; } ISession session = Diffusion.Sessions.Principal("client") .Credentials(Diffusion.Credentials.Password("password")) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .NoReconnection() .Open(serverUrl); WriteLine($"Session with id '{session.SessionId}' created."); var subscriptionCallback = new SubscriptionCallback(); var topicSelector = "?topic//"; try { subscriptionControl.Subscribe(session.SessionId, topicSelector, subscriptionCallback); WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'."); } catch (Exception ex) { WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}."); session.Close(); controlSession.Close(); return; } try { subscriptionControl.Unsubscribe(session.SessionId, topicSelector, subscriptionCallback); WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'."); } catch (Exception ex) { WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}."); } // Close the sessions session.Close(); controlSession.Close(); } /// <summary> /// The callback for subscription operations. /// </summary> private class SubscriptionCallback : ISubscriptionCallback { /// <summary> /// Indicates that the session was closed. /// </summary> public void OnDiscard() { WriteLine("The session was closed."); } /// <summary> /// Indicates that a requested operation has been handled by the server. /// </summary> public void OnComplete() { WriteLine("Subscription complete."); } } } }
Java and Android
package com.pushtechnology.diffusion.examples; import java.util.concurrent.CompletableFuture; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.SessionId; /** * This demonstrates using a client to subscribe and unsubscribe other clients * to topics. * <P> * This uses the 'SubscriptionControl' feature. * * @author Push Technology Limited * @since 5.0 */ public class ControlClientSubscriptionControl { private final Session session; private final SubscriptionControl subscriptionControl; /** * Constructor. */ public ControlClientSubscriptionControl() { session = Diffusion.sessions().principal("control").password("password") .open("ws://diffusion.example.com:80"); subscriptionControl = session.feature(SubscriptionControl.class); } /** * Subscribe a client to topics. * * @param sessionId client to subscribe * @param topicSelector topic selector expression * @param callback for subscription result */ public CompletableFuture<?> subscribe( SessionId sessionId, String topicSelector) { // To subscribe a client to a topic, this client session // must have the 'modify_session' permission. return subscriptionControl.subscribe( sessionId, topicSelector); } /** * Unsubscribe a client from topics. * * @param sessionId client to unsubscribe * @param topicSelector topic selector expression * @return a CompletableFuture that completes when a response is received * from the server */ public CompletableFuture<?> unsubscribe( SessionId sessionId, String topicSelector) { // To unsubscribe a client from a topic, this client session // must have the 'modify_session' permission. return subscriptionControl.unsubscribe( sessionId, topicSelector); } /** * Close the session. */ public void close() { session.close(); } }
C
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" char *g_topic_selector = "my/topic/path"; static int on_subscription_complete( SESSION_T *session, void *context) { // client has been successfully subscribed to a topic return HANDLER_SUCCESS; } static int on_session_open( SESSION_T *session, const SESSION_PROPERTIES_EVENT_T *request, void *context) { if(session_id_cmp(*session->id, request->session_id) == 0) { // It's our own session, ignore. return HANDLER_SUCCESS; } // Subscribe the client session to the topic SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = { .session_id = request->session_id, .topic_selector = g_topic_selector, .on_complete = on_subscription_complete }; subscribe_client(session, subscribe_params); return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; CREDENTIALS_T *credentials = credentials_create_password(password); // create a session with Diffusion DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } /* * Register a session properties listener, so we are notified * of new client connections. */ SET_T *required_properties = set_new_string(1); set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES); SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = { .on_session_open = on_session_open, .required_properties = required_properties }; session_properties_listener_register(session, params); set_free(required_properties); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class SubscriptionControl { func subscribe(session: PTDiffusionSession, sessionId: PTDiffusionSessionId, topic_path: String) { session.subscriptionControl.subscribeSessionId(sessionId, withTopicSelectorExpression: topic_path) { error in if (error != nil) { print("An error has occurred while subscribing session '%@' to topic '%@': %@", sessionId, topic_path, error!.localizedDescription) } else { print("Session '%@' has been successfully subscribed to topic '%@'", sessionId, topic_path) } } } func unsubscribe(session: PTDiffusionSession, sessionId: PTDiffusionSessionId, topic_path: String) { session.subscriptionControl.unsubscribeSessionId(sessionId, withTopicSelectorExpression: topic_path) { error in if (error != nil) { print("An error has occurred while unsubscribing session '%@' from topic '%@': %@", sessionId, topic_path, error!.localizedDescription) } else { print("Session '%@' has been successfully unsubscribed from topic '%@'", sessionId, topic_path) } } } }
Change the URL from that provided in the example to the URL of the Diffusion server.