Sending request messages to a session
A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.
![A control client session on the left. Diffusion in the centre. Another client session on the right. An arrow representing the request message goes from the control client session through a shape representing the message path inside the Diffusion server and continues to the other client session. An arrow representing the response message goes from the receiving client session back through the message path on the Diffusion server to the control client session.](rr_send_to_session.png)
- A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
- The client session receives the request message through a request stream.
- The client session uses a responder to send a response to the request message.
- The control client session receives the response.
Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.
Sending a request to a session
Required permissions:
permission for the specified message path and permissionUsually, it is a control client session in your organization's backend that sends messages directly to other sessions.
- The session ID of the client session to send the request to
- The message path to send the request and receive the response through
- The request message
- The datatype of the request message
- The datatype of the response message
control.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Session; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that sends a request message to a filter, /// then sends another request directly to the session, and displays the response. /// </summary> public sealed class SendingSessionRequestMessages { private readonly string messagingPath = ">random/requestResponse"; public SendingSessionRequestMessages(string serverUrl) { string serverUrl = args[ 0 ]; var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var messaging = session.Messaging; var requestCallback = new RequestCallback(); // Filter messaging is used to get the session ID for this example int requestsSent = await messaging.SendRequestToFilterAsync( "$Principal EQ 'client'", messagingPath, "Hello?", requestCallback); Thread.Sleep( 1000 ); // Send message to a session using obtained session ID string response = await messaging.SendRequestAsync<string, string>( requestCallback.SessionId, messagingPath, "Time"); WriteLine($"Received response: '{response}'."); // Close the session session.Close(); } /// <summary> /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed. /// </summary> private class RequestCallback : IFilteredRequestCallback<string> { public ISessionId SessionId { get; private set; } /// <summary> /// Indicates that the stream was closed. /// </summary> public void OnClose() => WriteLine( "A request handler was closed." ); /// <summary> /// Indicates error received by the callback. /// </summary> public void OnError( ErrorReason errorReason ) => WriteLine( $"A request handler has received error: '{errorReason}'." ); /// <summary> /// Indicates that a response message was received. /// </summary> public void OnResponse(ISessionId sessionId, string response) => SessionId = sessionId; /// <summary> /// Indicates that a error response message was received. /// </summary> public void OnResponseError( ISessionId sessionId, Exception exception ) => WriteLine( $"Response error received from session {sessionId}: '{exception}'." ); } } }
//Establish client session and control session final Session control = Diffusion.sessions().principal("control").password("password").open("ws://localhost:8080"); final Session client = Diffusion.sessions().principal("client").password("password").open("ws://localhost:8080"); //Obtain the Messaging and MessagingControl features final MessagingControl messagingControl = control.feature("MessagingControl.class"); final Messaging messaging = client.feature(Messaging.class); //Create a JSON object to send as a request final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\""); //Create a local request stream for the client to receive direct requests from the control session messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream); //Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received. final JSON response = messagingControl.sendRequest(client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
# Sending the request and receiving the response. print(f"Sending request: '{request}' to session {session_id}...") try: response = await session.messaging.send_request_to_session( path=path, session_id=session_id, request=request_type(request) ) except diffusion.DiffusionError as ex: print(f"ERROR: {ex}") else: print(f"... received response '{response}'")
[session.messagingControl sendRequest:[PTDiffusionPrimitive requestWithLongLong:42] toSessionId:sessionId path:message_path int64NumberCompletionHandler:^(NSNumber *response, NSError* error) { if (error) { NSLog(@"Failed to send to %@. Error: %@", message_path, error); } else { NSLog(@"Received response: %@", response); } }];
Responding to messages sent to a session
Required permissions:
for the specified message pathDefine a request stream to receive and respond to request messages that have a specific data type.
var handler = { onRequest : function(request, context, responder) { .... responder.respond(response); }, onError : function(error) {}, onClose : function() {} }
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// A simple IRequestStream implementation that prints confirmation of the actions completed. /// </summary> internal class SimpleRequestStream : IRequestStream<string, string> { /// <summary> /// Indicates that the request stream was closed. /// </summary> public void OnClose() => WriteLine( "A request handler was closed." ); /// <summary> /// Indicates that the request stream has received error. /// </summary> public void OnError( ErrorReason errorReason ) => WriteLine( $"A request handler has received error: {errorReason}." ); /// <summary> /// Indicates that a request was received and responds to it. /// </summary> /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks> public void OnRequest( string path, string request, IResponder<string> responder ) { if ( request == "Hello?" ) { // message to the filter to obtain the session ID responder.Respond( "Yes" ); } else { WriteLine( $"Received request: '{request}'." ); responder.Respond( DateTime.UtcNow.ToLongTimeString() ); } } } }
private final class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> { @Override public void onClose() { .... } @Override public void onError(ErrorReason errorReason) { .... } @Override public void onRequest(String path, JSON request, Responder<JSON> responder) { .... } }
def callback(request: str, **kwargs) -> str: return f"Hello there, {request}!"
@interface NumberRequestStreamDelegate : NSObject<PTDiffusionNumberRequestStreamDelegate> @end @implementation NumberRequestStreamDelegate - (void) diffusionStream:(nonnull PTDiffusionStream *)stream didReceiveRequestWithNumber:(nullable NSNumber *)number responder:(nonnull PTDiffusionResponder *)responder { // Do something when a request is received. } - (void)diffusionStream:(nonnull PTDiffusionStream *)stream didFailWithError:(nonnull NSError *)error { // Do something if the stream fails. } - (void)diffusionDidCloseStream:(nonnull PTDiffusionStream *)stream { // Do something if the stream closes. }
Add the request stream against a message path. You can only add one request stream for each message path.
control.messages.setRequestStream("foo", diffusion.datatypes.json(), diffusion.datatypes.json(), request_stream);
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that registers a handler to listen for messages on a path. /// </summary> public sealed class ReceivingSessionRequestMessages { public ReceivingSessionRequestMessages(string serverUrl) { var session = Diffusion.Sessions.Principal( "client" ).Password( "password" ).Open( serverUrl ); var messaging = session.Messaging; string messagingPath = ">random/requestResponse"; var requestStream = new SimpleRequestStream(); messaging.SetRequestStream( messagingPath, requestStream ); try { Thread.Sleep( 60000 );//wait for messages... } finally { // Close session messaging.RemoveRequestStream( messagingPath ); session.Close(); } } } }
messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream);
# Register handler to receive the request handler = RequestHandler( callback, request_type=request_type, response_type=request_type ) session.messaging.add_stream_handler(path, handler=handler, addressed=True)
// Ensure to maintain a strong reference to your request stream as it // is referenced weakly by the Diffusion client library. NumberRequestStreamDelegate *delegate = [NumberRequestStreamDelegate new]; PTDiffusionRequestStream *requestStream = [PTDiffusionPrimitive int64RequestStreamWithDelegate:delegate]; [session.messaging setRequestStream:requestStream forPath:message_path];