Just a second...

Sending request messages to a session

A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.

A control client session on the left. Diffusion in the centre. Another client session on the right. An arrow representing the request message goes from the control client session through a shape representing the message path inside the Diffusion server and continues to the other client session. An arrow representing the response message goes from the receiving client session back through the message path on the Diffusion server to the control client session.
When a request message is sent to a specific client session and that session responds, the following events occur:
  1. A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
  2. The client session receives the request message through a request stream.
  3. The client session uses a responder to send a response to the request message.
  4. The control client session receives the response.

Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.

Sending a request to a session

Required permissions:send_to_session permission for the specified message path and register_handler permission

Usually, it is a control client session in your organization's backend that sends messages directly to other sessions.

Send the request message specifying the following information:
  • The session ID of the client session to send the request to
  • The message path to send the request and receive the response through
  • The request message
  • The datatype of the request message
  • The datatype of the response message
JavaScript
session.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Session;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that sends a request message to a filter,
    /// then sends another request directly to the session, and displays the response.
    /// </summary>
    public sealed class SendingSessionRequestMessages {
        private readonly string messagingPath = ">random/requestResponse";

        public async Task SendingSessionRequestMessagesExample(string serverUrl) {
            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);

            var messaging = session.Messaging;
            var requestCallback = new RequestCallback();

            // Filter messaging is used to get the session ID for this example
            int requestsSent = await messaging.SendRequestToFilterAsync(
                "$Principal EQ 'client'",
                messagingPath,
                "Hello?",
                requestCallback);

            Thread.Sleep( 1000 );

            // Send message to a session using obtained session ID
            string response = await messaging.SendRequestAsync<string, string>(
                requestCallback.SessionId, messagingPath, "Time");
            WriteLine($"Received response: '{response}'.");

            // Close the session
            session.Close();
        }

        /// <summary>
        /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed.
        /// </summary>
        private class RequestCallback : IFilteredRequestCallback<string> {
            public ISessionId SessionId { get; private set; }

            /// <summary>
            /// Indicates that a response message was received.
            /// </summary>
            public void OnResponse(ISessionId sessionId, string response) => SessionId = sessionId;

            /// <summary>
            /// Indicates that a error response message was received.
            /// </summary>
            public void OnResponseError( ISessionId sessionId, Exception exception )
                => WriteLine( $"Response error received from session {sessionId}: '{exception}'." );
        }
    }
}
Java and Android
//Establish client session and control session
final Session control = Diffusion.sessions()
    .principal("control")
    .password("password")
    .open("ws://localhost:8080");

final Session client = Diffusion.sessions()
    .principal("client")
    .password("password")
    .open("ws://localhost:8080");

//Obtain the Messaging and MessagingControl features
final Messaging messagingControl = control.feature(Messaging.class);
final Messaging messaging = client.feature(Messaging.class);

//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");

//Create a local request stream for the client to receive direct requests from the control session
messaging.setRequestStream("foo", JSON.class, JSON.class, new JSONRequestStream());

//Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received.
final JSON response = messagingControl.sendRequest(
    client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
C
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);

SEND_REQUEST_TO_SESSION_PARAMS_T params = {
        .recipient_session = session_id,
        .path = message_path,
        .request = message_buf,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING,
        .on_response = on_message_response
};

send_request_to_session(session, params);
buf_free(message_buf);
Python
# Sending the request and receiving the response.
print(f"Sending request: '{request}' to session {session_id}...")
try:
    response = await session.messaging.send_request_to_session(
        path=path, session_id=session_id, request=request_type(request)
    )
except diffusion.DiffusionError as ex:
    print(f"ERROR: {ex}")
else:
    print(f"... received response '{response}'")
Apple
let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request

session.messaging.send(json_request,
                       to: session_id,
                       path: message_path,
                       jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
    if (error != nil) {
        print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
    }
    else {
        print("Received response: %@", response!)
    }
})

Responding to messages sent to a session

Required permissions:send_to_message_handler for the specified message path

Define a request stream to receive and respond to request messages that have a specific data type.

JavaScript
const handler = {
    onRequest : function(request, context, responder) {
        /// ...
        responder.respond(response);
    },
    onError : (error) => {
        // an error occured
    },
    onClose : () => {
        // the handler is closed
    }
}
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using static System.Console;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// A simple IRequestStream implementation that prints confirmation of the actions completed.
    /// </summary>
    internal class SimpleSessionMessagesRequestStream : IRequestStream<string, string> {
        /// <summary>
        /// Indicates that the request stream was closed.
        /// </summary>
        public void OnClose()
            => WriteLine( "A request handler was closed." );

        /// <summary>
        /// Indicates that the request stream has received error.
        /// </summary>
        public void OnError( ErrorReason errorReason )
            => WriteLine( $"A request handler has received error: {errorReason}." );

        /// <summary>
        /// Indicates that a request was received and responds to it.
        /// </summary>
        /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks>
        public void OnRequest( string path, string request, IResponder<string> responder ) {
            if ( request == "Hello?" ) {    // message to the filter to obtain the session ID
                responder.Respond( "Yes" );
            } else {
                WriteLine( $"Received request: '{request}'." );
                responder.Respond( DateTime.UtcNow.ToLongTimeString() );
            }
        }
    }
}
Java and Android
private static class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> {
    @Override
    public void onRequest(String path, JSON request, Responder<JSON> responder) {
        final JSON response = Diffusion.dataTypes().json().fromJsonString("\"world\"");
        responder.respond(response);
    }
    @Override
    public void onClose() {

    }

    @Override
    public void onError(ErrorReason errorReason) {

    }
}
C
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */

#include <stdio.h>
#include <stdlib.h>

#ifndef WIN32
        #include <unistd.h>
#else
        #define sleep(x) Sleep(1000 * x)
#endif

#include "diffusion.h"

static int on_selected_sessions_received(
        int number_sent,
        void *context)
{
        // `number_sent` is the number of sessions selected by the `session_filter`
        return HANDLER_SUCCESS;
}


static int on_message_response(
        DIFFUSION_DATATYPE response_datatype,
        const DIFFUSION_VALUE_T *response,
        void *context)
{
        // read the `response` by converting it to the datatype `response_datatype`
        return HANDLER_SUCCESS;
}


void to_filter(
        SESSION_T *session,
        char *message_path,
        char *message,
        char *session_filter)
{
        BUF_T *message_buf = buf_create();
        write_diffusion_string_value(message, message_buf);

        SEND_REQUEST_TO_FILTER_PARAMS_T params = {
                .path = message_path,
                .filter = session_filter,
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .request = message_buf,
                .on_response = on_message_response,
                .on_number_sent = on_selected_sessions_received
        };

        send_request_to_filter(session, params);
        buf_free(message_buf);
}


void to_session(
        SESSION_T *session,
        char *message_path,
        char *message,
        SESSION_ID_T *session_id)
{
        BUF_T *message_buf = buf_create();
        write_diffusion_string_value(message, message_buf);

        SEND_REQUEST_TO_SESSION_PARAMS_T params = {
                .recipient_session = session_id,
                .path = message_path,
                .request = message_buf,
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .on_response = on_message_response
        };

        send_request_to_session(session, params);
        buf_free(message_buf);
}


static int on_message_request(
        SESSION_T *session,
        const char *request_path,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // read the `request` based on the received `request_datatype`
        char *request_val;
        read_diffusion_string_value(request, &request_val, NULL);
        printf("Received message: %s\n", request_val);
        free(request_val);

        // create the response
        BUF_T *response_buf = buf_create();
        write_diffusion_string_value("This is my response", response_buf);

        // and respond to the request
        diffusion_respond_to_request(session, handle, response_buf, NULL);
        buf_free(response_buf);

        return HANDLER_SUCCESS;
}


void register_request_stream(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_STREAM_T request_stream = {
                .on_request = on_message_request
        };

        set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}


void to_path(
        SESSION_T *session,
        char *message_path,
        char *message)
{
        BUF_T *message_buf = buf_create();
        write_diffusion_string_value(message, message_buf);

        SEND_REQUEST_PARAMS_T params = {
                .path = message_path,
                .request = message_buf,
                .on_response = on_message_response,
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING
        };
        send_request(session, params);

        buf_free(message_buf);
}


static int on_request_handler_active(
        SESSION_T *session,
        const char *path,
        const DIFFUSION_REGISTRATION_T *registered_handler)
{
        // message path `path` is now active for `registered_handler`

        return HANDLER_SUCCESS;
}


static int on_request_received(
        SESSION_T *session,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_REQUEST_CONTEXT_T *request_context,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // handle request received
        // and response to request with
        // `diffusion_respond_to_request(session, handle, response, NULL)`

        return HANDLER_SUCCESS;
}


void register_request_handler(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_HANDLER_T request_handler = {
                .request_datatype = DATATYPE_STRING,
                .response_datatype = DATATYPE_STRING,
                .on_active = on_request_handler_active,
                .on_request = on_request_received
        };

        ADD_REQUEST_HANDLER_PARAMS_T params = {
                .path = message_path,
                .request_handler = &request_handler
        };

        add_request_handler(session, params);
}


int main(int argc, char **argv)
{
        const char *url = "ws://localhost:8080";
        const char *principal = "control";
        const char *password = "password";

        CREDENTIALS_T *credentials = credentials_create_password(password);
        SESSION_T *session =
                session_create(url, principal, credentials, NULL, NULL, NULL);

        SESSION_ID_T *session_id =
                session_id_create_from_string("e9cfbd5be9a72622-f000000300000004");

        char *message_path = "message/path";
        char *message = "hello world";
        char *session_filter = "$Principal is 'control'";

        to_filter(session, message_path, message, session_filter);
        to_session(session, message_path, message, session_id);
        to_path(session, message_path, message);
        register_request_handler(session, message_path);

        // Sleep for a while
        sleep(5);

        // Close the session, and release resources and memory
        session_close(session, NULL);
        session_free(session);

        credentials_free(credentials);
        session_id_free(session_id);

        return EXIT_SUCCESS;
}
Python
def callback(request: str, **kwargs) -> str:
    return diffusion.datatypes.STRING(f"Hello there, {request}!")
Apple
class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveRequestWith json: PTDiffusionJSON,
                         responder: PTDiffusionResponder) {
        print("Received request %@", json)
        let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
        responder.respond(with: json_response)
    }

    func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
        print("Request stream failed with error %@", error.localizedDescription)
    }

    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Request stream is now closed")
    }
}

Add the request stream against a message path. You can only add one request stream for each message path.

JavaScript
session.messages.setRequestStream("foo", handler, diffusion.datatypes.json(), diffusion.datatypes.json(), );
.NET
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Factories;

namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that registers a handler to listen for messages on a path.
    /// </summary>
    public sealed class ReceivingSessionRequestMessages {

        public void ReceivingSessionRequestMessagesExample(string serverUrl) {

            var session = Diffusion.Sessions.Principal( "client" ).Password( "password" ).Open( serverUrl );
            var messaging = session.Messaging;
            string messagingPath = ">random/requestResponse";

            var requestStream = new SimpleSessionMessagesRequestStream();
            messaging.SetRequestStream( messagingPath, requestStream );

            try
            {
                Thread.Sleep( 60000 ); // wait for messages...
            }
            finally
            {
                // Close session
                messaging.RemoveRequestStream( messagingPath );
                session.Close();
            }
        }
    }
}
Java and Android
messaging.setRequestStream("foo", JSON.class, JSON.class, new JSONRequestStream());
C
static int on_message_request(
        SESSION_T *session,
        const char *request_path,
        DIFFUSION_DATATYPE request_datatype,
        const DIFFUSION_VALUE_T *request,
        const DIFFUSION_RESPONDER_HANDLE_T *handle,
        void *context)
{
        // read the `request` based on the received `request_datatype`
        char *request_val;
        read_diffusion_string_value(request, &request_val, NULL);
        printf("Received message: %s\n", request_val);
        free(request_val);

        // create the response
        BUF_T *response_buf = buf_create();
        write_diffusion_string_value("This is my response", response_buf);

        // and respond to the request
        diffusion_respond_to_request(session, handle, response_buf, NULL);
        buf_free(response_buf);

        return HANDLER_SUCCESS;
}


void register_request_stream(
        SESSION_T *session,
        char *message_path)
{
        DIFFUSION_REQUEST_STREAM_T request_stream = {
                .on_request = on_message_request
        };

        set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}
Python
# Register handler to receive the request
handler = RequestHandler(
    callback,
    request_type=request_type,
    response_type=request_type
)
session.messaging.add_stream_handler(path, handler=handler, addressed=True)
Apple
let stream_delegate = JSONRequestStreamDelegate()
let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate)
session.messaging.setRequestStream(request_stream, forPath: message_path)