Example: Publish a time series
The following example uses the Diffusion™ API to create and update a time series topic.
This example creates a time series topic at foo/timeseries. It demonstrates how to append and edit values.
JavaScript
const TopicSpecification = diffusion.topics.TopicSpecification;
const TopicType = diffusion.topics.TopicType;
// 1. Create a time series topic specification with events
// of type double
const specification = new TopicSpecification(TopicType.TIME_SERIES, {
TIME_SERIES_EVENT_VALUE_TYPE : 'double'
});
// 2. Create a time series topic
await session.topics.add('topic/timeseries', specification);
// 3. Append a value to the time series topic
session.timeseries.append('topic/timeseries', 1.23, diffusion.datatypes.double());
// 4. Edit the last value in a time series topic.
try {
// perform a range query to obtain the last value only
const queryResult = await session.timeseries.rangeQuery()
.fromLast(1)
.as(diffusion.datatypes.double())
.selectFrom('topic/timeseries');
// edit the event
await Promise.all(queryResult.events.map((event) =>
session.timeseries.edit('topic/timeseries', event.sequence, 4.56, diffusion.datatypes.double())
));
} catch (err) {
console.log('Error obtaining the range query.', err);
}
.NET
/**
* Copyright © 2021 - 2025 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that adds a time series topic and updates its value.
/// </summary>
public sealed class TimeSeriesPublish {
public async Task TimeSeriesPublishExample(string serverUrl) {
string TOPIC_PREFIX = "time-series";
var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open( serverUrl );
// Create a time series topic with string values.
var typeName = Diffusion.DataTypes.Get<string>().TypeName;
var topic = $"{TOPIC_PREFIX}/{typeName}/{DateTime.Now.ToFileTimeUtc()}";
var specification = Diffusion.NewSpecification(TopicType.TIME_SERIES)
.WithProperty(TopicSpecificationProperty.TimeSeriesEventValueType, typeName);
// Add a time series topic.
try
{
await session.TopicControl.AddTopicAsync(topic, specification);
}
catch (Exception ex)
{
WriteLine($"Failed to add topic '{topic}' : {ex}.");
session.Close();
return;
}
// Append a value to the time series topic using a custom timestamp.
try
{
await session.TimeSeries.AppendAsync<string>(topic, "Value1",
DateTimeOffset.FromUnixTimeMilliseconds(322));
}
catch (Exception ex)
{
WriteLine($"Topic {topic} value could not be appended : {ex}.");
session.Close();
return;
}
// Append a value to the time series topic. The timestamp will be set to the current server time.
try
{
await session.TimeSeries.AppendAsync<string>(topic, "Value 1");
await session.TimeSeries.AppendAsync<string>(topic, "Value 2");
await session.TimeSeries.AppendAsync<string>(topic, "Value 3");
await session.TimeSeries.AppendAsync<string>(topic, "Value 4");
}
catch (Exception ex)
{
WriteLine($"Topic {topic} value could not be appended : {ex}.");
session.Close();
return;
}
// Edit a value of the time series topic.
try
{
// Edits the time series topic with sequence number 1 and value 'Value1'.
await session.TimeSeries.EditAsync<string>(topic, 1, "Value 1a");
}
catch (Exception ex)
{
WriteLine($"Topic {topic} value could not be edited : {ex}.");
session.Close();
return;
}
// Update the time series topic using the standard topic update method.
try
{
var newValue = "Last Value";
await session.TopicUpdate.SetAsync(topic, newValue);
}
catch (Exception ex)
{
WriteLine($"Topic {topic} could not be updated : {ex}.");
session.Close();
return;
}
// Remove the topic.
try
{
await session.TopicControl.RemoveTopicsAsync( topic );
}
catch(Exception ex)
{
WriteLine( $"Failed to remove topic '{topic}' : {ex}." );
}
// Close the session.
session.Close();
}
}
}
Java and Android
/*******************************************************************************
* Copyright (C) 2017, 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification;
import static com.pushtechnology.diffusion.datatype.DataTypes.INT64_DATATYPE_NAME;
import java.time.Instant;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.TimeSeries.EventMetadata;
import com.pushtechnology.diffusion.client.features.TopicUpdate;
import com.pushtechnology.diffusion.client.features.control.topics.TopicControl;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.topics.details.TopicType;
/**
* This example shows a control client creating a {@link TimeSeries} topic.
* Values can be appended to the topic using {@link #appendValue(long)}, and
* the last value of the topic can be edited using {@link #editLast(long)}.
* Alternatively, the methods provided by the {@link TopicUpdate} feature can be
* used.
*
* @author DiffusionData Limited
*/
public class ControlClientUpdatingTimeSeriesTopics {
private static final String TOPIC_PATH = "foo/timeseries";
private static final Logger LOG =
LoggerFactory.getLogger(ControlClientUpdatingTimeSeriesTopics.class);
private final Session session;
private final TimeSeries timeSeries;
private final TopicControl topicControl;
/**
* Constructor.
*
* @param serverUrl server URL to connect to example "ws://diffusion.example.com:80"
*/
public ControlClientUpdatingTimeSeriesTopics(String serverUrl)
throws InterruptedException, ExecutionException, TimeoutException {
session = Diffusion.sessions().principal("control").password("password")
.open(serverUrl);
timeSeries = session.feature(TimeSeries.class);
topicControl = session.feature(TopicControl.class);
final TopicSpecification spec = newTopicSpecification(TopicType.TIME_SERIES)
.withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, INT64_DATATYPE_NAME);
topicControl.addTopic(TOPIC_PATH, spec)
.thenAccept(result -> LOG.info("Add topic result: {}", result)).get(5, TimeUnit.SECONDS);
}
/**
* Appends a value to the time series topic.
*
* @param value value to append
* @return the event metadata from the successful append
*/
public EventMetadata appendValue(long value)
throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException {
return timeSeries.append(TOPIC_PATH, Long.class, value).get(5, TimeUnit.SECONDS);
}
/**
* Close the session and remove the time series topic.
*/
public void close()
throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException {
topicControl.removeTopics("?foo//").get(5, TimeUnit.SECONDS);
session.close();
}
/**
* Edit the last value in a time series topic.
*
* @param value value to edit with
*/
public void editLast(long value) {
//Obtain the last value in the time series topic
timeSeries.rangeQuery().fromLast(1).as(Long.class).selectFrom(TOPIC_PATH)
.whenComplete((query, ex) -> {
if (ex != null) {
LOG.error("Error obtaining the range query: {}", ex);
return;
}
//Perform the value edit
query.stream().forEach(event -> {
timeSeries.edit(TOPIC_PATH, event.sequence(), Long.class, value)
.whenComplete((metadata, e) -> {
if (e != null) {
LOG.error("Error editing topic: {}", e);
return;
}
LOG.info("EventMetadata from edit: {}", metadata);
});
});
});
}
/**
* Appends a value to the time series topic. Allows for creation of events
* with a custom timestamp. This can be used for loading historic or future
* values.
*
* @param value value to append
* @param timestamp the user supplied timestamp
* @return the event metadata from the successful append
*/
public EventMetadata appendValue(long value, Instant timestamp)
throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException {
return timeSeries.append(TOPIC_PATH, Long.class, value, timestamp).get(5, TimeUnit.SECONDS);
}
}
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
static int on_topic_added_with_specification(
SESSION_T *session,
TOPIC_ADD_RESULT_CODE result_code,
void *context)
{
// topic has been added
return HANDLER_SUCCESS;
}
static int on_append(
const DIFFUSION_TIME_SERIES_EVENT_METADATA_T *event_metadata,
void *context)
{
// value has been appended to the topic
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
const char *topic_path = "my/topic/path";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// Create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// create a time series topic
ADD_TOPIC_CALLBACK_T add_topic_callback = {
.on_topic_added_with_specification = on_topic_added_with_specification,
};
HASH_T *properties = hash_new(1);
hash_add(properties, strdup(DIFFUSION_TIME_SERIES_EVENT_VALUE_TYPE), strdup("json"));
TOPIC_SPECIFICATION_T *specification = topic_specification_init_with_properties(TOPIC_TYPE_TIME_SERIES, properties);
hash_free(properties, free, free);
add_topic_from_specification(session, topic_path, specification, add_topic_callback);
topic_specification_free(specification);
// append value to time series topic
BUF_T *value_buf = buf_create();
write_diffusion_json_value("{\"hello\": \"world\"}", value_buf);
DIFFUSION_TIME_SERIES_APPEND_PARAMS_T params =
{.on_append = on_append, .topic_path = topic_path, .datatype = DATATYPE_JSON, .value = value_buf};
diffusion_time_series_append(session, params, NULL);
buf_free(value_buf);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
return EXIT_SUCCESS;
}
Python
# Copyright (c) 2022 - 2023 DiffusionData Ltd., All Rights Reserved.
#
# Use is subject to licence terms.
#
# NOTICE: All information contained herein is, and remains the
# property of DiffusionData. The intellectual and technical
# concepts contained herein are proprietary to DiffusionData and
# may be covered by U.S. and Foreign Patents, patents in process, and
# are protected by trade secret or copyright law.
# Client implementation that adds a time series topic and updates its value.
import datetime
import asyncio
import diffusion
import diffusion.datatypes
# Diffusion server connection information;
# adjust as needed for the server used in practice.
from diffusion.features.timeseries import TimeSeries
server_url = "ws://localhost:8080"
principal = "admin"
credentials = diffusion.Credentials("password")
TOPIC_PREFIX = "time-series"
async def main():
# Creating the session.
async with diffusion.Session(
url=server_url, principal=principal, credentials=credentials
) as session:
# Create a time series topic with string values.
topic_type = diffusion.datatypes.STRING
topic = f"{TOPIC_PREFIX}/{topic_type.type_name}/{datetime.datetime.utcnow()}"
specification = TimeSeries.of(topic_type).with_properties()
# Add a time series topic.
try:
await session.topics.add_topic(topic, specification)
except Exception as ex:
print(f"Failed to add topic '{topic}' : {ex}.")
await session.close()
return
# Append a value to the time series topic using a custom timestamp.
try:
user_supplied_timestamp = datetime.datetime.now(
datetime.timezone.utc
) - datetime.timedelta(milliseconds=322)
await session.time_series.append(
topic,
"Value1",
value_type=diffusion.datatypes.STRING,
timestamp=user_supplied_timestamp,
)
except Exception as ex:
print(f"Topic {topic} value could not be appended : {ex}.")
return
# Append a value to the time series topic.
# The timestamp will be set to the current server time.
try:
await session.time_series.append(
topic, "Value 1", diffusion.datatypes.STRING
)
await session.time_series.append(
topic, "Value 2", diffusion.datatypes.STRING
)
await session.time_series.append(
topic, "Value 3", diffusion.datatypes.STRING
)
await session.time_series.append(
topic, "Value 4", diffusion.datatypes.STRING
)
except Exception as ex:
print(f"Topic {topic} value could not be appended : {ex}.")
await session.close()
return
# Edit a value of the time series topic.
try:
# Edits the time series topic with sequence number 1 and value 'Value1'.
await session.time_series.edit(topic, 1, "Value 1a", diffusion.datatypes.STRING)
except Exception as ex:
print(f"Topic {topic} value could not be edited : {ex}.")
return
# Update the time series topic using the standard topic update method.
try:
new_value = "Last Value"
await session.topics.set_topic(topic, new_value, diffusion.datatypes.STRING)
except Exception as ex:
print(f"Topic {topic} could not be updated : {ex}.")
await session.close()
return
# Remove the topic.
try:
await session.topics.remove_topic(topic)
except Exception as ex:
print(f"Failed to remove topic '{topic}' : {ex}.")
if __name__ == "__main__":
asyncio.run(main())
Apple
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
class PublishTimeSeries {
func run_example(url: URL, topic_path: String) {
let credentials = PTDiffusionCredentials(password: "password")
let configuration = PTDiffusionSessionConfiguration(principal: "control",
credentials: credentials)
// establish the session
PTDiffusionSession.open(with: url,
configuration: configuration) { (session, error) in
if (error != nil) {
print("An error has occurred while establishing a session: %@",
error!.localizedDescription)
}
else {
// create a time series JSON topic
session!.topicControl.addTopic(withPath: topic_path,
type: PTDiffusionTopicType.JSON) { (result, error) in
if (error != nil) {
print("An error has occurred while adding topic '%@': %@",
topic_path,
error!.localizedDescription)
}
else if (result == PTDiffusionAddTopicResult.exists()) {
print("Topic '%@' already existed", topic_path)
}
else {
print("Topic '%@' was created", topic_path)
}
// append a new value to the time series JSON topic
let json_value = try! PTDiffusionJSON(jsonString: "{\"hello\"}: {\"world\"}")
session!.timeSeries.append(toTopicPath: topic_path, jsonValue: json_value) { (metadata, error) in
if (error != nil) {
print("An error has occurred while appending to the time series JSON topic: %@",
error!.localizedDescription);
}
else {
print("JSON value has been successfully appended to the time series topic")
}
}
}
}
}
}
}