Configuring the Kafka adapter
Use the application.conf configuration file to configure how the adapter shares data between Diffusion™ and Kafka.
Edit the application.conf JSON file to configure the adapter's behavior.
The "diffusion" section is used to configure the connection to Diffusion.
"diffusion": { // URL of Diffusion server. "url": "ws://localhost:8080", // Principal to be used to connect to Diffusion server. "principal": "admin", // Password to be used to connect to Diffusion server. "password": "password", // Map of user defined properties to pass to create session. // "properties": {"key":"value"}, // Optional. Interval between Diffusion server reconnection attempts in milliseconds. Defaults to 1000. "reconnectIntervalMs": 1000, // Optional. Maximum message size to be used when creating Diffusion session. Defaults to Integer.MAX_VALUE. // "maximumMessageSize": 50000, // Optional. Maximum queue size to be used when creating Diffusion session. Defaults to 10000. // "maximumQueueSize": 20000 // Optional. Output buffer size (in bytes) to be used when creating Diffusion session. Defaults to 512KiB. // "outputBufferSize": 200000 // Optional. Input buffer size (in bytes) to be used when creating Diffusion session. Defaults to 512KiB. // "inputBufferSize": 200000 }
The "kafkaClusters" section defines which Kafka clusters to connect to.
"kafkaClusters": [ { // Unique identifier to identify this Kafka cluster. "clusterId": "localCluster", // List of host and port pairs that are addresses of the Kafka brokers in the Kafka cluster. "bootstrapServers" : ["localhost:9092", "localhost:9093"], // Optional. If used, SSL connection will be used to connect to Kafka brokers. // "sslDetails": { // Kafka client's JKS truststore's file location. // "trustStoreLocation": "/etc/kafka.client.truststore.jks", // Kafka client's JKS truststore's password. // "trustStorePassword": "clientpass" // }, // Optional. If used, SASL authentication is enabled. Contains configuration details for SASL authentication. // "saslDetails": { // Allowed values are PLAIN, GSSAPI. // "saslMechanism": "GSSAPI", // Mandatory, if saslMechanism is GSSAPI. // "kerberosServiceName": "kafkaAdapter" // Mandatory, if saslMechanism is PLAIN. // "userName": "admin", // Mandatory, if saslMechanism is PLAIN. // "password": "admin-secret" // } } ],
The "publisher" section defines how to consume Kafka topics and publish to Diffusion.
"publisher": { "subscriptions": [ { // Kafka cluster id defined in kafka.clusterId field. "kafkaClusterId": "localCluster", // Optional. Map of additional Kafka consumer configuration's keys and value pairs. "kafkaConsumerConfigurations": {"auto.offset.reset": "latest","metadata.max.age.ms": 1000}, // A unique string that identifies the consumer group for Kafka consumers created for this cluster. "consumerGroupId": "diffusionKafkaAdapter", // Optional. Number of Kafka consumers to be created per key-value type pair for this cluster. Defaults to 1. "consumerCount": 3, // Optional. Corresponds to "max.poll.interval.ms" configuration field of Kafka consumer. Defaults to 300 seconds. "consumerPollTimeoutMs": 1000, // Optional. List of Kafka topic's regular expression details to subscribe to in this cluster. "regexSubscriptions": [ { // Kafka topic regular expression to subscribe to. "name": "fx.*", // Data type of 'key' for this regular expression subscription. // Allowed values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON. "keyType": "STRING", // Data type of 'value' for this regular expression subscription. // Allowed values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON. "valueType": "JSON", // Optional. Flag to specify whether to map to timeseries topic. Defaults to false. // "mapToTimeSeriesTopic": true, // Optional. Prefix to be used when creating Diffusion topic. Defaults to "". "diffusionTopicPrefix": "kafka/", // Optional. Map of string key-value pairs of Diffusion topic properties which will be used during topic creation. "diffusionTopicProperties": {"DONT_RETAIN_VALUE":"true", "CONFLATION":"off"} } ] //Optional. List of Kafka topic details to subscribe to in this cluster. "topicSubscriptions": [ { // Kafka topic name to subscribe to. "name": "sampleTopic", // Data type of 'key' for this topic subscription. // Allowed values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON. "keyType": "STRING", // Data type of 'key' for this topic subscription. // Allowed Values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON. "valueType": "STRING", // Optional. Flag to specify whether to map to timeseries topic. Defaults to false. // "mapToTimeSeriesTopic": true, // Optional. Prefix to be used when creating Diffusion topic. Defaults to "". "diffusionTopicPrefix": "kafka/", // Optional. Map of string key-value pairs of Diffusion topic properties which will be used during topic creation. // "diffusionTopicProperties": {"DONT_RETAIN_VALUE":"true", "CONFLATION":"off"} } ] } ], // Optional. Prefix to be used when creating Diffusion topics. This prefix will be applied for all Diffusion topics created. Defaults to "". // "commonDiffusionTopicPrefix": "kafka/", // Optional. Map of string key-value pairs of Diffusion topic properties which will be used during topic creation. // These properties will be applied for all Diffusion topics created. // "commonDiffusionTopicProperties": {"DONT_RETAIN_VALUE":"true", "CONFLATION":"off"} },
The "subscriber" section defines how to subscribe to Diffusion topics and send updates to Kafka.
"subscriber": { "subscriptions": [ { "kafkaClusterId": "localCluster", "kafkaProducerConfigurations": {"client.id": "diffusionProducer"}, "topicDetails": [ { // Diffusion topic path or path selector to subscribe to. "diffusionTopic": "?kafka/.*//", // Data type of value to be received for this Diffusion topic path. // Allowed values are: JSON, BINARY, STRING, INT64, DOUBLE. "diffusionTopicType": "JSON", // Optional. If this field is set, this topic is used to send all Diffusion updates from the matching selector to the single specified Kafka topic. // "mapToSingleKafkaTopic": "godTopic", // Optional. If this field is set, Kafka topics are created based on the source Diffusion topic path with the specified prefix. "kafkaTopicPrefix": "diffusion.", // Optional. Kafka key value to be used to send Kafka update. Defaults to "1". "kafkaKey": "1" } ] } ] },
You can delete or comment out the "publisher" or "subscriber" section if you only need one-way functionality. You can leave both enabled if you want the adapter to work both ways at once.