Example: Subscribe other clients to topics via session ID
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
JavaScript
const controlSession = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
});
const topic = 'topic/example';
controlSession.topics.add(topic, diffusion.topics.TopicType.STRING);
const clientSession = await diffusion.connect({
    host   : 'host_name',
    port   : 443,
    secure : true
});
const clientSessionId = clientSession.sessionId;
const topicSelector = '?topic//';
await controlSession.clients.subscribe(clientSessionId, topicSelector);
console.log(`Subscribed session "${clientSessionId}" to topic "${topicSelector}".`)
.NET
/**
 * Copyright © 2021 - 2025 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that subscribes another session to topics.
    /// </summary>
    public sealed class SubscriptionControlSubscribe
    {
        public async Task SubscriptionControlSubscribeExample(string serverUrl)
        {
            var topic = $"topic/example";
            var controlSession = Diffusion.Sessions.Principal("control").Password("password")
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);
            var subscriptionControl = controlSession.SubscriptionControl;
            try
            {
                await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, CancellationToken.None);
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to add topic '{topic}' : {ex}.");
                controlSession.Close();
                return;
            }
            ISession session = Diffusion.Sessions.Principal("client")
                .Credentials(Diffusion.Credentials.Password("password"))
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .NoReconnection()
                .Open(serverUrl);
            WriteLine($"Session with id '{session.SessionId}' created.");
            var topicSelector = "?topic//";
            try
            {
                await subscriptionControl.SubscribeAsync(session.SessionId, topicSelector);
                WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}.");
                session.Close();
                controlSession.Close();
                return;
            }
            try
            {
                await subscriptionControl.UnsubscribeAsync(session.SessionId, topicSelector);
                WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'.");
            }
            catch (Exception ex)
            {
                WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}.");
            }
            // Close the sessions
            session.Close();
            controlSession.Close();
        }
    }
}
Java and Android
/*******************************************************************************
 * Copyright (C) 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
/**
 * This demonstrates using a client to subscribe and unsubscribe other clients
 * to topics using a session id.
 * <P>
 * This uses the 'SubscriptionControl' feature.
 *
 * @author DiffusionData Limited
 */
public class SubscriptionControlSubscribe {
    public static void main(String[] args) throws Exception {
        final String myTopicPath = "my/topic/path";
        // establish control and client sessions
        final Session controlSession = Diffusion.sessions()
            .principal("control")
            .password("password")
            .open("ws://localhost:8080");
        final Session clientSession = Diffusion.sessions()
            .principal("client")
            .password("password")
            .open("ws://localhost:8080");
        // initialise SubscriptionControl and TopicUpdate features
        final SubscriptionControl subscriptionControl = controlSession
            .feature(SubscriptionControl.class);
        final TopicUpdate topicUpdate = controlSession.feature(TopicUpdate.class);
        // register a stream to receive topic events
        clientSession.feature(Topics.class)
            .addStream(myTopicPath, String.class, new MyValueStream());
        // subscribe the client session to the topic using the session id
        subscriptionControl.subscribe(clientSession.getSessionId(), myTopicPath)
            .join();
        // add and set a topic with an initial value
        topicUpdate.addAndSet(
            myTopicPath,
            Diffusion.newTopicSpecification(TopicType.STRING),
            String.class,
            "myValue"
        ).join();
        // set a new topic value
        topicUpdate.set(myTopicPath, String.class, "myNewValue").join();
        // unsubscribe the client session from the topic using the session id
        subscriptionControl.unsubscribe(clientSession.getSessionId(), myTopicPath);
        Thread.sleep(1000);
        controlSession.close();
        clientSession.close();
    }
    private static final class MyValueStream implements Topics.ValueStream<String> {
        @Override
        public void onValue(String topicPath,
            TopicSpecification topicSpecification,
            String oldValue, String newValue) {
            System.out.printf("%s set to %s\n", topicPath, newValue);
        }
        @Override
        public void onSubscription(String topicPath,
            TopicSpecification topicSpecification) {
            System.out.printf("subscribed to %s\n", topicPath);
        }
        @Override
        public void onUnsubscription(String topicPath,
            TopicSpecification topicSpecification,
            Topics.UnsubscribeReason unsubscribeReason) {
            System.out.printf("unsubscribed from %s\n", topicPath);
        }
        @Override public void onClose() { }
        @Override public void onError(ErrorReason errorReason) { }
    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
    #include <unistd.h>
#else
    #define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
char *g_topic_selector = "my/topic/path";
static int on_subscription_complete(
    SESSION_T *session,
    void *context)
{
    // client has been successfully subscribed to a topic
    return HANDLER_SUCCESS;
}
static int on_session_open(
    SESSION_T *session,
    const SESSION_PROPERTIES_EVENT_T *request,
    void *context)
{
    if (session_id_cmp(*session->id, request->session_id) == 0)
    {
        // It's our own session, ignore.
        return HANDLER_SUCCESS;
    }
    // Subscribe the client session to the topic
    SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = {
        .session_id = request->session_id,
        .topic_selector = g_topic_selector,
        .on_complete = on_subscription_complete};
    subscribe_client(session, subscribe_params);
    return HANDLER_SUCCESS;
}
int main(
    int argc,
    char **argv)
{
    const char *url = "ws://localhost:8080";
    const char *principal = "control";
    const char *password = "password";
    CREDENTIALS_T *credentials = credentials_create_password(password);
    // create a session with Diffusion
    DIFFUSION_ERROR_T error = {0};
    SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
    if (session == NULL)
    {
        printf("Failed to create session: %s\n", error.message);
        free(error.message);
        credentials_free(credentials);
        return EXIT_FAILURE;
    }
    /*
     * Register a session properties listener, so we are notified
     * of new client connections.
     */
    SET_T *required_properties = set_new_string(1);
    set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES);
    SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = {.on_session_open = on_session_open, .required_properties = required_properties};
    session_properties_listener_register(session, params);
    set_free(required_properties);
    // Sleep for a while
    sleep(5);
    // Close the session, and release resources and memory
    session_close(session, NULL);
    session_free(session);
    credentials_free(credentials);
    return EXIT_SUCCESS;
}
Apple
//  Copyright (C) 2021 Push Technology Ltd.
//
//  Licensed under the Apache License, Version 2.0 (the "License");
//  you may not use this file except in compliance with the License.
//  You may obtain a copy of the License at
//  http://www.apache.org/licenses/LICENSE-2.0
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
import Foundation
import Diffusion
class SubscriptionControl {
    func subscribe(session: PTDiffusionSession,
                   sessionId: PTDiffusionSessionId,
                   topic_path: String) {
        session.subscriptionControl.subscribeSessionId(sessionId,
                                                       withTopicSelectorExpression: topic_path) { error in
            if (error != nil) {
                print("An error has occurred while subscribing session '%@' to topic '%@': %@",
                      sessionId,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Session '%@' has been successfully subscribed to topic '%@'", sessionId, topic_path)
            }
        }
    }
    func unsubscribe(session: PTDiffusionSession,
                     sessionId: PTDiffusionSessionId,
                     topic_path: String) {
        session.subscriptionControl.unsubscribeSessionId(sessionId,
                                                         withTopicSelectorExpression: topic_path) { error in
            if (error != nil) {
                print("An error has occurred while unsubscribing session '%@' from topic '%@': %@",
                      sessionId,
                      topic_path,
                      error!.localizedDescription)
            }
            else {
                print("Session '%@' has been successfully unsubscribed from topic '%@'", sessionId, topic_path)
            }
        }
    }
}
Change the URL from that provided in the example to the URL of the Diffusion server .