Skip to content

Request-Response Messaging

This package provides a client session with request-response messaging capabilities that can be used to implement application services.

Request-response messaging allows a session to send requests to other sessions. Each receiving session provides a corresponding response, which is returned to the sending session. Each request and response carries an application provided value.

Types of Messages

There are three mechanisms to specify the recipient(s) of a message:

  • Send to path: The messages of this type are sent without specifying the receiving session. The server will forward an unaddressed message to any session which was registered as a handler for the path indicated in the message.

  • Send to session: Messages can be sent to a specific session by specifying its ID. To be able to respond to messages addressed to itself, a session still needs to have an internal handler.

  • Send to filter: Messages can also be addressed using a session filter. When receiving such a message, the server will locate all the currently active sessions which match the filter's conditions, and respond to the sender with the number of located sessions; the sender can expect up to that many responses, so it needs to have a handler ready to handle those responses.

diffusion.messaging

Request-response messaging functionality.

ValueType_Bound module-attribute

ValueType_Bound = typing.Optional[
    typing.Union[JsonTypes_Bound, ValueTypeProtocol]
]

The upper bound of all request and response types.

J_In module-attribute

J_In = TypeVar(
    "J_In", bound=ValueType_Bound, contravariant=True
)

A value to be processed by a Callback.

J_Out_MaybeAwaitable module-attribute

J_Out_MaybeAwaitable = TypeVar(
    "J_Out_MaybeAwaitable",
    bound=typing.Union[
        ValueType_Bound,
        typing_extensions.Awaitable[ValueType_Bound],
    ],
    covariant=True,
)

A value to be returned by a Callback.

J_In_contra module-attribute

J_In_contra = typing_extensions.TypeVar(
    "J_In_contra",
    bound=ValueType_Bound,
    contravariant=True,
    default=Any,
)

A value to be processed by a Callback (contravariant).

J_Out_contra module-attribute

J_Out_contra = typing_extensions.TypeVar(
    "J_Out_contra",
    bound=ValueType_Bound,
    contravariant=True,
    default=Any,
)

A value to be returned by a Callback (contravariant).

TypeName_In module-attribute

TypeName_In = typing_extensions.TypeVar(
    "TypeName_In",
    bound=TypeName_Bound,
    default=TypeName_Bound,
)

The [TypeName][diffusion.datatypes.foundation.types.TypeName] of the incoming request.

TypeName_Out module-attribute

TypeName_Out = typing_extensions.TypeVar(
    "TypeName_Out",
    bound=TypeName_Bound,
    default=TypeName_Bound,
)

The [TypeName][diffusion.datatypes.foundation.types.TypeName] of the outgoing response.

TypeCode_In module-attribute

TypeCode_In = typing_extensions.TypeVar(
    "TypeCode_In",
    bound=TypeCode_Bound,
    default=TypeCode_Bound,
)

The [TypeCode][diffusion.datatypes.foundation.types.TypeCode] of the incoming request.

TypeCode_Out module-attribute

TypeCode_Out = typing_extensions.TypeVar(
    "TypeCode_Out",
    bound=TypeCode_Bound,
    default=TypeCode_Bound,
)

The [TypeCode][diffusion.datatypes.foundation.types.TypeCode] of the outgoing response.

RequestContext

Bases: typing.TypedDict

sender_session_id instance-attribute

sender_session_id: typing_extensions.NotRequired[SessionId]

The Session ID

path instance-attribute

The path

session_properties instance-attribute

Sesssion Properties of the session

Callback

Bases: Protocol[J_In, J_Out_MaybeAwaitable]

__call__

__call__(
    request: J_In,
    **kwargs: typing_extensions.Unpack[RequestContext]
) -> J_Out_MaybeAwaitable

Called when request is received

Parameters:

Name Type Description Default
request J_In

The received request

required
kwargs typing_extensions.Unpack[RequestContext]

Request context information

{}

Messaging

Bases: Component

Messaging component.

It is not supposed to be instantiated independently; an instance is available on each Session instance as session.messaging.

This feature provides a client session with request-response messaging capabilities that can be used to implement application services.

Request-response messaging allows a session to send requests to other sessions. Each receiving session provides a corresponding response, which is returned to the sending session. Each request and response carries an application provided value.

The method used to send a request determines which sessions will receive it. Each request is routed using the provided message path – an application provided string. Two addressing schemes are provided: unaddressed requests and addressed requests.

Unaddressed requests

A session can provide an application service by implementing a handler and registering it with the server. This is somewhat similar to implementing a REST service, except that interactions between the sender and receiver are asynchronous.

Unaddressed requests sent using send_request_async() are routed by the server to a handler that has been pre-registered by another session, and matches the message path.

Handlers are registered with add_request_handler_async(). Each session may register at most one handler for a given message path. Optionally, one or more session property names can be provided (see ISession for a full description of session properties), in which case the values of the session properties for each recipient session will be returned along with its response. To add a request handler, the control client session must have REGISTER_HANDLER permission. If registering to receive session property values, the session must also have VIEW_SESSION permission.

Routing works as follows:

  1. The session sends the request (with send_request_async()) providing the message path, the request value and data type, and the expected response type.
  2. The server uses the message path to apply access control. The sender must have the SEND_TO_MESSAGE_HANDLER path permission for the message path, or the request will be rejected.
  3. The server uses the message path to select a pre-registered handler and route the request to the appropriate recipient session. The server will consider all registered handlers and select one registered for the most specific path. If multiple sessions have registered a handler registered for a path, one will be chosen arbitrarily. If there is no registered handler matching the message path, the request will be rejected.
  4. Otherwise, the server forwards the request to one of the sessions registered to handle the message path. The message path is also passed to the recipient session, providing a hierarchical context.
  5. The recipient session processes the request and returns a response to the server, which forwards the response to the sending session.

Registration works across a cluster of servers. If no matching handler is registered on the server to which the sending session is connected, the request will be routed to another server in the cluster that has one.

Addressed requests

Addressed requests provide a way to perform actions on a group of sessions, or to notify sessions of one-off events (for repeating streams of events, use a topic instead).

An addressed request can be sent to a set of sessions using send_request_to_filter_async(). For the details of session filters, see ISession. Sending a request to a filter will match zero or more sessions. Each response received will be passed to the provided IFilteredRequestCallback. As a convenience, an addressed request can be sent to a specific session using the overloaded variant of send_request_async() that accepts a session id.

Sending an addressed request requires SEND_TO_SESSION permission.

If the sending session is connected to a server belonging to a cluster, the recipient sessions can be connected to other servers in the cluster. The filter will be evaluated against all sessions hosted by the cluster.

To receive addressed requests, a session must set up a local request stream to handle the specific message path, using set_request_stream(). When a request is received for the message path, the on_request() method on the stream is triggered. The session should respond using the provided respond() method call. Streams receive an on_close() callback when unregistered and an on_error() callback if the session is closed.

If a request is sent to a session that does not have a matching stream for the message path, an error will be returned to the sending session.

Accessing the feature

Obtain this feature from an ISession as follows:

messaging = session.messaging

Since

6.6

add_stream_handler

add_stream_handler(
    path: str,
    handler: RequestHandler,
    addressed: bool = False,
) -> None

Registers a request stream handler.

The handler is invoked when the session receives a request sent to the given path or session filter.

Parameters:

Name Type Description Default
path str

The handler will respond to the requests to this path.

required
handler RequestHandler

A Handler instance to handle the request.

required
addressed bool

True to handle the requests addressed to the session's ID or using a session filter; False for unaddressed requests.

False

add_filter_response_handler

add_filter_response_handler(
    session_filter: str, handler: Handler
) -> None

Registers a session filter response handler.

The handler is invoked when the session receives a response to a request sent to the session filter.

Parameters:

Name Type Description Default
session_filter str

A session filtering query.

required
handler Handler

A Handler instance to handle the request.

required

send_request_to_path async

send_request_to_path(
    path: str,
    request: DataType,
    response_type: Optional[DataTypeArgument] = None,
) -> Optional[DataType]

Send a request to sessions based on a path.

Parameters:

Name Type Description Default
path str

The path to send a request to.

required
request DataType

The request to be sent, wrapped into the required DataType class.

required
response_type Optional[DataTypeArgument]

The type to convert the response to. If omitted, it will be the same as the request's data type.

None

Returns:

Type Description
Optional[DataType]

The response value of the provided response_type.

add_request_handler async

add_request_handler(
    path: str,
    handler: RequestHandler,
    session_properties: Collection[str] = (),
) -> None

Register the session as the handler for requests to a given path.

This method is used to inform the server that any unaddressed requests to the given path should be forwarded to the active session. The handler to these requests is added at the same time, using add_stream_handler internally.

Parameters:

Name Type Description Default
path str

The handler will respond to the requests to this path.

required
handler RequestHandler

A callable to handle the request.

required
session_properties Collection[str]

A list of keys of session properties that should be supplied with each request. To request all fixed properties include ALL_FIXED_PROPERTIES as a key; any other fixed property keys will be ignored. To request all user properties include ALL_USER_PROPERTIES as a key; any other user properties will be ignored.

()

send_request_to_filter async

send_request_to_filter(
    session_filter: StrictStr,
    path: StrictStr,
    request: DataType,
) -> int

Send a request to other sessions, specified by the filter.

Parameters:

Name Type Description Default
session_filter StrictStr

A session filtering query.

required
path StrictStr

The path to send a request to.

required
request DataType

The request to be sent, wrapped into the required DataType class.

required

Returns:

Type Description
int

The number of sessions that correspond to the filter, which is the number of responses that can be expected. When each of the responses is received, the handler registered for the filter will be executed.

send_request_to_session async

send_request_to_session(
    path: str,
    session_id: SessionId,
    request: DataType,
    response_type: Optional[DataTypeArgument] = None,
) -> Optional[DataType]

Send a request to a single session.

Parameters:

Name Type Description Default
path str

The path to send a request to.

required
session_id SessionId

The ID of the session to send the request to.

required
request DataType

The request to be sent, wrapped into the required DataType class.

required
response_type Optional[DataTypeArgument]

The type to convert the response to. If omitted, it will be the same as the request's data type.

None

Returns:

Type Description
Optional[DataType]

The response value of the provided response_type.

RequestHandler

Bases: Handler, Generic[J_In_contra, J_Out_contra, TypeCode_In, TypeName_In, TypeCode_Out, TypeName_Out]

Class which specifies a request handler to receive request notifications.

__init__

Initialise the RequestHandler.

Parameters:

Name Type Description Default
callback typing.Union[Callback[J_In_contra, typing_extensions.Awaitable[J_Out_contra]], Callback[J_In_contra, J_Out_contra]]

the callback to call with the raw value of the request

required
request_type typing.Union[TypeName_In, TypeCode_In, Type[IBytes[Any, Any, J_In_contra]], Type[HasRawTypes[J_In_contra]], TopicSpecification[IBytes[Any, Any, J_In_contra]], TopicSpecification[ValueTypeProtocolSpecific[J_In_contra]]]

the Diffusion datatype of the request

required
response_type typing.Union[TypeName_Out, TypeCode_Out, Type[IBytes[Any, Any, J_Out_contra]], Type[HasRawTypesOrMore[J_Out_contra]], TopicSpecification[IBytes[Any, Any, J_Out_contra]], TopicSpecification[ValueTypeProtocolSpecificOrMore[J_Out_contra]]]

the Diffusion datatype of the response

required

handle async

handle(
    event: str = "request", **kwargs
) -> ValueTypeProtocolWithCodeAndName[
    J_Out_contra, TypeCode_Out, TypeName_Out
]

Execute the callback.

MessagingError

Bases: DiffusionError

The generic messaging error.

InvalidFilterError

Bases: SessionError

The exception used to report a filter expression is invalid.