NAV Navbar
JavaScript .NET (C#) Java Android C Python Apple (Swift)

Diffusion Cloud Quick Start

Logo diffusion

Diffusion is an intelligent event data platform that makes it simple, fast and efficient to move real-time data across the Internet.

Select your preferred coding language at top right.

This Quick Start Guide gives you an introduction to Diffusion Cloud and how to use it.

You will:

Set up Diffusion Cloud

Copy this hostname, as you will need it later to connect to Diffusion with a client library.

Qsg hostname

Use the REST API

Let's get some data into your Diffusion Cloud service.

Data is stored in topics.

You can publish data to a topic using our APIs and adapters.

A topic:

Unlike other real-time data services, Diffusion Cloud allows you to have millions of unique topics. You can use as many or as few as needed to represent your desired data structure.

Set up the REST API

To publish some data, we'll use the REST API (currently only available in Diffusion Cloud).

The REST API uses simple HTTPS calls, so you can call it from any programming language. It's completely stateless, which makes it good for low-powered IoT devices that can't maintain a persistent connection.

To start using the REST API, go to the dashboard for your Diffusion Cloud service.

In the left-hand menu, select REST API.

In the Access tokens section, your client secret is the long alphanumeric string after Header authorization: Basic.

You can use this to obtain an access token which will enable you to use the REST API.

You can do that with the curl command line tool like this:

curl \
    --request POST \
    --url https://login.diffusion.cloud/oauth2/token?grant_type=client_credentials \
    --header 'content-type: application/x-www-form-urlencoded' \
    --header 'authorization: Basic <CLIENT_SECRET>'

You will receive a JSON response with an access_token field. The access token is valid for 24 hours.

Create a topic with the REST API

Now you can use the access token to create a topic called my-topic.

Here's how to do it with curl:

curl --request POST \
  --url https://api.diffusion.cloud/topics/add \
  --header 'authorization: Bearer <TOKEN>' \
  --header 'content-type: application/json' \
  --data '{"path":"my-topic", "type":"json", "properties":{ "VALIDATE_VALUES":"true" }}'

or you can use your preferred coding language:

fetch("https://api.diffusion.cloud/topics/add", {
  "method": "POST",
  "headers": {
    "authorization:": "Bearer <TOKEN>",
    "content-type": "application/json"
  },
  "body": {
    "path": "my-topic",
    "type": "json",
    "properties": {
      "VALIDATE_VALUES": "true"
    }
  }
})
.then(response => {
  console.log(response);
})
.catch(err => {
  console.log(err);
});
import Foundation

let headers = [
    "authorization:": "Bearer <TOKEN>",
        "content-type": "application/json"
]
let parameters = [
    "path": "my-topic",
    "type": "json",
    "properties": ["VALIDATE_VALUES": "true"]
] as [String : Any]

let postData = JSONSerialization.data(withJSONObject: parameters, options: [])

let request = NSMutableURLRequest(
    url: NSURL(string: "https://api.diffusion.cloud/topics/add")! as URL,
    cachePolicy: .useProtocolCachePolicy,
    timeoutInterval: 10.0)

request.httpMethod = "POST"
request.allHTTPHeaderFields = headers
request.httpBody = postData as Data

let session = URLSession.shared
let dataTask = session.dataTask(
    with: request as URLRequest,
    completionHandler: {
        (data, response, error) -> Void in
            if (error != nil) {
                print(error)
            } else {
                let httpResponse = response as? HTTPURLResponse
                print(httpResponse)
            }
        })

dataTask.resume()

URL url = new URL ("https://api.diffusion.cloud/topics/add");

HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Authorization", "Bearer <TOKEN>");
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);

String requestParameter = "{\"path\":\"my-topic\", \"type\":\"json\", \"properties\":{ \"VALIDATE_VALUES\":\"true\" }}";

try(OutputStream os = connection.getOutputStream()) {
    byte[] input = requestParameter.getBytes("utf-8");
    os.write(input, 0, input.length);
}

InputStream is = connection.getInputStream();
URL url = new URL ("https://api.diffusion.cloud/topics/add");

HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Authorization", "Bearer <TOKEN>");
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);

String requestParameter = "{\"path\":\"my-topic\", \"type\":\"json\", \"properties\":{ \"VALIDATE_VALUES\":\"true\" }}";

try(OutputStream os = connection.getOutputStream()) {
    byte[] input = requestParameter.getBytes("utf-8");
    os.write(input, 0, input.length);
}

InputStream is = connection.getInputStream();
var client = new RestClient("https://api.diffusion.cloud/topics/add");

var request = new RestRequest(Method.POST);
request.AddHeader("authorization:", "Bearer <TOKEN>");
request.AddHeader("content-type", "application/json");
request.AddParameter("application/json", "{\"path\":\"my-topic\", \"type\":\"json\", \"properties\":{ \"VALIDATE_VALUES\":\"true\" }}", ParameterType.RequestBody);

IRestResponse response = client.Execute(request);
CURL *hnd = curl_easy_init();

curl_easy_setopt(hnd, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(hnd, CURLOPT_URL, "https://api.diffusion.cloud/topics/add");

struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "authorization:: Bearer <TOKEN>");
headers = curl_slist_append(headers, "content-type: application/json");

curl_easy_setopt(hnd, CURLOPT_HTTPHEADER, headers);
curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, "{\"path\":\"my-topic\", \"type\":\"json\", \"properties\":{ \"VALIDATE_VALUES\":\"true\" }}");

CURLcode ret = curl_easy_perform(hnd);
curl_easy_cleanup(hnd);
import requests

response = requests.post(
    url="https://api.diffusion.cloud/topics/add",
    headers={"authorization": "Bearer <TOKEN>", "content-type": "application/json"},
    json={
        "path": "my-topic",
        "type": "json",
        "properties": {
            "VALIDATE_VALUES": "true"
        }
    })

Update with the REST API

You have created a JSON topic, but it currently has no value.

Update the topic with the JSON value {"foo": "bar"}.

You can do it with curl:

curl --request POST \
  --url https://api.diffusion.cloud/topics/set \
  --header 'authorization: Bearer <TOKEN>' \
  --header 'content-type: application/json' \
  --data '{"path":"my-topic", "type":"json", "value": {"foo": "bar"}}'

or you can use your preferred coding language:

const response = await fetch(
    "https://api.diffusion.cloud/topics/set",
    {
        "method": "POST",
        "headers": {
            "authorization:": "Bearer <TOKEN>",
            "content-type": "application/json"
        },
        "body": {
            "path": "my-topic",
            "type": "json",
            "value": {
                "foo": "bar"
            }
        }
    }
)
console.log(response);
import Foundation

let headers = [
    "authorization:": "Bearer <TOKEN>",
    "content-type": "application/json"
]

let parameters = [
    "path": "my-topic",
    "type": "json",
    "value": ["foo": "bar"]
] as [String : Any]

let postData = JSONSerialization.data(withJSONObject: parameters, options: [])

let request = NSMutableURLRequest(
    url: NSURL(string: "https://api.diffusion.cloud/topics/set")! as URL,
    cachePolicy: .useProtocolCachePolicy,
    timeoutInterval: 10.0
)

request.httpMethod = "POST"
request.allHTTPHeaderFields = headers
request.httpBody = postData as Data

let session = URLSession.shared
let dataTask = session.dataTask(
    with: request as URLRequest,
    completionHandler: {
        (data, response, error) -> Void in
            if (error != nil) {
                print(error)
            } else {
                let httpResponse = response as? HTTPURLResponse
                print(httpResponse)
            }
        })

dataTask.resume()
URL url = new URL ("https://api.diffusion.cloud/topics/set");

HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Authorization", "Bearer <TOKEN>");
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);

String requestParameter = "{\"path\":\"my-topic\", \"type\":\"json\", \"value\": {\"foo\": \"bar\"}}";

try(OutputStream os = connection.getOutputStream()) {
    byte[] input = requestParameter.getBytes("utf-8");
    os.write(input, 0, input.length);
}

InputStream is = connection.getInputStream();
URL url = new URL ("https://api.diffusion.cloud/topics/set");

HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("POST");
connection.setRequestProperty("Authorization", "Bearer <TOKEN>");
connection.setRequestProperty("Content-Type", "application/json; utf-8");
connection.setRequestProperty("Accept", "application/json");
connection.setDoOutput(true);

String requestParameter = "{\"path\":\"my-topic\", \"type\":\"json\", \"value\": {\"foo\": \"bar\"}}";

try(OutputStream os = connection.getOutputStream()) {
    byte[] input = requestParameter.getBytes("utf-8");
    os.write(input, 0, input.length);
}

InputStream is = connection.getInputStream();
var client = new RestClient("https://api.diffusion.cloud/topics/set");

var request = new RestRequest(Method.POST);
request.AddHeader("authorization:", "Bearer <TOKEN>");
request.AddHeader("content-type", "application/json");
request.AddParameter("application/json", "{\"path\":\"my-topic\", \"type\":\"json\", \"value\": {\"foo\": \"bar\"}}", ParameterType.RequestBody);

IRestResponse response = client.Execute(request);
CURL *hnd = curl_easy_init();

curl_easy_setopt(hnd, CURLOPT_CUSTOMREQUEST, "POST");
curl_easy_setopt(hnd, CURLOPT_URL, "https://api.diffusion.cloud/topics/set");

struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "authorization:: Bearer <TOKEN>");
headers = curl_slist_append(headers, "content-type: application/json");
curl_easy_setopt(hnd, CURLOPT_HTTPHEADER, headers);

curl_easy_setopt(hnd, CURLOPT_POSTFIELDS, "{\"path\":\"my-topic\", \"type\":\"json\", \"value\": {\"foo\": \"bar\"}}");

CURLcode ret = curl_easy_perform(hnd);
curl_easy_cleanup(hnd);
import requests

response = requests.post(
    url="https://api.diffusion.cloud/topics/set",
    headers={"authorization:": "Bearer <TOKEN>", "content-type": "application/json"},
    json={
        "path": "my-topic",
        "type": "json",
        "value": {
            "foo": "bar"
        }
    })

Monitor topics with the console

You can use the Diffusion management console to see your new topic.

In your Diffusion Cloud service dashboard, click on Console at left.

You will be prompted to create a password for the admin principal.

Click the Topics tab at left.

Qsg console

You should see my-topic listed in the topic browser. Click on it to display the topic's value.

Fetch data with the REST API

You can fetch the current value of a topic with the REST API.

With the REST API, you only get the value at the instant you send a request. If your application needs continuous updates, you could send repeated REST API requests, but it's much better and more efficient to use a client SDK to stream values.

curl --request GET \
 --url https://api.diffusion.cloud/topics/fetch\?path\=my-topic \
 --header 'authorization: Bearer <TOKEN>'
const response = fetch(
    "https://api.diffusion.cloud/topics/fetch?path=my-topic",
    {
        "method": "GET",
        "headers": {
            "authorization:": "Bearer <TOKEN>"
        }
    })
console.log(response);
import Foundation

let headers = ["authorization:": "Bearer <TOKEN>"]

let request = NSMutableURLRequest(
    url: NSURL(string: "https://api.diffusion.cloud/topics/fetch?path=my-topic")! as URL,
    cachePolicy: .useProtocolCachePolicy,
    timeoutInterval: 10.0)

request.httpMethod = "GET"
request.allHTTPHeaderFields = headers

let session = URLSession.shared

let dataTask = session.dataTask(
    with: request as URLRequest,
    completionHandler: {
        (data, response, error) -> Void in
            if (error != nil) {
                print(error)
            } else {
                let httpResponse = response as? HTTPURLResponse
                print(httpResponse)
            }
    })

dataTask.resume()
URL url = new URL("https://api.diffusion.cloud/topics/fetch?path=my-topic");

HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Authorization", "Bearer <TOKEN>");

try(InputStream inputStream = connection.getInputStream()) {
    InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
    String inputLine;
    StringBuffer response = new StringBuffer();
    while ((inputLine = bufferedReader.readLine()) != null) {
        response.append(inputLine);
    }
}
URL url = new URL("https://api.diffusion.cloud/topics/fetch?path=my-topic");

HttpURLConnection connection = (HttpURLConnection)url.openConnection();
connection.setRequestMethod("GET");
connection.setRequestProperty("Authorization", "Bearer <TOKEN>");

try(InputStream inputStream = connection.getInputStream()) {
    InputStreamReader inputStreamReader = new InputStreamReader(inputStream);
    BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
    String inputLine;
    StringBuffer response = new StringBuffer();
    while ((inputLine = bufferedReader.readLine()) != null) {
        response.append(inputLine);
    }
}
var client = new RestClient("https://api.diffusion.cloud/topics/fetch?path=my-topic");

var request = new RestRequest(Method.GET);
request.AddHeader("authorization:", "Bearer <TOKEN>");

IRestResponse response = client.Execute(request);
CURL *hnd = curl_easy_init();

curl_easy_setopt(hnd, CURLOPT_CUSTOMREQUEST, "GET");
curl_easy_setopt(hnd, CURLOPT_URL, "https://api.diffusion.cloud/topics/fetch?path=my-topic");

struct curl_slist *headers = NULL;
headers = curl_slist_append(headers, "authorization:: Bearer <TOKEN>");
curl_easy_setopt(hnd, CURLOPT_HTTPHEADER, headers);

CURLcode ret = curl_easy_perform(hnd);
curl_easy_cleanup(hnd);
import requests

response = requests.get(
    url="https://api.diffusion.cloud/topics/fetch?path=my-topic",
    headers={"authorization:", "Bearer <TOKEN>"})

Use a Client SDK

Now we're going to use a full client SDK to subscribe to a topic.

Using the client libraries provides better performance and access to advanced features.

See the instructions at right for how to set up the SDK for your preferred language.

Include the Diffusion JavaScript client (with zlib support) in the <head> of your HTML.

<script src="https://download.pushtechnology.com/clients/6.9.0/js/diffusion-6.9.0.js"></script>
<script src="https://download.pushtechnology.com/clients/6.9.0/js/browserify-zlib-0.2.0.js"></script>

Or if you wish to use Node.js, install the latest Diffusion NPM module by running:

npm install --save diffusion

then import the library in your code:

const diffusion = require('diffusion');

Also see these Knowledge Base articles about using Diffusion with popular JavaScript frameworks and libraries:

The Diffusion manual has more information on installing and using the Javascript SDK.

The Diffusion manual has more information on installing and using the Apple SDK.

We recommend using Xcode 12.4. The lowest supported version is Xcode 9.2.

In your ViewController.swift file, import the client interface.

import Diffusion

Download the Diffusion Android client: diffusion-android-6.9.0.jar.

In Android Studio, create a new project using API Level 19 or later, then copy the diffusion-android-6.9.0.jar file to the app/libs directory in your project.

Find the library by expanding the app/libs folder in the left-hand panel (the Project Tool Window). If the libs folder is not shown in the left-hand panel, use the pull-down menu at the top of the panel to select the Project view.

Right-click on the jar, select Add as Library... then accept the default library settings.

Right-click on the libs folder in the Project Tool Window, then select Add as Library. If the libs folder is not shown in the left-hand panel, use the pull-down menu at the top of the panel to select Project view.

Add the following code to the build.gradle file within your project:

compileOptions {
    sourceCompatibility JavaVersion.VERSION_1_8
    targetCompatibility JavaVersion.VERSION_1_8
}

The Diffusion manual has more information on installing and using the Android SDK.

If you are using Maven, first add the Push Technology repo to your pom.xml file:

<repositories>
    <repository>
        <id>push-repository</id>
        <url>https://download.pushtechnology.com/maven/</url>
    </repository>
</repositories>

And then add the Diffusion client to your dependencies.

<dependencies>
    <dependency>
        <groupId>com.pushtechnology.diffusion</groupId>
        <artifactId>diffusion-client</artifactId>
        <version>6.9.0</version>
    </dependency>
</dependencies>

Or you can download the Diffusion Java client with dependencies: diffusion-client-with-dependencies-6.9.0.jar

The Diffusion manual has more information on installing and using the Java SDK.

Download the Diffusion .NET client assembly: diffusion-dotnet-6.9.0.zip

Or install the client using NuGet or .NET CLI (instructions here).

Import the Diffusion Client to your project:

using PushTechnology.ClientInterface;

The Diffusion manual has more information on installing and using the .NET SDK.

The Diffusion manual has more information on installing and using the C SDK.

The Diffusion manual has more information on installing and using the Python SDK.

Connect your client to Cloud

When you use a client library to connect to Diffusion, this creates a session.

A session is able to send and receive data, and acts as the intermediary between Diffusion and your application code. Sessions provide low-latency, high-throughput data streaming. Diffusion clients can reconnect reliably to the Cloud service, with no loss or duplication of data.

Let’s create a session:

const session = await diffusion.connect({
    host : "<HOST>",
    port : <PORT>,
    principal : "<PRINCIPAL>",
    credentials : "<PASSWORD>"
});

Add a reference to the session in your ViewController class to prevent it from being garbage collected:

var session:PTDiffusionSession?

Connect to Diffusion within the viewDidLoad method:

let url = URL(string: "ws://<HOST>:<PORT>")!

let credentials = PTDiffusionCredentials(
    password: "<PASSWORD>")

let config = PTDiffusionSessionConfiguration(
    principal: "<PRINCIPAL>",
    credentials: credentials)

PTDiffusionSession.open(with: url, configuration: config) {
    (_session, error) -> Void in
        self.session = _session
}

Add the following lines to the ViewController class to handle any potential errors:

let errorHandler: (Any?, Error?) -> Void = {
    response, error in
        if (error != nil) {
            print(error!)
        }
}

In your project's app/src/main/AndroidManifest.xml, set the INTERNET permission.

<uses-permission android:name="android.permission.INTERNET"/>;

Insert the element between the opening <manifest> tag and the opening <application> tag. This permission is required to use the Diffusion API.

In the MainActivity.java file in your project, create a SessionHandler inner class. This is required to prevent Diffusion running in the main thread and potentially affecting your app's performance.

private class SessionHandler implements SessionFactory.OpenCallback {
    private Session session = null;

    @Override
    public void onOpened(Session session) {
        this.session = session;
    }

    @Override
    public void onError(ErrorReason errorReason) {

    }

    public void close() {
        if ( session != null ) {
          session.close();
        }
    }
}

Either use Android Studio to resolve the imports, or paste the following into the import section of the file:

import com.pushtechnology.diffusion.client.callbacks.ErrorReason;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.SessionFactory;

In your MainActivity class, add the following private field.

private SessionHandler sessionHandler = null;

Then add the following lines to the onCreate method.

if (sessionHandler == null) {
    sessionHandler = new SessionHandler();

    Diffusion.sessions()
        .principal("<PRINCIPAL>")
        .password("<PASSWORD>")
        .open("ws://<HOST>:<PORT>", sessionHandler);
}

Again, use Android Studio to resolve the imports, or paste the following into the import section:

import com.pushtechnology.diffusion.client.Diffusion;

To properly free resources when your app is closed, override the onDestroy method of your MainActivity class.

if (sessionHandler != null ) {
    sessionHandler.close();
    sessionHandler = null;
}
super.onDestroy();

In the main method of your Java project, add the following lines to connect to Diffusion:

final Session session = Diffusion.sessions()
    .principal("<PRINCIPAL>")
    .password("<PASSWORD>")
    .open("ws://<HOST>:<PORT>");

In the main method of your .NET project, add the following lines to connect to Diffusion:

var session = Diffusion.Sessions
    .Principal("<PRINCIPAL>")
    .Password("<PASSWORD>")
    .Open("ws://<HOST>:<PORT>:");
DIFFUSION_SESSION_FACTORY_T *session_factory = diffusion_session_factory_init();
diffusion_session_factory_principal(session_factory, "<PRINCIPAL>");
diffusion_session_factory_password(session_factory, "<PASSWORD>");

SESSION_T *session = session_create_with_session_factory(session_factory, "ws://<HOST>:<PORT>:");
import diffusion
from diffusion.session import Session


async with Session(
    "ws://<HOST>:<PORT>:",
    principal="<PRINCIPAL>",
    credentials=diffusion.Credentials("<PASSWORD>")
) as session:
    # ...
Parameter Value
<HOST> Host name from Service Details
<PORT> 80
<PRINCIPAL> admin
<PASSWORD> Password you set for <PRINCIPAL>

Create and update a topic

Now let's publish a new JSON topic with the client library.

Topics have paths which are structured into hierarchies using the / character, much like URLs.

This makes it easy to organize related topics.

You can make a new topic that is a descendant of my-topic which you created with the REST API before:

my-topic/new-topic

Don't worry if you skipped the REST API section above and don't have my-topic in your Cloud service. You can still create a topic on this path.

You will publish this JSON data to the topic:

{"foo":"bar"}

First, create a JSON topic. Add the following code within the callback after establishing the session:

await session.topics.add('my-topic/new-topic', diffusion.topics.TopicType.JSON);

Now that the topic exists, you can publish data to it. Add the following lines:

const value = { foo : "bar" };

const result = await session.topicUpdate.set(
    "my-topic/new-topic",
    diffusion.datatypes.json(),
    value);

First, create a JSON topic. Add the following code within the callback after establishing the session:

self.session!.topicControl.addTopic(
    withPath: "my-topic/new-topic",
    type: PTDiffusionTopicType.JSON,
    completionHandler: self.errorHandler)

Now that the topic exists, you can publish data to it. Add the following lines:

let value = try? PTDiffusionJSON(object: ["foo": "bar"])

self.session!.topicUpdate.setWithPath(
    "my-topic/new-topic",
    toJSONValue: value!) {
        error in self.errorHandler(nil, error)
}

First, create a JSON topic. Add the following lines within the onOpened method of the SessionHandler class:

final CompletableFuture result =
    session.feature(TopicControl.class).addTopic(
        "my-topic/new-topic",
        TopicType.JSON);

Now you need to publish the data.

Since the topic holds JSON, you need to construct values of the appropriate type before updating.

To do this, use the DataType library provided by Diffusion.

final JSONDataType jsonDataType = Diffusion.dataTypes().json();

final JSON value = jsonDataType.fromJsonString("{\"foo\" : \"bar\" }");

final CompletableFuture result = session
    .feature(TopicUpdate.class)
    .set("my-topic/new-topic", JSON.class, value);

First, create a JSON topic. Add the following after the line that establishes the session:

final CompletableFuture result =
    session.feature(TopicControl.class).addTopic(
        "my-topic/new-topic",
        TopicType.JSON);

Now you need to publish the data.

Since the topic holds JSON, you need to construct values of the appropriate type before updating.

To do this, use the DataType library provided by Diffusion.

final JSONDataType jsonDataType = Diffusion.dataTypes().json();

final JSON value = jsonDataType.fromJsonString("{\"foo\" : \"bar\" }");

final CompletableFuture result = session
    .feature(TopicUpdate.class)
    .set("my-topic/new-topic", JSON.class, value);

First, create a JSON topic. Add the following after the line that establishes the session:

await session.TopicControl.AddTopicAsync("my-topic/new-topic", TopicType.JSON );

Now you need to publish the data.

Since the topic holds JSON, you need to construct values of the appropriate type before updating.

To do this, use the DataType library provided by Diffusion.

var jsonDataType = Diffusion.DataTypes.JSON;

var value = jsonDataType.FromJSONString("{\"foo\": \"bar\"}");

var result = session.TopicUpdate.SetAsync("my-topic/new-topic", value);

First, you need to define the callback functions.

static int on_topic_added(
        SESSION_T *session,
        TOPIC_ADD_RESULT_CODE result_code,
        void *context)
{
        printf("on_topic_added\n");
        return HANDLER_SUCCESS;
}

static int on_topic_add_failed(
        SESSION_T *session,
        TOPIC_ADD_FAIL_RESULT_CODE result_code,
        const DIFFUSION_ERROR_T *error,
        void *context)
{
        printf("on_topic_add_failed: %d\n", result_code);
        return HANDLER_SUCCESS;
}

static int on_topic_update(void *context)
{
        printf("topic update success\n");
        return HANDLER_SUCCESS;
}

Then you create a JSON topic. Add the following after the line that establishes the session:

ADD_TOPIC_CALLBACK_T callback = {
        .on_topic_added_with_specification = on_topic_added,
        .on_topic_add_failed_with_specification = on_topic_add_failed
};

TOPIC_SPECIFICATION_T *spec =
        topic_specification_init(TOPIC_TYPE_JSON);

add_topic_from_specification(
        session, "my-topic/new-topic", spec, callback);

Now you need to publish the data.

Since the topic holds JSON, you need to construct values of the appropriate type before updating.

To do this, use the DataType library provided by Diffusion.

BUF_T *buf = buf_create();
write_diffusion_json_value("{\"foo\": \"bar\"}", buf);

DIFFUSION_TOPIC_UPDATE_SET_PARAMS_T topic_update_params = {
        .topic_path = "my-topic/new-topic",
        .datatype = DATATYPE_JSON,
        .update = buf,
        .on_topic_update = on_topic_update
};

diffusion_topic_update_set(session, topic_update_params);

First, create a JSON topic. Add the following after the line that establishes the session:

topic_type = diffusion.datatypes.JSON

await session.topics.add_topic("my-topic/new-topic", topic_type)

Now you need to publish the data.

Since the topic holds JSON, you need to construct values of the appropriate type before updating.

To do this, use the DataType library provided by Diffusion.

value = json.loads("{\"foo\": \"bar\"}")

await session.topics.set_topic("my-topic/new-topic", value, specification=topic_type)

Subscribe to a topic

You can use security policies to determine which topics a session can subscribe to.

A session can subscribe to data published to topics. Subscription is a way for a session to tell Diffusion which topics it’s interested in. Diffusion will cache your subscriptions, so you can subscribe to topics which don’t yet exist, and retain full subscription state should your session disconnect and reconnect.

When a session subscribes to an existing topic, Diffusion will send that topic’s current value to the session immediately, and then send any subsequent updates as they arrive. This allows Diffusion to act as a cache, where subscribers are always guaranteed to get the latest value.

Neither publishers nor subscribers need to know about the existence of each other, so when you write code, you only need to think about the data, not who is using it.

Now that we’ve created a topic and published some data to it, let’s add a subscription to retrieve the JSON value.

Before you subscribe, you need to first attach a stream to listen for updates.

session.addStream(
    'my-topic/new-topic',
    diffusion.datatypes.json()).on('value',
    function(topic, specification, newValue, oldValue) {
        console.log('New value:', newValue.get());
});

Now you can tell Diffusion that you want to receive updates from the topic you created.

session.select('my-topic/new-topic');

To receive data from Diffusion, you use value streams. Value streams provide callbacks for topic events, such as subscriptions and received values.

Let’s define a class that implements the JSON Value Stream protocol, which you will then use to subscribe to the topic you created.

class StreamDelegate: NSObject, PTDiffusionJSONValueStreamDelegate {

    func diffusionStream(_ stream: PTDiffusionStream,
                         didUnsubscribeFromTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         reason: PTDiffusionTopicUnsubscriptionReason) {
        print("Unsubscribed from: \(topicPath)")
    }

    func diffusionStream(_ stream: PTDiffusionStream,
                         didFailWithError error: Error) {
        print("Failed with error: \(error)")
    }

    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Closed")
    }

    func diffusionStream(_ stream: PTDiffusionStream,
                         didSubscribeToTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification) {
        print("Subscribed to: \(topicPath)")
    }

    func diffusionStream(_ stream: PTDiffusionValueStream,
                         didUpdateTopicPath topicPath: String,
                         specification: PTDiffusionTopicSpecification,
                         oldJSON: PTDiffusionJSON?,
                         newJSON: PTDiffusionJSON) {
        do {
            let value:Dictionary<String, Any> = try newJSON.object() as! Dictionary
            print("\(topicPath): \(value.description)")
        }
        catch {
            print("Unable to read message")
        }
    }
}

Create an instance of this new class and assign it to a property of the ViewController. This avoids the reference being garbage collected after the initial session connection closure has ended.

let delegate = StreamDelegate()

Now you can use this delegate to register a Value Stream.

let selector = PTDiffusionTopicSelector(expression: "my-topic/new-topic")
let stream = PTDiffusionJSON.valueStream(with: self.delegate)

try? self.session!.topics.add(stream, with: selector, error: ())

Now you can tell Diffusion that you want to receive updates from the topic you created by subscribing to the topic.

self.session!.topics.subscribe(withTopicSelectorExpression: "my-topic/new-topic") {
    error in self.errorHandler(nil, error)
}

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first attach a stream to listen for updates.

import com.pushtechnology.diffusion.client.features.Topics.ValueStream.Default;

session.feature(Topics.class).addStream("my-topic/new-topic", JSON.class, new Topics.ValueStream.Default<JSON>() {
    @Override
    public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue, JSON newValue) {
        System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
    }
});

Now you can tell Diffusion that you want to receive updates from the topic you created.

session.feature(Topics.class).subscribe("my-topic/new-topic");

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first attach a stream to listen for updates.

import com.pushtechnology.diffusion.client.features.Topics.ValueStream.Default;

session.feature(Topics.class).addStream("my-topic/new-topic", JSON.class, new Topics.ValueStream.Default<JSON>() {
    @Override
    public void onValue(String topicPath, TopicSpecification topicSpec, JSON oldValue, JSON newValue) {
        System.out.println("New value for" + topicPath + ": " + newValue.toJsonString());
    }
});

Now you can tell Diffusion that you want to receive updates from the topic you created.

session.feature(Topics.class).subscribe("my-topic/new-topic");

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first attach a value stream to listen for updates. Value streams provide callbacks for topic events, such as subscriptions and received values. Define a class that implements the IValueStream interface, which you will then use to subscribe to your topic.

class ExampleValueStream : IValueStream<IJSON>
{
     public void OnSubscription(string topicPath, ITopicSpecification specification)
     {
         Console.WriteLine($"Subscribed to: {topicPath}");
     }

     public void OnUnsubscription(string topicPath, ITopicSpecification specification, TopicUnsubscribeReason reason)
     {
         Console.WriteLine($"Unsubscribed from: {topicPath}");
     }

     public void OnValue(string topicPath, ITopicSpecification specification, IJSON oldValue, IJSON newValue)
     {
         Console.WriteLine($"{topicPath}: {newValue.ToJSONString()}");
     }

     public void OnClose()
     {
         // Not used
     }

     public void OnError(ErrorReason errorReason)
     {
         // Not used
     }
}

Now create a value stream instance and subscribe to your topic.

session.Topics.AddStream("my-topic/new-topic", new ExampleValueStream());

await session.Topics.SubscribeAsync("my-topic/new-topic");

Note that you can create multiple streams to dispatch updates to different parts of your application.

Before you subscribe, you need to first define the callbacks functions for the value stream.

Value streams provide callbacks for topic events, such as subscriptions and received values.

static int on_subscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        void *context)
{
        printf("Subscribed to topic: %s\n", topic_path);
        return HANDLER_SUCCESS;
}

static int on_unsubscription(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *specification,
        NOTIFY_UNSUBSCRIPTION_REASON_T reason,
        void *context)
{
        printf("Unsubscribed from topic: %s\n", topic_path);
        return HANDLER_SUCCESS;
}

static int on_value(
        const char* topic_path,
        const TOPIC_SPECIFICATION_T *const specification,
        const DIFFUSION_DATATYPE datatype,
        const DIFFUSION_VALUE_T *const old_value,
        const DIFFUSION_VALUE_T *const new_value,
        void *context)
{
        char *result;
        bool success = to_diffusion_json_string(new_value, &result, &api_error);

        if(success) {
                printf("Received value: %s\n", result);
                free(result);
                return HANDLER_SUCCESS;
        }

        DIFFUSION_API_ERROR api_error;
        printf("Error during diffusion value read: %s\n", get_diffusion_api_error_description(api_error));
        diffusion_api_error_free(api_error);
        return HANDLER_FAILURE;
}

static void on_close()
{
        printf("Value stream closed\n");
}

Now create a value stream instance and subscribe to your topic.

VALUE_STREAM_T value_stream = {
        .datatype = DATATYPE_JSON,
        .on_subscription = on_subscription,
        .on_unsubscription = on_unsubscription,
        .on_value = on_value,
        .on_close = on_close
};

add_stream(session, "my-topic/new-topic", &value_stream);
SUBSCRIPTION_PARAMS_T params = {
        .topic_selector = "my-topic/new-topic"
};

subscribe(session, params);

Before you subscribe, you need to first define the callbacks functions for the value stream.

Value streams provide callbacks for topic events, such as subscriptions and received values.

def on_update(*, old_value, topic_path, topic_value, **kwargs):
    print("Topic:", topic_path)
    if old_value is None:
        print("  Initial value:", topic_value)
    else:
        print("  Value updated")
        print("    Old value:", old_value)
        print("    New value:", topic_value)

def on_subscribe(*, topic_path, **kwargs):
    print(f"Subscribed to {topic_path}")

def on_unsubscribe(*, reason, topic_path, **kwargs):
    print(f"Unsubscribed from {topic_path} because {str(reason)}")

Now create a value stream instance and subscribe to your topic.

topic_type = diffusion.datatypes.STRING
topic_selector="my-path/new-topic"

value_stream = diffusion.features.topics.ValueStreamHandler(
    data_type=topic_type,
    update=on_update,
    subscribe=on_subscribe,
    unsubscribe=on_unsubscribe
)

session.topics.add_value_stream(
    topic_selector=topic_selector, stream=value_stream
)
await session.topics.subscribe(topic_selector)

Next Steps

That’s it – you’ve successfully learned to use the REST API and a client SDK.

Now you could try:

Documentation

To learn more about the Diffusion JavaScript SDK, please browse the JavaScript API Documentation.

To learn more about the Diffusion Apple SDK, please browse the Apple API Documentation.

To learn more about the Diffusion Android SDK, please browse the Android API Documentation.

To learn more about the Diffusion Java SDK, please browse the Java API Documentation.

To learn more about the Diffusion .NET SDK, please browse the .NET API Documentation.

To learn more about the Diffusion C SDK, please browse the C API Documentation.

To learn more about the Diffusion Python SDK, please browse the Python API Documentation.

More advanced code examples are available on our GitHub page.

For in-depth documentation about Diffusion Cloud, see the Diffusion Cloud manual.

To browse all the documentation, including docs for the on-premises and older versions of Diffusion, visit the Diffusion documentation server.

For more resources or to contact Push Technology support, visit the Diffusion Developer Hub.