Example: Publish a time series
The following example uses the Diffusion™ API to create and update a time series topic.
This example creates a time series topic at foo/timeseries. It demonstrates how to append and edit values.
JavaScript
const TopicSpecification = diffusion.topics.TopicSpecification; const TopicType = diffusion.topics.TopicType; // 1. Create a time series topic specification with events // of type double const specification = new TopicSpecification(TopicType.TIME_SERIES, { TIME_SERIES_EVENT_VALUE_TYPE : 'double' }); // 2. Create a time series topic await session.topics.add('topic/timeseries', specification); // 3. Append a value to the time series topic session.timeseries.append('topic/timeseries', 1.23, diffusion.datatypes.double()); // 4. Edit the last value in a time series topic. try { // perform a range query to obtain the last value only const queryResult = await session.timeseries.rangeQuery() .fromLast(1) .as(diffusion.datatypes.double()) .selectFrom('topic/timeseries'); // edit the event await Promise.all(queryResult.events.map((event) => session.timeseries.edit('topic/timeseries', event.sequence, 4.56, diffusion.datatypes.double()) )); } catch (err) { console.log('Error obtaining the range query.', err); }
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that adds a time series topic and updates its value. /// </summary> public sealed class TimeSeriesPublish { public async Task TimeSeriesPublishExample(string serverUrl) { string TOPIC_PREFIX = "time-series"; var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open( serverUrl ); // Create a time series topic with string values. var typeName = Diffusion.DataTypes.Get<string>().TypeName; var topic = $"{TOPIC_PREFIX}/{typeName}/{DateTime.Now.ToFileTimeUtc()}"; var specification = session.TopicControl.NewSpecification(TopicType.TIME_SERIES) .WithProperty(TopicSpecificationProperty.TimeSeriesEventValueType, typeName); // Add a time series topic. try { await session.TopicControl.AddTopicAsync(topic, specification); } catch (Exception ex) { WriteLine($"Failed to add topic '{topic}' : {ex}."); session.Close(); return; } // Append a value to the time series topic using a custom timestamp. try { await session.TimeSeries.AppendAsync<string>(topic, "Value1", DateTimeOffset.FromUnixTimeMilliseconds(322)); } catch (Exception ex) { WriteLine($"Topic {topic} value could not be appended : {ex}."); session.Close(); return; } // Append a value to the time series topic. The timestamp will be set to the current server time. try { await session.TimeSeries.AppendAsync<string>(topic, "Value 1"); await session.TimeSeries.AppendAsync<string>(topic, "Value 2"); await session.TimeSeries.AppendAsync<string>(topic, "Value 3"); await session.TimeSeries.AppendAsync<string>(topic, "Value 4"); } catch (Exception ex) { WriteLine($"Topic {topic} value could not be appended : {ex}."); session.Close(); return; } // Edit a value of the time series topic. try { // Edits the time series topic with sequence number 1 and value 'Value1'. await session.TimeSeries.EditAsync<string>(topic, 1, "Value 1a"); } catch (Exception ex) { WriteLine($"Topic {topic} value could not be edited : {ex}."); session.Close(); return; } // Update the time series topic using the standard topic update method. try { var newValue = "Last Value"; await session.TopicUpdate.SetAsync(topic, newValue); } catch (Exception ex) { WriteLine($"Topic {topic} could not be updated : {ex}."); session.Close(); return; } // Remove the topic. try { await session.TopicControl.RemoveTopicsAsync( topic ); } catch(Exception ex) { WriteLine( $"Failed to remove topic '{topic}' : {ex}." ); } // Close the session. session.Close(); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2017, 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import static com.pushtechnology.diffusion.client.Diffusion.newTopicSpecification; import static com.pushtechnology.diffusion.datatype.DataTypes.INT64_DATATYPE_NAME; import java.time.Instant; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.TimeSeries; import com.pushtechnology.diffusion.client.features.TimeSeries.EventMetadata; import com.pushtechnology.diffusion.client.features.TopicUpdate; import com.pushtechnology.diffusion.client.features.control.topics.TopicControl; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.topics.details.TopicType; /** * This example shows a control client creating a {@link TimeSeries} topic. * Values can be appended to the topic using {@link #appendValue(long)}, and * the last value of the topic can be edited using {@link #editLast(long)}. * Alternatively, the methods provided by the {@link TopicUpdate} feature can be * used. * * @author DiffusionData Limited */ public class ControlClientUpdatingTimeSeriesTopics { private static final String TOPIC_PATH = "foo/timeseries"; private static final Logger LOG = LoggerFactory.getLogger(ControlClientUpdatingTimeSeriesTopics.class); private final Session session; private final TimeSeries timeSeries; private final TopicControl topicControl; /** * Constructor. * * @param serverUrl server URL to connect to example "ws://diffusion.example.com:80" */ public ControlClientUpdatingTimeSeriesTopics(String serverUrl) throws InterruptedException, ExecutionException, TimeoutException { session = Diffusion.sessions().principal("control").password("password") .open(serverUrl); timeSeries = session.feature(TimeSeries.class); topicControl = session.feature(TopicControl.class); final TopicSpecification spec = newTopicSpecification(TopicType.TIME_SERIES) .withProperty(TopicSpecification.TIME_SERIES_EVENT_VALUE_TYPE, INT64_DATATYPE_NAME); topicControl.addTopic(TOPIC_PATH, spec) .thenAccept(result -> LOG.info("Add topic result: {}", result)).get(5, TimeUnit.SECONDS); } /** * Appends a value to the time series topic. * * @param value value to append * @return the event metadata from the successful append */ public EventMetadata appendValue(long value) throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException { return timeSeries.append(TOPIC_PATH, Long.class, value).get(5, TimeUnit.SECONDS); } /** * Close the session and remove the time series topic. */ public void close() throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException { topicControl.removeTopics("?foo//").get(5, TimeUnit.SECONDS); session.close(); } /** * Edit the last value in a time series topic. * * @param value value to edit with */ public void editLast(long value) { //Obtain the last value in the time series topic timeSeries.rangeQuery().fromLast(1).as(Long.class).selectFrom(TOPIC_PATH) .whenComplete((query, ex) -> { if (ex != null) { LOG.error("Error obtaining the range query: {}", ex); return; } //Perform the value edit query.stream().forEach(event -> { timeSeries.edit(TOPIC_PATH, event.sequence(), Long.class, value) .whenComplete((metadata, e) -> { if (e != null) { LOG.error("Error editing topic: {}", e); return; } LOG.info("EventMetadata from edit: {}", metadata); }); }); }); } /** * Appends a value to the time series topic. Allows for creation of events * with a custom timestamp. This can be used for loading historic or future * values. * * @param value value to append * @param timestamp the user supplied timestamp * @return the event metadata from the successful append */ public EventMetadata appendValue(long value, Instant timestamp) throws IllegalArgumentException, InterruptedException, ExecutionException, TimeoutException { return timeSeries.append(TOPIC_PATH, Long.class, value, timestamp).get(5, TimeUnit.SECONDS); } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_topic_added_with_specification( SESSION_T *session, TOPIC_ADD_RESULT_CODE result_code, void *context) { // topic has been added return HANDLER_SUCCESS; } static int on_append( const DIFFUSION_TIME_SERIES_EVENT_METADATA_T *event_metadata, void *context) { // value has been appended to the topic return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // create a time series topic ADD_TOPIC_CALLBACK_T add_topic_callback = { .on_topic_added_with_specification = on_topic_added_with_specification, }; HASH_T *properties = hash_new(1); hash_add(properties, strdup(DIFFUSION_TIME_SERIES_EVENT_VALUE_TYPE), strdup("json")); TOPIC_SPECIFICATION_T *specification = topic_specification_init_with_properties(TOPIC_TYPE_TIME_SERIES, properties); hash_free(properties, free, free); add_topic_from_specification(session, topic_path, specification, add_topic_callback); topic_specification_free(specification); // append value to time series topic BUF_T *value_buf = buf_create(); write_diffusion_json_value("{\"hello\": \"world\"}", value_buf); DIFFUSION_TIME_SERIES_APPEND_PARAMS_T params = { .on_append = on_append, .topic_path = topic_path, .datatype = DATATYPE_JSON, .value = value_buf }; diffusion_time_series_append(session, params, NULL); buf_free(value_buf); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Python
# Copyright (c) 2022 - 2023 DiffusionData Ltd., All Rights Reserved. # # Use is subject to licence terms. # # NOTICE: All information contained herein is, and remains the # property of DiffusionData. The intellectual and technical # concepts contained herein are proprietary to DiffusionData and # may be covered by U.S. and Foreign Patents, patents in process, and # are protected by trade secret or copyright law. # Client implementation that adds a time series topic and updates its value. import datetime import asyncio import diffusion import diffusion.datatypes # Diffusion server connection information; # adjust as needed for the server used in practice. from diffusion.features.timeseries import TimeSeries server_url = "ws://localhost:8080" principal = "admin" credentials = diffusion.Credentials("password") TOPIC_PREFIX = "time-series" async def main(): # Creating the session. async with diffusion.Session( url=server_url, principal=principal, credentials=credentials ) as session: # Create a time series topic with string values. topic_type = diffusion.datatypes.STRING topic = f"{TOPIC_PREFIX}/{topic_type.type_name}/{datetime.datetime.utcnow()}" specification = TimeSeries.of(topic_type).with_properties() # Add a time series topic. try: await session.topics.add_topic(topic, specification) except Exception as ex: print(f"Failed to add topic '{topic}' : {ex}.") await session.close() return # Append a value to the time series topic using a custom timestamp. try: user_supplied_timestamp = datetime.datetime.utcnow() + datetime.timedelta( milliseconds=322 ) await session.time_series.append( topic, "Value1", value_type=diffusion.datatypes.STRING, timestamp=user_supplied_timestamp, ) except Exception as ex: print(f"Topic {topic} value could not be appended : {ex}.") return # Append a value to the time series topic. # The timestamp will be set to the current server time. try: await session.time_series.append( topic, "Value 1", diffusion.datatypes.STRING ) await session.time_series.append( topic, "Value 2", diffusion.datatypes.STRING ) await session.time_series.append( topic, "Value 3", diffusion.datatypes.STRING ) await session.time_series.append( topic, "Value 4", diffusion.datatypes.STRING ) except Exception as ex: print(f"Topic {topic} value could not be appended : {ex}.") await session.close() return # Edit a value of the time series topic. try: # Edits the time series topic with sequence number 1 and value 'Value1'. await session.time_series.edit(topic, 1, "Value 1a", diffusion.datatypes.STRING) except Exception as ex: print(f"Topic {topic} value could not be edited : {ex}.") return # Update the time series topic using the standard topic update method. try: new_value = "Last Value" await session.topics.set_topic(topic, new_value, diffusion.datatypes.STRING) except Exception as ex: print(f"Topic {topic} could not be updated : {ex}.") await session.close() return # Remove the topic. try: await session.topics.remove_topic(topic) except Exception as ex: print(f"Failed to remove topic '{topic}' : {ex}.") if __name__ == "__main__": asyncio.run(main())
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class PublishTimeSeries { func run_example(url: URL, topic_path: String) { let credentials = PTDiffusionCredentials(password: "password") let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials) // establish the session PTDiffusionSession.open(with: url, configuration: configuration) { (session, error) in if (error != nil) { print("An error has occurred while establishing a session: %@", error!.localizedDescription) } else { // create a time series JSON topic session!.topicControl.addTopic(withPath: topic_path, type: PTDiffusionTopicType.JSON) { (result, error) in if (error != nil) { print("An error has occurred while adding topic '%@': %@", topic_path, error!.localizedDescription) } else if (result == PTDiffusionAddTopicResult.exists()) { print("Topic '%@' already existed", topic_path) } else { print("Topic '%@' was created", topic_path) } // append a new value to the time series JSON topic let json_value = try! PTDiffusionJSON(jsonString: "{\"hello\"}: {\"world\"}") session!.timeSeries.append(toTopicPath: topic_path, jsonValue: json_value) { (metadata, error) in if (error != nil) { print("An error has occurred while appending to the time series JSON topic: %@", error!.localizedDescription); } else { print("JSON value has been successfully appended to the time series topic") } } } } } } }