Just a second...

Example: Subscribe other clients to topics via session ID

The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.

JavaScript
const controlSession = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
});

const topic = 'topic/example';
controlSession.topics.add(topic, diffusion.topics.TopicType.STRING);

const clientSession = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true
});

const clientSessionId = clientSession.sessionId;
const topicSelector = '?topic//';
await controlSession.clients.subscribe(clientSessionId, topicSelector);

console.log(`Subscribed session "${clientSessionId}" to topic "${topicSelector}".`)
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Clients;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that subscribes another session to topics.
    /// </summary>
    public sealed class SubscriptionControlSubscribe
    {
        public async Task SubscriptionControlSubscribeExample(string serverUrl)
        {
            var topic = $"topic/example";

            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var subscriptionControl = controlSession.SubscriptionControl;

            try
            {
                await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, CancellationToken.None);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to add topic '{topic}' : {ex}.");
                controlSession.Close();
                return;
            }

            ISession session = Diffusion.Sessions.Principal("client")
                .Credentials(Diffusion.Credentials.Password("password"))
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .NoReconnection()
                .Open(serverUrl);

            WriteLine($"Session with id '{session.SessionId}' created.");

            var subscriptionCallback = new SubscriptionCallback();

            var topicSelector = "?topic//";

            try
            {
                subscriptionControl.Subscribe(session.SessionId, topicSelector, subscriptionCallback);

                WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}.");
                session.Close();
                controlSession.Close();
                return;
            }

            try
            {
                subscriptionControl.Unsubscribe(session.SessionId, topicSelector, subscriptionCallback);

                WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}.");
            }

            // Close the sessions
            session.Close();
            controlSession.Close();
        }

        /// <summary>
        /// The callback for subscription operations.
        /// </summary>
        private class SubscriptionCallback : ISubscriptionCallback
        {
            /// <summary>
            /// Indicates that the session was closed.
            /// </summary>
            public void OnDiscard()
            {
                WriteLine("The session was closed.");
            }

            /// <summary>
            /// Indicates that a requested operation has been handled by the server.
            /// </summary>
            public void OnComplete()
            {
                WriteLine("Subscription complete.");
            }
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;

/**
 * This demonstrates using a client to subscribe and unsubscribe other clients
 * to topics using a session id.
 * <P>
 * This uses the 'SubscriptionControl' feature.
 *
 * @author DiffusionData Limited
 */
public class SubscriptionControlSubscribe {

    public static void main(String[] args) throws Exception {

        final String myTopicPath = "my/topic/path";

        // establish control and client sessions
        final Session controlSession = Diffusion.sessions()
            .principal("control")
            .password("password")
            .open("ws://localhost:8080");

        final Session clientSession = Diffusion.sessions()
            .principal("client")
            .password("password")
            .open("ws://localhost:8080");

        // initialise SubscriptionControl and TopicUpdate features
        final SubscriptionControl subscriptionControl = controlSession
            .feature(SubscriptionControl.class);
        final TopicUpdate topicUpdate = controlSession.feature(TopicUpdate.class);

        // register a stream to receive topic events
        clientSession.feature(Topics.class)
            .addStream(myTopicPath, String.class, new MyValueStream());

        // subscribe the client session to the topic using the session id
        subscriptionControl.subscribe(clientSession.getSessionId(), myTopicPath)
            .join();

        // add and set a topic with an initial value
        topicUpdate.addAndSet(
            myTopicPath,
            Diffusion.newTopicSpecification(TopicType.STRING),
            String.class,
            "myValue"
        ).join();

        // set a new topic value
        topicUpdate.set(myTopicPath, String.class, "myNewValue").join();

        // unsubscribe the client session from the topic using the session id
        subscriptionControl.unsubscribe(clientSession.getSessionId(), myTopicPath);

        Thread.sleep(1000);

        controlSession.close();
        clientSession.close();
    }

    private static final class MyValueStream implements Topics.ValueStream<String> {
        @Override
        public void onValue(String topicPath,
            TopicSpecification topicSpecification,
            String oldValue, String newValue) {
            System.out.printf("%s set to %s\n", topicPath, newValue);
        }

        @Override
        public void onSubscription(String topicPath,
            TopicSpecification topicSpecification) {
            System.out.printf("subscribed to %s\n", topicPath);
        }

        @Override
        public void onUnsubscription(String topicPath,
            TopicSpecification topicSpecification,
            Topics.UnsubscribeReason unsubscribeReason) {
            System.out.printf("unsubscribed from %s\n", topicPath);
        }

        @Override public void onClose() { }

        @Override public void onError(ErrorReason errorReason) { }
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"


char *g_topic_selector = "my/topic/path";


static int on_subscription_complete(
        SESSION_T *session,
        void *context)
{
        // client has been successfully subscribed to a topic
        return HANDLER_SUCCESS;
}


static int on_session_open(
        SESSION_T *session,
        const SESSION_PROPERTIES_EVENT_T *request,
        void *context)
{
        if(session_id_cmp(*session->id, request->session_id) == 0) {
                // It's our own session, ignore.
                return HANDLER_SUCCESS;
        }

        // Subscribe the client session to the topic
        SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = {
                .session_id = request->session_id,
                .topic_selector = g_topic_selector,
                .on_complete = on_subscription_complete
        };
        subscribe_client(session, subscribe_params);

        return HANDLER_SUCCESS;
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);

        // create a session with Diffusion
        DIFFUSION_ERROR_T error = { 0 };
        SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                printf("Failed to create session: %s\n", error.message);
                free(error.message);
                credentials_free(credentials);
                return EXIT_FAILURE;
        }

        /*
         * Register a session properties listener, so we are notified
         * of new client connections.
         */
        SET_T *required_properties = set_new_string(1);
        set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES);
        SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = {
                .on_session_open = on_session_open,
                .required_properties = required_properties
        };
        session_properties_listener_register(session, params);
        set_free(required_properties);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);

        return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.

import Foundation
import Diffusion

class SubscriptionControl {

    func subscribe(session: PTDiffusionSession,
                   sessionId: PTDiffusionSessionId,
                   topic_path: String) {

        session.subscriptionControl.subscribeSessionId(sessionId,
                                                       withTopicSelectorExpression: topic_path) { error in
            if (error != nil) {
                print("An error has occurred while subscribing session '%@' to topic '%@': %@",
                      sessionId,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Session '%@' has been successfully subscribed to topic '%@'", sessionId, topic_path)
            }
        }
    }


    func unsubscribe(session: PTDiffusionSession,
                     sessionId: PTDiffusionSessionId,
                     topic_path: String) {

        session.subscriptionControl.unsubscribeSessionId(sessionId,
                                                         withTopicSelectorExpression: topic_path) { error in
            if (error != nil) {
                print("An error has occurred while unsubscribing session '%@' from topic '%@': %@",
                      sessionId,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Session '%@' has been successfully unsubscribed from topic '%@'", sessionId, topic_path)
            }
        }
    }



}

Change the URL from that provided in the example to the URL of the Diffusion server .