Publishing service
The PUBLISHING_SERVICE
is a service type of Sink mode. It can be used to publish to Kafka topics with updates from the Diffusion topic subscription.
There can be multiple instances of this service type, added to the adapter, to update data from different Diffusion topics to different Kafka topics, using different configuration.
Complete configuration for this service consists of framework required configuration for sink service and this service type specific configuration.
{
"serviceName": "kafkaPublisherService",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from `?users//` Diffusion topic selector which is of JSON dataType and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "?users//"
},
"application": {
"configuration": {
"client.id": "diffusionGatewayProducer",
"request.timeout.ms": "30000"
},
"kafkaTopicPattern": "diffusion.${topic}",
"keyValue": "key"
}
}
}
Sink service Framework configuration
Details about supported configuration parameters for framework configuration for Sink service can be found here.
Application configuration
The supported application configuration parameters for this service type are defined in the table below:
Name | Type | Description | Mandatory | Default value |
---|---|---|---|---|
kafkaTopicPattern |
string |
The Kafka topic pattern to be used to create/publish to Kafka topics. The pattern can contain ${topic}, which will be replaced by the Diffusion topic path. |
no |
${topic} |
keyValue |
string |
The value for the 'key' to be set for Kafka records created by this service. If not set, the Diffusion topic path will be used. |
no |
n/a |
valueType |
The expected type of value for the Kafka producer. A corresponding value serializer class name is set for the Kafka producer based on this configuration. If not explicitly set, the serializer class name will be deduced from the type of the received update. See here for more details |
no |
n/a |
Apart from the above configs, configuration of KAFKA_CLUSTER sharedConfig can also be included within this configuration, if defining a separate sharedConfig instance, it is not required, or any of the sharedConfig configuration parameters is to be overridden. |
Data subscription, conversion and publication
When a PUBLISHING_SERVICE
service is added and started, the framework subscribes to any topic paths that match the
configured Diffusion topic selector defined in the diffusionTopicSelector
within the framework configuration. Any updates
on the subscribed topics are converted as required or configured and used to create the value of a Kafka record. The "key"
of the record is set according to the value defined in the keyValue
configuration, which will be of type String, and
the partition is internally calculated using the "key" value. The resulting Kafka record is then published to the Kafka
topic defined in `kafkaTopicPattern
.
The valueType
configuration parameter in the service configuration can be used
to restrict the service to receive a specific type of updates from the Diffusion
topics. If this parameter is set, the corresponding default payload converter will
be used to convert updates from the Diffusion topics into the type specified in
the valueType. However, if a payload converter is also explicitly specified in
the configuration, that will take precedence over the default converter used by
the adapter.
If the valueType
parameter is not set, the service will dynamically inspect the
type of received update and publishes to the Kafka topic. The type
of update received by the service depends on whether the payloadConverters
configuration parameter is set for the service. If configured, the update type will
be the output type of the specified payload converter (or the output
type of the last converter in the list if multiple converters are defined). If
the payloadConverters
configuration parameter is not set, the update type depends
on the type of the Diffusion topic path from which the update is received.
The table below illustrates the mapping between different Diffusion topic types and the corresponding types of updates
published to the sink service when a payload converter is not configured.
Diffusion topic type | Update value type |
---|---|
JSON |
String |
STRING |
String |
INT64 |
Long |
DOUBLE |
Double |
Binary |
bytes |
For a deeper understanding of how payload converters are used, please refer here. See here for the list of all issued payload converters by the framework. |
If a payload converter is specified, which does not create an output compatible with the message type listed below, an exception will be thrown. Therefore, when configuring the payload converter for the sink service, ensure that the final output type of the converter matches one of the following or specify the valueType which leads to the usage of the correct payload converter.
-
byte[]
-
ByteBuffer
-
Bytes
-
Double
-
Float
-
Integer
-
Long
-
Short
-
String
-
UUID
-
JsonNode
-
GenericContainer
The framework checks compatibility between the type of updates received from Diffusion topics, the type of data to which
these updates are converted (if payload converters are configured), and the type of data expected by the sink service.
Any incompatible data that may result in a serialization exception will be ignored, and the topics from which such updates
are received will be unsubscribed. Therefore, it is essential to appropriately configure payloadConverters
,
kafkaTopicPattern
, and valueType
to publish to Kafka topics with the desired value types.
As an illustration, consider the following sample configuration for a sink service of type PUBLISHING_SERVICE
. This service
subscribes to the Diffusion topic doubles
, which produces updates of type DOUBLE
. The configuration specifies that the
service should publish updates from the doubles
Diffusion topic to the doubles
Kafka topic. In this example, since
payloadConverters
is not explicitly configured, an internal default payload converter will be used. This converter
directly passes the Double
value to the sink service, which is then published to the Kafka topic.
{
"serviceName": "doublesPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "doubles"
},
"application": {
"kafkaTopicPattern": "doubles",
"valueType": "DOUBLE"
}
}
}
If the value type is set to STRING
as follows, updates from the doubles
Diffusion
topic will be converted to String and published to the Kafka topic. If the conversion
fails, a payload converter exception is thrown.
{
"serviceName": "doublesPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from Doubles Diffusion topic and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "doubles"
},
"application": {
"kafkaTopicPattern": "doubles",
"valueType": "STRING"
}
}
}
Meanwhile, a sink service with the following configuration will receive updates from any topics that match the configured
?data//
selector. These topics can be of different Diffusion topic types. The updates from different topic types will
be published to corresponding Kafka topics, with the serializer deduced from the types of updates received.
{
"serviceName": "dynamicPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Consumes from Diffusion topics and publishes to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "?data//"
},
"application": {
"kafkaTopicPattern": "diffusion.${topic}"
}
}
}
The sink service can handle different types of updates received from Diffusion topics and publish them to Kafka topics as configured. However, if the subscription to the Diffusion topic is for a specific path that publishes values of a specific type, setting valueType pre-defines the serializer to be used for the service. This is more efficient than dynamically checking each update for its value type to access the serializer to be used. |
JSON to Avro conversion
The JSON to Avro payload converter can be used to consume JSON data from a Diffusion topic and publish it to an Avro Kafka topic. This converter is issued by the framework. See here to understand about the converter.
To use the JSON to Avro converter:
-
$JSON_to_Avro must be configured in the
payloadConverters
configuration in theframework
configuration section for the sink service. This converter requiresschemaFilePath
that is a mandatory configuration parameter.
To produce Avro data:
-
the AVRO Message type must also be set for
valueType
configuration.
The application uses Kafka Avro serializer (provided by Confluent), to serialize Generic Record created by the converter. Hence, you must set additional required configuration settings for the Kafka producer, such as schema.registry.url
.
PUBLISHING_SERVICE
typed sink service, to consume JSON data from Diffusion topic, and publish to Avro topic{
"serviceName": "avroPublisher",
"serviceType": "PUBLISHING_SERVICE",
"description": "Published Avro data to Kafka",
"config": {
"sharedConfigName": "localKafkaCluster",
"framework": {
"diffusionTopicSelector": "sourceTopic",
"payloadConverters": [
{
"name": "$JSON_to_Avro",
"parameters": {
"schemaFilePath": "/data/resources/ordersSchema.avsc"
}
}
]
},
"application": {
"configuration": {
"client.id": "diffusionGatewayProducer",
"schema.registry.url": "<schemaRegistryURL>",
"basic.auth.credentials.source": "USER_INFO",
"basic.auth.user.info": "<API key>:<API secret>"
},
"kafkaTopicPattern": "diffusion.${topic}",
"keyValue": "key",
"valueType": "AVRO"
}
}
}
See Configuring the adapter for a complete example of the configuration for the adapter with configuration for PUBLISHING_SERVICE
service.