Example: Make exclusive updates to a topic
The following examples use the Diffusion™ API to register as the update source of a topic and to update that topic with content. A client that updates a topic using this method locks the topic and prevents other clients from updating the topic.
JavaScript
diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password' }).then(function(session) { // A session may establish an exclusive update source. Once active, only this session may update topics at or // under the registration branch. session.topics.registerUpdateSource('exclusive/topic', { onRegister : function(topic, deregister) { // The handler provides a deregistration function to remove this registration and allow other sessions to // update topics under the registered path. }, onActive : function(topic, updater) { // Once active, a handler may use the provided updater to update any topics at or under the registered path updater.update('exclusive/topic/bar', 123).then(function() { // The update was successful. }, function(err) { // There was an error updating the topic }); }, onStandBy : function(topic) { // If there is another update source registered for the same topic path, any subsequent registrations will // be put into a standby state. The registration is still held by the server, and the 'onActive' function // will be called if the pre-existing registration is closed at a later point in time }, onClose : function(topic, err) { // The 'onClose' function will be called once the registration is closed, either by the session being closed // or the 'deregister' function being called. } }); });
Apple
@import Diffusion; @interface TopicUpdateSourceExample (PTDiffusionTopicUpdateSource) <PTDiffusionTopicUpdateSource> @end @implementation TopicUpdateSourceExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL*)url { PTDiffusionCredentials *const credentials = [[PTDiffusionCredentials alloc] initWithPassword:@"password"]; PTDiffusionSessionConfiguration *const sessionConfiguration = [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control" credentials:credentials]; NSLog(@"Connecting..."); [PTDiffusionSession openWithURL:url configuration:sessionConfiguration completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; // Add topic. [self addTopicForSession:session]; }]; } static NSString *const _TopicPath = @"Example/Exclusively Updating"; -(void)addTopicForSession:(PTDiffusionSession *const)session { // Add a single value topic without an initial value. [session.topicControl addWithTopicPath:_TopicPath type:PTDiffusionTopicType_SingleValue value:nil completionHandler:^(NSError * _Nullable error) { if (error) { NSLog(@"Failed to add topic. Error: %@", error); } else { NSLog(@"Topic created."); // Register as an exclusive update source. [self registerAsUpdateSourceForSession:session]; } }]; } -(void)registerAsUpdateSourceForSession:(PTDiffusionSession *const)session { [session.topicUpdateControl registerUpdateSource:self forTopicPath:_TopicPath completionHandler:^(PTDiffusionTopicTreeRegistration *const registration, NSError *const error) { if (registration) { NSLog(@"Registered as an update source."); } else { NSLog(@"Failed to register as an update source. Error: %@", error); } }]; } -(void)updateTopicWithUpdater:(PTDiffusionTopicUpdater *const)updater value:(const NSUInteger)value { // Prepare data to update topic with. NSString *const string = [NSString stringWithFormat:@"Update #%lu", (unsigned long)value]; NSData *const data = [string dataUsingEncoding:NSUTF8StringEncoding]; PTDiffusionContent *const content = [[PTDiffusionContent alloc] initWithData:data]; // Update the topic. [updater updateWithTopicPath:_TopicPath value:content completionHandler:^(NSError *const error) { if (error) { NSLog(@"Failed to update topic. Error: %@", error); } else { NSLog(@"Topic updated to \"%@\"", string); // Update topic after a short wait. dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1.0 * NSEC_PER_SEC)), dispatch_get_main_queue(), ^ { [self updateTopicWithUpdater:updater value:value + 1]; }); } }]; } @end @implementation TopicUpdateSourceExample (PTDiffusionTopicUpdateSource) -(void)diffusionTopicTreeRegistration:(PTDiffusionTopicTreeRegistration *const)registration isActiveWithUpdater:(PTDiffusionTopicUpdater *const)updater { NSLog(@"Registration is active."); // Start updating. [self updateTopicWithUpdater:updater value:1]; } @end
Java
and Android
.NET
C
/** * Copyright © 2014, 2016 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * * This example is written in C99. Please use an appropriate C99 capable compiler * * @author Push Technology Limited * @since 5.0 */ /* * This example creates a simple single-value topic and periodically updates * the data it contains. */ #include <stdio.h> #include <stdlib.h> #include <time.h> #include <unistd.h> #include <apr.h> #include <apr_thread_mutex.h> #include <apr_thread_cond.h> #include "diffusion.h" #include "args.h" #include "conversation.h" #include "service/svc-update.h" int active = 0; apr_pool_t *pool = NULL; apr_thread_mutex_t *mutex = NULL; apr_thread_cond_t *cond = NULL; ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'t', "topic", "Topic name to create and update", ARG_OPTIONAL, ARG_HAS_VALUE, "time"}, {'s', "seconds", "Number of seconds to run for before exiting", ARG_OPTIONAL, ARG_HAS_VALUE, "30"}, END_OF_ARG_OPTS }; /* * Handlers for add topic feature. */ static int on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context) { printf("Added topic\n"); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context) { printf("Failed to add topic (%d)\n", response->response_code); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_topic_add_discard(SESSION_T *session, void *context) { apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } /* * Handlers for registration of update source feature */ static int on_update_source_init(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_REGISTRATION_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("Topic source \"%s\" in init state\n", id_str); free(id_str); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_update_source_registered(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_REGISTRATION_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("Registered update source \"%s\"\n", id_str); free(id_str); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_update_source_deregistered(SESSION_T *session, const CONVERSATION_ID_T *updater_id, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("Deregistered update source \"%s\"\n", id_str); free(id_str); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS;} static int on_update_source_active(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_REGISTRATION_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("Topic source \"%s\" active\n", id_str); free(id_str); active = 1; apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_update_source_standby(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_REGISTRATION_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("Topic source \"%s\" on standby\n", id_str); free(id_str); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } static int on_update_source_closed(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_REGISTRATION_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("Topic source \"%s\" closed\n", id_str); free(id_str); apr_thread_mutex_lock(mutex); apr_thread_cond_broadcast(cond); apr_thread_mutex_unlock(mutex); return HANDLER_SUCCESS; } /* * Handlers for update of data. */ static int on_update_success(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("on_update_success for updater \"%s\"\n", id_str); free(id_str); return HANDLER_SUCCESS; } static int on_update_failure(SESSION_T *session, const CONVERSATION_ID_T *updater_id, const SVC_UPDATE_RESPONSE_T *response, void *context) { char *id_str = conversation_id_to_string(*updater_id); printf("on_update_failure for updater \"%s\"\n", id_str); free(id_str); return HANDLER_SUCCESS; } /* * Program entry point. */ int main(int argc, char** argv) { /* * Standard command-line parsing. */ const HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } const char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } const char *topic_name = hash_get(options, "topic"); const long seconds = atol(hash_get(options, "seconds")); /* * Setup for condition variable. */ apr_initialize(); apr_pool_create(&pool, NULL); apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool); apr_thread_cond_create(&cond, pool); /* * Create a session with the Diffusion server. */ SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { fprintf(stderr, "TEST: Failed to create session\n"); fprintf(stderr, "ERR : %s\n", error.message); return EXIT_FAILURE; } /* * Create a topic holding simple string content. */ TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING); const ADD_TOPIC_PARAMS_T add_topic_params = { .topic_path = topic_name, .details = string_topic_details, .on_topic_added = on_topic_added, .on_topic_add_failed = on_topic_add_failed, .on_discard = on_topic_add_discard, }; apr_thread_mutex_lock(mutex); add_topic(session, add_topic_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); topic_details_free(string_topic_details); /* * Define the handlers for add_update_source() */ const UPDATE_SOURCE_REGISTRATION_PARAMS_T update_reg_params = { .topic_path = topic_name, .on_init = on_update_source_init, .on_registered = on_update_source_registered, .on_active = on_update_source_active, .on_standby = on_update_source_standby, .on_close = on_update_source_closed }; /* * Register an updater. */ apr_thread_mutex_lock(mutex); CONVERSATION_ID_T *updater_id = register_update_source(session, update_reg_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); /* * Define default parameters for an update source. */ UPDATE_SOURCE_PARAMS_T update_source_params_base = { .updater_id = updater_id, .topic_path = topic_name, .on_success = on_update_success, .on_failure = on_update_failure }; time_t end_time = time(NULL) + seconds; while(time(NULL) < end_time) { if(active) { /* * Create an update structure containing the current time. */ BUF_T *buf = buf_create(); const time_t time_now = time(NULL); buf_write_string(buf, ctime(&time_now)); CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, buf); UPDATE_T *upd = update_create(UPDATE_ACTION_REFRESH, UPDATE_TYPE_CONTENT, content); UPDATE_SOURCE_PARAMS_T update_source_params = update_source_params_base; update_source_params.update = upd; /* * Update the topic. */ update(session, update_source_params); content_free(content); update_free(upd); buf_free(buf); } sleep(1); } if(active) { UPDATE_SOURCE_DEREGISTRATION_PARAMS_T update_dereg_params = { .updater_id = updater_id, .on_deregistered = on_update_source_deregistered }; apr_thread_mutex_lock(mutex); deregister_update_source(session, update_dereg_params); apr_thread_cond_wait(cond, mutex); apr_thread_mutex_unlock(mutex); } /* * Close session and free resources. */ session_close(session, NULL); session_free(session); conversation_id_free(updater_id); credentials_free(credentials); apr_thread_mutex_destroy(mutex); apr_thread_cond_destroy(cond); apr_pool_destroy(pool); apr_terminate(); return EXIT_SUCCESS; }
Change the URL from that provided in the example to the URL of Diffusion Cloud. Diffusion Cloud service URLs end in diffusion.cloud
This page last modified: 2020/06/25