Example: Subscribe to a time series
The following example uses Diffusion™ API to subscribe to a time series topic.
This example demonstrates subscribing to a time series topic at foo/timeseries.
JavaScript
const stream = session.addStream('topic/timeseries', diffusion.datatypes.timeseries(diffusion.datatypes.int64()));
session.select('topic/timeseries');
stream.on('value', (topic, spec, event) => {
console.log(`received update ${event.value}: sequence=${event.sequence} timestamp=${event.timestamp}`);
});
.NET
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;
using System;
using PushTechnology.ClientInterface.Client.Features.TimeSeries;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation which subscribes to a string time series topic and
/// consumes the data it receives.
/// </summary>
public sealed class TimeSeriesTopics {
public async Task TimeSeriesTopicsExample(string serverUrl) {
string TOPIC_PREFIX = "time-series";
var session = Diffusion.Sessions.Principal("client").Password("password").Open(serverUrl);
var typeName = Diffusion.DataTypes.Get<String>().TypeName;
var topic = $"?{TOPIC_PREFIX}/{typeName}//";
// Add a value stream
var stringStream = new StringStream();
session.Topics.AddTimeSeriesStream<string>(topic, stringStream);
// Subscribe to the topic.
try
{
await session.Topics.SubscribeAsync(topic);
}
catch (Exception ex)
{
WriteLine($"Failed to subscribe to topic '{topic}' : {ex}.");
}
finally
{
// Note that closing the session, will automatically unsubscribe from all topics
// the client is subscribed to.
session.Topics.RemoveStream(stringStream);
session.Close();
}
}
/// <summary>
/// Basic implementation of the IValueStream for time series string topics.
/// </summary>
private sealed class StringStream : IValueStream<IEvent<string>> {
/// <summary>
/// Notification of the stream being closed normally.
/// </summary>
public void OnClose()
=> WriteLine( "The subscription stream is now closed." );
/// <summary>
/// Notification of a contextual error related to this callback.
/// </summary>
/// <param name="errorReason">Error reason.</param>
public void OnError( ErrorReason errorReason )
=> WriteLine( $"An error has occured : {errorReason}." );
/// <summary>
/// Notification of a successful subscription.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
public void OnSubscription( string topicPath, ITopicSpecification specification )
=> WriteLine( $"Client subscribed to {topicPath}." );
/// <summary>
/// Notification of a successful unsubscription.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
/// <param name="reason">Error reason.</param>
public void OnUnsubscription( string topicPath, ITopicSpecification specification,
TopicUnsubscribeReason reason )
=> WriteLine( $"Client unsubscribed from {topicPath} : {reason}." );
/// <summary>
/// Topic update received.
/// </summary>
/// <param name="topicPath">Topic path.</param>
/// <param name="specification">Topic specification.</param>
/// <param name="oldValue">Value prior to update.</param>
/// <param name="newValue">Value after update.</param>
public void OnValue( string topicPath, ITopicSpecification specification,
IEvent<string> oldValue, IEvent<string> newValue )
=> WriteLine( $"New value of {topicPath} is {newValue}." );
}
}
}
Java and Android
/*******************************************************************************
* Copyright (C) 2017, 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.TimeSeries;
import com.pushtechnology.diffusion.client.features.TimeSeries.Event;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.ValueStream;
import com.pushtechnology.diffusion.client.session.Session;
/**
* This demonstrates a client session subscribing to a
* {@link TimeSeries} topic.
*
* @author DiffusionData Limited
*/
public class ClientConsumingTimeSeriesTopics {
private static final String TOPIC_PATH = "foo/timeseries";
private final Session session;
/**
* Constructor.
*
* @param serverUrl for example "ws://diffusion.example.com:80"
* @param valueStream value stream to receive time series topic events
*/
public ClientConsumingTimeSeriesTopics(String serverUrl, ValueStream<Event<Long>> valueStream)
throws InterruptedException, ExecutionException, TimeoutException {
session = Diffusion.sessions().principal("client").password("password")
.open(serverUrl);
final Topics topics = session.feature(Topics.class);
topics.addTimeSeriesStream(TOPIC_PATH, Long.class, valueStream);
topics.subscribe(TOPIC_PATH).get(5, TimeUnit.SECONDS);
}
/**
* Close the session.
*/
public void close() {
session.close();
}
}
C
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
static int on_subscription(
const char *topic_path,
const TOPIC_SPECIFICATION_T *specification,
void *context)
{
// subscribed to `topic_path`
return HANDLER_SUCCESS;
}
static int on_unsubscription(
const char *topic_path,
const TOPIC_SPECIFICATION_T *specification,
NOTIFY_UNSUBSCRIPTION_REASON_T reason,
void *context)
{
// unsubscribed from `topic_path`
return HANDLER_SUCCESS;
}
static int on_value(
const char *topic_path,
const TOPIC_SPECIFICATION_T *specification,
const DIFFUSION_DATATYPE datatype,
const DIFFUSION_VALUE_T *old_value,
const DIFFUSION_VALUE_T *new_value,
void *context)
{
DIFFUSION_TIME_SERIES_EVENT_T *new_event;
read_diffusion_time_series_event(new_value, &new_event, NULL);
DIFFUSION_VALUE_T *new_event_value = diffusion_time_series_event_get_value(new_event);
DIFFUSION_API_ERROR api_error;
char *result;
bool success = to_diffusion_json_string(new_event_value, &result, &api_error);
if (success)
{
printf("Received value: %s\n", result);
free(result);
}
else
{
const char *description = get_diffusion_api_error_description(api_error);
printf("Error during diffusion value read: %s\n", description);
diffusion_api_error_free(api_error);
}
diffusion_value_free(new_event_value);
diffusion_time_series_event_free(new_event);
return HANDLER_SUCCESS;
}
static void on_close()
{
// value stream is now closed
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
const char *topic_path = "my/topic/path";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// Create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// Create a value stream
VALUE_STREAM_T value_stream = {
.datatype = DATATYPE_JSON,
.on_subscription = on_subscription,
.on_unsubscription = on_unsubscription,
.on_value = on_value,
.on_close = on_close};
add_time_series_stream(session, topic_path, &value_stream);
// Subscribe to topics matching the selector
SUBSCRIPTION_PARAMS_T params = {.topic_selector = topic_path};
subscribe(session, params);
// Sleep for a while
sleep(5);
// Unsubscribe from topics matching the selector
UNSUBSCRIPTION_PARAMS_T unsub_params = {.topic_selector = topic_path};
unsubscribe(session, unsub_params);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
return EXIT_SUCCESS;
}
Python
# Copyright (c) 2022 - 2023 DiffusionData Ltd., All Rights Reserved.
#
# Use is subject to licence terms.
#
# NOTICE: All information contained herein is, and remains the
# property of DiffusionData. The intellectual and technical
# concepts contained herein are proprietary to DiffusionData and
# may be covered by U.S. and Foreign Patents, patents in process, and
# are protected by trade secret or copyright law.
import asyncio
import typing
import diffusion.datatypes
from diffusion.datatypes.timeseries import Event
from diffusion.features.timeseries import TimeSeries
from diffusion.features.topics import ValueStreamHandler
from diffusion.features.topics.details.topic_specification import TopicSpecification
from diffusion.internal.services.topics import UnsubscribeReason
from diffusion.internal.session.exception_handler import ErrorReason
# Diffusion server connection information;
# adjust as needed for the server used in practice.
server_url = "ws://localhost:8080"
principal = "admin"
credentials = diffusion.Credentials("password")
TOPIC_PREFIX = "time-series"
async def main():
"""
Client implementation which subscribes to a string time series topic and
consumes the data it receives.
"""
# Creating the session.
async with diffusion.Session(
url=server_url, principal=principal, credentials=credentials
) as session:
# Create a string topic
topic_type = diffusion.datatypes.STRING
topic = f"?{TOPIC_PREFIX}/{topic_type.type_name}//"
# Add a value stream
time_series_string_stream = TimeSeriesStringStream()
session.topics.add_value_stream(topic, time_series_string_stream)
# Subscribe to the topic.
try:
await session.topics.subscribe(topic)
await asyncio.sleep(0.3)
except Exception as ex:
print(f"Failed to subscribe to topic '{topic}' : {ex}.")
# noinspection PyUnusedLocal
class TimeSeriesStringStream(ValueStreamHandler):
"""
Basic implementation of the ValueStreamHandler for time series string topics.
"""
def __init__(self):
super().__init__(
TimeSeries.of(diffusion.datatypes.STRING),
update=self.update,
subscribe=self.subscribe,
unsubscribe=self.unsubscribe,
error=self.error,
close=self.close,
)
async def close(self):
"""
Notification of the stream being closed normally.
"""
print("The subscrption stream is now closed.")
async def error(self, error_reason: ErrorReason):
"""
Notification of a contextual error related to this callback.
Situations in which <code>OnError</code> is called include
the session being closed, a communication
timeout, or a problem with the provided parameters.
No further calls will be made to this callback.
Args:
error_reason: Error reason.
"""
print(f"An error has occured : {error_reason}.")
async def subscribe(
self,
*,
topic_path: str,
topic_spec: TopicSpecification,
topic_value: typing.Optional[Event[diffusion.datatypes.STRING]] = None,
):
"""
Notification of a successful subscription.
Args:
topic_path: Topic path.
topic_spec: Topic specification.
topic_value: Topic value.
"""
print(f"Client subscribed to {topic_path}.")
async def unsubscribe(
self,
*,
topic_path: str,
topic_spec: TopicSpecification,
reason: typing.Optional[UnsubscribeReason] = None,
):
"""
Args:
topic_path: Topic path.
topic_spec: Topic specification.
reason: error reason.
"""
print(f"Client unsubscribed from {topic_path} : {reason}.")
async def update(
self,
*,
topic_path: str,
topic_spec: TopicSpecification,
old_value: Event[diffusion.datatypes.STRING],
topic_value: Event[diffusion.datatypes.STRING],
):
"""
Topic update received.
Args:
topic_path: Topic path.
topic_spec: Topic specification.
old_value: Value prior to update.
topic_value: Value after update.
"""
print(f"New value of {topic_path} is {topic_value}.")
Apple
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
class SubscribeTimeSeries {
class JSONTimeSeriesValueStreamHandler: PTDiffusionJSONTimeSeriesEventValueStreamDelegate {
func diffusionStream(_ stream: PTDiffusionValueStream,
didUpdateTimeSeriesTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
oldJSONEvent oldJsonEvent: PTDiffusionJSONTimeSeriesEvent?,
newJSONEvent newJsonEvent: PTDiffusionJSONTimeSeriesEvent) {
print("Topic '%@' has been updated: %@ -> %@", topicPath, oldJsonEvent ?? "NULL", newJsonEvent)
}
func diffusionStream(_ stream: PTDiffusionStream,
didSubscribeToTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification) {
print("You have been subscribed to '%@'", topicPath)
}
func diffusionStream(_ stream: PTDiffusionStream,
didUnsubscribeFromTopicPath topicPath: String,
specification: PTDiffusionTopicSpecification,
reason: PTDiffusionTopicUnsubscriptionReason) {
print("You have been unsubscribed from '%@'", topicPath)
}
func diffusionStream(_ stream: PTDiffusionStream,
didFailWithError error: Error) {
print("Value stream failed with error: %@", error.localizedDescription)
}
func diffusionDidClose(_ stream: PTDiffusionStream) {
print("Value stream is now closed")
}
}
func run_example(url: URL, topic_path: String) {
let credentials = PTDiffusionCredentials(password: "password")
let configuration = PTDiffusionSessionConfiguration(principal: "control",
credentials: credentials)
// establish the session
PTDiffusionSession.open(with: url,
configuration: configuration) { (session, error) in
if (error != nil) {
print("An error has occurred while establishing a session: %@",
error!.localizedDescription)
}
else {
// create a handler to receive time series topic updates
let handler = JSONTimeSeriesValueStreamHandler()
let value_stream = PTDiffusionJSON.timeSeriesEventValueStream(with: handler);
// link handler to `topic_path`
try! session!.topics.add(value_stream, withSelectorExpression: topic_path)
// subscribe to `topic_path`
session!.topics.subscribe(withTopicSelectorExpression: topic_path) { error in
if (error != nil) {
print("An error has occurred while attempting to subscribe to '%@: %@",
topic_path,
error!.localizedDescription)
}
else {
print("Successfully subscribed to '%@'", topic_path)
}
}
}
}
}
}