Writing source handlers

SourceHandler is a type of service handler for source services. Source handlers are used to consume external data and publish them to Diffusion topics. As defined here, the required configuration parameters, Publisher instance and StateHandler instance can be injected to source handlers during instantiation of this class.

There are two types of SourceHandler for two modes of source services:

  • StreamingSourceHandler, for streaming sources, and

  • PollingSourceHandler, for polling sources

Streaming source handler

A StreamingSourceHandler provides the functionality for a streaming source service. The purpose of a streaming source service is to listen for updates from some external source, and publish them to the framework - for publishing to Diffusion.

streaming source
Figure 1. Data publishing sequence in Streaming source service handler

When a service instance is added by the framework, the start method of the service handler for the service is called, to allow the service to instantiate any resources required to publish to the framework. If the injected publisher is used to publish, before start is called by the Framework, the publication will fail.

In CsvStreamingSourceHandler, in the start method, a runnable is started to watch a file for any changes. As soon as the change is detected by the API, the file will be published to the framework using the publisher instance.

    ...
    @Override
    public CompletableFuture<?> start() {
        updateAndStartWatchingFile();

        return CompletableFuture.completedFuture(null);
    }

    private void updateAndStartWatchingFile() {
        update();

        future = executorService.submit(() -> {
            try {
                watchFile();
            }
            catch (IOException ex) {
                LOG.error("Failed to start watch service", ex);
            }
        });
    }

    private void update() {
        try {
            publisher
                .publish(diffusionTopicName, file)
                .whenComplete((result, ex) -> {
                    ...
                });
        }
        catch (PayloadConversionException ex) {
           ...
        }
    }
    ...

Polling source handler

A PollingSourceHandler provides the functionality for a polling source service. A polling source handler, for a corresponding service instance, is polled periodically by the framework (the period is user configurable) via its poll method. When polled, a polling source handler is responsible for retrieving updates from some back-end system and use publisher to publish data to Diffusion.

polling source
Figure 2. Data publishing sequence in Polling source service handler

A polling source handler should only publish updates when it is polled. It must publish the updates via the Publisher.publish method before returning from the poll method. The handler typically publishes a single update when polled but can publish any number (including zero), of updates to different topics, before returning.

Below is the code snippet from CsvPollingSourceHandler, which demonstrates publishing the CSV file to publisher in the poll method.

    @Override
    public CompletableFuture<?> poll() {
        LOG.debug("Polled");
        final CompletableFuture<?> pollCf = new CompletableFuture<>();
        try {
            publisher
                .publish(diffusionTopicName, file)
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        LOG.error(
                            "Failed to publish to topic {} from file {}",
                            diffusionTopicName,
                            fileName,
                            ex);
                        pollCf.completeExceptionally(ex);
                    }
                    else {
                        pollCf.complete(null);
                    }
                });
        }
        catch (PayloadConversionException ex) {
            LOG.error("Failed to convert content of {} to JSON", fileName, ex);
            if (conversionErrorCount.getAndIncrement() == CONVERSION_ERROR_THRESHOLD) {
                pollCf.completeExceptionally(ex);
            }
            else {
                pollCf.complete(null);
            }
        }

        return pollCf;
    }

In this implementation, only after a successful publication of a file, the returned CompletableFuture is completed successfully.

If the CompletableFuture returned by the poll method, does not complete in a specific time, a timeout occurs and the service will be paused. This specific time can be configured by the user for a service.
Documentation tip
Since users can configure payload converters as required in the source service configuration, to specify the correct converter that will accept the published value, they will need to know the type of data that will be published.
Ensure that this information is documented in the application user guide for the supported service type.

Providing source service specific properties

The Framework provides an option to SourceHandlers to specify certain properties, required to process updates received from external sources and publish to Diffusion topics. These are Diffusion topic type to publish to, payload converters to use to convert data from fetched type to Diffusion topic type and the mechanism to send the updates to Diffusion topic. These properties are applied to all services of the specified service type.

The framework retrieves this information by invoking the SourceHandler.getSourceServiceProperties method. By default, the implementation of this method returns null, indicating that the framework should assume the use of default properties. However, if the application developer desires to specify properties different from the defaults, the method must be overridden to return a SourceServiceProperties object.

A SourceServiceProperties object may be built using a builder obtained using the DiffusionGatewayFramework.newSourceServicePropertiesBuilder method.

SourceServiceProperties for CsvPollingSourceHandler and CsvStreamingSourceHandler is defined as follows:

    @Override
    public SourceServiceProperties getSourceServiceProperties() throws InvalidConfigurationException {
        return
            newSourceServicePropertiesBuilder()
                .updateMode(UpdateMode.STREAMING)
                .topicType(TopicType.JSON)
                .payloadConverter("$CSV_to_JSON")
                .build();
    }

Here, the source handler is specified to use a payload converter with name $CSV_to_JSON. This is one of the Payload converters issued in the Framework.

The properties that may be set with newSourceServicePropertiesBuilder and the defaults (if not explicitly set), are as follows:

Property Description Default

topicType

This refers to the Diffusion topic type to be created and published to. JSON, STRING, BINARY, INT64, and DOUBLE are supported. This specified topic type will be set for all services associated with this source service handler unless the user configures the service to use a specific topic type. See here for more details.

JSON

payloadConverter

The payload converter to be used for converting data from an external format to Diffusion topic format.

Standard conversions are used.
See here for more details.

updateMode

Specifies the update mode to be used for topics published by the service. This could be:

- SIMPLE mode: sends a full value to the server on every update.
This mode supports normal updating as well as applying JSON patches to JSON topics.
A series of updates are sent asynchronously to the Diffusion topic.

- SIMPLE_SYNC mode: Similar to the SIMPLE mode, however, here the updates are sent synchronously.

- STREAMING mode: takes advantage of the Diffusion delta streaming capabilities, such that only differences in the value are sent, thus reducing bandwidth usage.
Applying JSON patches to JSON topics is not supported in this mode.
A series of deltas are sent asynchronously to the Diffusion topic.

STREAMING

Other topic-specific properties, such as time-series topic details, persistence policies, and more, will be configured by users of the Gateway application as part of the service configuration for the framework. Users also have the option to configure Diffusion topic types and payload converters within the framework’s configuration for specific services. These user-configured settings take precedence over those defined in the configured source service properties.

Publishing updates

A SourceHandler must only publish updates when it is active (start has been called or resume called after a pause) The Publisher object passed to the handler construction is used for publishing updates.

Two overloaded Publisher.publish methods are provided for publishing updates.

The first accepts the Diffusion topic path and the update value. The topic type of the published topics will either be the one configured by the user or specified in the SourceServiceProperty, with the former taking precedence. Using this method, a single service can publish to any number of different Diffusion topics of the same type with same properties, and the topics will be dynamically created as needed upon the initial publish. Here is an example of this method call:

    publisher.publish("topic", data);

The second overloaded method accepts TopicProperties in addition to the Diffusion topic path and the update value. The topic type can be explicitly specified using the TopicProperties. This method is suitable for publishing updates when the same publisher instance needs to publish to different types of topics based on the type of data it receives from the source. In addition to the topic type, any other properties like timeSeries or PersistencePolicy can be specified in the TopicProperties.

An application user can set topic properties for topics to be created by a source service in the service configuration or default values are applied. A TopicProperties instance that contains user-configured or default topic properties, can be accessed by using the Publisher.getConfiguredTopicProperties method, which is available after the service handler is started. Any topic properties in this configured topic properties instance can be overridden using any of the helper methods in TopicProperties and passed in this method. The helper method in TopicProperties, such as TopicProperties.withTopicType(TopicType), returns an immutable instance of TopicProperties. Hence, values should be overridden in the last created TopicProperties instance to override multiple values.

The topic properties thus created can be passed to the Publisher.publish method. These methods can be called as follows:

    TopicProperties topicProperties =
        publisher
            .getConfiguredTopicProperties()
            .withTopicType(TopicType.STRING)
            .withTimeSeriesFlag(true);

    publisher.publish(
        "topic",
        data,
        topicProperties);

However, if payload converters are specified in the service configuration, then any user defined configuration will take precedence and the passed topicProperties will be ignored. The configured payload converter will be used to convert the updates passed with this method. In this case, the output type of the converter used will define the type of Diffusion topic to be published.

If any payload converters are not specified by a user or in SourceServiceProperties.Builder#payloadConverter, a default converter to produce data of the supplied topic type will be used and topic properties specified in this method will take precedence.

When using this method to publish updates, the application’s documentation should clearly state that the service is capable of publishing to different types of topics in Diffusion, and users should configure the service accordingly.

The Publisher.publish methods are asynchronous, eliminating the need for the handler to block and wait for the response.

Documentation tip
If the service uses the publish method with TopicProperties, it should be explicitly documented that, depending on the types of data the service receives from the source, it will publish to different types of Diffusion topics. But if payload converters are configured the type of topic will be the type of data produced by the converter. Hence, the service should be configured accordingly.
Publisher.getConfiguredTopicProperties returns a valid topic properties instance, only after the service has been started.
The framework ensures that updates are passed to Diffusion in the order they are presented. However, the application developer must bear in mind that if a single update fails, then any update supplied after it may not have been applied.
The framework handles the retrying of transient failures of single updates. An error will only be reported to the application if an update could not be recovered.

Handling missing topic notifications

A source service handler can also be implemented as a missing topic notification handler.

Refer to Handling missing topic notifications to understand how missing topic notification handlers work in more detail.

The csv-file-source application implements StreamingSourceHandler and PollingSourceHandler for the two service types it supports.