Example: Subscribe other clients to topics via session ID
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
JavaScript
const controlSession = await diffusion.connect({
host : 'host_name',
port : 443,
secure : true,
principal : 'control',
credentials : 'password'
});
const topic = 'topic/example';
controlSession.topics.add(topic, diffusion.topics.TopicType.STRING);
const clientSession = await diffusion.connect({
host : 'host_name',
port : 443,
secure : true
});
const clientSessionId = clientSession.sessionId;
const topicSelector = '?topic//';
await controlSession.clients.subscribe(clientSessionId, topicSelector);
console.log(`Subscribed session "${clientSessionId}" to topic "${topicSelector}".`)
.NET
/**
* Copyright © 2021 - 2025 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that subscribes another session to topics.
/// </summary>
public sealed class SubscriptionControlSubscribe
{
public async Task SubscriptionControlSubscribeExample(string serverUrl)
{
var topic = $"topic/example";
var controlSession = Diffusion.Sessions.Principal("control").Password("password")
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open(serverUrl);
var subscriptionControl = controlSession.SubscriptionControl;
try
{
await controlSession.TopicControl.AddTopicAsync(topic, TopicType.STRING, CancellationToken.None);
}
catch (Exception ex)
{
WriteLine($"Failed to add topic '{topic}' : {ex}.");
controlSession.Close();
return;
}
ISession session = Diffusion.Sessions.Principal("client")
.Credentials(Diffusion.Credentials.Password("password"))
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.NoReconnection()
.Open(serverUrl);
WriteLine($"Session with id '{session.SessionId}' created.");
var topicSelector = "?topic//";
try
{
await subscriptionControl.SubscribeAsync(session.SessionId, topicSelector);
WriteLine($"Session with id '{session.SessionId}' is subscribed to '{topicSelector}'.");
}
catch (Exception ex)
{
WriteLine($"Failed to subscribe to '{topicSelector}' : {ex}.");
session.Close();
controlSession.Close();
return;
}
try
{
await subscriptionControl.UnsubscribeAsync(session.SessionId, topicSelector);
WriteLine($"Session with id '{session.SessionId}' is unsubscribed to '{topicSelector}'.");
}
catch (Exception ex)
{
WriteLine($"Failed to unsubscribe to '{topicSelector}' : {ex}.");
}
// Close the sessions
session.Close();
controlSession.Close();
}
}
}
Java and Android
/*******************************************************************************
* Copyright (C) 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
/**
* This demonstrates using a client to subscribe and unsubscribe other clients
* to topics using a session id.
* <P>
* This uses the 'SubscriptionControl' feature.
*
* @author DiffusionData Limited
*/
public class SubscriptionControlSubscribe {
public static void main(String[] args) throws Exception {
final String myTopicPath = "my/topic/path";
// establish control and client sessions
final Session controlSession = Diffusion.sessions()
.principal("control")
.password("password")
.open("ws://localhost:8080");
final Session clientSession = Diffusion.sessions()
.principal("client")
.password("password")
.open("ws://localhost:8080");
// initialise SubscriptionControl and TopicUpdate features
final SubscriptionControl subscriptionControl = controlSession
.feature(SubscriptionControl.class);
final TopicUpdate topicUpdate = controlSession.feature(TopicUpdate.class);
// register a stream to receive topic events
clientSession.feature(Topics.class)
.addStream(myTopicPath, String.class, new MyValueStream());
// subscribe the client session to the topic using the session id
subscriptionControl.subscribe(clientSession.getSessionId(), myTopicPath)
.join();
// add and set a topic with an initial value
topicUpdate.addAndSet(
myTopicPath,
Diffusion.newTopicSpecification(TopicType.STRING),
String.class,
"myValue"
).join();
// set a new topic value
topicUpdate.set(myTopicPath, String.class, "myNewValue").join();
// unsubscribe the client session from the topic using the session id
subscriptionControl.unsubscribe(clientSession.getSessionId(), myTopicPath);
Thread.sleep(1000);
controlSession.close();
clientSession.close();
}
private static final class MyValueStream implements Topics.ValueStream<String> {
@Override
public void onValue(String topicPath,
TopicSpecification topicSpecification,
String oldValue, String newValue) {
System.out.printf("%s set to %s\n", topicPath, newValue);
}
@Override
public void onSubscription(String topicPath,
TopicSpecification topicSpecification) {
System.out.printf("subscribed to %s\n", topicPath);
}
@Override
public void onUnsubscription(String topicPath,
TopicSpecification topicSpecification,
Topics.UnsubscribeReason unsubscribeReason) {
System.out.printf("unsubscribed from %s\n", topicPath);
}
@Override public void onClose() { }
@Override public void onError(ErrorReason errorReason) { }
}
}
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
char *g_topic_selector = "my/topic/path";
static int on_subscription_complete(
SESSION_T *session,
void *context)
{
// client has been successfully subscribed to a topic
return HANDLER_SUCCESS;
}
static int on_session_open(
SESSION_T *session,
const SESSION_PROPERTIES_EVENT_T *request,
void *context)
{
if (session_id_cmp(*session->id, request->session_id) == 0)
{
// It's our own session, ignore.
return HANDLER_SUCCESS;
}
// Subscribe the client session to the topic
SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = {
.session_id = request->session_id,
.topic_selector = g_topic_selector,
.on_complete = on_subscription_complete};
subscribe_client(session, subscribe_params);
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
CREDENTIALS_T *credentials = credentials_create_password(password);
// create a session with Diffusion
DIFFUSION_ERROR_T error = {0};
SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
/*
* Register a session properties listener, so we are notified
* of new client connections.
*/
SET_T *required_properties = set_new_string(1);
set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES);
SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = {.on_session_open = on_session_open, .required_properties = required_properties};
session_properties_listener_register(session, params);
set_free(required_properties);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
return EXIT_SUCCESS;
}
Apple
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
class SubscriptionControl {
func subscribe(session: PTDiffusionSession,
sessionId: PTDiffusionSessionId,
topic_path: String) {
session.subscriptionControl.subscribeSessionId(sessionId,
withTopicSelectorExpression: topic_path) { error in
if (error != nil) {
print("An error has occurred while subscribing session '%@' to topic '%@': %@",
sessionId,
topic_path,
error!.localizedDescription)
}
else {
print("Session '%@' has been successfully subscribed to topic '%@'", sessionId, topic_path)
}
}
}
func unsubscribe(session: PTDiffusionSession,
sessionId: PTDiffusionSessionId,
topic_path: String) {
session.subscriptionControl.unsubscribeSessionId(sessionId,
withTopicSelectorExpression: topic_path) { error in
if (error != nil) {
print("An error has occurred while unsubscribing session '%@' from topic '%@': %@",
sessionId,
topic_path,
error!.localizedDescription)
}
else {
print("Session '%@' has been successfully unsubscribed from topic '%@'", sessionId, topic_path)
}
}
}
}
Change the URL from that provided in the example to the URL of the Diffusion server .