Skip to content

Handlers

diffusion.handlers

Base module for various event handlers.

AT_T_contra module-attribute

AT_T_contra = typing_extensions.TypeVar(
    "AT_T_contra",
    bound="AbstractDataType",
    default="AbstractDataType",
    contravariant=True,
)

The type of topic values received by a Handler (contravariant).

AT_T_co module-attribute

AT_T_co = typing_extensions.TypeVar(
    "AT_T_co",
    bound="AbstractDataType",
    default=typing.Any,
    covariant=True,
)

The type of topic values received by a Handler (covariant).

Callback_RT module-attribute

Callback_RT = typing_extensions.TypeVar(
    "Callback_RT", default=typing.Any, covariant=True
)

Callback return type.

Handler

Bases: Protocol

Protocol for event handlers implementation.

handle async

handle(event: str, **kwargs: typing.Any) -> typing.Any

Implements handling of the given event.

Parameters:

Name Type Description Default
event str

The event identifier.

required
**kwargs typing.Any

Additional arguments.

{}

remove

remove(sub_handler: Handler) -> None

Removes the given sub-handler from the given handler.

Raises:

Type Description
KeyError

If the handler does not exist.

UnknownHandlerError

Bases: exceptions.DiffusionError

Raised when a requested handler key has not been registered in the session.

SubHandlerArgs

Bases: typing_extensions.TypedDict, typing.Generic[AT_T_co]

Named arguments to the diffusion.handlers.SubHandlerProtocol

topic_path instance-attribute

topic_path: typing_extensions.Annotated[
    str, typing_extensions.Doc("Topic path.")
]

Topic path.

topic_value instance-attribute

Topic value.

topic_spec instance-attribute

Topic spec.

UpdateArgs

Bases: typing_extensions.TypedDict, SubHandlerArgs[AT_T_co]

Inherits all parameters from diffusion.handlers.SubHandlerArgs.

old_value instance-attribute

old_value: typing_extensions.Annotated[
    typing.Optional[AT_T_co],
    typing_extensions.Doc(
        "The old topic value. Will be None for the first update callback in a stream (as there is no previous value)."
    ),
]

The old topic value. Will be None for the first update callback in a stream (as there is no previous value).

UnsubscribeArgs

Bases: typing_extensions.TypedDict, SubHandlerArgs[AT_T_co]

Inherits all parameters from diffusion.handlers.SubHandlerArgs.

reason instance-attribute

reason: typing_extensions.Annotated[
    UnsubscribeReason,
    typing_extensions.Doc("Reason for unsubscription."),
]

Reason for unsubscription.

SubHandlerProtocol

Bases: Protocol[AT_T_contra, Callback_RT]

__call__

Subhandler Protocol - callbacks should fulfil this interface.

Other Parameters:

Name Type Description
topic_path typing_extensions.Annotated[str, typing_extensions.Doc('Topic path.')]

Topic path.

topic_value typing_extensions.Annotated[typing.Optional[AT_T_co], typing_extensions.Doc('Topic value.')]

Topic value.

topic_spec typing_extensions.Annotated[TopicSpecification, typing_extensions.Doc('Topic spec.')]

Topic spec.

Returns:

Type Description
Callback_RT

A value of type diffusion.handlers.Callback_RT][]

UnsubscribeProtocol

Bases: Protocol[AT_T_contra, Callback_RT]

__call__

Unsubscribe Callback Protocol - callbacks should fulfil this interface.

Other Parameters:

Name Type Description
reason typing_extensions.Annotated[UnsubscribeReason, typing_extensions.Doc('Reason for unsubscription.')]

Reason for unsubscription.

Returns:

Type Description
Callback_RT

A value of type diffusion.handlers.Callback_RT][]

UpdateProtocol

Bases: Protocol[AT_T_contra, Callback_RT]

__call__

Update Callback Protocol - callbacks should fulfil this interface.

Other Parameters:

Name Type Description
old_value typing_extensions.Annotated[typing.Optional[AT_T_co], typing_extensions.Doc('The old topic value. Will be None for the first update callback in a stream (as there is no previous value).')]

The old topic value. Will be None for the first update callback in a stream (as there is no previous value).

Returns:

Type Description
Callback_RT

A value of type diffusion.handlers.Callback_RT][]

SimpleHandler

Bases: Handler

Wraps a callable into a Handler protocol instance.

handle async

handle(event: str = '', **kwargs: typing.Any) -> typing.Any

Implements handling of the given event.

Parameters:

Name Type Description Default
event str

The event identifier.

''
**kwargs typing.Any

Additional arguments.

{}

AbstractHandlerSet

Bases: Handler, typing.Iterable[Handler], abc.ABC

handle async

handle(event: str = '', **kwargs: typing.Any) -> typing.Any

Implements handling of the given event.

Parameters:

Name Type Description Default
event str

The event identifier.

''
**kwargs typing.Any

Additional arguments.

{}

Returns:

Type Description
typing.Any

Aggregated list of returned values.

HandlerSet

Bases: set, AbstractHandlerSet

A collection of handlers to be invoked together.

ErrorHandler

Bases: Protocol[Callback_RT]

__call__

__call__(
    code: int, description: str, **kwargs
) -> Callback_RT

Handle an error.

Parameters:

Name Type Description Default
code int

the error code

required
description str

the error description.

required

EventStreamHandler

Bases: Handler, typing.Generic[AT_T_co, Callback_RT]

Generic handler of event streams.

Each keyword argument is a callable which will be converted to coroutine and awaited to handle the event matching the argument keyword.

__init__

__init__(
    *,
    error: typing.Optional[ErrorHandler] = None,
    response: typing.Optional[
        FilterResponseProtocol[AT_T_co, Callback_RT]
    ] = None,
    **kwargs: typing.Optional[SubHandlerProtocol]
)

Parameters:

Name Type Description Default
on_error

error handler callback

required
**kwargs typing.Optional[SubHandlerProtocol]

other subhandlers

{}

handle async

handle(event: str, **kwargs: typing.Any) -> typing.Any

Implements handling of the given event.

Parameters:

Name Type Description Default
event str

The event identifier.

required
**kwargs typing.Any

Additional arguments.

{}