Sending request messages to a session filter
A client session can send a request message containing typed data directly to each client session in the set of connected client sessions that match a specified session properties filter. The receiving client sessions can then send a response message containing typed data. The request and response messages are addressed through the same message path.
For more information about session properties and how to filter connected client sessions using their properties, see Session properties and Session filtering.
- A control client session sends a request message, specifying the filter that selects the client sessions to receive the request and specifying the message path to send the message through.
- The Diffusion server evaluates the query and sends the message on to connected client sessions whose session properties match the filter
- The client sessions in the filtered set each receive the request message through a request stream.
- Each client session uses a responder to send a response to the request message.
- The control client session receives responses from each of the clients sessions specified by the filter.
The request messages and the response messages contain typed values. The messages can contain data of one of the following types: JSON, binary, string, 64-bit integer, or double. The response messages are not required to be the same data type as the request or as the response messages from other client sessions.
Sending a request message to a filter
Required permissions:
permission for the specified message path and permissionUsually, it is a control client session in your organization's backend that sends messages to a filter. For more information about defining a session filter, see Session filtering.
- The query to use to filter which client sessions to send the requests to
- The message path to send the request and receive the responses through
- The request message
- The datatype of the request message
- The datatype of the response message
const handler = { onResponse : (sessionID, response) => { // response received }, onResponseError : (sessionID, error) => { // the response from the session resulted in an error }, onError : (error) => { // an error occured } }; control.messages.sendRequestToFilter(filter, 'foo', 'Hello clients', handler, diffusion.datatypes.json(), diffusion.datatypes.json());
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Session; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that sends a request message to a filter and /// displays the response. /// </summary> public sealed class SendingFilterRequestMessages { private readonly string messagingPath = ">random/requestResponse"; public async Task SendingFilterRequestMessagesExample(string serverUrl) { var session = Diffusion.Sessions.Principal( "control" ).Password( "password" ) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); var messaging = session.Messaging; var requestCallback = new RequestCallback(); int requestsSent = await messaging.SendRequestToFilterAsync( "$Principal EQ 'client'", messagingPath, "Time", requestCallback ); WriteLine( $"Sent request to {requestsSent} session(s) matching the filter." ); // Close the session session.Close(); } /// <summary> /// A simple IFilteredRequestCallback implementation that prints confirmation of the actions completed. /// </summary> private class RequestCallback : IFilteredRequestCallback<string> { /// <summary> /// Indicates that a response message was received. /// </summary> public void OnResponse( ISessionId sessionId, string response ) => WriteLine( $"Received response: '{response}'." ); /// <summary> /// Indicates that a error response message was received. /// </summary> public void OnResponseError( ISessionId sessionId, Exception exception ) => WriteLine( $"Response error received from session {sessionId}: '{exception}'." ); } } }
//Establish control session final Session control = Diffusion.sessions() .principal("control") .password("password") .open("ws://localhost:8080"); //Obtain the MessagingControl feature final Messaging messagingControl = control.feature(Messaging.class); //Create a JSON object to send as a request final JSON request = Diffusion.dataTypes().json().fromJsonString("\"hello\""); //Send the request to a message path "foo", to all sessions which do not have a 'control' //principal and wait for (at most) 5 seconds until the response (number of responses) is received. final int numberOfResponses = messagingControl.sendRequestToFilter( "$Principal NE 'control'", "foo", request, JSON.class, JSON.class, new Messaging.FilteredRequestCallback.Default<>()) .get(5, TimeUnit.SECONDS);
BUF_T *message_buf = buf_create(); write_diffusion_string_value(message, message_buf); SEND_REQUEST_TO_FILTER_PARAMS_T params = { .path = message_path, .filter = session_filter, .request_datatype = DATATYPE_STRING, .response_datatype = DATATYPE_STRING, .request = message_buf, .on_response = on_message_response, .on_number_sent = on_selected_sessions_received }; send_request_to_filter(session, params); buf_free(message_buf);
# sending the request and receiving the number of expected responses print(f"Sending request: '{request}' to session filter `{session_filter}`...") try: response = await session.messaging.send_request_to_filter( session_filter=session_filter, path=path, request=request_type(request), ) except diffusion.DiffusionError as ex: print(f"ERROR while sending request to session filter: {ex}") else: print(f"... expecting {response} response(s) ...")
class JSONSessionResponseHandler: PTDiffusionJSONSessionResponseStreamDelegate { func diffusionStream(_ stream: PTDiffusionStream, didReceiveResponseWith json: PTDiffusionJSON, from sessionId: PTDiffusionSessionId) { print("Received response from %@: %@", sessionId, json) } func diffusionStream(_ stream: PTDiffusionStream, didReceiveError error: Error, from sessionId: PTDiffusionSessionId) { print("Received error from %@: %@", sessionId, error) } func diffusionStream(_ stream: PTDiffusionStream, didFailWithError error: Error) { print("Stream failed with error: %@", error.localizedDescription) } func diffusionDidClose(_ stream: PTDiffusionStream) { print("Stream is now closed") } } // create the request let json_request = try! PTDiffusionJSON(jsonString: "{\"hello\": \"world\"}").request // create the handler that will receive the response let handler = JSONSessionResponseHandler() let response_stream = PTDiffusionJSON.sessionResponseStream(with: handler) // send the message request session.messaging.send(json_request, toFilter: session_properties_filter, path: message_path, responseStream: response_stream) { (selectedSessions, error) in if (error != nil) { print("Error occurred while sending message to filter: %@", error!.localizedDescription) } else { print("Message successfully sent to %lu sessions", selectedSessions) } }
Responding to messages sent to a filter
Required permissions:
for the specified message pathTo the receiving client session, a request message sent to a filter is the same as a request message sent directly to the session. The receiving client session responds in the same way.
See Responding to messages sent to a session for details.