Update recordV2 topics
The following example demonstrates how to create and update recordV2 topics, including the use of a schema.
This example demonstrates a Java™ control client updating recordV2 topics.
Java and Android
/*******************************************************************************
* Copyright (C) 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
import com.pushtechnology.diffusion.datatype.recordv2.RecordV2;
import com.pushtechnology.diffusion.datatype.recordv2.schema.Schema;
import com.pushtechnology.diffusion.datatype.recordv2.schema.SchemaBuilder;
/**
* Client creating and updating RecordV2 topics.
*
* @author DiffusionData Limited
*/
public final class RecordV2Updating {
public static void main(String[] args) {
final Session session = Diffusion.sessions()
.principal("admin")
.password("password")
.open("ws://localhost:8080");
final TopicControl topicControl = session.feature(TopicControl.class);
final SchemaBuilder builder = Diffusion.dataTypes()
.recordV2().schemaBuilder();
// define a schema consisting of a single record with three fields each
// of a different data type.
final Schema schema = builder
.record("Record")
.string("myString").integer("myInt").decimal("myDouble", 3)
.build();
// create a topic specification with our schema as a property
final TopicSpecification specification = Diffusion
.newTopicSpecification(TopicType.RECORD_V2)
.withProperty(TopicSpecification.SCHEMA, schema.asJSONString());
// add the topic
topicControl.addTopic("my/topic/path", specification);
// add a value stream and subscribe to receive updates
session.feature(Topics.class).addStream(
"?my/topic/path",
RecordV2.class,
new MyValueStream());
session.feature(Topics.class).subscribe("?my/topic/path");
// create a value for our topic
final RecordV2 value = schema.createMutableModel()
.set("myString", "Hello World")
.set("myInt", "5")
.set("myDouble", "3.141")
.asValue();
//update the topic with our value
session.feature(TopicUpdate.class).set(
"my/topic/path",
RecordV2.class,
value
).join();
}
private static final class MyValueStream implements Topics.ValueStream<RecordV2> {
@Override
public void onValue(String topicPath,
TopicSpecification specification, RecordV2 oldValue,
RecordV2 newValue) {
System.out.printf("new value: %s\n", newValue.asFields());
}
@Override
public void onSubscription(String topicPath,
TopicSpecification specification) { }
@Override
public void onUnsubscription(String topicPath,
TopicSpecification specification, Topics.UnsubscribeReason reason) { }
@Override
public void onClose() { }
@Override
public void onError(ErrorReason errorReason) { }
}
}