Request-Response Messaging¶
This package provides a client session with request-response messaging capabilities that can be used to implement application services.
Request-response messaging allows a session to send requests to other sessions. Each receiving session provides a corresponding response, which is returned to the sending session. Each request and response carries an application provided value.
Types of Messages¶
There are three mechanisms to specify the recipient(s) of a message:
-
Send to path: The messages of this type are sent without specifying the receiving session. The server will forward an unaddressed message to any session which was registered as a handler for the path indicated in the message.
-
Send to session: Messages can be sent to a specific session by specifying its ID. To be able to respond to messages addressed to itself, a session still needs to have an internal handler.
-
Send to filter: Messages can also be addressed using a session filter. When receiving such a message, the server will locate all the currently active sessions which match the filter's conditions, and respond to the sender with the number of located sessions; the sender can expect up to that many responses, so it needs to have a handler ready to handle those responses.
diffusion.messaging ¶
Request-response messaging functionality.
ValueType_Bound
module-attribute
¶
ValueType_Bound = typing.Optional[
typing.Union[JsonTypes_Bound, ValueTypeProtocol]
]
The upper bound of all request and response types.
J_In
module-attribute
¶
J_In = TypeVar(
"J_In", bound=ValueType_Bound, contravariant=True
)
A value to be processed by a Callback.
J_Out_MaybeAwaitable
module-attribute
¶
J_Out_MaybeAwaitable = TypeVar(
"J_Out_MaybeAwaitable",
bound=typing.Union[
ValueType_Bound,
typing_extensions.Awaitable[ValueType_Bound],
],
covariant=True,
)
A value to be returned by a Callback.
J_In_contra
module-attribute
¶
J_In_contra = typing_extensions.TypeVar(
"J_In_contra",
bound=ValueType_Bound,
contravariant=True,
default=Any,
)
A value to be processed by a Callback (contravariant).
J_Out_contra
module-attribute
¶
J_Out_contra = typing_extensions.TypeVar(
"J_Out_contra",
bound=ValueType_Bound,
contravariant=True,
default=Any,
)
A value to be returned by a Callback (contravariant).
TypeName_In
module-attribute
¶
TypeName_In = typing_extensions.TypeVar(
"TypeName_In",
bound=TypeName_Bound,
default=TypeName_Bound,
)
The [TypeName][diffusion.datatypes.foundation.types.TypeName] of the incoming request.
TypeName_Out
module-attribute
¶
TypeName_Out = typing_extensions.TypeVar(
"TypeName_Out",
bound=TypeName_Bound,
default=TypeName_Bound,
)
The [TypeName][diffusion.datatypes.foundation.types.TypeName] of the outgoing response.
TypeCode_In
module-attribute
¶
TypeCode_In = typing_extensions.TypeVar(
"TypeCode_In",
bound=TypeCode_Bound,
default=TypeCode_Bound,
)
The [TypeCode][diffusion.datatypes.foundation.types.TypeCode] of the incoming request.
TypeCode_Out
module-attribute
¶
TypeCode_Out = typing_extensions.TypeVar(
"TypeCode_Out",
bound=TypeCode_Bound,
default=TypeCode_Bound,
)
The [TypeCode][diffusion.datatypes.foundation.types.TypeCode] of the outgoing response.
RequestContext ¶
sender_session_id
instance-attribute
¶
sender_session_id: typing_extensions.NotRequired[SessionId]
The Session ID
session_properties
instance-attribute
¶
session_properties: typing_extensions.NotRequired[
SessionProperties
]
Sesssion Properties of the session
Callback ¶
Bases: Protocol[J_In, J_Out_MaybeAwaitable]
__call__ ¶
__call__(
request: J_In,
**kwargs: typing_extensions.Unpack[RequestContext]
) -> J_Out_MaybeAwaitable
Called when request is received
Parameters:
Name | Type | Description | Default |
---|---|---|---|
request
|
J_In
|
The received request |
required |
kwargs
|
typing_extensions.Unpack[RequestContext]
|
Request context information |
{}
|
Messaging ¶
Bases: Component
Messaging component.
It is not supposed to be instantiated independently; an instance is available
on each Session
instance as session.messaging
.
This feature provides a client session with request-response messaging capabilities that can be used to implement application services.
Request-response messaging allows a session to send requests to other sessions. Each receiving session provides a corresponding response, which is returned to the sending session. Each request and response carries an application provided value.
The method used to send a request determines which sessions will receive it. Each request is routed using the provided message path – an application provided string. Two addressing schemes are provided: unaddressed requests and addressed requests.
Unaddressed requests¶
A session can provide an application service by implementing a handler and registering it with the server. This is somewhat similar to implementing a REST service, except that interactions between the sender and receiver are asynchronous.
Unaddressed requests sent using send_request_async() are routed by the server to a handler that has been pre-registered by another session, and matches the message path.
Handlers are registered with add_request_handler_async(). Each session may register at most one handler for a given message path. Optionally, one or more session property names can be provided (see ISession for a full description of session properties), in which case the values of the session properties for each recipient session will be returned along with its response. To add a request handler, the control client session must have REGISTER_HANDLER permission. If registering to receive session property values, the session must also have VIEW_SESSION permission.
Routing works as follows:
- The session sends the request (with send_request_async()) providing the message path, the request value and data type, and the expected response type.
- The server uses the message path to apply access control. The sender must have the SEND_TO_MESSAGE_HANDLER path permission for the message path, or the request will be rejected.
- The server uses the message path to select a pre-registered handler and route the request to the appropriate recipient session. The server will consider all registered handlers and select one registered for the most specific path. If multiple sessions have registered a handler registered for a path, one will be chosen arbitrarily. If there is no registered handler matching the message path, the request will be rejected.
- Otherwise, the server forwards the request to one of the sessions registered to handle the message path. The message path is also passed to the recipient session, providing a hierarchical context.
- The recipient session processes the request and returns a response to the server, which forwards the response to the sending session.
Registration works across a cluster of servers. If no matching handler is registered on the server to which the sending session is connected, the request will be routed to another server in the cluster that has one.
Addressed requests¶
Addressed requests provide a way to perform actions on a group of sessions, or to notify sessions of one-off events (for repeating streams of events, use a topic instead).
An addressed request can be sent to a set of sessions using send_request_to_filter_async(). For the details of session filters, see ISession. Sending a request to a filter will match zero or more sessions. Each response received will be passed to the provided IFilteredRequestCallback. As a convenience, an addressed request can be sent to a specific session using the overloaded variant of send_request_async() that accepts a session id.
Sending an addressed request requires SEND_TO_SESSION permission.
If the sending session is connected to a server belonging to a cluster, the recipient sessions can be connected to other servers in the cluster. The filter will be evaluated against all sessions hosted by the cluster.
To receive addressed requests, a session must set up a local request stream to handle the specific message path, using set_request_stream(). When a request is received for the message path, the on_request() method on the stream is triggered. The session should respond using the provided respond() method call. Streams receive an on_close() callback when unregistered and an on_error() callback if the session is closed.
If a request is sent to a session that does not have a matching stream for the message path, an error will be returned to the sending session.
Accessing the feature¶
Obtain this feature from an ISession as follows:
messaging = session.messaging
Since
6.6
add_stream_handler ¶
add_stream_handler(
path: str,
handler: RequestHandler,
addressed: bool = False,
) -> None
Registers a request stream handler.
The handler is invoked when the session receives a request sent to the given path or session filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The handler will respond to the requests to this path. |
required |
handler
|
RequestHandler
|
A Handler instance to handle the request. |
required |
addressed
|
bool
|
|
False
|
add_filter_response_handler ¶
Registers a session filter response handler.
The handler is invoked when the session receives a response to a request sent to the session filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session_filter
|
str
|
A session filtering query. |
required |
handler
|
Handler
|
A Handler instance to handle the request. |
required |
send_request_to_path
async
¶
send_request_to_path(
path: str,
request: DataType,
response_type: Optional[DataTypeArgument] = None,
) -> Optional[DataType]
Send a request to sessions based on a path.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to send a request to. |
required |
request
|
DataType
|
The request to be sent, wrapped into the required |
required |
response_type
|
Optional[DataTypeArgument]
|
The type to convert the response to. If omitted, it will be
the same as the |
None
|
Returns:
Type | Description |
---|---|
Optional[DataType]
|
The response value of the provided |
add_request_handler
async
¶
add_request_handler(
path: str,
handler: RequestHandler,
session_properties: Collection[str] = (),
) -> None
Register the session as the handler for requests to a given path.
This method is used to inform the server that any unaddressed requests to the
given path should be forwarded to the active session. The handler to
these requests is added at the same time, using add_stream_handler
internally.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The handler will respond to the requests to this path. |
required |
handler
|
RequestHandler
|
A callable to handle the request. |
required |
session_properties
|
Collection[str]
|
A list of keys of session properties that should be
supplied with each request. To request all fixed
properties include |
()
|
send_request_to_filter
async
¶
Send a request to other sessions, specified by the filter.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session_filter
|
StrictStr
|
A session filtering query. |
required |
path
|
StrictStr
|
The path to send a request to. |
required |
request
|
DataType
|
The request to be sent, wrapped into the required |
required |
Returns:
Type | Description |
---|---|
int
|
The number of sessions that correspond to the filter, which is the number of responses that can be expected. When each of the responses is received, the handler registered for the filter will be executed. |
send_request_to_session
async
¶
send_request_to_session(
path: str,
session_id: SessionId,
request: DataType,
response_type: Optional[DataTypeArgument] = None,
) -> Optional[DataType]
Send a request to a single session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
path
|
str
|
The path to send a request to. |
required |
session_id
|
SessionId
|
The ID of the session to send the request to. |
required |
request
|
DataType
|
The request to be sent, wrapped into the required |
required |
response_type
|
Optional[DataTypeArgument]
|
The type to convert the response to. If omitted, it will be
the same as the |
None
|
Returns:
Type | Description |
---|---|
Optional[DataType]
|
The response value of the provided |
RequestHandler ¶
Bases: Handler
, Generic[J_In_contra, J_Out_contra, TypeCode_In, TypeName_In, TypeCode_Out, TypeName_Out]
Class which specifies a request handler to receive request notifications.
__init__ ¶
__init__(
callback: typing.Union[
Callback[
J_In_contra,
typing_extensions.Awaitable[J_Out_contra],
],
Callback[J_In_contra, J_Out_contra],
],
request_type: typing.Union[
TypeName_In,
TypeCode_In,
Type[IBytes[Any, Any, J_In_contra]],
Type[HasRawTypes[J_In_contra]],
TopicSpecification[IBytes[Any, Any, J_In_contra]],
TopicSpecification[
ValueTypeProtocolSpecific[J_In_contra]
],
],
response_type: typing.Union[
TypeName_Out,
TypeCode_Out,
Type[IBytes[Any, Any, J_Out_contra]],
Type[HasRawTypesOrMore[J_Out_contra]],
TopicSpecification[IBytes[Any, Any, J_Out_contra]],
TopicSpecification[
ValueTypeProtocolSpecificOrMore[J_Out_contra]
],
],
) -> None
Initialise the RequestHandler.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
callback
|
typing.Union[Callback[J_In_contra, typing_extensions.Awaitable[J_Out_contra]], Callback[J_In_contra, J_Out_contra]]
|
the callback to call with the raw value of the request |
required |
request_type
|
typing.Union[TypeName_In, TypeCode_In, Type[IBytes[Any, Any, J_In_contra]], Type[HasRawTypes[J_In_contra]], TopicSpecification[IBytes[Any, Any, J_In_contra]], TopicSpecification[ValueTypeProtocolSpecific[J_In_contra]]]
|
the Diffusion datatype of the request |
required |
response_type
|
typing.Union[TypeName_Out, TypeCode_Out, Type[IBytes[Any, Any, J_Out_contra]], Type[HasRawTypesOrMore[J_Out_contra]], TopicSpecification[IBytes[Any, Any, J_Out_contra]], TopicSpecification[ValueTypeProtocolSpecificOrMore[J_Out_contra]]]
|
the Diffusion datatype of the response |
required |
handle
async
¶
handle(
event: str = "request", **kwargs
) -> ValueTypeProtocolWithCodeAndName[
J_Out_contra, TypeCode_Out, TypeName_Out
]
Execute the callback.
MessagingError ¶
Bases: DiffusionError
The generic messaging error.