Example: Subscribe other clients to topics
The following examples use the SubscriptionControl feature in the Diffusion™ API to subscribe other client sessions to topics.
Java
and Android
package com.pushtechnology.diffusion.examples; import java.util.concurrent.CompletableFuture; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.control.topics.SubscriptionControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.SessionId; /** * This demonstrates using a client to subscribe and unsubscribe other clients * to topics. * <P> * This uses the 'SubscriptionControl' feature. * * @author Push Technology Limited * @since 5.0 */ public class ControlClientSubscriptionControl { private final Session session; private final SubscriptionControl subscriptionControl; /** * Constructor. */ public ControlClientSubscriptionControl() { session = Diffusion.sessions().principal("control").password("password") .open("ws://diffusion.example.com:80"); subscriptionControl = session.feature(SubscriptionControl.class); } /** * Subscribe a client to topics. * * @param sessionId client to subscribe * @param topicSelector topic selector expression * @param callback for subscription result */ public CompletableFuture<?> subscribe( SessionId sessionId, String topicSelector) { // To subscribe a client to a topic, this client session // must have the 'modify_session' permission. return subscriptionControl.subscribe( sessionId, topicSelector); } /** * Unsubscribe a client from topics. * * @param sessionId client to unsubscribe * @param topicSelector topic selector expression * @return a CompletableFuture that completes when a response is received * from the server */ public CompletableFuture<?> unsubscribe( SessionId sessionId, String topicSelector) { // To unsubscribe a client from a topic, this client session // must have the 'modify_session' permission. return subscriptionControl.unsubscribe( sessionId, topicSelector); } /** * Close the session. */ public void close() { session.close(); } }
.NET
C
/* * This example waits to be notified of a client connection, and then * subscribes that client to a named topic. */ #include <stdio.h> #include <unistd.h> #include <apr.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include "diffusion.h" #include "args.h" ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'t', "topic_selector", "Topic selector to subscribe/unsubscribe clients from", ARG_OPTIONAL, ARG_HAS_VALUE, ">foo"}, END_OF_ARG_OPTS }; HASH_T *options = NULL; /* * Callback invoked when a client has been successfully subscribed to * a topic. */ static int on_subscription_complete(SESSION_T *session, void *context) { printf("Subscription complete\n"); return HANDLER_SUCCESS; } /* * Callback invoked when a client session has been opened. */ static int on_session_open(SESSION_T *session, const SESSION_PROPERTIES_EVENT_T *request, void *context) { if(session_id_cmp(*session->id, request->session_id) == 0) { // It's our own session, ignore. return HANDLER_SUCCESS; } char *topic_selector = hash_get(options, "topic_selector"); char *sid_str = session_id_to_string(&request->session_id); printf("Subscribing session %s to topic selector %s\n", sid_str, topic_selector); free(sid_str); /* * Subscribe the client session to the topic. */ SUBSCRIPTION_CONTROL_PARAMS_T subscribe_params = { .session_id = request->session_id, .topic_selector = topic_selector, .on_complete = on_subscription_complete }; subscribe_client(session, subscribe_params); return HANDLER_SUCCESS; } int main(int argc, char **argv) { /* * Standard command-line parsing. */ options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } const char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } /* * Create a session with Diffusion. */ DIFFUSION_ERROR_T error = { 0 }; SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { fprintf(stderr, "Failed to create session: %s\n", error.message); return EXIT_FAILURE; } /* * Register a session properties listener, so we are notified * of new client connections. * In the callback, we will subscribe the client to topics * according to the topic_selector argument. */ SET_T *required_properties = set_new_string(1); set_add(required_properties, PROPERTIES_SELECTOR_ALL_FIXED_PROPERTIES); SESSION_PROPERTIES_REGISTRATION_PARAMS_T params = { .on_session_open = on_session_open, .required_properties = required_properties }; session_properties_listener_register(session, params); set_free(required_properties); /* * Pretend to do some work. */ sleep(10); /* * Close session and tidy up. */ session_close(session, NULL); session_free(session); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of Diffusion Cloud. Diffusion Cloud service URLs end in diffusion.cloud
This page last modified: 2020/06/25