Custom Publisher

By default, for source and hybrid services, a Publisher instance is supplied by the framework which can be used to perform any operation the publisher allows. If the requirement is to process an update before calling the publish method, a developer of an application can do so in the service handler. The update could be divided into multiple values and be published to multiple Diffusion topics or, based on the update, a Diffusion topic could be removed. If this functionality of processing an update needs to be reused in multiple application types, or if a developer wants to enable overriding processing of an update in a service, a CustomPublisher can be used. Using a CustomPublisher is an advanced option and is sensible only with the above-mentioned use cases.

Implementing CustomPublisher

CustomPublisher is an abstract class and provides an abstract method process that needs to be implemented by a subclass. The subclass should be implemented and made available as a standalone JAR that can then be reused in different applications. Within the abstract method, logic to process an update for the service needs to be implemented. This method accepts a map of context and an update to process. The map of context will be supplied by an application that is specific to the update, such as in the Kafka adapter, where it can be a Kafka topic name from where the update is received. An example implementation of the abstract method is provided below:

    @Override
    public CompletableFuture<?> process(Map<String, Object> context, Object update) throws PayloadConversionException {
        final Map<String, Object> topicToUpdatesMap = getFinalUpdates(context, update);

        topicToUpdatesMap.forEach((topic, value) -> {
            try {
                publisher
                    .publish(topic, value)
                    .whenComplete((result, throwable) -> {
                        if (throwable != null) {
                            LOG.error("Failed to publish update to {} topic", topic, throwable);
                        }
                    });
            }
            catch (PayloadConversionException ex) {
                LOG.error("Failed to convert update for {} topic", topic, ex);
            }
        });

        return CompletableFuture.completedFuture(null);
    }

In the example above, an update is processed using the supplied context to create a map of Diffusion topic paths and corresponding updates. Each topic is then published with its update. The context in this process method is supplied by the application and is application and update-specific. If such contexts are required, the user guide of applications that could use the custom publisher should be consulted to identify different contexts that are supplied with each update.

The CustomPublisher class also contains a getApplicationType method, which can be used to identify the type of application to process the updates accordingly. However, checking applicationType in the process method can be inefficient with every update. Hence, applicationType can be required as a context in the configuration for the custom publisher that can be analyzed during the construction of the publisher. The applicationType or any other detail required during the construction of the publisher can be specified as a requirement of the publisher configuration that a user needs to specify in the service configuration. See here for details on how a CustomPublisher can be configured to be used for a service. For more details on the construction of the subclass and on providing further context for it, see the Javadoc of the CustomPublisher class.

Supporting CustomPublisher in an application

To support using a CustomPublisher in source or hybrid services in an application, the application must check if the publisher supplied to the add method of such service types is an instance of CustomPublisher and cast the publisher to use the process method in the service handler.

    private static CompletableFuture<?> publish(
        Publisher publisher,
        String defaultTopic,
        Object update) {

        try {
            if (publisher instanceof CustomPublisher) {

                return ((CustomPublisher) publisher).process(
                    Collections.singletonMap("topic", defaultTopic),
                    update);
            }
            else {
                return publisher.publish(defaultTopic, update);
            }
        }
        catch (PayloadConversionException ex) {
            LOG.error("Failed to process update", ex);
        }
        return CompletableFuture.completedFuture(null);
    }

As presented above, the application should support publication for services that are configured with or without a custom publisher, as the use of a custom publisher for a service is dependent on the user’s requirement. If an application supports using a custom publisher for source or hybrid services, users can specify the use of a custom publisher for a specific service by specifying it in the configuration

Using CustomPublisher in an application

The implementation of the CustomPublisher is provided as a standalone JAR that needs to be plugged in with the application during its startup, i.e., the JAR should be added to the classpath when starting the application. For this, the JAR file can be placed in a directory, and the path of the directory can be set as the value for the gateway.ext.dir system property. The framework will load this directory into the classpath during application startup. A service in the application can then be configured to use the custom publisher. However, this will only work if both the application and the CustomPublisher are compatible with each other. This means the application must support using the CustomPublisher, and the CustomPublisher must be able to handle updates supplied by the application.

CustomPublisher configuration

The configuration for source or hybrid services in an application can be configured in the following way to use the custom publisher.

    {
      "serviceName": "enhancedPoller",
      "serviceType": "POLLING_SOURCE",
      "config": {
        "framework": {
          "pollIntervalMs": 500,
          "customPublisher": {
            "className": "com.diffusiondata.gateway.publisher.MultiTopicPublisher",
            "parameters": {
              "margin": "0.1"
            }
          }
        }
      }
    },

As presented above, to specify the use of a custom publisher for a source or hybrid service, the customPublisher parameter should be specified in the framework configuration. The parameters for the customPublisher are defined below:

Key Description Mandatory

className

The full class name of the custom publisher.

Yes

parameters

The parameters required for instantiation of the custom publisher. These should be documented in the user guide of the custom publisher.

No

Thus, with above service configuration, if the implementation of the CustomPublisher, i.e., com.diffusiondata.gateway.publisher.MultiTopicPublisher is available in the classpath, the framework will supply its instance to the add method for the POLLING_SOURCE serviceType. The service will then use this publisher when processing the updates from the source.