Example: Subscribe to a time series
The following example uses Diffusion™ API to subscribe to a time series topic.
This example demonstrates subscribing to a time series topic at foo/timeseries.
JavaScript
const stream = session.addStream('topic/timeseries', diffusion.datatypes.timeseries(diffusion.datatypes.int64())); session.select('topic/timeseries'); stream.on('value', (topic, spec, event) => { console.log(`received update ${event.value}: sequence=${event.sequence} timestamp=${event.timestamp}`); });
.NET
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; using System; using PushTechnology.ClientInterface.Client.Features.TimeSeries; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation which subscribes to a string time series topic and /// consumes the data it receives. /// </summary> public sealed class TimeSeriesTopics { public async Task TimeSeriesTopicsExample(string serverUrl) { string TOPIC_PREFIX = "time-series"; var session = Diffusion.Sessions.Principal("client").Password("password").Open(serverUrl); var typeName = Diffusion.DataTypes.Get<String>().TypeName; var topic = $"?{TOPIC_PREFIX}/{typeName}//"; // Add a value stream var stringStream = new StringStream(); session.Topics.AddTimeSeriesStream<string>(topic, stringStream); // Subscribe to the topic. try { await session.Topics.SubscribeAsync(topic); } catch (Exception ex) { WriteLine($"Failed to subscribe to topic '{topic}' : {ex}."); } finally { // Note that closing the session, will automatically unsubscribe from all topics // the client is subscribed to. session.Topics.RemoveStream(stringStream); session.Close(); } } /// <summary> /// Basic implementation of the IValueStream for time series string topics. /// </summary> private sealed class StringStream : IValueStream<IEvent<string>> { /// <summary> /// Notification of the stream being closed normally. /// </summary> public void OnClose() => WriteLine( "The subscription stream is now closed." ); /// <summary> /// Notification of a contextual error related to this callback. /// </summary> /// <param name="errorReason">Error reason.</param> public void OnError( ErrorReason errorReason ) => WriteLine( $"An error has occured : {errorReason}." ); /// <summary> /// Notification of a successful subscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> public void OnSubscription( string topicPath, ITopicSpecification specification ) => WriteLine( $"Client subscribed to {topicPath}." ); /// <summary> /// Notification of a successful unsubscription. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="reason">Error reason.</param> public void OnUnsubscription( string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason ) => WriteLine( $"Client unsubscribed from {topicPath} : {reason}." ); /// <summary> /// Topic update received. /// </summary> /// <param name="topicPath">Topic path.</param> /// <param name="specification">Topic specification.</param> /// <param name="oldValue">Value prior to update.</param> /// <param name="newValue">Value after update.</param> public void OnValue( string topicPath, ITopicSpecification specification, IEvent<string> oldValue, IEvent<string> newValue ) => WriteLine( $"New value of {topicPath} is {newValue}." ); } } }
Java and Android
/******************************************************************************* * Copyright (C) 2017, 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.TimeSeries; import com.pushtechnology.diffusion.client.features.TimeSeries.Event; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.ValueStream; import com.pushtechnology.diffusion.client.session.Session; /** * This demonstrates a client session subscribing to a * {@link TimeSeries} topic. * * @author DiffusionData Limited */ public class ClientConsumingTimeSeriesTopics { private static final String TOPIC_PATH = "foo/timeseries"; private final Session session; /** * Constructor. * * @param serverUrl for example "ws://diffusion.example.com:80" * @param valueStream value stream to receive time series topic events */ public ClientConsumingTimeSeriesTopics(String serverUrl, ValueStream<Event<Long>> valueStream) throws InterruptedException, ExecutionException, TimeoutException { session = Diffusion.sessions().principal("client").password("password") .open(serverUrl); final Topics topics = session.feature(Topics.class); topics.addTimeSeriesStream(TOPIC_PATH, Long.class, valueStream); topics.subscribe(TOPIC_PATH).get(5, TimeUnit.SECONDS); } /** * Close the session. */ public void close() { session.close(); } }
C
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" static int on_subscription( const char* topic_path, const TOPIC_SPECIFICATION_T *specification, void *context) { // subscribed to `topic_path` return HANDLER_SUCCESS; } static int on_unsubscription( const char* topic_path, const TOPIC_SPECIFICATION_T *specification, NOTIFY_UNSUBSCRIPTION_REASON_T reason, void *context) { // unsubscribed from `topic_path` return HANDLER_SUCCESS; } static int on_value( const char* topic_path, const TOPIC_SPECIFICATION_T * specification, const DIFFUSION_DATATYPE datatype, const DIFFUSION_VALUE_T * old_value, const DIFFUSION_VALUE_T * new_value, void *context) { DIFFUSION_TIME_SERIES_EVENT_T *new_event; read_diffusion_time_series_event(new_value, &new_event, NULL); DIFFUSION_VALUE_T *new_event_value = diffusion_time_series_event_get_value(new_event); DIFFUSION_API_ERROR api_error; char *result; bool success = to_diffusion_json_string(new_event_value, &result, &api_error); if(success) { printf("Received value: %s\n", result); free(result); } else { const char *description = get_diffusion_api_error_description(api_error); printf("Error during diffusion value read: %s\n", description); diffusion_api_error_free(api_error); } diffusion_value_free(new_event_value); diffusion_time_series_event_free(new_event); return HANDLER_SUCCESS; } static void on_close() { // value stream is now closed } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; const char *topic_path = "my/topic/path"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // Create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // Create a value stream VALUE_STREAM_T value_stream = { .datatype = DATATYPE_JSON, .on_subscription = on_subscription, .on_unsubscription = on_unsubscription, .on_value = on_value, .on_close = on_close }; add_time_series_stream(session, topic_path, &value_stream); // Subscribe to topics matching the selector SUBSCRIPTION_PARAMS_T params = { .topic_selector = topic_path }; subscribe(session, params); // Sleep for a while sleep(5); // Unsubscribe from topics matching the selector UNSUBSCRIPTION_PARAMS_T unsub_params = { .topic_selector = topic_path }; unsubscribe(session, unsub_params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
Python
# Copyright (c) 2022 - 2023 DiffusionData Ltd., All Rights Reserved. # # Use is subject to licence terms. # # NOTICE: All information contained herein is, and remains the # property of DiffusionData. The intellectual and technical # concepts contained herein are proprietary to DiffusionData and # may be covered by U.S. and Foreign Patents, patents in process, and # are protected by trade secret or copyright law. import asyncio import typing import diffusion.datatypes from diffusion.datatypes.timeseries import Event from diffusion.features.timeseries import TimeSeries from diffusion.features.topics import ValueStreamHandler from diffusion.features.topics.details.topic_specification import TopicSpecification from diffusion.internal.services.topics import UnsubscribeReason from diffusion.internal.session.exception_handler import ErrorReason # Diffusion server connection information; # adjust as needed for the server used in practice. server_url = "ws://localhost:8080" principal = "admin" credentials = diffusion.Credentials("password") TOPIC_PREFIX = "time-series" async def main(): """ Client implementation which subscribes to a string time series topic and consumes the data it receives. """ # Creating the session. async with diffusion.Session( url=server_url, principal=principal, credentials=credentials ) as session: # Create a string topic topic_type = diffusion.datatypes.STRING topic = f"?{TOPIC_PREFIX}/{topic_type.type_name}//" # Add a value stream time_series_string_stream = TimeSeriesStringStream() session.topics.add_value_stream(topic, time_series_string_stream) # Subscribe to the topic. try: await session.topics.subscribe(topic) await asyncio.sleep(0.3) except Exception as ex: print(f"Failed to subscribe to topic '{topic}' : {ex}.") # noinspection PyUnusedLocal class TimeSeriesStringStream(ValueStreamHandler): """ Basic implementation of the ValueStreamHandler for time series string topics. """ def __init__(self): super().__init__( TimeSeries.of(diffusion.datatypes.STRING), update=self.update, subscribe=self.subscribe, unsubscribe=self.unsubscribe, error=self.error, close=self.close, ) async def close(self): """ Notification of the stream being closed normally. """ print("The subscrption stream is now closed.") async def error(self, error_reason: ErrorReason): """ Notification of a contextual error related to this callback. Situations in which <code>OnError</code> is called include the session being closed, a communication timeout, or a problem with the provided parameters. No further calls will be made to this callback. Args: error_reason: Error reason. """ print(f"An error has occured : {error_reason}.") async def subscribe( self, *, topic_path: str, topic_spec: TopicSpecification, topic_value: typing.Optional[Event[diffusion.datatypes.STRING]] = None, ): """ Notification of a successful subscription. Args: topic_path: Topic path. topic_spec: Topic specification. topic_value: Topic value. """ print(f"Client subscribed to {topic_path}.") async def unsubscribe( self, *, topic_path: str, topic_spec: TopicSpecification, reason: typing.Optional[UnsubscribeReason] = None, ): """ Args: topic_path: Topic path. topic_spec: Topic specification. reason: error reason. """ print(f"Client unsubscribed from {topic_path} : {reason}.") async def update( self, *, topic_path: str, topic_spec: TopicSpecification, old_value: Event[diffusion.datatypes.STRING], topic_value: Event[diffusion.datatypes.STRING], ): """ Topic update received. Args: topic_path: Topic path. topic_spec: Topic specification. old_value: Value prior to update. topic_value: Value after update. """ print(f"New value of {topic_path} is {topic_value}.")
Apple
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class SubscribeTimeSeries { class JSONTimeSeriesValueStreamHandler: PTDiffusionJSONTimeSeriesEventValueStreamDelegate { func diffusionStream(_ stream: PTDiffusionValueStream, didUpdateTimeSeriesTopicPath topicPath: String, specification: PTDiffusionTopicSpecification, oldJSONEvent oldJsonEvent: PTDiffusionJSONTimeSeriesEvent?, newJSONEvent newJsonEvent: PTDiffusionJSONTimeSeriesEvent) { print("Topic '%@' has been updated: %@ -> %@", topicPath, oldJsonEvent ?? "NULL", newJsonEvent) } func diffusionStream(_ stream: PTDiffusionStream, didSubscribeToTopicPath topicPath: String, specification: PTDiffusionTopicSpecification) { print("You have been subscribed to '%@'", topicPath) } func diffusionStream(_ stream: PTDiffusionStream, didUnsubscribeFromTopicPath topicPath: String, specification: PTDiffusionTopicSpecification, reason: PTDiffusionTopicUnsubscriptionReason) { print("You have been unsubscribed from '%@'", topicPath) } func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) { print("Value stream failed with error: %@", error.localizedDescription) } func diffusionDidClose(_ stream: PTDiffusionStream) { print("Value stream is now closed") } } func run_example(url: URL, topic_path: String) { let credentials = PTDiffusionCredentials(password: "password") let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials) // establish the session PTDiffusionSession.open(with: url, configuration: configuration) { (session, error) in if (error != nil) { print("An error has occurred while establishing a session: %@", error!.localizedDescription) } else { // create a handler to receive time series topic updates let handler = JSONTimeSeriesValueStreamHandler() let value_stream = PTDiffusionJSON.timeSeriesEventValueStream(with: handler); // link handler to `topic_path` try! session!.topics.add(value_stream, withSelectorExpression: topic_path, error: ()) // subscribe to `topic_path` session!.topics.subscribe(withTopicSelectorExpression: topic_path) { error in if (error != nil) { print("An error has occurred while attempting to subscribe to '%@: %@", topic_path, error!.localizedDescription) } else { print("Successfully subscribed to '%@'", topic_path) } } } } } }