Sending request messages to a session
A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.
- A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
- The client session receives the request message through a request stream.
- The client session uses a responder to send a response to the request message.
- The control client session receives the response.
Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.
Sending a request to a session
Required permissions: permission for the specified message path and permission
Usually, it is a control client session in your organization's backend that sends messages directly to other sessions.
- The session ID of the client session to send the request to
- The message path to send the request and receive the response through
- The request message
- The datatype of the request message
- The datatype of the response message
session.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Session;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that sends a request message to a filter,
/// then sends another request directly to the session, and displays the response.
/// </summary>
public sealed class SendingSessionRequestMessages {
private readonly string messagingPath = ">random/requestResponse";
public async Task SendingSessionRequestMessagesExample(string serverUrl) {
var session = Diffusion.Sessions.Principal( "control" ).Password( "password" )
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open(serverUrl);
var messaging = session.Messaging;
var requestCallback = new RequestCallback();
// Filter messaging is used to get the session ID for this example
int requestsSent = await messaging.SendRequestToFilterAsync(
"$Principal EQ 'client'",
messagingPath,
"Hello?",
requestCallback);
Thread.Sleep( 1000 );
// Send message to a session using obtained session ID
string response = await messaging.SendRequestAsync<string, string>(
requestCallback.SessionId, messagingPath, "Time");
WriteLine($"Received response: '{response}'.");
// Close the session
session.Close();
}
/// <summary>
/// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed.
/// </summary>
private class RequestCallback : IFilteredRequestCallback<string> {
public ISessionId SessionId { get; private set; }
/// <summary>
/// Indicates that a response message was received.
/// </summary>
public void OnResponse(ISessionId sessionId, string response) => SessionId = sessionId;
/// <summary>
/// Indicates that a error response message was received.
/// </summary>
public void OnResponseError( ISessionId sessionId, Exception exception )
=> WriteLine( $"Response error received from session {sessionId}: '{exception}'." );
}
}
}
//Establish client session and control session
final Session control = Diffusion.sessions()
.principal("control")
.password("password")
.open("ws://localhost:8080");
final Session client = Diffusion.sessions()
.principal("client")
.password("password")
.open("ws://localhost:8080");
//Obtain the Messaging and MessagingControl features
final Messaging messagingControl = control.feature(Messaging.class);
final Messaging messaging = client.feature(Messaging.class);
//Create a JSON object to send as a request
final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\"");
//Create a local request stream for the client to receive direct requests from the control session
messaging.setRequestStream("foo", JSON.class, JSON.class, new JSONRequestStream());
//Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received.
final JSON response = messagingControl.sendRequest(
client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);
SEND_REQUEST_TO_SESSION_PARAMS_T params = {
.recipient_session = session_id,
.path = message_path,
.request = message_buf,
.request_datatype = DATATYPE_STRING,
.response_datatype = DATATYPE_STRING,
.on_response = on_message_response};
send_request_to_session(session, params);
buf_free(message_buf);
# Sending the request and receiving the response.
print(f"Sending request: '{request}' to session {session_id}...")
try:
response = await session.messaging.send_request_to_session(
path=path, session_id=session_id, request=request_type(request)
)
except diffusion.DiffusionError as ex:
print(f"ERROR: {ex}")
else:
print(f"... received response '{response}'")
let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request
session.messaging.send(json_request,
to: session_id,
path: message_path,
jsonCompletionHandler: { (response: PTDiffusionJSON?, error) in
if (error != nil) {
print("Failed to send message to %@. Error: %@", message_path, error!.localizedDescription)
}
else {
print("Received response: %@", response!)
}
})
Responding to messages sent to a session
Required permissions: for the specified message path
Define a request stream to receive and respond to request messages that have a specific data type.
const handler = {
onRequest : function(request, context, responder) {
/// ...
responder.respond(response);
},
onError : (error) => {
// an error occured
},
onClose : () => {
// the handler is closed
}
}
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// A simple IRequestStream implementation that prints confirmation of the actions completed.
/// </summary>
internal class SimpleSessionMessagesRequestStream : IRequestStream<string, string> {
/// <summary>
/// Indicates that the request stream was closed.
/// </summary>
public void OnClose()
=> WriteLine( "A request handler was closed." );
/// <summary>
/// Indicates that the request stream has received error.
/// </summary>
public void OnError( ErrorReason errorReason )
=> WriteLine( $"A request handler has received error: {errorReason}." );
/// <summary>
/// Indicates that a request was received and responds to it.
/// </summary>
/// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks>
public void OnRequest( string path, string request, IResponder<string> responder ) {
if ( request == "Hello?" ) { // message to the filter to obtain the session ID
responder.Respond( "Yes" );
} else {
WriteLine( $"Received request: '{request}'." );
responder.Respond( DateTime.UtcNow.ToLongTimeString() );
}
}
}
}
private static final class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> {
@Override
public void onRequest(String path, JSON request, Responder<JSON> responder) {
final JSON response = Diffusion.dataTypes().json().fromJsonString("\"world\"");
responder.respond(response);
}
@Override
public void onClose() {
}
@Override
public void onError(ErrorReason errorReason) {
}
}
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
static int on_selected_sessions_received(
int number_sent,
void *context)
{
// `number_sent` is the number of sessions selected by the `session_filter`
return HANDLER_SUCCESS;
}
static int on_message_response(
DIFFUSION_DATATYPE response_datatype,
const DIFFUSION_VALUE_T *response,
void *context)
{
// read the `response` by converting it to the datatype `response_datatype`
return HANDLER_SUCCESS;
}
void to_filter(
SESSION_T *session,
char *message_path,
char *message,
char *session_filter)
{
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);
SEND_REQUEST_TO_FILTER_PARAMS_T params = {
.path = message_path,
.filter = session_filter,
.request_datatype = DATATYPE_STRING,
.response_datatype = DATATYPE_STRING,
.request = message_buf,
.on_response = on_message_response,
.on_number_sent = on_selected_sessions_received};
send_request_to_filter(session, params);
buf_free(message_buf);
}
void to_session(
SESSION_T *session,
char *message_path,
char *message,
SESSION_ID_T *session_id)
{
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);
SEND_REQUEST_TO_SESSION_PARAMS_T params = {
.recipient_session = session_id,
.path = message_path,
.request = message_buf,
.request_datatype = DATATYPE_STRING,
.response_datatype = DATATYPE_STRING,
.on_response = on_message_response};
send_request_to_session(session, params);
buf_free(message_buf);
}
static int on_message_request(
SESSION_T *session,
const char *request_path,
DIFFUSION_DATATYPE request_datatype,
const DIFFUSION_VALUE_T *request,
const DIFFUSION_RESPONDER_HANDLE_T *handle,
void *context)
{
// read the `request` based on the received `request_datatype`
char *request_val;
read_diffusion_string_value(request, &request_val, NULL);
printf("Received message: %s\n", request_val);
free(request_val);
// create the response
BUF_T *response_buf = buf_create();
write_diffusion_string_value("This is my response", response_buf);
// and respond to the request
diffusion_respond_to_request(session, handle, response_buf, NULL);
buf_free(response_buf);
return HANDLER_SUCCESS;
}
void register_request_stream(
SESSION_T *session,
char *message_path)
{
DIFFUSION_REQUEST_STREAM_T request_stream = {.on_request = on_message_request};
set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}
void to_path(
SESSION_T *session,
char *message_path,
char *message)
{
BUF_T *message_buf = buf_create();
write_diffusion_string_value(message, message_buf);
SEND_REQUEST_PARAMS_T params = {
.path = message_path,
.request = message_buf,
.on_response = on_message_response,
.request_datatype = DATATYPE_STRING,
.response_datatype = DATATYPE_STRING};
send_request(session, params);
buf_free(message_buf);
}
static int on_request_handler_active(
SESSION_T *session,
const char *path,
const DIFFUSION_REGISTRATION_T *registered_handler)
{
// message path `path` is now active for `registered_handler`
return HANDLER_SUCCESS;
}
static int on_request_received(
SESSION_T *session,
DIFFUSION_DATATYPE request_datatype,
const DIFFUSION_VALUE_T *request,
const DIFFUSION_REQUEST_CONTEXT_T *request_context,
const DIFFUSION_RESPONDER_HANDLE_T *handle,
void *context)
{
// handle request received
// and response to request with
// `diffusion_respond_to_request(session, handle, response, NULL)`
return HANDLER_SUCCESS;
}
void register_request_handler(
SESSION_T *session,
char *message_path)
{
DIFFUSION_REQUEST_HANDLER_T request_handler = {
.request_datatype = DATATYPE_STRING,
.response_datatype = DATATYPE_STRING,
.on_active = on_request_handler_active,
.on_request = on_request_received};
ADD_REQUEST_HANDLER_PARAMS_T params = {.path = message_path, .request_handler = &request_handler};
add_request_handler(session, params);
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session = session_create(url, principal, credentials, NULL, NULL, NULL);
SESSION_ID_T *session_id = session_id_create_from_string("e9cfbd5be9a72622-f000000300000004");
char *message_path = "message/path";
char *message = "hello world";
char *session_filter = "$Principal is 'control'";
to_filter(session, message_path, message, session_filter);
to_session(session, message_path, message, session_id);
to_path(session, message_path, message);
register_request_handler(session, message_path);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
session_id_free(session_id);
return EXIT_SUCCESS;
}
def callback(request: typing.Optional[str], **kwargs) -> str:
return f"Hello there, {request}!"
class JSONRequestStreamDelegate: PTDiffusionJSONRequestStreamDelegate {
func diffusionStream(_ stream: PTDiffusionStream,
didReceiveRequestWith json: PTDiffusionJSON,
responder: PTDiffusionResponder) {
print("Received request %@", json)
let json_response = try! PTDiffusionJSON(jsonString: "{\"greetings\": \"stranger\"}").response
responder.respond(with: json_response)
}
func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) {
print("Request stream failed with error %@", error.localizedDescription)
}
func diffusionDidClose(_ stream: PTDiffusionStream) {
print("Request stream is now closed")
}
}
Add the request stream against a message path. You can only add one request stream for each message path.
session.messages.setRequestStream("foo", handler, diffusion.datatypes.json(), diffusion.datatypes.json(), );
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Threading;
using PushTechnology.ClientInterface.Client.Factories;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that registers a handler to listen for messages on a path.
/// </summary>
public sealed class ReceivingSessionRequestMessages {
public void ReceivingSessionRequestMessagesExample(string serverUrl) {
var session = Diffusion.Sessions.Principal( "client" ).Password( "password" ).Open( serverUrl );
var messaging = session.Messaging;
string messagingPath = ">random/requestResponse";
var requestStream = new SimpleSessionMessagesRequestStream();
messaging.SetRequestStream( messagingPath, requestStream );
try
{
Thread.Sleep( 60000 ); // wait for messages...
}
finally
{
// Close session
messaging.RemoveRequestStream( messagingPath );
session.Close();
}
}
}
}
messaging.setRequestStream("foo", JSON.class, JSON.class, new JSONRequestStream());
static int on_message_request(
SESSION_T *session,
const char *request_path,
DIFFUSION_DATATYPE request_datatype,
const DIFFUSION_VALUE_T *request,
const DIFFUSION_RESPONDER_HANDLE_T *handle,
void *context)
{
// read the `request` based on the received `request_datatype`
char *request_val;
read_diffusion_string_value(request, &request_val, NULL);
printf("Received message: %s\n", request_val);
free(request_val);
// create the response
BUF_T *response_buf = buf_create();
write_diffusion_string_value("This is my response", response_buf);
// and respond to the request
diffusion_respond_to_request(session, handle, response_buf, NULL);
buf_free(response_buf);
return HANDLER_SUCCESS;
}
void register_request_stream(
SESSION_T *session,
char *message_path)
{
DIFFUSION_REQUEST_STREAM_T request_stream = {.on_request = on_message_request};
set_request_stream(session, message_path, DATATYPE_STRING, DATATYPE_STRING, &request_stream);
}
# Register handler to receive the request
handler = RequestHandler(
callback,
request_type=request_type,
response_type=request_type
)
session.messaging.add_stream_handler(path, handler=handler, addressed=True)
let stream_delegate = JSONRequestStreamDelegate() let request_stream = PTDiffusionJSON.requestStream(with: stream_delegate) session.messaging.setRequestStream(request_stream, forPath: message_path)