The following example demonstrates how to create and update recordV2 topics, including the use of a schema.

This example demonstrates a Java™ control client updating recordV2 topics containing currency exchange rate information.

Each topic contains a record with two decimal fields, representing the buy and sell rates between a pair of currencies.

The example can be run either with or without a schema.

package com.pushtechnology.diffusion.examples;

import static com.pushtechnology.diffusion.client.topics.details.TopicSpecification.REMOVAL;
import static com.pushtechnology.diffusion.client.topics.details.TopicSpecification.SCHEMA;
import static com.pushtechnology.diffusion.client.topics.details.TopicType.RECORD_V2;
import static com.pushtechnology.diffusion.client.topics.details.TopicType.STRING;
import static java.util.concurrent.TimeUnit.SECONDS;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeoutException;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.Registration;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.datatype.recordv2.RecordV2;
import com.pushtechnology.diffusion.datatype.recordv2.RecordV2DataType;
import com.pushtechnology.diffusion.datatype.recordv2.model.MutableRecordModel;
import com.pushtechnology.diffusion.datatype.recordv2.schema.Schema;

 * An example of using a control client to create and update a RecordV2 topic in
 * exclusive mode.
 * <P>
 * This uses the 'TopicControl' feature to create a topic and the
 * 'TopicUpdate' feature to send updates to it.
 * <P>
 * To send updates to a topic, the client session requires the 'update_topic'
 * permission for that branch of the topic tree.
 * <P>
 * The example can be used with or without the use of a schema. This is simply
 * to demonstrate the different mechanisms and is not necessarily demonstrating
 * the most efficient way to update such a topic.
 * @author Push Technology Limited
 * @since 6.0
 * @see ClientConsumingRecordV2Topics
public final class ControlClientUpdatingRecordV2Topics {

    private static final String ROOT_TOPIC = "FX";

    private final Session session;
    private final TopicControl topicControl;
    private final TopicSpecification topicSpecification;
    private volatile Registration updateSourceRegistration;
    private final Schema schema;
    private final RecordV2DataType dataType;

     * Constructor.
     * @param serverUrl for example "ws://diffusion.example.com:80"
    public ControlClientUpdatingRecordV2Topics(
        String serverUrl,
        boolean withSchema)
            throws InterruptedException, ExecutionException, TimeoutException {

        session =

        topicControl = session.feature(TopicControl.class);

        // Create the root topic that will remove itself when the session closes
        final TopicSpecification specification =
                    "when this session closes remove '?" + ROOT_TOPIC + "//'");

        topicControl.addTopic(ROOT_TOPIC, specification).get(5, SECONDS);

        dataType = Diffusion.dataTypes().recordV2();

        if (withSchema) {
            schema = dataType.schemaBuilder()
                .record("Rates").decimal("Bid", 5).decimal("Ask", 5).build();
            // Create the topic specification to be used for all rates topics
            topicSpecification =
        else {
            schema = null;
            // Create the topic specification to be used for all rates topics
            topicSpecification =

     * Adds a new conversion rate in terms of base currency and target currency.
     * The bid and ask rates are entered as strings which may be a decimal value
     * which will be parsed and validated, rounding to 5 decimal places.
     * @param currency the base currency (e.g. GBP)
     * @param targetCurrency the target currency (e.g. USD)
    public void addRateTopic(
        String currency,
        String targetCurrency)
        throws InterruptedException, ExecutionException, TimeoutException {

            rateTopicName(currency, targetCurrency),
            topicSpecification).get(5, SECONDS);

     * Set a rate.
     * <P>
     * The rate topic in question must have been added first using
     * {@link #addRateTopic} otherwise this will fail.
     * @param currency the base currency
     * @param targetCurrency the target currency
     * @param bid the new bid rate
     * @param ask the new ask rate
     * @return a CompletableFuture that completes when a response is received
     *         from the server
    public CompletableFuture<?> setRate(
        String currency,
        String targetCurrency,
        String bid,
        String ask) {

        final RecordV2 value;
        if (schema == null) {
            value = dataType.valueBuilder().addFields(bid, ask).build();
        else {
            // Mutable models could be kept and reused but for this simple
            // example one is created every time
            final MutableRecordModel model =
            model.set("Bid", bid);
            model.set("Ask", ask);
            value = model.asValue();

        return session.feature(TopicUpdate.class).set(
            rateTopicName(currency, targetCurrency),

     * Remove a rate (removes its topic).
     * @param currency the base currency
     * @param targetCurrency the target currency
    public void removeRate(
        String currency,
        String targetCurrency)
        throws InterruptedException, ExecutionException, TimeoutException {

            rateTopicName(currency, targetCurrency))
            .get(5, SECONDS);

     * Removes a currency (removes its topic and all subordinate rate topics).
     * @param currency the base currency
    public void removeCurrency(String currency)
        throws InterruptedException, ExecutionException, TimeoutException {
            .removeTopics(String.format("?%s/%s//", ROOT_TOPIC, currency))
            .get(5, SECONDS);

     * Close the session.
    public void close() throws InterruptedException {
        // Close the registered update source
        final Registration registration = this.updateSourceRegistration;
        if (registration != null) {

     * Generates a hierarchical topic name for a rate topic.
     * <P>
     * e.g. for currency=GBP and targetCurrency=USD would return "FX/GBP/USD".
     * @param currency the base currency
     * @param targetCurrency the target currency
     * @return the topic name
    private static String rateTopicName(String currency,
        String targetCurrency) {
        return String.format("%s/%s/%s", ROOT_TOPIC, currency, targetCurrency);
