Custom Publisher
By default, for source and hybrid services, a Publisher
instance is supplied by
the framework which can be used to perform any operation the publisher allows. If
the requirement is to process an update before calling the publish
method, a developer
of an application can do so in the service handler. The update could be divided into
multiple values and be published to multiple Diffusion topics or, based on the update,
a Diffusion topic could be removed. If this functionality of processing an update needs
to be reused in multiple application types, or if a developer
wants to enable overriding processing of an update in a service, a CustomPublisher
can be used. Using a CustomPublisher
is an advanced option and is sensible only
with the above-mentioned use cases.
Implementing CustomPublisher
CustomPublisher
is an abstract class and provides an abstract method process
that needs to be implemented by a subclass. The subclass should be implemented and
made available as a standalone JAR that can then be reused in different applications.
Within the abstract method, logic to process an update for the service needs to be
implemented. This method accepts a map of context and an update to process. The map
of context will be supplied by an application that is specific to the update, such as
in the Kafka adapter, where it can be a Kafka topic name from where the update is received.
An example implementation of the abstract method is provided below:
@Override
public CompletableFuture<?> process(Map<String, Object> context, Object update) throws PayloadConversionException {
final Map<String, Object> topicToUpdatesMap = getFinalUpdates(context, update);
topicToUpdatesMap.forEach((topic, value) -> {
try {
publisher
.publish(topic, value)
.whenComplete((result, throwable) -> {
if (throwable != null) {
LOG.error("Failed to publish update to {} topic", topic, throwable);
}
});
}
catch (PayloadConversionException ex) {
LOG.error("Failed to convert update for {} topic", topic, ex);
}
});
return CompletableFuture.completedFuture(null);
}
In the example above, an update is processed using the supplied context to create a map of
Diffusion topic paths and corresponding updates. Each topic is then published with its update.
The context in this process
method is supplied by the application and is application and update-specific.
If such contexts are required, the user guide of applications that could use the custom
publisher should be consulted to identify different contexts that are supplied with each update.
The CustomPublisher
class also contains a getApplicationType
method, which
can be used to identify the type of application to process the updates accordingly.
However, checking applicationType in the process
method can be inefficient with every update.
Hence, applicationType
can be required as a context in the configuration for the
custom publisher that can be analyzed during the construction of the publisher.
The applicationType
or any other detail required during
the construction of the publisher can be specified as a requirement of the publisher configuration
that a user needs to specify in the service configuration. See here
for details on how a CustomPublisher
can be configured to be used for a service.
For more details on the construction of the subclass and on providing further context
for it, see the Javadoc of the CustomPublisher
class.
Supporting CustomPublisher in an application
To support using a CustomPublisher
in source or hybrid services in an application,
the application must check if the publisher supplied to the add
method of such service
types is an instance of CustomPublisher
and cast the publisher to use the process
method in the service handler.
private static CompletableFuture<?> publish(
Publisher publisher,
String defaultTopic,
Object update) {
try {
if (publisher instanceof CustomPublisher) {
return ((CustomPublisher) publisher).process(
Collections.singletonMap("topic", defaultTopic),
update);
}
else {
return publisher.publish(defaultTopic, update);
}
}
catch (PayloadConversionException ex) {
LOG.error("Failed to process update", ex);
}
return CompletableFuture.completedFuture(null);
}
As presented above, the application should support publication for services that are configured with or without a custom publisher, as the use of a custom publisher for a service is dependent on the user’s requirement. If an application supports using a custom publisher for source or hybrid services, users can specify the use of a custom publisher for a specific service by specifying it in the configuration
Using CustomPublisher in an application
The implementation of the CustomPublisher
is provided as a standalone JAR that
needs to be plugged in with the application during its startup, i.e., the JAR should
be added to the classpath when starting the application. For this, the JAR file can
be placed in a directory, and the path of the directory can be set as the value for
the gateway.ext.dir system property.
The framework will load this directory into the classpath during application startup.
A service in the application can then be configured to use the custom publisher. However, this will only work if
both the application and the CustomPublisher
are compatible with each other. This
means the application must support using the CustomPublisher
, and the CustomPublisher
must be able to handle updates supplied by the application.
CustomPublisher configuration
The configuration for source or hybrid services in an application can be configured in the following way to use the custom publisher.
{ "serviceName": "enhancedPoller", "serviceType": "POLLING_SOURCE", "config": { "framework": { "pollIntervalMs": 500, "customPublisher": { "className": "com.diffusiondata.gateway.publisher.MultiTopicPublisher", "parameters": { "margin": "0.1" } } } } },
As presented above, to specify the use of a custom publisher for a source or hybrid
service, the customPublisher
parameter should be specified in the framework configuration.
The parameters for the customPublisher
are defined below:
Key | Description | Mandatory |
---|---|---|
|
The full class name of the custom publisher. |
Yes |
|
The parameters required for instantiation of the custom publisher. These should be documented in the user guide of the custom publisher. |
No |
Thus, with above service configuration, if the implementation of
the CustomPublisher
, i.e., com.diffusiondata.gateway.publisher.MultiTopicPublisher
is available in the classpath, the framework will supply its instance to the add
method
for the POLLING_SOURCE
serviceType. The service will then use this publisher when
processing the updates from the source.