Sending request messages to a session
A client session can send a request message containing typed data directly to a client session. The receiving client session can then send a response message containing typed data. The request and response messages are addressed through the same message path.
- A control client session sends a request message to a client session, specifying the message path to send the message through and the session ID of the client session to send the request message to.
- The client session receives the request message through a request stream.
- The client session uses a responder to send a response to the request message.
- The control client session receives the response.
Both the request message and the response message contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response message is not required to be the same data type as the request it responds to.
Sending a request to a session
Required permissions:
permission for the specified message path and permissionUsually, it is a control client session in your organization's backend that sends messages directly to other sessions.
- The session ID of the client session to send the request to
- The message path to send the request and receive the response through
- The request message
- The datatype of the request message
- The datatype of the response message
control.messages.sendRequest('foo', 'Hello client', session_id, diffusion.datatypes.json(), diffusion.datatypes.json())
[session.messagingControl sendRequest:[PTDiffusionPrimitive requestWithLongLong:42] toSessionId:sessionId path:message_path int64NumberCompletionHandler:^(NSNumber *response, NSError* error) { if (error) { NSLog(@"Failed to send to %@. Error: %@", message_path, error); } else { NSLog(@"Received response: %@", response); } }];
//Establish client session and control session final Session control = Diffusion.sessions().principal("control").password("password").open("ws://localhost:8080"); final Session client = Diffusion.sessions().principal("client").password("password").open("ws://localhost:8080"); //Obtain the Messaging and MessagingControl features final MessagingControl messagingControl = control.feature("MessagingControl.class"); final Messaging messaging = client.feature(Messaging.class); //Create a JSON object to send as a request final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\""); //Create a local request stream for the client to receive direct requests from the control session messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream); //Send the request to a message path "foo" and wait for (at most) 5 seconds until the response is received. final JSON response = messagingControl.sendRequest(client.getSessionId(), "foo", request, JSON.class, JSON.class).get(5, TimeUnit.SECONDS);
Responding to messages sent to a session
Required permissions:
for the specified message pathDefine a request stream to receive and respond to request messages that have a specific data type.
var handler = { onRequest : function(request, context, responder) { .... responder.respond(response); }, onError : function(error) {}, onClose : function() {} }
@interface NumberRequestStreamDelegate : NSObject<PTDiffusionNumberRequestStreamDelegate> @end @implementation NumberRequestStreamDelegate - (void) diffusionStream:(nonnull PTDiffusionStream *)stream didReceiveRequestWithNumber:(nullable NSNumber *)number responder:(nonnull PTDiffusionResponder *)responder { // Do something when a request is received. } - (void)diffusionStream:(nonnull PTDiffusionStream *)stream didFailWithError:(nonnull NSError *)error { // Do something if the stream fails. } - (void)diffusionDidCloseStream:(nonnull PTDiffusionStream *)stream { // Do something if the stream closes. }
private final class JSONRequestStream implements Messaging.RequestStream<JSON, JSON> { @Override public void onClose() { .... } @Override public void onError(ErrorReason errorReason) { .... } @Override public void onRequest(String path, JSON request, Responder<JSON> responder) { .... } }
Add the request stream against a message path. You can only add one request stream for each message path.
control.messages.setRequestStream("foo", diffusion.datatypes.json(), diffusion.datatypes.json(), request_stream);
// Ensure to maintain a strong reference to your request stream as it // is referenced weakly by the Diffusion client library. NumberRequestStreamDelegate *delegate = [NumberRequestStreamDelegate new]; PTDiffusionRequestStream *requestStream = [PTDiffusionPrimitive int64RequestStreamWithDelegate:delegate]; [session.messaging setRequestStream:requestStream forPath:message_path];
messaging.setRequestStream("foo", JSON.class, JSON.class, requestStream);
One-way messaging to a session
Diffusion™ Cloud also provides a legacy capability to send one-way messages to a session. This message is not typed. The receiving session cannot respond directly to this message.
- The session ID of the client session to send the message to
- The message path to send the message through
- The message content
- Any additional options, such as headers or a message priority
session.messages.send('message_path', content, session_id);
[session.messagingControl sendToSessionId:sessionId path:message_path message:[[PTDiffusionBytes alloc] initWithData:data] completionHandler:^(NSError* error) { if (error) { NSLog(@"Failed to send to %@. Error: %@", message_path, error); } else { NSLog(@"Sent"); } }];
session.feature(MessagingControl.class).send(session_id, message_path, content, send_callback);
session.MessagingControl.Send( session_id, message_path, content, send_callback );
SEND_MSG_TO_SESSION_PARAMS_T params = { .topic_path = message_path, .session_id = *session_id, .content = *content, .options.headers = headers, .options.priority = CLIENT_SEND_PRIORITY_NORMAL, .on_send = on_send, .context = context }; /* * Send the message and wait for the callback to acknowledge * delivery. */ send_msg_to_session(session, params);
To receive a message sent directly to a client session, that client session must add a message stream to receive messages sent through that message path:
// Create with a default listener function session.messages.listen('message_path', function(message) { // Do something with the message });
@interface MessageStreamDelegate : NSObject <PTDiffusionMessageStreamDelegate> @end @implementation MessageStreamDelegate -(void) diffusionStream:(PTDiffusionStream *)stream didReceiveMessageOnTopicPath:(NSString *)path content:(PTDiffusionContent *)content context:(PTDiffusionReceiveContext *)context { // Do something when a message is received. } -(void)diffusionDidCloseStream:(PTDiffusionStream *)stream { // Do something if the stream closes. } -(void)diffusionStream:(PTDiffusionStream *)stream didFailWithError:(NSError *)error { // Do something if the stream fails. } // ... in later code // Ensure to maintain a strong reference to your message stream delegate as it // is referenced weakly by the Diffusion client library. MessageStreamDelegate *const delegate = [MessageStreamDelegate new]; // Create a locally evaluated topic selector specifying the messaging paths that // should be captured by the stream. PTDiffusionTopicSelector *const topicSelector = [PTDiffusionTopicSelector topicSelectorWithExpression:message_path]; // Use the Messaging feature to add a local stream using your delegate against // the topic selector. [session.messaging addMessageStreamWithSelector:topicSelector delegate:delegate];
session.feature(Messaging.class).addMessageStream(message_path, stream);
session.Messaging.AddMessageStream( message_path, stream );
/* * Register a listener for messages on the given path. */ MSG_LISTENER_REGISTRATION_PARAMS_T listener_params = { .topic_path = message_path, .listener = on_stream_message, .context = context }; register_msg_listener(session, listener_params);
You can also add a fallback message stream to receive messages sent through any message path that does not have a stream add against it:
// Ensure to maintain a strong reference to your message stream delegate as it // is referenced weakly by the Diffusion client library. MessageStreamDelegate *const delegate = [MessageStreamDelegate new]; // Use the Messaging feature to add a local fallback stream using your delegate. [session.messaging addFallbackMessageStreamWithDelegate:delegate];
messaging.addFallbackMessageStream(message_stream);
session.Messaging.AddFallbackMessageStream( stream );
/* * Register a listener for any other messages. * (.topic_path is NULL). */ MSG_LISTENER_REGISTRATION_PARAMS_T global_listener_params = { .listener = on_stream_message, .context = context }; register_msg_listener(session, global_listener_params);
To respond to this message, the receiving client session sends a separate message to the message path through which the received message was sent. For more information, see One-way messaging to a path. However, if multiple client sessions have added messages handlers on this message path, the one-way message sent in response is not guaranteed to be received by the client session that sent the original one-way message.
This page last modified: 2016/11/01