Skip to content

Data Types

Data types are containers for specific formats of of topic data and messages sent between Diffusion clients and servers.

Usage

from diffusion import datatypes

# the next two lines are equivalent
string_type = datatypes.get("STRING")
string_type = datatypes.STRING

string_instance = string_type("message")

# a data type can be instantiated directly:
string_instance = datatypes.STRING("message")

Fundamental Data Types

diffusion.datatypes.foundation.types

HasRawTypes

Bases: typing_extensions.Protocol[RealValue_Precise]

Protocol describing an object with the given raw datatype

HasRawTypesOrMore

Bases: HasRawTypes[typing.Union[RealValue_Precise, typing.Any]], typing_extensions.Protocol[RealValue_Precise]

Protocol describing an object with at least the given raw datatype

ValueTypeProtocolSpecific

Bases: ValueTypeProtocol, HasRawTypes[RealValue_Precise], Protocol[RealValue_Precise]

Protocol describing a Diffusion Datatype-like-object with at least the given raw datatype

ValueTypeProtocolSpecificOrMore

Bases: ValueTypeProtocol, HasRawTypesOrMore[RealValue_Precise], Protocol[RealValue_Precise]

Protocol describing a Diffusion Datatype-like-object with the given raw datatype

diffusion.datatypes.DataType

Bases: typing.Generic[TypeCode, TypeName]

Generic parent class for all data types implementations.

value abstractmethod property writable

value

Current value of the instance.

__init__

__init__(value: typing.Any) -> None

Initialise the datatype value

Parameters:

Name Type Description Default
value typing.Any

the value to initialise the datatype with

required

read_value classmethod

read_value(
    stream: io.BytesIO,
) -> Optional["typing_extensions.Self"]

Read the value from a binary stream.

Parameters:

Name Type Description Default
stream io.BytesIO

Binary stream containing the serialised data.

required

Returns:

Type Description
Optional['typing_extensions.Self']

An initialised instance of the DataType.

validate

validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid. By default there is no validation.

Available Data Types

diffusion.datatypes.BINARY module-attribute

Binary datatype alias, points to BinaryDataType

diffusion.datatypes.DOUBLE module-attribute

Double datatype alias, points to DoubleDataType

diffusion.datatypes.INT64 module-attribute

Int64 datatype alias, points to Int64DataType

diffusion.datatypes.STRING module-attribute

String datatype alias, points to StringDataType

diffusion.datatypes.JSON module-attribute

Json datatype alias, points to JsonDataType

diffusion.datatypes.TIME_SERIES module-attribute

TimeSeries datatype alias, points to TimeSeriesEventDataType

diffusion.datatypes.OBJECT module-attribute

OBJECT = Object

Object datatype - effectively a token indicating types should not be converted

Unimplemented Data Types

To be added when appropriate.

diffusion.datatypes.RECORD_V2 module-attribute

RECORD_V2 = complex.RecordDataType

Record V2 datatype alias, points to RecordDataType

diffusion.datatypes.UNKNOWN module-attribute

UNKNOWN = complex.UnknownDataType

Unknown datatype alias, points to UnknownDataType

diffusion.datatypes.complex.RecordDataType

Bases: IBytes[TopicSpecification['RecordDataType'], 'RecordDataType', 'RecordDataType']

Diffusion record data type implementation.

diffusion.datatypes.complex.UnknownDataType

Bases: IBytes[TopicSpecification['UnknownDataType'], 'UnknownDataType', 'UnknownDataType']

Unknown data type implementation.

Generic base classes

diffusion.datatypes.foundation.datatype.DataType

Bases: typing.Generic[TypeCode, TypeName]

Generic parent class for all data types implementations.

value abstractmethod property writable

value

Current value of the instance.

__init__

__init__(value: typing.Any) -> None

Initialise the datatype value

Parameters:

Name Type Description Default
value typing.Any

the value to initialise the datatype with

required

read_value classmethod

read_value(
    stream: io.BytesIO,
) -> Optional["typing_extensions.Self"]

Read the value from a binary stream.

Parameters:

Name Type Description Default
stream io.BytesIO

Binary stream containing the serialised data.

required

Returns:

Type Description
Optional['typing_extensions.Self']

An initialised instance of the DataType.

validate

validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid. By default there is no validation.

diffusion.datatypes.foundation.abstract.AbstractDataType

Bases: DataType, typing.Generic[TS_T, ValueType, RealValue]

value property writable

value: RealValue

Current value of the instance.

serialised_value property

serialised_value: dict

Return the sequence of values ready to be serialised.

It is assumed that the serialisation will use the serialised-value serialiser.

with_properties class-attribute

with_properties: WithProperties = WithProperties()

A class property that returns the type of this class's appropriate TopicSpecification class, ready for instantiation with the relevant parameters.

See Also

WithProperties

write_value

write_value(stream: BytesIO) -> BytesIO

Write the value into a binary stream.

Parameters:

Name Type Description Default
stream BytesIO

Binary stream to serialise the value into.

required

to_bytes

to_bytes() -> bytes

Convert the value into the binary representation.

Convenience method, not to be overridden

read_value classmethod

read_value(stream: BytesIO) -> typing.Optional[Self]

Read the value from a binary stream.

Parameters:

Name Type Description Default
stream BytesIO

Binary stream containing the serialised data.

required

Returns:

Type Description
typing.Optional[Self]

An initialised instance of the DataType.

from_bytes classmethod

from_bytes(data: bytes) -> typing.Optional[Self]

Convert a binary representation into the corresponding value.

Parameters:

Name Type Description Default
data bytes

Serialised binary representation of the value.

required

Returns:

Type Description
typing.Optional[Self]

An initialised instance of the DataType.

encode abstractmethod classmethod

encode(value: Any) -> bytes

Convert a value into the corresponding binary representation.

Parameters:

Name Type Description Default
value Any

Native value to be serialised

required

Returns:

Type Description
bytes

Serialised binary representation of the value.

decode abstractmethod classmethod

decode(data: bytes) -> RealValue

Convert a binary representation into the corresponding value.

Parameters:

Name Type Description Default
data bytes

Serialised binary representation of the value.

required

Returns:

Type Description
RealValue

Deserialised value.

set_from_bytes

set_from_bytes(data: bytes) -> None

Convert bytes and set the corresponding value on the instance.

can_read_as classmethod

can_read_as(
    result_type: typing.Type[AbstractDataType],
) -> bool

Checks whether this data type is compatible with the given result_type

This means that any valid binary representation of this data type can be read as an instance of the given result_type

Any value type can be read as an OBJECT.

Parameters:

Name Type Description Default
result_type typing.Type[AbstractDataType]

The type to check for compatibility.

required

Returns:

Type Description
bool

True if the data type is compatible with the given result_type. Otherwise false.

Raises:

Type Description
ValueError

The given result_type isn't a valid result type.

diffusion.datatypes.foundation.abstract.WithProperties

Bytes datatypes

diffusion.datatypes.foundation.ibytesdatatype.IBytes

Bases: AbstractDataType[TS_T, ValueType, RealValue], typing.Generic[TS_T, ValueType, RealValue]

Base type for Diffusion data types allowing conversion to and from bytes.

diffusion.datatypes.foundation.bytesdatatype.Bytes

Bases: IBytesParent

Represents a basic, bytes-only implementation of IBytes

Effectively a lower bound on IBytes types.

Primitive datatypes

diffusion.datatypes.primitives

PrimitiveDataTypes module-attribute

PrimitiveDataTypes = typing.Union[
    BINARY, DOUBLE, INT64, STRING
]

Primitive diffusion data types.

PrimitiveDataTypesClasses module-attribute

PrimitiveDataTypesClasses = typing.Union[
    typing.Type[BINARY],
    typing.Type[DOUBLE],
    typing.Type[INT64],
    typing.Type[STRING],
]

Classes of primitive Diffusion data types.

binarydatatype

BinaryDataType

Bases: PrimitiveDataType[bytes]

Binary data type. Not convertible to or from other types

doubledatatype

DoubleDataType

Bases: JsonDataType[float]

Data type that supports double-precision floating point numbers.

(Eight-byte IEEE 754)

The integer value is serialized as CBOR-format binary. A serialized value can be read as a JSON instance.

int64datatype

Int64DataType

Bases: JsonDataType[int]

Data type that supports 64-bit, signed integer values.

The integer value is serialized as CBOR-format binary. A serialized value can be read as JSON instance.

validate
validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid.

jsondatatype

JsonTypes module-attribute

A JSON value.

JsonTypes_Bound module-attribute

JsonTypes_Bound = typing.Optional[JsonTypes]

An optional JsonTypes type.

primitivedatatype

PrimitiveDataType

Bases: IBytes[TopicSpecification['PrimitiveDataType[T]'], 'PrimitiveDataType[T]', typing.Optional[T]], typing.Generic[T]

validate
validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid.

stringdatatype

StringDataType

Bases: JsonDataType[str]

String data type.

The string value is serialized as CBOR-format binary.

Complex datatypes

diffusion.datatypes.complex.JsonDataType

Bases: PrimitiveDataType[T_json], typing.Generic[T_json]

diffusion.datatypes.complex.ComplexDataTypes module-attribute

ComplexDataTypes = typing.Union[JSON, RECORD_V2]

Complex Diffusion data types.

diffusion.datatypes.complex.ComplexDataTypesClasses module-attribute

ComplexDataTypesClasses = typing.Union[
    typing.Type[JSON], typing.Type[RECORD_V2]
]

Classes of complex Diffusion data types.

Time Series

diffusion.datatypes.timeseries

TopicSpecification

Bases: typing.Generic[T], topic_specification.TopicSpecification[T]

Time Series Topic Specification class

TIME_SERIES_EVENT_VALUE_TYPE instance-attribute

TIME_SERIES_EVENT_VALUE_TYPE: typing.Type[
    TimeSeriesValueType
]

Specifies the event data type for a time series topic.

TIME_SERIES_RETAINED_RANGE class-attribute instance-attribute

TIME_SERIES_RETAINED_RANGE: typing.Optional[str] = None

Key of the topic property that specifies the range of events retained by a time series topic.

When a new event is added to the time series, older events that fall outside of the range are discarded.

If the property is not specified, a time series topic will retain the ten most recent events.

Time series range expressions

The property value is a time series range expression string composed of one or more constraint clauses. Constraints are combined to provide a range of events from the end of the time series.

'limit' constraint

A limit constraint specifies the maximum number of events from the end of the time series.

'last' constraint

A last constraint specifies the maximum duration of events from the end of the time series. The duration is expressed as an integer followed by one of the following time units.

  • ms - milliseconds
  • s - seconds
  • h - hours

If a range expression contains multiple constraints, the constraint that selects the smallest range is used.

Property value Meaning
limit 5 The five most recent events
last 10s All events that are no more than ten seconds older than the latest event
last 10s limit 5 The five most recent events that are no more than ten seconds older than the latest event

Range expressions are not case sensitive:

limit 5 last 10s
is equivalent to

LIMIT 5 LAST 10S.

Since 6.8.3

TIME_SERIES_SUBSCRIPTION_RANGE class-attribute instance-attribute

TIME_SERIES_SUBSCRIPTION_RANGE: typing.Optional[str] = None

Key of the topic property that specifies the range of time series topic events to send to new subscribers.

The property value is a time series range expression, following the format used for TIME_SERIES_RETAINED_RANGE.

If the property is not specified, new subscribers will be sent the latest event if delta streams are enabled and no events if delta streams are disabled. See the description of Subscription range in the {@link TimeSeries time series feature} documentation.

Since 6.8.3

CONFLATION class-attribute instance-attribute

TimeSeries conflation policy is restricted to the above.

See Also

diffusion.features.topics.details.topic_specification.TopicSpecification.CONFLATION

TopicSpecificationAuto

Bases: typing.Generic[T], TopicSpecification[T]

TIME_SERIES_EVENT_VALUE_TYPE class-attribute instance-attribute

TIME_SERIES_EVENT_VALUE_TYPE: typing.Type[
    TimeSeriesValueType_Diffusion
] = pydantic.Field(
    const=True, default_factory=fake_own_type
)

Specifies the event data type for a time series topic.

TimeSeriesWithProperties

TimeSeriesEventDataType

Bases: IBytes[TopicSpecification['TimeSeriesEventDataType[VT]'], Event[VT], Event[VT]], typing.Generic[VT]

A data type for time series events

with_properties class-attribute

Returns a Topic Specification class filled with this type and accepting the relevant parameters

__init__

__init__(value: Event[VT])

Initialise a TimeSeriesEventDataType

Parameters:

Name Type Description Default
value Event[VT]

the Event[VT] to set as the value of this instance

required

TimeSeriesDataType

Bases: object

Time series data type implementation.

of classmethod

Provide a Time Series datatype with the given Event[VT] value type.

Please use TimeSeries.of rather than this function to obtain Time Series datatypes.

Parameters:

Name Type Description Default
val_type typing.Type[VT]

the type of value that events will contain.

required

Returns:

Type Description
typing.Type[TimeSeriesEventDataType[VT]]

The relevant Time Series data type.

time_series_event

EventType

Bases: enum.IntEnum

Event type constants

Event

Bases: typing.Generic[VT], MarshalledModel

Implementation of a time series event

metadata class-attribute instance-attribute
metadata: EventMetadata = pydantic.Field()

Event metadata

original_event instance-attribute
original_event: EventMetadata

Original event metadata

value class-attribute instance-attribute
value: typing.Optional[VT] = pydantic.Field()

Value of event payload

is_original_event property
is_original_event

Flag indicating whether this is an original event

create classmethod
create(
    metadata: EventMetadata,
    original_event: typing.Optional[EventMetadata],
    value: typing.Optional[VT_other],
    value_type: typing.Type[VT_other],
) -> Event[VT_other]

Static factory to create a new Event instance

Parameters:

Name Type Description Default
metadata EventMetadata

the metadata of the event

required
original_event typing.Optional[EventMetadata]

the original event

required
value typing.Optional[VT_other]

the value of the event

required
value_type typing.Type[VT_other]

the type of the value

required

Returns:

Type Description
Event[VT_other]

a new Event instance

with_value
with_value(
    new_value: typing.Optional[VT_other],
    type: typing.Type[VT_other],
) -> Event[VT_other]

Create a copy of this Event with a different value

Parameters:

Name Type Description Default
new_value typing.Optional[VT_other]

the new value

required
type typing.Type[VT_other]

type of the new value

required

Returns:

Type Description
Event[VT_other]

a new Event instance

read_value classmethod
read_value(stream: BytesIO) -> Event[VT]

Read an original event from an input stream

Parameters:

Name Type Description Default
stream BytesIO

the input stream

required

Returns:

Type Description
Event[VT]

the event that was read from the stream

OriginalEvent

Bases: typing.Generic[VT], Event[VT]

read_value classmethod
read_value(stream: BytesIO) -> OriginalEvent[VT]

Read an original event from an input stream

Parameters:

Name Type Description Default
stream BytesIO

the input stream

required

Returns:

Type Description
OriginalEvent[VT]

the event that was read from the stream

EditEvent

Bases: typing.Generic[VT], Event[VT]

original_event instance-attribute
original_event: EventMetadata

Original event metadata

read_value classmethod
read_value(stream: BytesIO) -> EditEvent[VT]

Read an edit event from an input stream

Parameters:

Name Type Description Default
stream BytesIO

the input stream

required

Returns:

Type Description
EditEvent[VT]

the event that was read from the stream

time_series_event_metadata

EventMetadata

Bases: MarshalledModel

Time series event metadata.

sequence instance-attribute
sequence: int

Sequence number identifying this event within its time series. Assigned by the server when the event is created.

Sequence numbers are unique within a time series. Each event appended to a time series is assigned a sequence number that is is equal to the sequence number of the preceding event plus one.

timestamp instance-attribute
timestamp: int

Event timestamp. Assigned by the server when the event is created.

Events do not have unique timestamps. Events with different sequence numbers may have the same timestamp.

Subsequent events in a time series usually have timestamps that are greater or equal to the timestamps of earlier events, but this is not guaranteed due to changes to the time source used by the server.

Timestamps represent the difference, measured in milliseconds, between the time the server added the event to the time series and midnight, January 1, 1970 UTC

author instance-attribute
author: str

Server-authenticated identity of the session that created the event.

If the session that created the event was not authenticated, the author will be an empty string.

types

TimeSeriesValueTypeClasses module-attribute

Possible typing.Type values for a Time Series Value

TimeSeriesValueType_Diffusion module-attribute

TimeSeriesValueType_Diffusion = typing.Union[
    PrimitiveDataTypes, ComplexDataTypes
]

Possible implementation types for a Time Series Value

RawComplexDataTypes module-attribute

RawComplexDataTypes = typing.Union[
    typing.List[typing.Any],
    typing.Dict[typing.Any, typing.Any],
]

Types that could be JSON

TimeSeriesValueTypeOrRaw module-attribute

TimeSeriesValueTypeOrRaw = typing.Union[
    TimeSeriesValueType, RawDataTypes
]

Time Series Value Type parameter

VT_argtype module-attribute

VT_argtype = typing.TypeVar(
    "VT_argtype",
    bound=typing.Union[bytes, TimeSeriesValueTypeOrRaw],
)

Time Series Value Type parameter (TypeVar)

VT module-attribute

VT = typing.TypeVar('VT', bound=TimeSeriesValueType)

Possible types for a Time Series Value (TypeVar)

VT_other module-attribute

VT_other = typing.TypeVar(
    "VT_other", bound=TimeSeriesValueType
)

Possible types for a Time Series Value conversion target

Conversion

diffusion.datatypes.conversion.GenericConverter

Bases: typing.Protocol, typing.Hashable

A datatype converter, takes end user types and returns valid Diffusion types