STREAM_TO_REMOTE

The STREAM_TO_REMOTE service type can be used to subscribe to Diffusion topics in the local server and publish their updates to Diffusion topics on the remote server. Thus, for this service type, the local server is the source server and the remote server is the target server. This service type also supports the on-demand publication feature, which can be enabled to publish to Diffusion topics in the remote server only if there is a demand or a trigger.

There can be multiple instances of this service type added to the adapter to consume data from different Diffusion topics on the local server and publish the updates to Diffusion topics on the remote server using different configurations.

Complete configuration for this service consists of framework required configuration for sink service and this service type specific configuration.

Example 1. Sample configuration for a service of type STREAM_TO_REMOTE
    {
      "serviceName": "tradeConsumerReplicated",
      "description": "Replicates from local topic and publishes to remote",
      "serviceType": "STREAM_TO_REMOTE",
      "config": {
        "sharedConfigName": "remoteDiffusionServer",
        "framework": {
          "diffusionTopicSelector": "?trade//"
        },
        "application": {
          "topicMappingFunction": "fromLocal/replicated/<path(0)>",
          "publicationRetries": 5,
          "retryIntervalMs": 5000
        }
      }
    }

Framework configuration

Details about supported configuration parameters for framework configuration for Sink service can be found here.

Application configuration

The supported application configuration parameters for this service type are defined in the table below:

STREAM_TO_REMOTE service application configuration
Name Type Description Mandatory Default value

topicMappingFunction

String

A function to map the Diffusion topic path on the local server from which an update is received, to construct a topic path used to publish updates on the remote server.

No

<path(0)>

publicationRetries

integer

Number of retries that will be used to attempt to publish an update to a Diffusion topic when transient errors occur.

No

5

retryIntervalMs

integer

Time interval in milliseconds to retry publication attempts if publication fails due to retryable or transient errors.

No

5000

updateMode

UpdateMode

The update mode to be used when publishing to Diffusion topics on the remote server. This could be:

- SIMPLE mode: sends a full value to the server on every update.
A series of updates are sent asynchronously to the Diffusion topic.

- SIMPLE_SYNC mode: Similar to the SIMPLE mode, however, here the updates are sent synchronously.

- STREAMING mode: takes advantage of the Diffusion delta streaming capabilities, such that only differences in the value are sent, thus reducing bandwidth usage.
A series of deltas are sent asynchronously to the Diffusion topic.

No

STREAMING

topicProperties

TopicProperties

The properties to be used to create the Diffusion topic in the remote Diffusion server. If the 'topicType' value in the topicProperties is not explicitly specified, the topic type will be inferred from the topic type of the source topic in the local Diffusion server. If 'payloadConverters' are specified in the framework configuration, the topic type should also be specified accordingly. See here for details about this configuration. See the description below for more details.

No

null

removeStaleTopic

boolean

A boolean value to specify whether to remove the topic created in the remote server, if the source topic in the local server becomes unavailable.

No

true

onDemandPublication

onDemandPublicationConfig

Configuration to specify on-demand topic publication.

No

null

Apart from the above configuration parameters, this service also requires the remote server connection details. These can be supplied as part of the service configuration or specified as shared configuration and referred to in the service configuration as illustrated above. See here for details on supported Diffusion server configuration parameters. Below is a sample configuration that contains the remote server connection details within the service configuration instead of specifying it in the shared configuration block:

    {
      "serviceName": "tradeConsumerReplicated",
      "description": "Replicates from local topic and publishes to remote",
      "serviceType": "STREAM_TO_REMOTE",
      "config": {
        "framework": {
          "diffusionTopicSelector": "?trade//"
        },
        "application": {
          "diffusion": {
            "url": "ws://localhost:7080",
            "principal": "admin",
            "password": "password",
            "reconnectIntervalMs": 5000
          },
          "topicMappingFunction": "fromLocal/replicated/<path(0)>",
          "publicationRetries": 5,
          "retryIntervalMs": 5000
        }
      }
    }

Data subscription, conversion and publication to remote server

When a STREAM_TO_REMOTE service is added and started, the framework subscribes to any topic paths that match the configured Diffusion topic selector in the local server, defined in the diffusionTopicSelector within the framework configuration (if onDemandPublication is specified in the application configuration, the subscription occurs only after there is a demand or a trigger in the remote server). Any updates to the subscribed Diffusion topics are received and converted as configured and published to a Diffusion topic in the remote server which is defined with topicMappingFunction. See here for more details on how the source topic path is mapped to a target topic path using the given mapping function.

By default, the topic type of the source topic will be replicated to the target topic. If the source topic is time series, time-series-specific properties which are timeSeriesRetainedRange and timeSeriesSubscriptionRange will also be replicated. However, if the topicProperties configuration for the service is also configured to set the target topic to be time series, time-series-specific properties specified in the configuration will be used instead of those in the source topic to create the target topic.

This behavior can be overridden by explicitly setting the topicType parameter in the topicProperties for the service. If payloadConverters configuration is specified in the framework configuration, the topicType should also be specified accordingly.

If the configured payload converter creates an update which is not compatible with a Diffusion topic type, a payload conversion exception will be logged.

For a deeper understanding of how payload converters are used, please refer here. See here for the list of all issued payload converters by the framework.

As an illustration, consider the following sample configuration for a service of type STREAM_TO_REMOTE. This service subscribes to the Diffusion topic doubles, which produces updates of type DOUBLE. The topicType parameter is not explicitly specified. Hence, the type of topic to be published to the remote server will also be of type DOUBLE. In this example, since payloadConverters is not explicitly configured, an internal default payload converter will be used. This converter directly passes the Double value to the service, which is then published to the Diffusion topic. Since topicProperties is not specified, default topic properties will be used.

{
  "serviceName": "doublesPublisher",
  "serviceType": "STREAM_TO_REMOTE",
  "description": "Consumes from Doubles Diffusion topic and publishes to remote server",
  "config": {
    "sharedConfigName": "remoteDiffusionServer",
    "framework": {
      "diffusionTopicSelector": "doubles"
    }
  }
}

With the same doubles topic of DOUBLE topic type in the local server, in the example below, a custom payload converter is specified to be used that transforms the data from the source topic, and the topic type is specified to be String. Thus, the transformed update from the source topic will be converted to STRING topic type and published to the remote server. If the topicType is not explicitly specified, the target topic to be created in the remote server is expected to be of DOUBLE type. Hence, if the supplied payload converter does not produce the output that is compatible with DOUBLE, a payload converter exception will be logged.

{
  "serviceName": "doublesPublisher",
  "serviceType": "STREAM_TO_REMOTE",
  "description": "Consumes from Doubles Diffusion topic, transforms and publishes to remote server",
  "config": {
    "sharedConfigName": "remoteDiffusionServer",
    "framework": {
      "diffusionTopicSelector": "doubles",
      "payloadConverters": [
        {
            "name": "customPayloadConverterThatTransformsData"
        }
      ]
    },
    "application": {
      "topicType": "STRING"
    }
  }
}

If the requirement is simply to publish to STRING topics in the remote server from doubles topics in the local server, the configuration can be simplified as follows:

{
  "serviceName": "doublesPublisher",
  "serviceType": "STREAM_TO_REMOTE",
  "description": "Consumes from Doubles Diffusion topic, and publishes to STRING topics in the remote server",
  "config": {
    "sharedConfigName": "remoteDiffusionServer",
    "framework": {
      "diffusionTopicSelector": "doubles"
    },
    "application": {
      "topicType": "STRING"
    }
  }
}

See Configuring the adapter for a complete example of the configuration for the adapter with configuration for STREAM_TO_REMOTE services.

See Diffusion session management to understand how remote Diffusion sessions are created and used for the services of STREAM_FROM_REMOTE service type.