U
- the type of the update to processC
- the UpdateContext
type to process the updatepublic abstract class CustomPublisher<U,C extends UpdateContext> extends Object implements Publisher
Publisher
that enables overriding the default
publication mechanism in a specific type of Gateway application.
Normally, within a source or hybrid ServiceHandler
in a Gateway
application, the Publisher.publish(String, Object)
method is called to
publish an update to a Diffusion topic. This default behaviour can be overridden by
configuring the service to use a CustomPublisher
. An implementation of
CustomPublisher
can be plugged into the application to process updates and
perform operations provided by the Publisher
. For example, it can divide a single
update into multiple updates, enrich those updates, publish them to multiple Diffusion topics,
or even remove a Diffusion topic based on the update.
Using a CustomPublisher
is an advanced feature and is useful when
the default publication mechanism in an existing application needs to be overridden.
All Gateway applications supplied by DiffusionData Limited support this feature, allowing
developers to override the default publication. Externally developed applications
that need to process an update can simply do so by using a Publisher
.
The Gateway application that supports custom publication should implement the
UpdateContext
and provide additional getters for any context information
related to the update that will be supplied to the CustomPublisher
.
The context should be documented clearly so that a CustomPublisher
developer
understands the context supplied with the update and can use it as needed.
A CustomPublisher
developer should include the Gateway application JAR as a dependency
to access the application's implemented UpdateContext
. In Maven, the JAR can be
downloaded and included in the POM file with the system scope.
When a custom publisher is configured for a service, any PayloadConverter
s defined
for the service will be ignored, as a custom publisher may process updates into different
formats and publish them to different Diffusion topics. However, the custom publisher itself
can still use PayloadConverter
s if necessary, to convert updates before publishing them.
To receive the payload converter instances from the framework, the constructor of the
custom publisher should accept a Map of payload converter names to payload converter instances
as a constructor argument. Refer to the constructor documentation for further details.
Similar to a PayloadConverter
, the implementation of a CustomPublisher
can be
provided as a standalone JAR. This JAR can then be used with a Gateway application by
including it in the classpath when starting the application. This setup works if both the
application and the CustomPublisher
are compatible. Specifically, the application
must support using a CustomPublisher
, and the CustomPublisher
must be capable
of handling the updates provided by the application.
If a CustomPublisher
implementation is available in the classpath and a user has
configured a service to use it, the framework will pass an instance of the custom publisher
to the `add` methods of the GatewayApplication
for source and hybrid service types.
This instance should be cast to a CustomPublisher
and used to process the update
received by the source or hybrid service.
Publisher
Constructor and Description |
---|
CustomPublisher(Publisher publisher)
Constructor to create an instance of
CustomPublisher . |
Modifier and Type | Method and Description |
---|---|
CompletableFuture<?> |
addMissingTopicHandler(String topicPath,
MissingTopicNotificationHandler missingTopicNotificationHandler)
Register a
MissingTopicNotificationHandler which will be notified if a
client subscription request matches no known topics and the selector
prefix of the subscription matches the specified branch of the topic
tree. |
CompletableFuture<?> |
applyJSONPatch(String path,
String patch)
Apply a JSON patch to a Diffusion topic value.
|
TopicProperties |
getConfiguredTopicProperties()
Returns a
TopicProperties with user-configured or default
topic properties for the service. |
abstract CompletableFuture<?> |
process(U update,
C updateContext)
Processes the supplied update to publish to Diffusion topics.
|
CompletableFuture<?> |
publish(String path,
Object value)
Publishes a new value to Diffusion.
|
CompletableFuture<?> |
publish(String path,
Object value,
TopicProperties topicProperties)
Publishes a new value to Diffusion.
|
CompletableFuture<?> |
remove(String topics)
Remove a topic or topics.
|
CompletableFuture<?> |
removeMissingTopicHandler(String topicPath)
Removes
MissingTopicNotificationHandler registered for the
specified topicPath in the service. |
void |
setInitialJSONValueForPatchUpdates(String path,
String jsonValue)
Sets the initial value for a Diffusion JSON topic, to which subsequent patch
updates will be applied.
|
public CustomPublisher(Publisher publisher)
CustomPublisher
.
The Framework uses reflection to instantiate the subclass by injecting a
Publisher
instance into the constructor of the subclass. The
subclass should supply this instance of the publisher to its super
constructor. The instance of the subclass thus created will then be
supplied to the `add` methods of the GatewayApplication
for
source and hybrid service types, which will then be used to process the
updates.
If the subclass requires a Map
of context for its construction, the
signature of the constructor can also include a Map, after Publisher,
containing keys of String
type and values of Object
type.
The framework then extracts the configured context for the
CustomPublisher in the service configuration and supplies it to the
constructor.
If the subclass also requires PayloadConverter
s to process
updates, it can define a constructor that accepts a
Publisher
, a Map
of context, and a Map
of
PayloadConverter
s, in this order. The map of payload converters
should contain the payload converter name as the key and the payload converter
instance as the value. Using the converter's name, the associated payload
converter in the map should be assigned to the converter that the publisher requires.
Payload converters required by the custom publisher should be correctly configured
within the custom publisher's configuration. The framework extracts the configured
payload converter information for the custom publisher from the service configuration,
constructs the map of payload converters, and supplies it to the constructor.
Hence, the supplied converters should be validated in the constructor to ensure
they are configured as required.
Below is an example of a constructor for a custom publisher that requires
payload converters:
public KafkaCustomPublisher(
Publisher publisher,
Map<String, Object> configContext,
Map<String, ? extends PayloadConverter<?, ?>> payloadConverterMap) {
}
publisher
- the publisher instance that is used to delegate all
operations supported by the Publisher
.public abstract CompletableFuture<?> process(U update, C updateContext) throws PayloadConversionException
The implementation of this method can call all the Publisher
methods via the abstract superclass.
The payload converter specified for the service will be applied to the
final value that is supplied to the
Publisher.publish(String, Object, TopicProperties)
or
Publisher.publish(String, Object)
method to convert it to a
Diffusion topic value. Hence, the implementation of this interface should
clarify what happens in this method and what type of updates will be
supplied to the
Publisher.publish(String, Object, TopicProperties)
or
Publisher.publish(String, Object)
methods.
updateContext
- the Gateway application-specific and update-specific
context that can be used to process the update. The
context provided by different applications can be
different. Hence, the custom publisher should handle them
accordingly, and if any required context is absent, an
IllegalArgumentException
should be thrown. The
application manual can be consulted to understand if it
supports using a custom publisher and context it
supplies.update
- the update to process.
To indicate success, the completable future should complete
successfully with a null value. If a call to a Publisher
method completes exceptionally then that future can be returned. If
any other processing error occurs, the method should return an
exceptionally completed future with a suitable exception.
PayloadConversionException
- if the supplied value could not be
converted by the payload converter
configured for the service, or the
value type is incompatible with the
payload converter.public final CompletableFuture<?> publish(String path, Object value) throws PayloadConversionException
Publisher
The payload converter specified for the service will be applied to the supplied value in order to convert it to a Diffusion topic value. The order of updates to this method will be preserved in the topic updates. Therefore, this method should be called in the expected order of updates for a topic path, if order should be maintained.
publish
in interface Publisher
path
- the topic pathvalue
- the unconverted valueIf the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
If the task fails, the CompletableFuture will complete
exceptionally with a CompletionException
. Common reasons
for failure, listed by the exception reported as the
cause
, include:
DiffusionSecurityException
– if the application
principal does not have sufficient permissions to perform the
operation;
DiffusionClientException
– if some other
exception has been returned from the Diffusion server via the
Diffusion Client API. The cause will provide more detail.
ServiceStateException
– if the service state is
incompatible with the operation.
PayloadConversionException
- if the supplied value could not be
converted by the payload converter configured for the service, or
the value type is incompatible with the payload converterpublic final CompletableFuture<?> publish(String path, Object value, TopicProperties topicProperties) throws PayloadConversionException
Publisher
This method operates in a similar manner to
Publisher.publish(String, Object)
, but it also provides an option to
specify TopicProperties
to be used for the supplied topic.
Use this method in preference over Publisher.publish(String, Object)
to create Diffusion topics of specific types.
An application user can set topic properties for topics to be
created by a source service in the service configuration or default
values are applied. A TopicProperties
instance that contains
user-configured or default topic properties can be accessed by using the
Publisher.getConfiguredTopicProperties()
method, which is available after
the service handler is started. Any topic properties in this
configured topic properties instance can be overridden using any of
the helper methods in TopicProperties
and passed in this method.
The helper method in TopicProperties
, such as
TopicProperties.withTopicType(TopicType)
, returns an immutable
instance of `TopicProperties`. Hence, values should be overridden in
the last created `TopicProperties` instance to override multiple values.
An application user can also specify payload converters to be used for the updates to be published. If payload converters are specified in the service configuration, then any user defined configuration will take precedence and the passed topicProperties will be ignored. The configured payload converter will be used to convert the updates passed with this method. In this case, the output type of the converter used will define the type of Diffusion topic to be published.
If any payload converters are not specified by a user or in
SourceServiceProperties.Builder#payloadConverter(String)
}, a
default converter to produce data of the supplied topic type will be
used and topic properties specified in this method will take precedence.
If the topic properties passed with a previously published topic path
changes at runtime, the previously created topic should be removed using
Publisher.remove(String)
before publishing again.
publish
in interface Publisher
path
- the topic pathvalue
- the unconverted valuetopicProperties
- The topic properties to use to create the topic.
If many paths require the same
TopicProperties
then the same instance
should be used for efficiency.If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
If the task fails, the CompletableFuture will complete
exceptionally with a CompletionException
. Common reasons
for failure, listed by the exception reported as the
cause
, include:
DiffusionSecurityException
– if the application
principal does not have sufficient permissions to perform the
operation;
DiffusionClientException
– if some other
exception has been returned from the Diffusion server via the
Diffusion Client API. The cause will provide more detail.
ServiceStateException
– if the service state is
incompatible with the operation.
PayloadConversionException
- if the supplied value could not be
converted by the payload converter configured for the service, or
the value type is incompatible with the payload converter, or
if the supplied topic type does not match the value created by
the payload converter in the service configuration.public final CompletableFuture<?> applyJSONPatch(String path, String patch) throws IncompatibleConfigurationException
Publisher
This method may be used only with JSON
topics to
apply a patch to a Diffusion topic value.
Also this cannot be used if the service properties for the service
specify UpdateMode.STREAMING
.
applyJSONPatch
in interface Publisher
path
- the topic pathpatch
- the JSON patch to applyIf the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
If the task fails, the CompletableFuture will complete
exceptionally with a CompletionException
. Common reasons
for failure, listed by the exception reported as the
cause
, include:
DiffusionSecurityException
– if the application
principal does not have sufficient permissions to perform the
operation;
JSONPatchException
– if the patch failed to
apply;
DiffusionClientException
– if some other
exception has been returned from the Diffusion server via the
Diffusion Client API. The cause will provide more detail.
ServiceStateException
– if the service state is
incompatible with the operation.
IncompatibleConfigurationException
- if the topic type in the
service properties is not JSON
or
UpdateMode.STREAMING
is definedto set
initial value for a Diffusion JSON topic, to which subsequent patch updates are
to be applied
public final CompletableFuture<?> remove(String topics)
Publisher
This allows the SourceHandler
or HybridHandler
to remove
a Diffusion topic or topics that it may have previously created
regardless of any persistence policy in use. The topic could have been
created using a topic prefix configured by a user. Hence, this prefix
will be prepended to the passed topic before removing the topic.
Only topics that the application principal has sufficient permission to remove will be removed.
remove
in interface Publisher
topics
- a single topic may be removed by simply specifying its
path. By specifying a path followed by a single / all topics below
the specified path will be removed. By specifying a path followed
by // all topics below the path and the topic at the path will be
removed.If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
If the task fails, the CompletableFuture will complete
exceptionally with a CompletionException
. Common reasons
for failure, listed by the exception reported as the
cause
, include:
DiffusionSecurityException
– if the application
principal does not have sufficient permissions to perform the
operation;
DiffusionClientException
– if some other
exception has been returned from the Diffusion server via the
Diffusion Client API. The cause will provide more detail.
ServiceStateException
– if the service state is
incompatible with the operation.
public final void setInitialJSONValueForPatchUpdates(String path, String jsonValue)
Publisher
This method will only register the initial value for path in memory,
which will be used when Publisher.applyJSONPatch(String, String)
method
is called.
If Publisher.applyJSONPatch(String, String)
fails because topic does
not exist, the topic will be created using the value set in this
method.
This method should be used before Publisher.applyJSONPatch(String, String)
is called, so that if the topic to
send patch to, does not exist, framework will create a JSON topic with
specified JSON value.
If this method is called multiple times, value set in last method call will be applied.
If this method is not called before applying patch to a JSON topic, and the topic does not exist, the topic will be created with '{}' as initial value.
setInitialJSONValueForPatchUpdates
in interface Publisher
path
- Diffusion topic path to which initial value is to be set.jsonValue
- JSON string value to be set as initial value when
creating the JSON topic.public final CompletableFuture<?> addMissingTopicHandler(String topicPath, MissingTopicNotificationHandler missingTopicNotificationHandler)
Publisher
MissingTopicNotificationHandler
which will be notified if a
client subscription request matches no known topics and the selector
prefix of the subscription matches the specified branch of the topic
tree.
Ideally, this method in Publisher
should be called when starting
the SourceHandler
which contains the publisher.
The provided handler is called when another session subscribes a topic selector which does not match any topics and the selector prefix of the subscription matches the specified branch of the topic tree for which the handler is registered.
addMissingTopicHandler
in interface Publisher
topicPath
- identifies a branch of the topic treemissingTopicNotificationHandler
- the handler to use for notifying
topic subscription at or below the topicPath
(unless there is
another handler registered for a more specific topic path)If the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
If the task fails, the CompletableFuture will complete exceptionally
with a CompletionException
. Common reasons for failure,
listed by the exception reported as the cause
, include:
DiffusionSecurityException
– if the application
principal does not have REGISTER_HANDLER
permission;
DiffusionClientException
– if some other
exception has been returned from the Diffusion server via the
Diffusion Client API. The cause will provide more detail.
public final CompletableFuture<?> removeMissingTopicHandler(String topicPath)
Publisher
MissingTopicNotificationHandler
registered for the
specified topicPath in the service.
This method can be used to remove handler registration and stop getting missing topic notifications for the topic path.
removeMissingTopicHandler
in interface Publisher
topicPath
- topic path for which
MissingTopicNotificationHandler
should be removedIf the task completes successfully, the CompletableFuture result will be null. The result type is any rather than Void to provide forward compatibility with future iterations of this API that may provide a non-null result with a more specific result type.
If the task fails, the CompletableFuture will complete exceptionally
with a CompletionException
. Common reasons for failure,
listed by the exception reported as the cause
, include:
DiffusionClientException
– if some other
exception has been returned from the Diffusion server via the
Diffusion Client API. The cause will provide more detail.
public final TopicProperties getConfiguredTopicProperties()
Publisher
TopicProperties
with user-configured or default
topic properties for the service.
If this method is accessed before ServiceHandler.start()
is
called, it will return null.
getConfiguredTopicProperties
in interface Publisher
TopicProperties
instance with user-configured or default values.Copyright © 2024 DiffusionData Limited. All rights reserved.