Just a second...

Example: Make exclusive updates to a topic

The following examples use the API to register as the update source of a topic and to update that topic with content. A client that updates a topic using this method locks the topic and prevents other clients from updating the topic.

JavaScript
diffusion.connect({
    host   : 'diffusion.example.com',
    port   : 443,
    secure : true,
    principal : 'control',
    credentials : 'password'
}).then(function(session) {
    // A session may establish an exclusive update source. Once active, only this session may update topics at or
    // under the registration branch.

    session.topics.registerUpdateSource('exclusive/topic', {
        onRegister : function(topic, deregister) {
            // The handler provides a deregistration function to remove this registration and allow other sessions to
            // update topics under the registered path.
        },
        onActive : function(topic, updater) {
            // Once active, a handler may use the provided updater to update any topics at or under the registered path
            updater.update('exclusive/topic/bar', 123).then(function() {
                // The update was successful.
            }, function(err) {
                // There was an error updating the topic
            });
        },
        onStandBy : function(topic) {
            // If there is another update source registered for the same topic path, any subsequent registrations will
            // be put into a standby state. The registration is still held by the server, and the 'onActive' function
            // will be called if the pre-existing registration is closed at a later point in time
        },
        onClose : function(topic, err) {
            // The 'onClose' function will be called once the registration is closed, either by the session being closed
            // or the 'deregister' function being called.
        }
    });
});
Apple
@import Diffusion;

@interface TopicUpdateSourceExample (PTDiffusionTopicUpdateSource) <PTDiffusionTopicUpdateSource>
@end

@implementation TopicUpdateSourceExample {
    PTDiffusionSession* _session;
}

-(void)startWithURL:(NSURL*)url {
    PTDiffusionCredentials *const credentials =
        [[PTDiffusionCredentials alloc] initWithPassword:@"password"];

    PTDiffusionSessionConfiguration *const sessionConfiguration =
        [[PTDiffusionSessionConfiguration alloc] initWithPrincipal:@"control"
                                                       credentials:credentials];

    NSLog(@"Connecting...");

    [PTDiffusionSession openWithURL:url
                      configuration:sessionConfiguration
                  completionHandler:^(PTDiffusionSession *session, NSError *error)
    {
        if (!session) {
            NSLog(@"Failed to open session: %@", error);
            return;
        }

        // At this point we now have a connected session.
        NSLog(@"Connected.");

        // Set ivar to maintain a strong reference to the session.
        _session = session;

        // Add topic.
        [self addTopicForSession:session];
    }];
}

static NSString *const _TopicPath = @"Example/Exclusively Updating";

-(void)addTopicForSession:(PTDiffusionSession *const)session {
    // Add a single value topic without an initial value.
    [session.topicControl addWithTopicPath:_TopicPath
                                      type:PTDiffusionTopicType_SingleValue
                                     value:nil
                         completionHandler:^(NSError * _Nullable error)
    {
        if (error) {
            NSLog(@"Failed to add topic. Error: %@", error);
        } else {
            NSLog(@"Topic created.");

            // Register as an exclusive update source.
            [self registerAsUpdateSourceForSession:session];
        }
    }];
}

-(void)registerAsUpdateSourceForSession:(PTDiffusionSession *const)session {
    [session.topicUpdateControl registerUpdateSource:self
                                        forTopicPath:_TopicPath
                                   completionHandler:^(PTDiffusionTopicTreeRegistration *const registration, NSError *const error)
    {
        if (registration) {
            NSLog(@"Registered as an update source.");
        } else {
            NSLog(@"Failed to register as an update source. Error: %@", error);
        }
    }];
}

-(void)updateTopicWithUpdater:(PTDiffusionTopicUpdater *const)updater
                        value:(const NSUInteger)value {
    // Prepare data to update topic with.
    NSString *const string =
        [NSString stringWithFormat:@"Update #%lu", (unsigned long)value];
    NSData *const data = [string dataUsingEncoding:NSUTF8StringEncoding];
    PTDiffusionContent *const content =
        [[PTDiffusionContent alloc] initWithData:data];

    // Update the topic.
    [updater updateWithTopicPath:_TopicPath
                           value:content
               completionHandler:^(NSError *const error)
    {
        if (error) {
            NSLog(@"Failed to update topic. Error: %@", error);
        } else {
            NSLog(@"Topic updated to \"%@\"", string);

            // Update topic after a short wait.
            dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(1.0 * NSEC_PER_SEC)),
                dispatch_get_main_queue(), ^
            {
                [self updateTopicWithUpdater:updater value:value + 1];
            });
        }
    }];
}

@end

@implementation TopicUpdateSourceExample (PTDiffusionTopicUpdateSource)

-(void)diffusionTopicTreeRegistration:(PTDiffusionTopicTreeRegistration *const)registration
                  isActiveWithUpdater:(PTDiffusionTopicUpdater *const)updater {
    NSLog(@"Registration is active.");

    // Start updating.
    [self updateTopicWithUpdater:updater value:1];
}

@end
Java and Android
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.TopicTreeHandler;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl.AddCallback;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.UpdateSource;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater;
import com.pushtechnology.diffusion.client.features.control.topics.TopicUpdateControl.Updater.UpdateCallback;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.SingleValueTopicDetails;
import com.pushtechnology.diffusion.client.topics.details.TopicDetails;

/**
 * An example of using a control client as an event feed to a topic.
 * <P>
 * This uses the 'TopicControl' feature to create a topic and the
 * 'TopicUpdateControl' feature to send updates to it.
 * <P>
 * To send updates to a topic, the client session requires the 'update_topic'
 * permission for that branch of the topic tree.
 *
 * @author Push Technology Limited
 * @since 5.0
 */
public class ControlClientAsUpdateSource {

    private static final String TOPIC_NAME = "Feeder";

    private final Session session;
    private final TopicControl topicControl;
    private final TopicUpdateControl updateControl;
    private final UpdateCallback updateCallback;

    /**
     * Constructor.
     *
     * @param callback for updates
     */
    public ControlClientAsUpdateSource(UpdateCallback callback) {

        updateCallback = callback;

        session =
            Diffusion.sessions().principal("control").password("password")
                .open("ws://diffusion.example.com:80");
        topicControl = session.feature(TopicControl.class);
        updateControl = session.feature(TopicUpdateControl.class);
    }

    /**
     * Start the feed.
     *
     * @param provider the provider of prices
     * @param scheduler a scheduler service to schedule a periodic feeder task
     */
    public void start(
        final PriceProvider provider,
        final ScheduledExecutorService scheduler) {

        // Set up topic details
        final SingleValueTopicDetails.Builder builder =
            topicControl.newDetailsBuilder(
                SingleValueTopicDetails.Builder.class);

        final TopicDetails details =
            builder.metadata(Diffusion.metadata().decimal("Price")).build();

        // Declare a custom update source implementation. When the source is set
        // as active start a periodic task to poll the provider every second and
        // update the topic. When the source is closed, stop the scheduled task.
        final UpdateSource source = new UpdateSource.Default() {
            private ScheduledFuture<?> theFeeder;

            @Override
            public void onActive(String topicPath, Updater updater) {
                theFeeder =
                    scheduler.scheduleAtFixedRate(
                        new FeederTask(provider, updater),
                        1, 1, TimeUnit.SECONDS);
            }

            @Override
            public void onClose(String topicPath) {
                if (theFeeder != null) {
                    theFeeder.cancel(true);
                }
            }
        };

        // Create the topic. When the callback indicates that the topic has been
        // created then register the topic source for the topic and request
        // that it is removed when the session closes.
        topicControl.addTopic(
            TOPIC_NAME,
            details,
            new AddCallback.Default() {
                @Override
                public void onTopicAdded(String topic) {
                    topicControl.removeTopicsWithSession(
                        topic,
                        new TopicTreeHandler.Default());
                    updateControl.registerUpdateSource(topic, source);
                }
            });

    }

    /**
     * Close the session.
     */
    public void close() {
        session.close();
    }

    /**
     * Periodic task to poll from provider and send update to server.
     */
    private final class FeederTask implements Runnable {

        private final PriceProvider priceProvider;
        private final Updater priceUpdater;

        private FeederTask(PriceProvider provider, Updater updater) {
            priceProvider = provider;
            priceUpdater = updater;
        }

        @Override
        public void run() {
            priceUpdater.update(
                TOPIC_NAME,
                Diffusion.content().newContent(priceProvider.getPrice()),
                updateCallback);
        }

    }

    /**
     * Interface of a price provider that can periodically be polled for a
     * price.
     */
    public interface PriceProvider {
        /**
         * Get the current price.
         *
         * @return current price as a decimal string
         */
        String getPrice();
    }
}
.NET
using System.Threading;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features.Control.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;

namespace Examples {
    /// <summary>
    /// An example of using a control client as an event feed to a topic.
    ///
    /// This uses the <see cref="ITopicControl"/> feature to create a topic and the <see cref="ITopicUpdateControl"/>
    /// feature to send updates to it.
    ///
    /// To send updates to a topic, the client session requires the <see cref="TopicPermission.UPDATE_TOPIC"/>
    /// permission for that branch of the topic tree.
    /// </summary>
    public class ControlClientAsUpdateSource {
        private const string TopicName = "Feeder";
        private readonly ISession session;
        private readonly ITopicControl topicControl;
        private readonly ITopicUpdateControl updateControl;
        private readonly ITopicUpdaterUpdateCallback updateCallback;

        /// <summary>
        /// Constructor.
        /// </summary>
        /// <param name="callback">The callback for updates.</param>
        public ControlClientAsUpdateSource( ITopicUpdaterUpdateCallback callback ) {
            updateCallback = callback;

            session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .Open( "ws://diffusion.example.com;80" );

            topicControl = session.GetTopicControlFeature();
            updateControl = session.GetTopicUpdateControlFeature();
        }

        public void Start( IPriceProvider provider ) {
            // Set up topic details
            var builder = topicControl.CreateDetailsBuilder<ISingleValueTopicDetailsBuilder>();
            var details = builder.Metadata( Diffusion.Metadata.Decimal( "Price" ) ).Build();

            // Declare a custom update source implementation. When the source is set as active, start a periodic task
            // to poll the provider every second and update the topic. When the source is closed, stop the scheduled
            // task.
            var source = new UpdateSource( provider, updateCallback );

            // Create the topic. When the callback indicates that the topic has been created, register the topic
            // source for the topic.
            topicControl.AddTopicFromValue( TopicName, details, new AddCallback( updateControl, source ) );
        }

        public void Close() {
            // Remove our topic and close the session when done.
            topicControl.RemoveTopics( ">" + TopicName, new RemoveCallback( session ) );
        }

        private class RemoveCallback : TopicControlRemoveCallbackDefault {
            private readonly ISession theSession;

            public RemoveCallback( ISession session ) {
                theSession = session;
            }

            /// <summary>
            /// Notification that a call context was closed prematurely, typically due to a timeout or the session being
            /// closed. No further calls will be made for the context.
            /// </summary>
            public override void OnDiscard() {
                theSession.Close();
            }

            /// <summary>
            /// Topic(s) have been removed.
            /// </summary>
            public override void OnTopicsRemoved() {
                theSession.Close();
            }
        }

        private class AddCallback : TopicControlAddCallbackDefault {
            private readonly ITopicUpdateControl updateControl;
            private readonly UpdateSource updateSource;

            public AddCallback( ITopicUpdateControl updater, UpdateSource source ) {
                updateControl = updater;
                updateSource = source;
            }

            /// <summary>
            /// Topic has been added.
            /// </summary>
            /// <param name="topicPath">The full path of the topic that was added.</param>
            public override void OnTopicAdded( string topicPath ) {
                updateControl.RegisterUpdateSource( topicPath, updateSource );
            }
        }

        private class UpdateSource : TopicUpdateSourceDefault {
            private readonly IPriceProvider thePriceProvider;
            private readonly ITopicUpdaterUpdateCallback theUpdateCallback;
            private readonly CancellationTokenSource cancellationToken = new CancellationTokenSource();

            public UpdateSource( IPriceProvider provider, ITopicUpdaterUpdateCallback callback ) {
                thePriceProvider = provider;
                theUpdateCallback = callback;
            }

            /// <summary>
            /// State notification that this source is now active for the specified topic path, and is therefore in a
            /// valid state to send updates on topics at or below the registered topic path.
            /// </summary>
            /// <param name="topicPath">The registration path.</param>
            /// <param name="updater">An updater that may be used to update topics at or below the registered path.</param>
            public override void OnActive( string topicPath, ITopicUpdater updater ) {
                PeriodicTaskFactory.Start( () => {
                    updater.Update(
                        TopicName, Diffusion.Content.NewContent( thePriceProvider.Price ), theUpdateCallback );
                }, 1000, cancelToken: cancellationToken.Token );
            }

            /// <summary>
            /// Called if the handler is closed. The handler will be closed if the
            /// session is closed after the handler has been registered, or if the
            /// handler is unregistered using <see cref="IRegistration.Close">close</see>.
            ///
            /// No further calls will be made for the handler.
            /// </summary>
            /// <param name="topicPath">the branch of the topic tree for which the handler was registered</param>
            public override void OnClose( string topicPath ) {
                cancellationToken.Cancel();
            }
        }

        public interface IPriceProvider {
            /// <summary>
            /// Get the current price as a decimal string.
            /// </summary>
            string Price {
                get;
            }
        }
    }
}
C
/**
 * Copyright © 2014, 2016 Push Technology Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 * This example is written in C99. Please use an appropriate C99 capable compiler
 *
 * @author Push Technology Limited
 * @since 5.0
 */

/*
 * This example creates a simple single-value topic and periodically updates
 * the data it contains.
 */
#include &lt;stdio.h>
#include &lt;stdlib.h>
#include &lt;time.h>
#include &lt;unistd.h>

#include &lt;apr.h>
#include &lt;apr_thread_mutex.h>
#include &lt;apr_thread_cond.h>

#include "diffusion.h"
#include "args.h"
#include "conversation.h"
#include "service/svc-update.h"

int active = 0;

apr_pool_t *pool = NULL;
apr_thread_mutex_t *mutex = NULL;
apr_thread_cond_t *cond = NULL;

ARG_OPTS_T arg_opts[] = {
        ARG_OPTS_HELP,
        {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"},
        {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL},
        {'t', "topic", "Topic name to create and update", ARG_OPTIONAL, ARG_HAS_VALUE, "time"},
        {'s', "seconds", "Number of seconds to run for before exiting", ARG_OPTIONAL, ARG_HAS_VALUE, "30"},
        END_OF_ARG_OPTS
};

/*
 * Handlers for add topic feature.
 */
static int
on_topic_added(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        printf("Added topic\n");
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_failed(SESSION_T *session, const SVC_ADD_TOPIC_RESPONSE_T *response, void *context)
{
        printf("Failed to add topic (%d)\n", response->response_code);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_topic_add_discard(SESSION_T *session, void *context)
{
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

/*
 * Handlers for registration of update source feature
 */
static int
on_update_source_init(SESSION_T *session,
                      const CONVERSATION_ID_T *updater_id,
                      const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                      void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("Topic source \"%s\" in init state\n", id_str);
        free(id_str);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_update_source_registered(SESSION_T *session,
                            const CONVERSATION_ID_T *updater_id,
                            const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                            void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("Registered update source \"%s\"\n", id_str);
        free(id_str);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_update_source_deregistered(SESSION_T *session,
                              const CONVERSATION_ID_T *updater_id,
                              void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("Deregistered update source \"%s\"\n", id_str);
        free(id_str);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;}


static int
on_update_source_active(SESSION_T *session,
                        const CONVERSATION_ID_T *updater_id,
                        const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                        void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("Topic source \"%s\" active\n", id_str);
        free(id_str);
        active = 1;
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_update_source_standby(SESSION_T *session,
                         const CONVERSATION_ID_T *updater_id,
                         const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                         void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("Topic source \"%s\" on standby\n", id_str);
        free(id_str);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

static int
on_update_source_closed(SESSION_T *session,
                        const CONVERSATION_ID_T *updater_id,
                        const SVC_UPDATE_REGISTRATION_RESPONSE_T *response,
                        void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("Topic source \"%s\" closed\n", id_str);
        free(id_str);
        apr_thread_mutex_lock(mutex);
        apr_thread_cond_broadcast(cond);
        apr_thread_mutex_unlock(mutex);
        return HANDLER_SUCCESS;
}

/*
 * Handlers for update of data.
 */
static int
on_update_success(SESSION_T *session,
                  const CONVERSATION_ID_T *updater_id,
                  const SVC_UPDATE_RESPONSE_T *response,
                  void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("on_update_success for updater \"%s\"\n", id_str);
        free(id_str);
        return HANDLER_SUCCESS;
}

static int
on_update_failure(SESSION_T *session,
                  const CONVERSATION_ID_T *updater_id,
                  const SVC_UPDATE_RESPONSE_T *response,
                  void *context)
{
        char *id_str = conversation_id_to_string(*updater_id);
        printf("on_update_failure for updater \"%s\"\n", id_str);
        free(id_str);
        return HANDLER_SUCCESS;
}

/*
 * Program entry point.
 */
int
main(int argc, char** argv)
{
        /*
         * Standard command-line parsing.
         */
        const HASH_T *options = parse_cmdline(argc, argv, arg_opts);
        if(options == NULL || hash_get(options, "help") != NULL) {
                show_usage(argc, argv, arg_opts);
                return EXIT_FAILURE;
        }

        const char *url = hash_get(options, "url");
        const char *principal = hash_get(options, "principal");
        CREDENTIALS_T *credentials = NULL;
        const char *password = hash_get(options, "credentials");
        if(password != NULL) {
                credentials = credentials_create_password(password);
        }
        const char *topic_name = hash_get(options, "topic");
        const long seconds = atol(hash_get(options, "seconds"));

        /*
         * Setup for condition variable.
         */
        apr_initialize();
        apr_pool_create(&pool, NULL);
        apr_thread_mutex_create(&mutex, APR_THREAD_MUTEX_UNNESTED, pool);
        apr_thread_cond_create(&cond, pool);

        /*
         * Create a session with the Diffusion server.
         */
        SESSION_T *session;
        DIFFUSION_ERROR_T error = { 0 };
        session = session_create(url, principal, credentials, NULL, NULL, &error);
        if(session == NULL) {
                fprintf(stderr, "TEST: Failed to create session\n");
                fprintf(stderr, "ERR : %s\n", error.message);
                return EXIT_FAILURE;
        }

        /*
         * Create a topic holding simple string content.
         */
        TOPIC_DETAILS_T *string_topic_details = create_topic_details_single_value(M_DATA_TYPE_STRING);
        const ADD_TOPIC_PARAMS_T add_topic_params = {
                .topic_path = topic_name,
                .details = string_topic_details,
                .on_topic_added = on_topic_added,
                .on_topic_add_failed = on_topic_add_failed,
                .on_discard = on_topic_add_discard,
        };

        apr_thread_mutex_lock(mutex);
        add_topic(session, add_topic_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        topic_details_free(string_topic_details);

        /*
         * Define the handlers for add_update_source()
         */
        const UPDATE_SOURCE_REGISTRATION_PARAMS_T update_reg_params = {
                .topic_path = topic_name,
                .on_init = on_update_source_init,
                .on_registered = on_update_source_registered,
                .on_active = on_update_source_active,
                .on_standby = on_update_source_standby,
                .on_close = on_update_source_closed
        };

        /*
         * Register an updater.
         */
        apr_thread_mutex_lock(mutex);
        CONVERSATION_ID_T *updater_id = register_update_source(session, update_reg_params);
        apr_thread_cond_wait(cond, mutex);
        apr_thread_mutex_unlock(mutex);

        /*
         * Define default parameters for an update source.
         */
        UPDATE_SOURCE_PARAMS_T update_source_params_base = {
                .updater_id = updater_id,
                .topic_path = topic_name,
                .on_success = on_update_success,
                .on_failure = on_update_failure
        };

        time_t end_time = time(NULL) + seconds;

        while(time(NULL) &lt; end_time) {

                if(active) {
                        /*
                         * Create an update structure containing the current time.
                         */
                        BUF_T *buf = buf_create();
                        const time_t time_now = time(NULL);
                        buf_write_string(buf, ctime(&time_now));

                        CONTENT_T *content = content_create(CONTENT_ENCODING_NONE, buf);

                        UPDATE_T *upd = update_create(UPDATE_ACTION_REFRESH,
                                                      UPDATE_TYPE_CONTENT,
                                                      content);

                        UPDATE_SOURCE_PARAMS_T update_source_params = update_source_params_base;
                        update_source_params.update = upd;

                        /*
                         * Update the topic.
                         */
                        update(session, update_source_params);

                        content_free(content);
                        update_free(upd);
                        buf_free(buf);
                }

                sleep(1);
        }

        if(active) {
                UPDATE_SOURCE_DEREGISTRATION_PARAMS_T update_dereg_params = {
                        .updater_id = updater_id,
                        .on_deregistered = on_update_source_deregistered
                };

                apr_thread_mutex_lock(mutex);
                deregister_update_source(session, update_dereg_params);
                apr_thread_cond_wait(cond, mutex);
                apr_thread_mutex_unlock(mutex);
        }

        /*
         * Close session and free resources.
         */
        session_close(session, NULL);
        session_free(session);

        conversation_id_free(updater_id);
        credentials_free(credentials);

        apr_thread_mutex_destroy(mutex);
        apr_thread_cond_destroy(cond);
        apr_pool_destroy(pool);
        apr_terminate();

        return EXIT_SUCCESS;

}

Change the URL from that provided in the example to the URL of Diffusion™ Cloud. Diffusion Cloud service URLs end in diffusion.cloud