Skip to content

Session

diffusion.session

Public session for end users.

ANONYMOUS_PRINCIPAL module-attribute

ANONYMOUS_PRINCIPAL = ''

Anonymous principal username

Session

A client session connected to a Diffusion server or a cluster of servers.

Parameters:

Name Type Description Default
url str

WebSockets URL of the Diffusion server to connect to.

required
principal str

The name of the security principal associated with the session.

None
credentials Optional[Credentials]

Security information required to authenticate the connection.

None

The recommended method is to instantiate this class as an async context manager. Here is a minimal example:

>>> async with diffusion.Session("ws://diffusion.server:8080") as session:
...     # do some work with the session
...     pass

The context manager will make sure that the connection is properly closed at the end of the program. Alternatively, it is possible to open the connection explicitly, which can be useful if the session needs to be passed around, in this case the connection needs to be explicitly closed as well:

>>> session = diffusion.Session("ws://diffusion.server:8080")
>>> await session.connect()
>>> # do some work with the session
>>> await session.close()

state property

state

Returns the current connection state of the session.

session_id property

session_id

The current session ID.

services property

services

The ServiceLocator instance responsible for retrieving services.

handlers property

handlers

The collection of registered handlers.

data property

data: dict

Internal data storage.

messaging property

messaging: Messaging

Request-response messaging component.

topics property

topics: Topics

Topics component.

session_trees property

session_trees: SessionTrees

Session Trees component.

metrics property

metrics: Metrics

Metrics component.

time_series property

time_series: TimeSeries

Time Series component.

connect async

connect(properties: Optional[SessionProperties] = None)

Connect to the server.

Parameters:

Name Type Description Default
properties Optional[SessionProperties]

A dict of Diffusion session properties to set and/or update at connection.

None

close async

close()

Closes the session.

ping_server async

ping_server()

Send the user ping to the server.

lock async

lock(lock_name: str, scope: SessionLockScope = SessionLockScope.UNLOCK_ON_SESSION_LOSS) -> SessionLock

Attempts to acquire a SessionLock with a given scope.

Notes

If the operation completes successfully, the result will be the requested SessionLock assigned to the calling session by the server. The session owns the returned lock and is responsible for unlocking it.

Acquiring the lock can take an arbitrarily long time if other sessions are competing for the lock. The server will retain the session's request for the lock until it is assigned to the session, the session is closed.

A session can call this method multiple times. If the lock is acquired, all calls will complete successfully with equal SessionLock.

A session that acquires a lock will remain its owner until it is unlocked (via SessionLock.unlock) or the session closes.

If called with a scope of SessionLockScope.UNLOCK_ON_SESSION_LOSS, this method behaves exactly like SessionLocks.lock.

If called with a scope of SessionLockScope.UNLOCK_ON_CONNECTION_LOSS, any lock that is returned will be unlocked if the session loses its connection to the server. This is useful to allow another session to take ownership of the lock while this session is reconnecting.

If a session makes multiple requests for a lock using different scopes, and the server assigns the lock to the session fulfilling the requests, the lock will be given the weakest scope (SessionLockScope.UNLOCK_ON_CONNECTION_LOSS).

Access control

To allow fine-grained access control, lock names are interpreted as path names, controlled with the update-topic/b, for example.

Since

6.10.

Parameters:

Name Type Description Default
lock_name str

The name of the session lock.

required
scope SessionLockScope

The scope of the session lock.

SessionLockScope.UNLOCK_ON_SESSION_LOSS

Returns:
A session lock object.

on_state_changed async

on_state_changed(old_state: State, new_state: State)

Raises the StateChanged event from session. Bound as a handler on the InternalSession.on_state_changed event.

remove_listener

remove_listener(listener: SessionListener)

Remove a given session state listener from the session

Parameters:

Name Type Description Default
listener SessionListener

the listener to remove

required

Raises:

Type Description
InvalidOperationError

if no such listener is present

SessionListener

Bases: EventStreamHandler

The session listener.

on_session_event async

on_session_event(*, session: Session, old_state: typing.Optional[State], new_state: State)

Called when a session changed state.

Parameters:

Name Type Description Default
session Session

The session that sent this event.

required
old_state typing.Optional[State]

The old session state.

required
new_state State

The session state.

required

session_factory

Connector

Bases: object

Connector object.

This can be awaited to return a connected session, or used as an asynchronous context manager that provides a connected session on entry and closes it on exit.

__init__
__init__(session_details: SessionDetails, container_factory: SessionContainerFactory, url: str = None)

This is used by the SessionFactory and is not designed for manual creation.

Parameters:

Name Type Description Default
session_details SessionDetails

the session details involved

required
container_factory SessionContainerFactory

responsible for building the session and connecting it

required
url str

the URL to be connected to. If not present the session details will be used instead.

None

SessionFactory

Bases: object

Factory for client diffusion.session.Session.

Each instance is immutable, and has a number of session attributes that determine how sessions should be established. An initial instance of this factory with default values for all attributes can be obtained from diffusion.sessions(). Each method returns a copy of this factory, with a single changed attribute.

Establishing a session

A new session can be created using open specifying a URL that identifies the server. This returns an async context manager, which will provide the connected Session object.

The server is identified by the diffusion.session.session_factory.SessionFactory.server_host and diffusion.session.session_factory.SessionFactory.server_port attributes.

If a URL is specified, it takes precedence over the diffusion.session.session_factory.SessionFactory.server_host and diffusion.session.session_factory.SessionFactory.server_port session factory attributes.

URL format

URLs should take the form scheme://host:port, where scheme is chosen from the following table and determines the transport protocol used to send Diffusion messages.

Scheme Transport Protocol
ws WebSocket. See https://tools.ietf.org/html/rfc6455
wss WebSocket over TLS.

We recommend using the WebSocket protocol options ws or wss.

TLS is Transport Layer Security, commonly known as SSL. TLS-based protocols use cryptography to provide transport-level privacy, authentication, and integrity, and protects against network-sniffing and man-in-the-middle attacks. We recommend using the TLS variants for all communication. For a typical application, you should only consider not using TLS for unauthenticated ("anonymous") client sessions.

Since

6.9

principal
principal(principal: str) -> SessionFactory

Sets the security principal.

By default, this will be diffusion.session.ANONYMOUS_PRINCIPAL.

Parameters:

Name Type Description Default
principal str

the principal

required

Returns:

Type Description
SessionFactory

a new immutable instance of the factory with the principal changed

credentials
credentials(credentials: Credentials) -> SessionFactory

Set credentials.

The default is None.

Parameters:

Name Type Description Default
credentials Credentials

the credentials

required

Returns:

Type Description
SessionFactory

a new immutable instance of the factory with the credentials changed

initial_retry_strategy
initial_retry_strategy(strategy: RetryStrategy) -> SessionFactory

Sets the initial retry strategy.

The strategy will determine whether a failure to open a session due to a ServerConnectionError should be retried and if so, at what interval and for how many attempts.

If no initial retry strategy is set there will be no attempt to retry after such a failure.

Parameters:

Name Type Description Default
strategy RetryStrategy

strategy the retry strategy to use

required

Returns:

Type Description
SessionFactory

a new immutable instance of the factory with the initial retry strategy changed

Since

6.9

server_host
server_host(host: str) -> SessionFactory

Set the host name of the server to connect the session to.

This value is only used if a URL is not provided when opening a session.

Parameters:

Name Type Description Default
host str

the host name of the server

required

Returns:

Type Description
SessionFactory

a new immutable instance of the factory that will use the provided host

Raises:

Type Description
IllegalArgumentError

if the specified host is invalid

server_port
server_port(port: int) -> SessionFactory

Set the port of the server to connect the session to.

This value is only used if a URL is not provided when opening a session. If the port is not set using this method or a URL, the port will be inferred based on the transport and security configuration.

The provided value must be within the range used for port numbers.

Parameters:

Name Type Description Default
port int

the port of the server

required

Returns:

Type Description
SessionFactory

a new immutable instance of the factory that will use the provided port

Raises:

Type Description
IllegalArgumentError

if the specified port is invalid

properties
properties(properties: SessionProperties) -> SessionFactory

Sets user-defined session property values.

Supplied session properties will be provided to the server when a session is created using this session factory. The supplied properties will be validated during authentication and may be discarded or changed.

The specified properties will be added to any existing properties set for this session factory. If any of the keys have been previously declared then they will be overwritten with the new values.

For details of how session properties are used see diffusion.session.Session.

Parameters:

Name Type Description Default
properties SessionProperties

a map of user-defined session properties

required

Returns:

Type Description
SessionFactory

a new immutable instance of the factory with the supplied properties set

Since

6.9

open
open(url: str = None) -> Connector

This method returns a diffusion.session.session_factory.Connector instance.

Awaiting this object via await will open a connection to a server and return a new, connected Session. Using it as an async context manager via async with wil do the same on entry and close this Session on exit.

It can take a URL to specify the server location, ignoring the diffusion.session.session_factory.SessionFactory.server_host and diffusion.session.session_factory.SessionFactory.server_port session factory attributes.

Raises:

Type Description
IllegalArgumentError

if url is invalid

IllegalStateError

if any of the session attributes are found to be inconsistent. For example, if the default SSL context could not be loaded

SessionEstablishmentError

if an initial connection could not be established

AuthenticationError

if the client is insufficiently authorized to start the session, based on the supplied client credentials.

Parameters:

Name Type Description Default
url str

the server location

None

Returns:

Type Description
Connector

A diffusion.session.session_factory.Connector object.

Since

6.9

sessions cached

sessions(container_factory = None) -> SessionFactory

Returns:

Type Description
SessionFactory

The default session factory.

retry_strategy

RetryStrategy

Bases: pydantic.BaseModel

Defines a retry strategy.

A retry strategy will be applied when an initial to attempt to open a session fails with a ServerConnectionError.

The strategy is defined in terms of the number of seconds between retries and the maximum number of retries to attempt.

Since: 6.9

interval class-attribute
interval: float = pydantic.Field(ge=1.0)

the number of seconds before the first retry and between subsequent retries

attempts class-attribute
attempts: pydantic.StrictInt = Int64.max_unsigned_int()

The number of retry attempts

exceptions

IncompatibleTopicError

Bases: SessionError

The topic is incompatible.

NoSuchTopicError

Bases: SessionError

There is no such topic.

NoSuchEventError

Bases: SessionError

The exception used to report a time series topic does not have an original event with the sequence number provided by an

See Also

timeseries.edit operation.

Notes

Added in version 6.9.

IncompatibleExistingTopicError

Bases: SessionError

This differs from ExistingTopicError as the reason is that the existing topic is owned by something that prevents the caller managing. The specification of the existing topic may be the same.

UnsatisfiedConstraintError

Bases: SessionError

The exception to report a constraint was not satisfied.

ExclusiveUpdaterConflictError

Bases: SessionError

The exception to indicate an update could not be applied because an exclusive update source is registered for the path.

InvalidPatchError

Bases: SessionError

The exception to report that a JSON Patch was invalid.

FailedPatchError

Bases: SessionError

The exception to report that applying a JSON Patch failed.

Notes

This can happen if the topic's current value is not valid CBOR. See VALIDATE_VALUES.

InvalidUpdateStreamError

Bases: SessionError

The exception used to report an operation was performed with an invalid [UpdateStream][diffusion.UpdateStream].

IncompatibleTopicStateError

Bases: SessionError

The exception that indicates that an operation could not be performed because the topic is managed by a component (such as fan-out) that prohibits updates from the caller.

InvalidQueryError

Bases: SessionError

Exception used to report a query that is invalid for the time series.

Notes

An example invalid query is one where the anchor is a sequence number beyond the end of the time series (for example, it is specified using with a sequence number greater than the latest sequence number, or with a count greater than the number of events in the time series), and the span is a relative time. Since no timestamp is associated with the anchor, the range is meaningless.

Added in version 6.9.

SessionClosedError

Bases: SessionError

The exception indicating a ISession closure.

Notes

No further operations are possible when this exception has been thrown.

Added in 6.9

SessionSecurityError

Bases: SessionError

The exception indicating that a ISession operation failed due to a security constraint.

Notes

Repeating the operation with the same security credentials is likely to fail.

Added in 6.9

FatalConnectionError

Bases: ProtocolError

The exception indicating a connection has been rejected and should not be retried.

Notes

This exception is never thrown directly but might be the cause of a SessionError

Added in 6.9

AuthenticationError

Bases: FatalConnectionError

The connection exception representing an authentication failure.

Notes

This exception is never thrown directly but might be the cause of a SessionError

Added in 6.9

ProxyAuthenticationError

Bases: ProtocolError

The exception indicating that there was a problem during authentication with a proxy server.

Notes

This exception is never thrown directly but might be the cause of a SessionError

Added in 6.9.

locks

session_lock_acquisition

SessionLockScope

Bases: enum.IntEnum

Values for the scope parameter of Session.lock

Since

6.10

UNLOCK_ON_SESSION_LOSS class-attribute
UNLOCK_ON_SESSION_LOSS = 0

The lock will be released when the acquiring session loses its current connection to the server.

UNLOCK_ON_CONNECTION_LOSS class-attribute
UNLOCK_ON_CONNECTION_LOSS = 1

The lock will be released when the acquiring session is closed.

session_locks

SessionLock

Bases: object

A Session lock.

Notes

A session lock is a server-managed resource that can be used to coordinate exclusive access to shared resources across sessions. For example, to ensure a single session has the right to update a topic; to ensure at most one session responds to an event; or to select a single session to perform a housekeeping task. Session locks support general collaborative locking schemes. The application architect is responsible for designing a suitable locking scheme and for ensuring each application component follows the scheme appropriately.

Session locks are identified by a lock name. Lock names are arbitrary and chosen at will to suit the application. Each lock is owned by at most one session. Locks are established on demand; there is no separate operation to create or destroy a lock.

A session lock is acquired using the Session.lock method. If no other session owns the lock, the server will assign the lock to the calling session immediately. Otherwise, the server will record that the session is waiting to acquire the lock. A session can call Session.lock more than once for a given session lock – if the lock is acquired, all calls will complete successfully with equal SessionLock values.

If a session closes, the session locks it owns are automatically released. A session can also release a lock by calling SessionLock.unlock. When a session lock is released and other sessions are waiting to acquire the lock, the server will arbitrarily select one of the waiting sessions and notify it that it has acquired the lock. All of the newly selected session's pending Session.lock calls will complete normally. Other sessions will continue to wait.

The Session.lock variant of this method takes a scope parameter that provides the further option of automatically releasing the lock when the session loses its connection to the server.

Race conditions

This session lock API has inherent race conditions. Even if an application is coded correctly to protect a shared resource using session locks, there may be a period where two or more sessions concurrently access the resource. The races arise for several reasons including

  • Due to the check-then-act approach of polling , the lock can be lost after the check has succeeded but before the resource is accessed;

  • The server can detect a session is disconnected and assign the lock to another session before the original session has detected the disconnection.

Despite this imprecision, session locks provide a useful way to coordinate session actions.

NOTE This interface does not require user implementation and is only used to hide implementation details.

Since: 6.10

unlock async
unlock() -> bool

Releases this session lock if it is owned by the session.

Since

6.10.

Returns:

Type Description
bool

True if unlocking is successful, False if not

set_released async
set_released()

Marks this lock as released.

SessionLocks

Bases: object

__init__
__init__(internal_session: InternalSession)

Initializes a new SessionLocks

Parameters:

Name Type Description Default
internal_session InternalSession

The internal session.

required
lock async
lock(lock_name: pydantic.StrictStr, scope: SessionLockScope) -> SessionLock

Sends a lock request to the server.

Notes

If the operation completes successfully, the result will be a new SessionLock.

Parameters:

Name Type Description Default
lock_name pydantic.StrictStr

The name of the lock.

required
scope SessionLockScope

The scope of the lock.

required

Returns:

Type Description
SessionLock

A session lock object.

post_commit async
post_commit()

Hook that runs immediately after internal lock committal.

Primarily used for testing.

unlock async
unlock(session_lock: SessionLock) -> bool

Sends an unlock request to the server and releases the given session lock.

Notes

If the operation completes successfully, the result will be a bool indicating whether the server released the given SessionLock.

Parameters:

Name Type Description Default
session_lock SessionLock

The session lock to release.

required

Returns:

Type Description
bool

True if successful, otherwise False.

diffusion.internal.session.Credentials

Simple wrapper class to encapsulate server credentials.

Parameters:

Name Type Description Default
value Union[str, bytes]

value: The value of the credentials.

b''

Examples:

>>> cred1 = Credentials("bar")
>>> cred1.type
<CredentialsType.PLAIN_PASSWORD: 1>
>>> cred1.password
'AQNiYXI='
>>> tuple(cred1)
(1, b'bar')
>>> cred2 = Credentials(b"bla")
>>> cred2.type
<CredentialsType.CUSTOM: 2>
>>> cred2.password
'AgNibGE='

password property

password: str

The base64-encoded textual representation of the credentials.

diffusion.session.exceptions

SessionError

Bases: DiffusionError

IncompatibleTopicError

Bases: SessionError

The topic is incompatible.

UpdateFailedError

Bases: SessionError

NoSuchTopicError

Bases: SessionError

There is no such topic.

NoTopicFoundError

Bases: SessionError

NoSuchEventError

Bases: SessionError

The exception used to report a time series topic does not have an original event with the sequence number provided by an

See Also

timeseries.edit operation.

Notes

Added in version 6.9.

ExistingTopicError

Bases: SessionError

InvalidTopicPathError

Bases: SessionError

InvalidTopicSpecificationError

Bases: SessionError

TopicLicenseLimitError

Bases: SessionError

IncompatibleExistingTopicError

Bases: SessionError

This differs from ExistingTopicError as the reason is that the existing topic is owned by something that prevents the caller managing. The specification of the existing topic may be the same.

AddTopicError

Bases: SessionError

UnsatisfiedConstraintError

Bases: SessionError

The exception to report a constraint was not satisfied.

ExclusiveUpdaterConflictError

Bases: SessionError

The exception to indicate an update could not be applied because an exclusive update source is registered for the path.

InvalidPatchError

Bases: SessionError

The exception to report that a JSON Patch was invalid.

FailedPatchError

Bases: SessionError

The exception to report that applying a JSON Patch failed.

Notes

This can happen if the topic's current value is not valid CBOR. See VALIDATE_VALUES.

InvalidUpdateStreamError

Bases: SessionError

The exception used to report an operation was performed with an invalid [UpdateStream][diffusion.UpdateStream].

NotATimeSeriesTopicError

Bases: SessionError

IncompatibleTopicStateError

Bases: SessionError

The exception that indicates that an operation could not be performed because the topic is managed by a component (such as fan-out) that prohibits updates from the caller.

HandlerConflictError

Bases: SessionError

UnhandledMessageError

Bases: SessionError

NoSuchSessionError

Bases: SessionError

CancellationError

Bases: SessionError

RejectedRequestError

Bases: SessionError

InvalidQueryError

Bases: SessionError

Exception used to report a query that is invalid for the time series.

Notes

An example invalid query is one where the anchor is a sequence number beyond the end of the time series (for example, it is specified using with a sequence number greater than the latest sequence number, or with a count greater than the number of events in the time series), and the span is a relative time. Since no timestamp is associated with the anchor, the range is meaningless.

Added in version 6.9.

SessionClosedError

Bases: SessionError

The exception indicating a ISession closure.

Notes

No further operations are possible when this exception has been thrown.

Added in 6.9

SessionSecurityError

Bases: SessionError

The exception indicating that a ISession operation failed due to a security constraint.

Notes

Repeating the operation with the same security credentials is likely to fail.

Added in 6.9

FatalConnectionError

Bases: ProtocolError

The exception indicating a connection has been rejected and should not be retried.

Notes

This exception is never thrown directly but might be the cause of a SessionError

Added in 6.9

AuthenticationError

Bases: FatalConnectionError

The connection exception representing an authentication failure.

Notes

This exception is never thrown directly but might be the cause of a SessionError

Added in 6.9

ProxyAuthenticationError

Bases: ProtocolError

The exception indicating that there was a problem during authentication with a proxy server.

Notes

This exception is never thrown directly but might be the cause of a SessionError

Added in 6.9.