Skip to content

Data Types

Data types are containers for specific formats of of topic data and messages sent between Diffusion clients and servers.

Usage

from diffusion import datatypes

# the next two lines are equivalent
string_type = datatypes.get("STRING")
string_type = datatypes.STRING

string_instance = string_type("message")

# a data type can be instantiated directly:
string_instance = datatypes.STRING("message")

Available Data Types

diffusion.datatypes.BINARY = primitives.BinaryDataType module-attribute

Binary datatype alias, points to BinaryDataType

diffusion.datatypes.DOUBLE = primitives.DoubleDataType module-attribute

Double datatype alias, points to DoubleDataType

diffusion.datatypes.INT64 = primitives.Int64DataType module-attribute

Int64 datatype alias, points to Int64DataType

diffusion.datatypes.STRING = primitives.StringDataType module-attribute

String datatype alias, points to StringDataType

diffusion.datatypes.JSON = complex.JsonDataType module-attribute

Json datatype alias, points to JsonDataType

diffusion.datatypes.TIME_SERIES = TimeSeriesEventDataType module-attribute

TimeSeries datatype alias, points to TimeSeriesEventDataType

Unimplemented Data Types

To be added when appropriate.

diffusion.datatypes.RECORD_V2 = complex.RecordDataType module-attribute

Record V2 datatype alias, points to RecordDataType

diffusion.datatypes.ROUTING = complex.RoutingDataType module-attribute

Routing datatype alias, points to RoutingDataType

diffusion.datatypes.UNKNOWN = complex.UnknownDataType module-attribute

Unknown datatype alias, points to UnknownDataType

diffusion.datatypes.complex.RecordDataType

Bases: AbstractDataType

Diffusion record data type implementation.

diffusion.datatypes.complex.RoutingDataType

Bases: AbstractDataType

Routing data type implementation.

diffusion.datatypes.complex.UnknownDataType

Bases: AbstractDataType

Unknown data type implementation.

Generic base classes

diffusion.datatypes.foundation.datatype.DataType

Generic parent class for all data types implementations.

type_code: int class-attribute

Globally unique numeric identifier for the data type.

type_name: str class-attribute

Globally unique identifier for the data type.

value writable abstractmethod property

Current value of the instance.

__init__(value: typing.Optional[typing.Any]) -> None

Initialise the datatype value

Parameters:

Name Type Description Default
value typing.Optional[typing.Any]

the value to initialise the datatype with

required

read_value(stream: io.BytesIO) -> Optional[DataType] classmethod

Read the value from a binary stream.

Parameters:

Name Type Description Default
stream io.BytesIO

Binary stream containing the serialised data.

required

Returns:

Type Description
Optional[DataType]

An initialised instance of the DataType.

validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid. By default there is no validation.

diffusion.datatypes.foundation.abstract.AbstractDataType

Bases: DataType, typing.Generic[TS_T, ValueType, RealValue]

value: typing.Optional[RealValue] writable property

Current value of the instance.

serialised_value: dict property

Return the sequence of values ready to be serialised.

It is assumed that the serialisation will use the serialised-value serialiser.

with_properties: typing.ClassVar[WithProperties] = WithProperties() class-attribute

A class property that returns the type of this class's appropriate TopicSpecification class, ready for instantiation with the relevant parameters.

See Also

WithProperties

write_value(stream: BytesIO) -> BytesIO

Write the value into a binary stream.

Parameters:

Name Type Description Default
stream BytesIO

Binary stream to serialise the value into.

required

to_bytes() -> bytes

Convert the value into the binary representation.

Convenience method, not to be overridden

read_value(stream: BytesIO) -> Optional[AbstractDataType] classmethod

Read the value from a binary stream.

Parameters:

Name Type Description Default
stream BytesIO

Binary stream containing the serialised data.

required

Returns:

Type Description
Optional[AbstractDataType]

An initialised instance of the DataType.

from_bytes(data: bytes) -> Optional[A_T] classmethod

Convert a binary representation into the corresponding value.

Parameters:

Name Type Description Default
data bytes

Serialised binary representation of the value.

required

Returns:

Type Description
Optional[A_T]

An initialised instance of the DataType.

encode(value: Any) -> bytes classmethod

Convert a value into the corresponding binary representation.

Parameters:

Name Type Description Default
value Any

Native value to be serialised

required

Returns:

Type Description
bytes

Serialised binary representation of the value.

decode(data: bytes) -> Any classmethod

Convert a binary representation into the corresponding value.

Parameters:

Name Type Description Default
data bytes

Serialised binary representation of the value.

required

Returns:

Type Description
Any

Deserialised value.

set_from_bytes(data: bytes) -> None

Convert bytes and set the corresponding value on the instance.

diffusion.datatypes.foundation.abstract.WithProperties

Bases: typing.Generic[TS_T, ValueType]

__get__(instance: typing.Optional[AbstractDataType[TS_T, ValueType, RealValue]], owner: typing.Type[AbstractDataType[TS_T, ValueType, RealValue]]) -> typing.Type[TS_T]

Return a TopicSpecification class prefilled with owner

Parameters:

Name Type Description Default
instance typing.Optional[AbstractDataType[TS_T, ValueType, RealValue]]

the instance on which this is called

required
owner typing.Type[AbstractDataType[TS_T, ValueType, RealValue]]

the class on which this is called

required

Returns:

Type Description
typing.Type[TS_T]

Return a TopicSpecification class prefilled with owner

Primitive datatypes

diffusion.datatypes.primitives

RawDataTypes = typing.Union[StrictFloat, StrictInt, StrictStr, StrictBytes] module-attribute

Raw data types that are used to populate Diffusion datatypes

PrimitiveDataTypes = typing.Union[BINARY, DOUBLE, INT64, STRING] module-attribute

Primitive diffusion data types.

PrimitiveDataTypesClasses = typing.Union[typing.Type[BINARY], typing.Type[DOUBLE], typing.Type[INT64], typing.Type[STRING]] module-attribute

Classes of primitive Diffusion data types.

doubledatatype

DoubleDataType

Bases: PrimitiveDataType[float]

Data type that supports double-precision floating point numbers.

(Eight-byte IEEE 754)

The integer value is serialized as CBOR-format binary. A serialized value can be read as a JSON instance.

int64datatype

Int64DataType

Bases: PrimitiveDataType[int]

Data type that supports 64-bit, signed integer values.

The integer value is serialized as CBOR-format binary. A serialized value can be read as JSON instance.

validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid.

primitivedatatype

PrimitiveDataType

Bases: typing.Generic[T], AbstractDataType[TopicSpecification[PrimitiveDataType], PrimitiveDataType[T], typing.Optional[T]]

validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid.

stringdatatype

StringDataType

Bases: PrimitiveDataType[str]

String data type.

The string value is serialized as CBOR-format binary.

binarydatatype

BinaryDataType

Bases: PrimitiveDataType[bytes]

Data type that supports arbitrary binary data.

encode(value) -> bytes classmethod

Convert the value into the binary representation.

decode(data: bytes) -> bytes classmethod

Convert a binary representation into the corresponding value.

Parameters:

Name Type Description Default
data bytes

Serialised binary representation of the value.

required

Returns:

Type Description
bytes

Deserialised value.

Complex datatypes

diffusion.datatypes.complex.JsonDataType

Bases: AbstractDataType[TopicSpecification[JsonDataType], JsonDataType, JsonTypes]

JSON data type implementation.

validate() -> None

Check the current value for correctness.

Raises:

Type Description
`InvalidDataError`

If the value is invalid.

diffusion.datatypes.complex.ComplexDataTypes = typing.Union[JSON, RECORD_V2] module-attribute

Complex Diffusion data types.

diffusion.datatypes.complex.ComplexDataTypesClasses = typing.Union[typing.Type[JSON], typing.Type[RECORD_V2]] module-attribute

Classes of complex Diffusion data types.

Time Series

diffusion.datatypes.timeseries

TopicSpecification

Bases: typing.Generic[T], topic_specification.TopicSpecification[T]

Time Series Topic Specification class

TIME_SERIES_EVENT_VALUE_TYPE: typing.Type[TimeSeriesValueType] = pydantic.Field(const=True) class-attribute

Specifies the event data type for a time series topic.

TIME_SERIES_RETAINED_RANGE: typing.Optional[str] = None class-attribute

Key of the topic property that specifies the range of events retained by a time series topic.

When a new event is added to the time series, older events that fall outside of the range are discarded.

If the property is not specified, a time series topic will retain the ten most recent events.

Time series range expressions

The property value is a time series range expression string composed of one or more constraint clauses. Constraints are combined to provide a range of events from the end of the time series.

limit constraint

A limit constraint specifies the maximum number of events from the end of the time series.

last constraint

A last constraint specifies the maximum duration of events from the end of the time series. The duration is expressed as an integer followed by one of the following time units.

  • ms - milliseconds
  • s - seconds
  • h - hours

If a range expression contains multiple constraints, the constraint that selects the smallest range is used.

Property value Meaning
limit 5 The five most recent events
last 10s All events that are no more than ten seconds older than the latest event
last 10s limit 5 The five most recent events that are no more than ten seconds older than the latest event

Range expressions are not case sensitive:

limit 5 last 10s

is equivalent to

LIMIT 5 LAST 10S.

Since 6.8.3

TIME_SERIES_SUBSCRIPTION_RANGE: typing.Optional[str] = None class-attribute

Key of the topic property that specifies the range of time series topic events to send to new subscribers.

The property value is a time series range expression, following the format used for TIME_SERIES_RETAINED_RANGE.

If the property is not specified, new subscribers will be sent the latest event if delta streams are enabled and no events if delta streams are disabled. See the description of Subscription range in the {@link TimeSeries time series feature} documentation.

Since 6.8.3

CONFLATION: typing.Optional[Literal[ConflationPolicy.OFF, ConflationPolicy.UNSUBSCRIBE]] = None class-attribute

TimeSeries conflation policy is restricted to the above.

See Also

diffusion.features.topics.details.topic_specification.TopicSpecification.CONFLATION

TimeSeriesWithProperties

Bases: typing.Generic[T], WithProperties[TopicSpecification[T], Event]

__get__(instance: typing.Optional[AbstractDataType[TopicSpecification[T], ValueType, RealValue]], owner: typing.Type[AbstractDataType[TopicSpecification[T], ValueType, RealValue]]) -> typing.Type[TopicSpecification[T]]

Return a TopicSpecification class prefilled with owner

Parameters:

Name Type Description Default
instance typing.Optional[AbstractDataType[TopicSpecification[T], ValueType, RealValue]]

the instance on which this is called

required
owner typing.Type[AbstractDataType[TopicSpecification[T], ValueType, RealValue]]

the class on which this is called

required

Returns:

Type Description
typing.Type[TopicSpecification[T]]

Return a TopicSpecification class prefilled with owner

TimeSeriesEventDataType

Bases: typing.Generic[VT], AbstractDataType[TopicSpecification[TimeSeriesEventDataType[VT]], Event[VT], Event[VT]]

A data type for time series events

with_properties: typing.ClassVar[TimeSeriesWithProperties] = TimeSeriesWithProperties() class-attribute

Returns a Topic Specification class filled with this type and accepting the relevant parameters

__init__(value: Event[VT])

Initialise a TimeSeriesEventDataType

Parameters:

Name Type Description Default
value Event[VT]

the Event[VT] to set as the value of this instance

required

TimeSeriesDataType

Bases: AbstractDataType

Time series data type implementation.

of(val_type: typing.Type[VT]) -> typing.Type[TimeSeriesEventDataType[VT]] classmethod

Provide a Time Series datatype with the given Event[VT] value type.

Please use TimeSeries.of rather than this function to obtain Time Series datatypes.

Parameters:

Name Type Description Default
val_type typing.Type[VT]

the type of value that events will contain.

required

Returns:

Type Description
typing.Type[TimeSeriesEventDataType[VT]]

The relevant Time Series data type.

time_series_event_metadata

EventMetadata

Bases: MarshalledModel

Time series event metadata.

sequence: int class-attribute

Sequence number identifying this event within its time series. Assigned by the server when the event is created.

Sequence numbers are unique within a time series. Each event appended to a time series is assigned a sequence number that is is equal to the sequence number of the preceding event plus one.

timestamp: int class-attribute

Event timestamp. Assigned by the server when the event is created.

Events do not have unique timestamps. Events with different sequence numbers may have the same timestamp.

Subsequent events in a time series usually have timestamps that are greater or equal to the timestamps of earlier events, but this is not guaranteed due to changes to the time source used by the server.

Timestamps represent the difference, measured in milliseconds, between the time the server added the event to the time series and midnight, January 1, 1970 UTC

author: str class-attribute

Server-authenticated identity of the session that created the event.

If the session that created the event was not authenticated, the author will be an empty string.

time_series_event

EventType

Bases: enum.IntEnum

Event type constants

Event

Bases: typing.Generic[VT], MarshalledModel

Implementation of a time series event

metadata: EventMetadata = pydantic.Field() class-attribute

Event metadata

original_event: EventMetadata class-attribute

Original event metadata

value: typing.Optional[VT] = pydantic.Field() class-attribute

Value of event payload

is_original_event property

Flag indicating whether this is an original event

create(metadata: EventMetadata, original_event: typing.Optional[EventMetadata], value: typing.Optional[VT_other], value_type: typing.Type[VT_other]) -> Event[VT_other] classmethod

Static factory to create a new Event instance

Parameters:

Name Type Description Default
metadata EventMetadata

the metadata of the event

required
original_event typing.Optional[EventMetadata]

the original event

required
value typing.Optional[VT_other]

the value of the event

required
value_type typing.Type[VT_other]

the type of the value

required

Returns:

Type Description
Event[VT_other]

a new Event instance

with_value(new_value: typing.Optional[VT_other], type: typing.Type[VT_other]) -> Event[VT_other]

Create a copy of this Event with a different value

Parameters:

Name Type Description Default
new_value typing.Optional[VT_other]

the new value

required
type typing.Type[VT_other]

type of the new value

required

Returns:

Type Description
Event[VT_other]

a new Event instance

read_value(stream: BytesIO) -> Event[VT] classmethod

Read an original event from an input stream

Parameters:

Name Type Description Default
stream BytesIO

the input stream

required

Returns:

Type Description
Event[VT]

the event that was read from the stream

OriginalEvent

Bases: typing.Generic[VT], Event[VT]

read_value(stream: BytesIO) -> OriginalEvent[VT] classmethod

Read an original event from an input stream

Parameters:

Name Type Description Default
stream BytesIO

the input stream

required

Returns:

Type Description
OriginalEvent[VT]

the event that was read from the stream

EditEvent

Bases: typing.Generic[VT], Event[VT]

original_event: EventMetadata class-attribute

Original event metadata

read_value(stream: BytesIO) -> EditEvent[VT] classmethod

Read an edit event from an input stream

Parameters:

Name Type Description Default
stream BytesIO

the input stream

required

Returns:

Type Description
EditEvent[VT]

the event that was read from the stream

types

TimeSeriesValueTypeClasses = typing.Union[PrimitiveDataTypesClasses, ComplexDataTypesClasses] module-attribute

Possible typing.Type values for a Time Series Value

TimeSeriesValueType = typing.Union[PrimitiveDataTypes, ComplexDataTypes] module-attribute

Possible types for a Time Series Value

RawComplexDataTypes = typing.Union[typing.List[typing.Any], typing.Dict[typing.Any, typing.Any]] module-attribute

Types that could be JSON

TimeSeriesValueTypeOrRaw = typing.Optional[typing.Union[TimeSeriesValueType, RawDataTypes, RawComplexDataTypes, StrictBool]] module-attribute

Time Series Value Type parameter

VT_argtype = typing.TypeVar('VT_argtype', bound=TimeSeriesValueTypeOrRaw) module-attribute

Time Series Value Type parameter (TypeVar)

VT = typing.TypeVar('VT', bound=TimeSeriesValueType) module-attribute

Possible types for a Time Series Value (TypeVar)

VT_other = typing.TypeVar('VT_other', bound=TimeSeriesValueType) module-attribute

Possible types for a Time Series Value conversion target