Session¶
diffusion.session
¶
Public session for end users.
ANONYMOUS_PRINCIPAL = ''
module-attribute
¶
Anonymous principal username
Session
¶
A client session connected to a Diffusion server or a cluster of servers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str
|
WebSockets URL of the Diffusion server to connect to. |
required |
principal |
str
|
The name of the security principal associated with the session. |
None
|
credentials |
Optional[Credentials]
|
Security information required to authenticate the connection. |
None
|
The recommended method is to instantiate this class as an async context manager.
Here is a minimal example
async with diffusion.Session("ws://diffusion.server:8080") as session: # do some work with the session
The context manager will make sure that the connection is properly closed at the end of the program. Alternatively, it is possible to open the connection explicitly, which can be useful if the session needs to be passed around, in
this case the connection needs to be explicitly closed as well
session = diffusion.Session("ws://diffusion.server:8080") await session.connect()
do some work with the session¶
await session.close()
state
property
¶
Returns the current connection state of the session.
session_id
property
¶
The current session ID.
services
property
¶
The ServiceLocator instance responsible for retrieving services.
handlers
property
¶
The collection of registered handlers.
data: dict
property
¶
Internal data storage.
messaging: Messaging
property
¶
Request-response messaging component.
topics: Topics
property
¶
Topics component.
session_trees: SessionTrees
property
¶
Session Trees component.
metrics: Metrics
property
¶
Metrics component.
time_series: TimeSeries
property
¶
Time Series component.
connect(properties: Optional[SessionProperties] = None)
async
¶
Connect to the server.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
properties |
Optional[SessionProperties]
|
A dict of Diffusion session properties to set and/or update at connection. |
None
|
close()
async
¶
Closes the session.
ping_server()
async
¶
Send the user ping to the server.
_add_ping_handler(handler: Handler) -> None
¶
Register a new handler for system pings.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
handler |
Handler
|
The Handler instance to be invoked when a system ping message is received by the session. |
required |
lock(lock_name: str, scope: SessionLockScope = SessionLockScope.UNLOCK_ON_SESSION_LOSS) -> SessionLock
async
¶
Attempts to acquire a SessionLock with a given scope.
Notes
If the operation completes successfully, the result will be the requested SessionLock assigned to the calling session by the server. The session owns the returned lock and is responsible for unlocking it.
Acquiring the lock can take an arbitrarily long time if other sessions are competing for the lock. The server will retain the session's request for the lock until it is assigned to the session, the session is closed.
A session can call this method multiple times. If the lock is acquired, all calls will complete successfully with equal SessionLock.
A session that acquires a lock will remain its owner until it is unlocked (via SessionLock.unlock) or the session closes.
If called with a scope
of
SessionLockScope.UNLOCK_ON_SESSION_LOSS,
this method behaves exactly like SessionLocks.lock.
If called with a scope
of
SessionLockScope.UNLOCK_ON_CONNECTION_LOSS,
any lock that is returned will be unlocked
if the session loses its connection to the server.
This is useful to allow another session to take ownership of the lock
while this session is reconnecting.
If a session makes multiple requests for a lock using different scopes, and the server assigns the lock to the session fulfilling the requests, the lock will be given the weakest scope (SessionLockScope.UNLOCK_ON_CONNECTION_LOSS).
Access control¶
To allow fine-grained access control, lock names are interpreted as path names, controlled with the update-topic/b, for example.
Since
6.10.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lock_name |
str
|
The name of the session lock. |
required |
scope |
SessionLockScope
|
The scope of the session lock. |
SessionLockScope.UNLOCK_ON_SESSION_LOSS
|
Returns:
A session lock object.
on_state_changed(old_state: State, new_state: State)
async
¶
Raises the StateChanged event from session. Bound as a handler on the InternalSession.on_state_changed event.
remove_listener(listener: SessionListener)
¶
Remove a given session state listener from the session
Parameters:
Name | Type | Description | Default |
---|---|---|---|
listener |
SessionListener
|
the listener to remove |
required |
Raises:
Type | Description |
---|---|
InvalidOperationError
|
if no such listener is present |
SessionListener
¶
Bases: EventStreamHandler
The session listener.
on_session_event(*, session: Session, old_state: typing.Optional[State], new_state: State)
async
¶
Called when a session changed state.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session |
Session
|
The session that sent this event. |
required |
old_state |
typing.Optional[State]
|
The old session state. |
required |
new_state |
State
|
The session state. |
required |
session_factory
¶
Connector
¶
Bases: object
Connector object.
This can be awaited to return a connected session, or used as an asynchronous context manager that provides a connected session on entry and closes it on exit.
__init__(session_details: SessionDetails, container_factory: SessionContainerFactory, url: str = None)
¶
This is used by the SessionFactory and is not designed for manual creation.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session_details |
SessionDetails
|
the session details involved |
required |
container_factory |
SessionContainerFactory
|
responsible for building the session and connecting it |
required |
url |
str
|
the URL to be connected to. If not present the session details will be used instead. |
None
|
SessionFactory
¶
Bases: object
Factory for client diffusion.session.Session.
Each instance is immutable, and has a number of session attributes that determine how sessions should be established. An initial instance of this factory with default values for all attributes can be obtained from diffusion.sessions(). Each method returns a copy of this factory, with a single changed attribute.
Establishing a session¶
A new session can be created using open specifying a URL that identifies the server. This returns an async context manager, which will provide the connected Session object.
The server is identified by the diffusion.session.session_factory.SessionFactory.server_host and diffusion.session.session_factory.SessionFactory.server_port attributes.
If a URL is specified, it takes precedence over the diffusion.session.session_factory.SessionFactory.server_host and diffusion.session.session_factory.SessionFactory.server_port session factory attributes.
URL format¶
URLs should take the form scheme://host:port, where scheme is chosen from the following table and determines the transport protocol used to send Diffusion messages.
Scheme | Transport Protocol |
---|---|
ws |
WebSocket. See https://tools.ietf.org/html/rfc6455 |
wss |
WebSocket over TLS. |
We recommend using the WebSocket protocol options ws
or
wss
.
TLS is Transport Layer Security, commonly known as SSL. TLS-based protocols use cryptography to provide transport-level privacy, authentication, and integrity, and protects against network-sniffing and man-in-the-middle attacks. We recommend using the TLS variants for all communication. For a typical application, you should only consider not using TLS for unauthenticated ("anonymous") client sessions.
Since
6.9
principal(principal: str) -> SessionFactory
¶
Sets the security principal.
By default, this will be diffusion.session.ANONYMOUS_PRINCIPAL.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
principal |
str
|
the principal |
required |
Returns:
Type | Description |
---|---|
SessionFactory
|
a new immutable instance of the factory with the principal changed |
credentials(credentials: Credentials) -> SessionFactory
¶
Set credentials.
The default is None
.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
credentials |
Credentials
|
the credentials |
required |
Returns:
Type | Description |
---|---|
SessionFactory
|
a new immutable instance of the factory with the credentials changed |
initial_retry_strategy(strategy: RetryStrategy) -> SessionFactory
¶
Sets the initial retry strategy.
The strategy will determine whether a failure to open a session due to a ServerConnectionError should be retried and if so, at what interval and for how many attempts.
If no initial retry strategy is set there will be no attempt to retry after such a failure.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
strategy |
RetryStrategy
|
strategy the retry strategy to use |
required |
Returns:
Type | Description |
---|---|
SessionFactory
|
a new immutable instance of the factory with the initial retry strategy changed |
Since
6.9
server_host(host: str) -> SessionFactory
¶
Set the host name of the server to connect the session to.
This value is only used if a URL is not provided when opening a session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
host |
str
|
the host name of the server |
required |
Returns:
Type | Description |
---|---|
SessionFactory
|
a new immutable instance of the factory that will use the provided host |
Raises:
Type | Description |
---|---|
IllegalArgumentError
|
if the specified |
server_port(port: int) -> SessionFactory
¶
Set the port of the server to connect the session to.
This value is only used if a URL is not provided when opening a session. If the port is not set using this method or a URL, the port will be inferred based on the transport and security configuration.
The provided value must be within the range used for port numbers.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
port |
int
|
the port of the server |
required |
Returns:
Type | Description |
---|---|
SessionFactory
|
a new immutable instance of the factory that will use the provided port |
Raises:
Type | Description |
---|---|
IllegalArgumentError
|
if the specified |
properties(properties: SessionProperties) -> SessionFactory
¶
Sets user-defined session property values.
Supplied session properties will be provided to the server when a session is created using this session factory. The supplied properties will be validated during authentication and may be discarded or changed.
The specified properties will be added to any existing properties set for this session factory. If any of the keys have been previously declared then they will be overwritten with the new values.
For details of how session properties are used see diffusion.session.Session.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
properties |
SessionProperties
|
a map of user-defined session properties |
required |
Returns:
Type | Description |
---|---|
SessionFactory
|
a new immutable instance of the factory with the supplied properties set |
Since
6.9
open(url: str = None) -> Connector
¶
This method returns a diffusion.session.session_factory.Connector instance.
Awaiting this object via await
will open a connection
to a server and return a new, connected Session.
Using it as an async context manager via async with
wil
do the same on entry and close this Session on exit.
It can take a URL to specify the server location, ignoring the diffusion.session.session_factory.SessionFactory.server_host and diffusion.session.session_factory.SessionFactory.server_port session factory attributes.
Raises:
Type | Description |
---|---|
IllegalArgumentError
|
if |
IllegalStateError
|
if any of the session attributes are found to be inconsistent. For example, if the default SSL context could not be loaded |
SessionEstablishmentError
|
if an initial connection could not be established |
AuthenticationError
|
if the client is insufficiently authorized to start the session, based on the supplied client credentials. |
Parameters:
Name | Type | Description | Default |
---|---|---|---|
url |
str
|
the server location |
None
|
Returns:
Type | Description |
---|---|
Connector
|
Since
6.9
sessions(container_factory = None) -> SessionFactory
cached
¶
Returns:
Type | Description |
---|---|
SessionFactory
|
The default session factory. |
retry_strategy
¶
RetryStrategy
¶
Bases: pydantic.BaseModel
Defines a retry strategy.
A retry strategy will be applied when an initial to attempt to open a session fails with a ServerConnectionError.
The strategy is defined in terms of the number of seconds between retries and the maximum number of retries to attempt.
Since: 6.9
exceptions
¶
IncompatibleTopicError
¶
Bases: SessionError
The topic is incompatible.
NoSuchTopicError
¶
Bases: SessionError
There is no such topic.
NoSuchEventError
¶
Bases: SessionError
The exception used to report a time series topic does not have an original event with the sequence number provided by an
See Also
timeseries.edit operation.
Notes
Added in version 6.9.
IncompatibleExistingTopicError
¶
Bases: SessionError
This differs from ExistingTopicError as the reason is that the existing topic is owned by something that prevents the caller managing. The specification of the existing topic may be the same.
UnsatisfiedConstraintError
¶
Bases: SessionError
The exception to report a constraint was not satisfied.
ExclusiveUpdaterConflictError
¶
Bases: SessionError
The exception to indicate an update could not be applied because an exclusive update source is registered for the path.
InvalidPatchError
¶
Bases: SessionError
The exception to report that a JSON Patch was invalid.
FailedPatchError
¶
Bases: SessionError
The exception to report that applying a JSON Patch failed.
Notes
This can happen if the topic's current value is not valid CBOR. See VALIDATE_VALUES.
InvalidUpdateStreamError
¶
Bases: SessionError
The exception used to report an operation was performed with an invalid [UpdateStream][diffusion.UpdateStream].
IncompatibleTopicStateError
¶
Bases: SessionError
The exception that indicates that an operation could not be performed because the topic is managed by a component (such as fan-out) that prohibits updates from the caller.
InvalidQueryError
¶
Bases: SessionError
Exception used to report a query that is invalid for the time series.
Notes
An example invalid query is one where the anchor is a sequence number
beyond the end of the time series (for example, it is specified using
count
greater than the number of events in the time series),
and the span is a relative time. Since no timestamp is associated
with the anchor, the range is meaningless.
Added in version 6.9.
SessionClosedError
¶
Bases: SessionError
The exception indicating a ISession closure.
Notes
No further operations are possible when this exception has been thrown.
Added in 6.9
SessionSecurityError
¶
Bases: SessionError
The exception indicating that a ISession operation failed due to a security constraint.
Notes
Repeating the operation with the same security credentials is likely to fail.
Added in 6.9
FatalConnectionError
¶
Bases: ProtocolError
The exception indicating a connection has been rejected and should not be retried.
Notes
This exception is never thrown directly but might be the cause of a SessionError
Added in 6.9
AuthenticationError
¶
Bases: FatalConnectionError
The connection exception representing an authentication failure.
Notes
This exception is never thrown directly but might be the cause of a SessionError
Added in 6.9
ProxyAuthenticationError
¶
Bases: ProtocolError
The exception indicating that there was a problem during authentication with a proxy server.
Notes
This exception is never thrown directly but might be the cause of a SessionError
Added in 6.9.
locks
¶
session_lock_acquisition
¶
SessionLockScope
¶
SessionLockAcquisition
¶
session_lock_request_cancellation
¶
SessionLockRequestCancellation
¶
Bases: MarshalledModel
The request to cancel a session lock.
session_lock_request
¶
SessionLockRequest
¶
Bases: MarshalledModel
The request to acquire a session lock.
session_locks
¶
SessionLocks
¶
Bases: object
__init__(internal_session: InternalSession)
¶
Initializes a new SessionLocks
Parameters:
Name | Type | Description | Default |
---|---|---|---|
internal_session |
InternalSession
|
The internal session. |
required |
lock(lock_name: pydantic.StrictStr, scope: SessionLockScope) -> SessionLock
async
¶
Sends a lock request to the server.
Notes
If the operation completes successfully, the result will be a new SessionLock.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
lock_name |
pydantic.StrictStr
|
The name of the lock. |
required |
scope |
SessionLockScope
|
The scope of the lock. |
required |
Returns:
Type | Description |
---|---|
SessionLock
|
A session lock object. |
post_commit()
async
¶
Hook that runs immediately after internal lock committal.
Primarily used for testing.
unlock(session_lock: SessionLock) -> bool
async
¶
Sends an unlock request to the server and releases the given session lock.
Notes
If the operation completes successfully, the result will be a
bool
indicating whether the server released the given
SessionLock.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
session_lock |
SessionLock
|
The session lock to release. |
required |
Returns:
Type | Description |
---|---|
bool
|
|
SessionLock
¶
Bases: object
A Session lock.
Notes
A session lock is a server-managed resource that can be used to coordinate exclusive access to shared resources across sessions. For example, to ensure a single session has the right to update a topic; to ensure at most one session responds to an event; or to select a single session to perform a housekeeping task. Session locks support general collaborative locking schemes. The application architect is responsible for designing a suitable locking scheme and for ensuring each application component follows the scheme appropriately.
Session locks are identified by a lock name. Lock names are arbitrary and chosen at will to suit the application. Each lock is owned by at most one session. Locks are established on demand; there is no separate operation to create or destroy a lock.
A session lock is acquired using the Session.lock method. If no other session owns the lock, the server will assign the lock to the calling session immediately. Otherwise, the server will record that the session is waiting to acquire the lock. A session can call Session.lock more than once for a given session lock – if the lock is acquired, all calls will complete successfully with equal SessionLock values.
If a session closes, the session locks it owns are automatically released. A session can also release a lock by calling SessionLock.unlock. When a session lock is released and other sessions are waiting to acquire the lock, the server will arbitrarily select one of the waiting sessions and notify it that it has acquired the lock. All of the newly selected session's pending Session.lock calls will complete normally. Other sessions will continue to wait.
The Session.lock variant of this method takes a scope parameter that provides the further option of automatically releasing the lock when the session loses its connection to the server.
Race conditions¶
This session lock API has inherent race conditions. Even if an application is coded correctly to protect a shared resource using session locks, there may be a period where two or more sessions concurrently access the resource. The races arise for several reasons including
-
Due to the check-then-act approach of polling
, the lock can be lost after the check has succeeded but before the resource is accessed; -
The server can detect a session is disconnected and assign the lock to another session before the original session has detected the disconnection.
Despite this imprecision, session locks provide a useful way to coordinate session actions.
NOTE This interface does not require user implementation and is only used to hide implementation details.
Since: 6.10
diffusion.internal.session.Credentials
¶
Simple wrapper class to encapsulate server credentials.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
value |
Union[str, bytes]
|
value: The value of the credentials. |
b''
|
Examples:
>>> cred1 = Credentials("bar")
>>> cred1.type
<CredentialsType.PLAIN_PASSWORD: 1>
>>> cred1.password
'AQNiYXI='
>>> tuple(cred1)
(1, b'bar')
>>> cred2 = Credentials(b"bla")
>>> cred2.type
<CredentialsType.CUSTOM: 2>
>>> cred2.password
'AgNibGE='
password: str
property
¶
The base64-encoded textual representation of the credentials.
diffusion.session.exceptions
¶
SessionError
¶
Bases: DiffusionError
IncompatibleTopicError
¶
Bases: SessionError
The topic is incompatible.
UpdateFailedError
¶
Bases: SessionError
NoSuchTopicError
¶
Bases: SessionError
There is no such topic.
NoTopicFoundError
¶
Bases: SessionError
NoSuchEventError
¶
Bases: SessionError
The exception used to report a time series topic does not have an original event with the sequence number provided by an
See Also
timeseries.edit operation.
Notes
Added in version 6.9.
ExistingTopicError
¶
Bases: SessionError
InvalidTopicPathError
¶
Bases: SessionError
InvalidTopicSpecificationError
¶
Bases: SessionError
TopicLicenseLimitError
¶
Bases: SessionError
IncompatibleExistingTopicError
¶
Bases: SessionError
This differs from ExistingTopicError as the reason is that the existing topic is owned by something that prevents the caller managing. The specification of the existing topic may be the same.
AddTopicError
¶
Bases: SessionError
UnsatisfiedConstraintError
¶
Bases: SessionError
The exception to report a constraint was not satisfied.
ExclusiveUpdaterConflictError
¶
Bases: SessionError
The exception to indicate an update could not be applied because an exclusive update source is registered for the path.
InvalidPatchError
¶
Bases: SessionError
The exception to report that a JSON Patch was invalid.
FailedPatchError
¶
Bases: SessionError
The exception to report that applying a JSON Patch failed.
Notes
This can happen if the topic's current value is not valid CBOR. See VALIDATE_VALUES.
InvalidUpdateStreamError
¶
Bases: SessionError
The exception used to report an operation was performed with an invalid [UpdateStream][diffusion.UpdateStream].
NotATimeSeriesTopicError
¶
Bases: SessionError
IncompatibleTopicStateError
¶
Bases: SessionError
The exception that indicates that an operation could not be performed because the topic is managed by a component (such as fan-out) that prohibits updates from the caller.
HandlerConflictError
¶
Bases: SessionError
UnhandledMessageError
¶
Bases: SessionError
NoSuchSessionError
¶
Bases: SessionError
CancellationError
¶
Bases: SessionError
RejectedRequestError
¶
Bases: SessionError
InvalidQueryError
¶
Bases: SessionError
Exception used to report a query that is invalid for the time series.
Notes
An example invalid query is one where the anchor is a sequence number
beyond the end of the time series (for example, it is specified using
count
greater than the number of events in the time series),
and the span is a relative time. Since no timestamp is associated
with the anchor, the range is meaningless.
Added in version 6.9.
SessionClosedError
¶
Bases: SessionError
The exception indicating a ISession closure.
Notes
No further operations are possible when this exception has been thrown.
Added in 6.9
SessionSecurityError
¶
Bases: SessionError
The exception indicating that a ISession operation failed due to a security constraint.
Notes
Repeating the operation with the same security credentials is likely to fail.
Added in 6.9
FatalConnectionError
¶
Bases: ProtocolError
The exception indicating a connection has been rejected and should not be retried.
Notes
This exception is never thrown directly but might be the cause of a SessionError
Added in 6.9
AuthenticationError
¶
Bases: FatalConnectionError
The connection exception representing an authentication failure.
Notes
This exception is never thrown directly but might be the cause of a SessionError
Added in 6.9
ProxyAuthenticationError
¶
Bases: ProtocolError
The exception indicating that there was a problem during authentication with a proxy server.
Notes
This exception is never thrown directly but might be the cause of a SessionError
Added in 6.9.