Writing hybrid handlers

Prerequisites: See Writing sink handlers, before proceeding with this step.

HybridHandler is a type of service handler for hybrid services. Hybrid handlers are used to consume data from Diffusion topics, perform any data manipulation and publish them to another Diffusion topic. Hence, it has functionality of both Diffusion topic publisher and subscriber.

hybrid service
Figure 1. Data publishing sequence in Hybrid handler

As defined here, the required configuration parameters, Publisher instance, Subscriber instance and StateHandler instance can be injected to hybrid handlers during instantiation of this class. The Publisher instance can be used in the update method of the Hybrid handler, to publish the received data, from a subscribed topic to another Diffusion topic. The Subscriber instance can be used to subscribe to Diffusion topics as required.

Below is the code snippet from JsonDateAppender, which demonstrates receiving a topic update and publishing it to another topic in the update method.

    @Override
    public CompletableFuture<?> update(String path, String value, TopicProperties properties) {

        try {
            final JsonNode jsonNode = OBJECT_MAPPER.readTree(value);

            if (!jsonNode.isObject()) {
                return CompletableFuture.completedFuture(null);
            }

            final ObjectNode objectNode = (ObjectNode) jsonNode;

            objectNode.put("timestamp", Instant.now().toString());

            publisher
                .publish(targetTopicPrefix + path, objectNode)
                .whenComplete((result, ex) -> {
                    if (ex != null) {
                        LOG.error("Failed to send updated data from {}", path);
                    }
                });
        }
        catch (JsonProcessingException | PayloadConversionException ex) {
            LOG.error("Failed to process update from {}", path, ex);
        }

        return CompletableFuture.completedFuture(null);
    }

Here, if the updated value is JSON, a timestamp is appended to the value and published to another topic.

Providing hybrid service specific properties

HybridHandler is an extension of both SinkHandler and StreamingSourceHandler. Hence, it allows developers to supply both; sink specific service properties and source specific service properties, to provide the necessary service properties to the framework, to consume and publish data to Diffusion topics.

    @Override
    public SinkServiceProperties getSinkServiceProperties() throws InvalidConfigurationException {
        return
            newSinkServicePropertiesBuilder()
                .autoSubscribe(false)
                .build();
    }

    @Override
    public SourceServiceProperties getSourceServiceProperties() throws InvalidConfigurationException {
        return
            newSourceServicePropertiesBuilder()
                .topicType(TopicType.JSON)
                .build();
    }