Sending request messages to a message path
A client session can send a request message containing typed data to a message path. One or more client sessions can register to handle messages sent to that message path. The handling client session can then send a response message containing typed data. The response message is sent to the requesting client session directly, through the same message path.
![A client session on the left. Diffusion in the centre. A control client session that has registered a handler for the message path on the right. An arrow representing the request message goes from the client session through a shape representing the message path inside the Diffusion server and continues to the handling client session. An arrow representing the response message goes from the handling client session back through the message path on the Diffusion server to the requesting client session.](rr_send_to_path.png)
- A client session sends a request message to a message path.
- The control client session receives the request message through a request handler.
- The session client session uses sends a response to the request message.
- The client session receives the response.
Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.
Sending to a message path
Required permissions:
permission for the specified message path- The message path to send the request to and receive the response through
- The request message
- The datatype of the request message
- The datatype of the response message
// Example with json topic type. var jsonType = diffusion.datatypes.json(); // Create a JSON object to send as a request. var requestJson = jsonType.from("hello"); // Send the request to a message path "foo". session.messages.sendRequest('foo', requestJson, jsonType).then(function(response) { console.log(response.get()); }, function(error) {});
/// <summary> /// Client implementation that sends request messages to a path and /// displays the response. /// </summary> public sealed class SendingPathRequestMessages { public SendingPathRequestMessages(string serverUrl) { string serverUrl = args[ 0 ]; var session = Diffusion.Sessions.Principal("control").Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var messaging = session.Messaging; string messagingPath = ">random/requestResponse"; try { string response = await messaging.SendRequestAsync<string, string>( messagingPath, "Starting chat..." ); WriteLine( $"Received response: '{response}'." ); Thread.Sleep( 1000 ); response = await messaging.SendRequestAsync<string, string>( messagingPath, "Hello!" ); WriteLine( $"Received response: '{response}'." ); } catch ( Exception e ) { WriteLine( $"Got exception: '{e.Message}'." ); } // Close the session session.Close(); } }
//Establish client sesssion final Session session = Diffusion.sessions().principal("client").password("password").open("ws://localhost:8080"); //Obtain the Messaging feature final Messaging messaging = session.feature(Messaging.class); //Create a JSON object to send as a request final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\""); //Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received. final JSON response = messaging.sendRequest("foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS)
# Sending the request and receiving the response. print(f"Sending request: '{request}' to path '{path}'...") try: response = await session.messaging.send_request_to_path( path=path, request=request_type(request) ) except diffusion.DiffusionError as ex: print(f"ERROR: {ex}") else: print(f"... received response '{response}'")
[session.messaging sendRequest:[PTDiffusionPrimitive requestWithLongLong:42] toPath:message_path int64NumberCompletionHandler:^(NSNumber *response, NSError *error) { if (error) { NSLog(@"Failed to send to %@. Error: %@", message_path, error); } else { NSLog(@"Received response: %@", response); } }];
Responding to request messages sent to a message path
Required permissions:
permission for the specified message path, permission, and permission to register to receive session property values with the request messageDefine a request handler to receive and respond to request messages that have a specific data type.
var jsonType = diffusion.datatypes.json(); var requestJson = jsonType.from({ "foo": "bar"}); var responseJson = jsonType.from({ "ying": "yang"}); // Define a request handler for json topic type var handler = { onRequest: function(request, context, responder) { responder.respond(responseJson, jsonType); }, onError: function() {}, onClose: function() {} };
/// <summary> /// A simple IRequestHandler implementation that prints confirmation of the actions completed. /// </summary> internal class SimpleRequestHandler : IRequestHandler<string, string> { /// <summary> /// Indicates that the request handler was closed. /// </summary> public void OnClose() => WriteLine( "A request handler was closed." ); /// <summary> /// Indicates that the request handler has received error. /// </summary> public void OnError( ErrorReason errorReason ) => WriteLine( $"A request handler has received error: '{errorReason}'." ); /// <summary> /// Indicates that a request was received and responds to it. /// </summary> /// <remarks>On invalid request you would call: <see cref="IResponder{TResponse}.Reject(string)"/>.</remarks> public void OnRequest( string request, IRequestContext context, IResponder<string> responder ) { WriteLine( $"Received request: '{request}'." ); responder.Respond( DateTime.UtcNow.ToLongTimeString() ); } }
private final class JSONRequestHandler implements MessagingControl.RequestHandler<JSON, JSON> { @Override public void onClose() { .... } @Override public void onError(ErrorReason errorReason) { .... } @Override public void onRequest(JSON request, RequestContext context, Responder<JSON> responder) { .... responder.respond(response); } }
def callback(request: str, **kwargs) -> str: return f"Hello there, {request}!"
@interface NumberRequestDelegate : NSObject<PTDiffusionNumberRequestDelegate> @end @implementation NumberRequestDelegate -(void)diffusionTopicTreeRegistration:(PTDiffusionTopicTreeRegistration *)registration didReceiveRequestWithNumber:(nullable NSNumber *)number context:(PTDiffusionRequestContext *)context responder:(PTDiffusionResponder *)responder; { // Do something when a request is received. } - (void)diffusionTopicTreeRegistration:(nonnull PTDiffusionTopicTreeRegistration *)registration didFailWithError:(nonnull NSError *)error { // Do something if the registration fails. } - (void)diffusionTopicTreeRegistrationDidClose:(nonnull PTDiffusionTopicTreeRegistration *)registration { // Do something if the registration closes. }
Register the request handler against a message path. You can only register one request handler against each message path.
var handler = { onRequest: function(request, context, responder) {}, onError: function() {}, onClose: function() {} }; session.messages.addRequestHandler('topic', handler);
/// <summary> /// Client implementation that registers a handler to listen for messages on a path. /// </summary> public sealed class ReceivingPathRequestMessages { public ReceivingPathRequestMessages(string serverUrl) { var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var messaging = session.Messaging; string messagingPath = ">random/requestResponse"; var requestHandler = new SimpleRequestHandler(); var requestHandlerRegistration = await messaging.AddRequestHandlerAsync( messagingPath, requestHandler ); try { Thread.Sleep( 60000 );//wait for messages... } finally { // Close session await requestHandlerRegistration.CloseAsync(); session.Close(); } } }
messagingControl.addRequestHandler(messagePath, JSON.class, JSON.class, new JSONRequestHandler());
# Register handler to receive the request handler = RequestHandler( callback, request_type=request_type, response_type=request_type ) print("Registering request handler...") try: await session.messaging.add_request_handler(path, handler=handler) except diffusion.DiffusionError as ex: print(f"ERROR: {ex}") else: print("... request handler registered")
// Ensure to maintain a strong reference to your delegate as it // is referenced weakly by the Diffusion client library. NumberRequestDelegate *const delegate = [NumberRequestDelegate new]; PTDiffusionRequestHandler *const handler = [PTDiffusionPrimitive int64RequestHandlerWithDelegate:delegate]; [session.messagingControl addRequestHandler:handler forPath:path completionHandler:^(PTDiffusionTopicTreeRegistration *registration, NSError *error) { // Check error is `nil`, indicating success. }];