Just a second...

Update recordV2 topics

The following example demonstrates how to create and update recordV2 topics, including the use of a schema.

This example demonstrates a Java™ control client updating recordV2 topics.

Java and Android
/*******************************************************************************
 * Copyright (C) 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *******************************************************************************/
package com.pushtechnology.diffusion.manual;

import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.recordv2.RecordV2;
import com.pushtechnology.diffusion.datatype.recordv2.schema.Schema;
import com.pushtechnology.diffusion.datatype.recordv2.schema.SchemaBuilder;

/**
 * Client creating and updating RecordV2 topics
 *
 * @author DiffusionData Limited
 */
public final class RecordV2Updating {

    public static void main(String[] args) {

        final Session session = Diffusion.sessions()
            .principal("admin")
            .password("password")
            .open("ws://localhost:8080");

        final TopicControl topicControl = session.feature(TopicControl.class);
        final SchemaBuilder builder = Diffusion.dataTypes()
            .recordV2().schemaBuilder();

        // define a schema consisting of a single record with three fields each
        // of a different data type.
        final Schema schema = builder
            .record("Record")
            .string("myString").integer("myInt").decimal("myDouble", 3)
            .build();

        // create a topic specification with our schema as a property
        final TopicSpecification specification = Diffusion
            .newTopicSpecification(TopicType.RECORD_V2)
            .withProperty(TopicSpecification.SCHEMA, schema.asJSONString());

        // add the topic
        topicControl.addTopic("my/topic/path", specification);

        // add a value stream and subscribe to receive updates
        session.feature(Topics.class).addStream(
            "?my/topic/path",
            RecordV2.class,
            new MyValueStream());
        session.feature(Topics.class).subscribe("?my/topic/path");

        // create a value for our topic
        final RecordV2 value = schema.createMutableModel()
            .set("myString", "Hello World")
            .set("myInt", "5")
            .set("myDouble", "3.141")
            .asValue();

        //update the topic with our value
        session.feature(TopicUpdate.class).set(
            "my/topic/path",
            RecordV2.class,
            value
        ).join();
    }

    private static class MyValueStream implements Topics.ValueStream<RecordV2> {
        @Override
        public void onValue(String topicPath,
            TopicSpecification specification, RecordV2 oldValue,
            RecordV2 newValue) {
            System.out.printf("new value: %s\n", newValue.asFields());
        }

        @Override
        public void onSubscription(String topicPath,
            TopicSpecification specification) { }

        @Override
        public void onUnsubscription(String topicPath,
            TopicSpecification specification, Topics.UnsubscribeReason reason) { }

        @Override
        public void onClose() { }

        @Override
        public void onError(ErrorReason errorReason) { }
    }
}