Sending request messages to a session
A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.
 
            - A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
- The client session receives the request message through a request stream.
- The client session uses a responder to send a response to the request message.
- The control client session receives the response.
Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.
Sending a request to a session
Required permissions: permission for the specified message path and permission
Usually, it is a control client session in your organization's backend that sends messages directly to other sessions.
- The session ID of the client session to send the request to
- The message path to send the request and receive the response through
- The request message
- The datatype of the request message
- The datatype of the response message
session.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Session;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that sends a request message to a filter,
    /// then sends another request directly to the session, and displays the response.
    /// </summary>
    public sealed class SendingSessionRequestMessages {
        private readonly string messagingPath = ">random/requestResponse";
        public async Task SendingSessionRequestMessagesExample(string serverUrl) {
            var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
                .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
                .Open(serverUrl);
            var messaging = session.Messaging;
            var requestCallback = new RequestCallback();
            // Filter messaging is used to get the session ID for this example
            int requestsSent = await messaging.SendRequestToFilterAsync(
                "$Principal EQ 'client'",
                messagingPath,
                "Hello?",
                requestCallback);
            Thread.Sleep( 1000 );
            // Send message to a session using obtained session ID
            string response = await messaging.SendRequestAsync<string, string>(
                requestCallback.SessionId, messagingPath, "Time");
            WriteLine($"Received response: '{response}'.");
            // Close the session
            session.Close();
        }
        /// <summary>
        /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed.
        /// </summary>
        private class RequestCallback : IFilteredRequestCallback<string> {
            public ISessionId SessionId { get; private set; }
            /// <summary>
            /// Indicates that a response message was received.
            /// </summary>
            public void OnResponse(ISessionId sessionId, string response) => SessionId = sessionId;
            /// <summary>
            /// Indicates that a error response message was received.
            /// </summary>
            public void OnResponseError( ISessionId sessionId, Exception exception )
                => WriteLine( $"Response error received from session {sessionId}: '{exception}'." );
        }
    }
}
//Establish client session and control session
final Session control = Diffusion.sessions()
    .principal("control")
    .password("password")
    .open("ws://localhost:8080");
final Session client = Diffusion.sessions()
    .principal("client")
    .password("password")
    .open("ws://localhost:8080");
//Obtain the Messaging and MessagingControl features
final Messaging messagingControl = control.feature(Messaging.class);
final Messaging messaging = client.feature(Messaging.class);
//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");
//Create a local request stream for the client to receive direct requests from the control session
messaging.setRequestStream("foo", JSON.class, JSON.class, new JSONRequestStream());
//Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received.
final JSON response = messagingControl.sendRequest(
    client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);
SEND_REQUEST_TO_SESSION_PARAMS_T params = {
    .recipient_session = session_id,
    .path = message_path,
    .request = message_buf,
    .request_datatype = DATATYPE_STRING,
    .response_datatype = DATATYPE_STRING,
    .on_response = on_message_response};
send_request_to_session(session, params);
buf_free(message_buf);
# Sending the request and receiving the response.
print(f"Sending request: '{request}' to session {session_id}...")
try:
    response = await session.messaging.send_request_to_session(
        path=path, session_id=session_id, request=request_type(request)
    )
except diffusion.DiffusionError as ex:
    print(f"ERROR: {ex}")
else:
    print(f"... received response '{response}'")
let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request
session.messaging.send(json_request,
                       to: session_id,
                       path: message_path,
                       jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
    if (error != nil) {
        print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
    }
    else {
        print("Received response: %@", response!)
    }
})
Responding to messages sent to a session
Required permissions: for the specified message path
Define a request stream to receive and respond to request messages that have a specific data type.
const handler = {
    onRequest : function(request, context, responder) {
        /// ...
        responder.respond(response);
    },
    onError : (error) => {
        // an error occured
    },
    onClose : () => {
        // the handler is closed
    }
}
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// A simple IRequestStream implementation that prints confirmation of the actions completed.
    /// </summary>
    internal class SimpleSessionMessagesRequestStream : IRequestStream<string, string> {
        /// <summary>
        /// Indicates that the request stream was closed.
        /// </summary>
        public void OnClose()
            => WriteLine( "A request handler was closed." );
        /// <summary>
        /// Indicates that the request stream has received error.
        /// </summary>
        public void OnError( ErrorReason errorReason )
            => WriteLine( $"A request handler has received error: {errorReason}." );
        /// <summary>
        /// Indicates that a request was received and responds to it.
        /// </summary>
        /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks>
        public void OnRequest( string path, string request, IResponder<string> responder ) {
            if ( request == "Hello?" ) {    // message to the filter to obtain the session ID
                responder.Respond( "Yes" );
            } else {
                WriteLine( $"Received request: '{request}'." );
                responder.Respond( DateTime.UtcNow.ToLongTimeString() );
            }
        }
    }
}
private static final class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> {
    @Override
    public void onRequest(String path, JSON request, Responder<JSON> responder) {
        final JSON response = Diffusion.dataTypes().json().fromJsonString("\"world\"");
        responder.respond(response);
    }
    @Override
    public void onClose() {
    }
    @Override
    public void onError(ErrorReason errorReason) {
    }
}
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 *
 */
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
    #include <unistd.h>
#else
    #define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
static int on_selected_sessions_received(
    int number_sent,
    void *context)
{
    // `number_sent` is the number of sessions selected by the `session_filter`
    return HANDLER_SUCCESS;
}
static int on_message_response(
    DIFFUSION_DATATYPE response_datatype,
    const DIFFUSION_VALUE_T *response,
    void *context)
{
    // read the `response` by converting it to the datatype `response_datatype`
    return HANDLER_SUCCESS;
}
void to_filter(
    SESSION_T *session,
    char *message_path,
    char *message,
    char *session_filter)
{
    BUF_T *message_buf = buf_create();
    write_diffusion_string_value(message, message_buf);
    SEND_REQUEST_TO_FILTER_PARAMS_T params = {
        .path = message_path,
        .filter = session_filter,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING,
        .request = message_buf,
        .on_response = on_message_response,
        .on_number_sent = on_selected_sessions_received};
    send_request_to_filter(session, params);
    buf_free(message_buf);
}
void to_session(
    SESSION_T *session,
    char *message_path,
    char *message,
    SESSION_ID_T *session_id)
{
    BUF_T *message_buf = buf_create();
    write_diffusion_string_value(message, message_buf);
    SEND_REQUEST_TO_SESSION_PARAMS_T params = {
        .recipient_session = session_id,
        .path = message_path,
        .request = message_buf,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING,
        .on_response = on_message_response};
    send_request_to_session(session, params);
    buf_free(message_buf);
}
static int on_message_request(
    SESSION_T *session,
    const char *request_path,
    DIFFUSION_DATATYPE request_datatype,
    const DIFFUSION_VALUE_T *request,
    const DIFFUSION_RESPONDER_HANDLE_T *handle,
    void *context)
{
    // read the `request` based on the received `request_datatype`
    char *request_val;
    read_diffusion_string_value(request, &request_val, NULL);
    printf("Received message: %s\n", request_val);
    free(request_val);
    // create the response
    BUF_T *response_buf = buf_create();
    write_diffusion_string_value("This is my response", response_buf);
    // and respond to the request
    diffusion_respond_to_request(session, handle, response_buf, NULL);
    buf_free(response_buf);
    return HANDLER_SUCCESS;
}
void register_request_stream(
    SESSION_T *session,
    char *message_path)
{
    DIFFUSION_REQUEST_STREAM_T request_stream = {.on_request = on_message_request};
    set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}
void to_path(
    SESSION_T *session,
    char *message_path,
    char *message)
{
    BUF_T *message_buf = buf_create();
    write_diffusion_string_value(message, message_buf);
    SEND_REQUEST_PARAMS_T params = {
        .path = message_path,
        .request = message_buf,
        .on_response = on_message_response,
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING};
    send_request(session, params);
    buf_free(message_buf);
}
static int on_request_handler_active(
    SESSION_T *session,
    const char *path,
    const DIFFUSION_REGISTRATION_T *registered_handler)
{
    // message path `path` is now active for `registered_handler`
    return HANDLER_SUCCESS;
}
static int on_request_received(
    SESSION_T *session,
    DIFFUSION_DATATYPE request_datatype,
    const DIFFUSION_VALUE_T *request,
    const DIFFUSION_REQUEST_CONTEXT_T *request_context,
    const DIFFUSION_RESPONDER_HANDLE_T *handle,
    void *context)
{
    // handle request received
    // and response to request with
    // `diffusion_respond_to_request(session, handle, response, NULL)`
    return HANDLER_SUCCESS;
}
void register_request_handler(
    SESSION_T *session,
    char *message_path)
{
    DIFFUSION_REQUEST_HANDLER_T request_handler = {
        .request_datatype = DATATYPE_STRING,
        .response_datatype = DATATYPE_STRING,
        .on_active = on_request_handler_active,
        .on_request = on_request_received};
    ADD_REQUEST_HANDLER_PARAMS_T params = {.path = message_path, .request_handler = &request_handler};
    add_request_handler(session, params);
}
int main(
    int argc,
    char **argv)
{
    const char *url = "ws://localhost:8080";
    const char *principal = "control";
    const char *password = "password";
    CREDENTIALS_T *credentials = credentials_create_password(password);
    SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, NULL);
    SESSION_ID_T *session_id = session_id_create_from_string("e9cfbd5be9a72622-f000000300000004");
    char *message_path = "message/path";
    char *message = "hello world";
    char *session_filter = "$Principal is 'control'";
    to_filter(session, message_path, message, session_filter);
    to_session(session, message_path, message, session_id);
    to_path(session, message_path, message);
    register_request_handler(session, message_path);
    // Sleep for a while
    sleep(5);
    // Close the session, and release resources and memory
    session_close(session, NULL);
    session_free(session);
    credentials_free(credentials);
    session_id_free(session_id);
    return EXIT_SUCCESS;
}
def callback(request: typing.Optional[str], **kwargs) -> str:
    return f"Hello there, {request}!"
class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
    func diffusionStream(_ stream: PTDiffusionStream,
                         didReceiveRequestWith json: PTDiffusionJSON,
                         responder: PTDiffusionResponder) {
        print("Received request %@", json)
        let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
        responder.respond(with: json_response)
    }
    func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
        print("Request stream failed with error %@", error.localizedDescription)
    }
    func diffusionDidClose(_ stream: PTDiffusionStream) {
        print("Request stream is now closed")
    }
}
Add the request stream against a message path. You can only add one request stream for each message path.
session.messages.setRequestStream("foo", handler, diffusion.datatypes.json(), diffusion.datatypes.json(), );
/**
 * Copyright © 2021 - 2023 DiffusionData Ltd.
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 * http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Factories;
namespace PushTechnology.ClientInterface.Example {
    /// <summary>
    /// Client implementation that registers a handler to listen for messages on a path.
    /// </summary>
    public sealed class ReceivingSessionRequestMessages {
        public void ReceivingSessionRequestMessagesExample(string serverUrl) {
            var session = Diffusion.Sessions.Principal( "client" ).Password( "password" ).Open( serverUrl );
            var messaging = session.Messaging;
            string messagingPath = ">random/requestResponse";
            var requestStream = new SimpleSessionMessagesRequestStream();
            messaging.SetRequestStream( messagingPath, requestStream );
            try
            {
                Thread.Sleep( 60000 ); // wait for messages...
            }
            finally
            {
                // Close session
                messaging.RemoveRequestStream( messagingPath );
                session.Close();
            }
        }
    }
}
messaging.setRequestStream("foo", JSON.class, JSON.class, new JSONRequestStream());
static int on_message_request(
    SESSION_T *session,
    const char *request_path,
    DIFFUSION_DATATYPE request_datatype,
    const DIFFUSION_VALUE_T *request,
    const DIFFUSION_RESPONDER_HANDLE_T *handle,
    void *context)
{
    // read the `request` based on the received `request_datatype`
    char *request_val;
    read_diffusion_string_value(request, &request_val, NULL);
    printf("Received message: %s\n", request_val);
    free(request_val);
    // create the response
    BUF_T *response_buf = buf_create();
    write_diffusion_string_value("This is my response", response_buf);
    // and respond to the request
    diffusion_respond_to_request(session, handle, response_buf, NULL);
    buf_free(response_buf);
    return HANDLER_SUCCESS;
}
void register_request_stream(
    SESSION_T *session,
    char *message_path)
{
    DIFFUSION_REQUEST_STREAM_T request_stream = {.on_request = on_message_request};
    set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}
# Register handler to receive the request
handler = RequestHandler(
    callback,
    request_type=request_type,
    response_type=request_type
)
session.messaging.add_stream_handler(path, handler=handler, addressed=True)
let stream_delegate = JSONRequestStreamDelegate() let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate) session.messaging.setRequestStream(request_stream, forPath: message_path)