Just a second...

Configuring the Kafka adapter

Use the application.conf configuration file to configure how the adapter shares data between Diffusion™ and Kafka.

Edit the application.conf JSON file to configure the adapter's behavior.

The "diffusion" section is used to configure the connection to Diffusion.

"diffusion": {
      // URL of Diffusion server.
      "url": "ws://localhost:8080",
      // Principal to be used to connect to Diffusion server.
      "principal": "admin",
      // Password to be used to connect to Diffusion server.
      "password": "password",
      // Map of user defined properties to pass to create session.
      // "properties": {"key":"value"},
      // Optional. Interval between Diffusion server reconnection attempts in milliseconds. Defaults to 1000.
      "reconnectIntervalMs": 1000,
      // Optional. Maximum message size to be used when creating Diffusion session. Defaults to Integer.MAX_VALUE.
      // "maximumMessageSize": 50000,
      // Optional. Maximum queue size to be used when creating Diffusion session.  Defaults to 10000.
      // "maximumQueueSize": 20000
      // Optional. Output buffer size (in bytes) to be used when creating Diffusion session. Defaults to 512KiB.
      // "outputBufferSize": 200000
      // Optional. Input buffer size (in bytes) to be used when creating Diffusion session. Defaults to 512KiB.
      // "inputBufferSize": 200000
		    }

The "kafkaClusters" section defines which Kafka clusters to connect to.

  "kafkaClusters": [
	{
	  // Unique identifier to identify this Kafka cluster.
	  "clusterId": "localCluster",
	  // List of host and port pairs that are addresses of the Kafka brokers in the Kafka cluster.
	  "bootstrapServers" : ["localhost:9092", "localhost:9093"],
	  // Optional. If used, SSL connection will be used to connect to Kafka brokers.
	  // "sslDetails": {
	      // Kafka client's JKS truststore's file location.
	      // "trustStoreLocation": "/etc/kafka.client.truststore.jks",
	      // Kafka client's JKS truststore's password.
	      // "trustStorePassword": "clientpass"
	  // },
	  // Optional. If used, SASL authentication is enabled. Contains configuration details for SASL authentication.
	  // "saslDetails": {
	      // Allowed values are PLAIN, GSSAPI.
	      // "saslMechanism": "GSSAPI",
	      // Mandatory, if saslMechanism is GSSAPI.
	      // "kerberosServiceName": "kafkaAdapter"
	      // Mandatory, if saslMechanism is PLAIN.
	      // "userName": "admin",
	      // Mandatory, if saslMechanism is PLAIN.
	      // "password": "admin-secret"
	  // }
	}
],

The "publisher" section defines how to consume Kafka topics and publish to Diffusion.

  "publisher": {
"subscriptions": [
  {
    // Kafka cluster id defined in kafka.clusterId field.
    "kafkaClusterId": "localCluster",
    // Optional. Map of additional Kafka consumer configuration's keys and value pairs.
    "kafkaConsumerConfigurations": {"auto.offset.reset": "latest","metadata.max.age.ms": 1000},
    // A unique string that identifies the consumer group for Kafka consumers created for this cluster.
    "consumerGroupId": "diffusionKafkaAdapter",
    // Optional. Number of Kafka consumers to be created per key-value type pair for this cluster. Defaults to 1.
    "consumerCount": 3,
    // Optional. Corresponds to "max.poll.interval.ms" configuration field of Kafka consumer. Defaults to 300 seconds.
    "consumerPollTimeoutMs": 1000,
    // Optional. List of Kafka topic's regular expression details to subscribe to in this cluster.
    "regexSubscriptions": [
      {
        // Kafka topic regular expression to subscribe to.
        "name": "fx.*",
        // Data type of 'key' for this regular expression subscription.
        // Allowed values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON.
        "keyType": "STRING",
        // Data type of 'value' for this regular expression subscription.
        // Allowed values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON.
        "valueType": "JSON",
        // Optional. Flag to specify whether to map to timeseries topic. Defaults to false.
        // "mapToTimeSeriesTopic": true,
        // Optional. Prefix to be used when creating Diffusion topic. Defaults to "".
        "diffusionTopicPrefix": "kafka/",
        // Optional. Map of string key-value pairs of Diffusion topic properties which will be used during topic creation.
        "diffusionTopicProperties": {"DONT_RETAIN_VALUE":"true", "CONFLATION":"off"}
      }
    ]
    //Optional. List of Kafka topic details to subscribe to in this cluster.
    "topicSubscriptions": [
      {
        // Kafka topic name to subscribe to.
        "name": "sampleTopic",
        // Data type of 'key' for this topic subscription.
        // Allowed values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON.
        "keyType": "STRING",
        // Data type of 'key' for this topic subscription.
        // Allowed Values are: BYTEARRAY, BYTEBUFFER, BYTES, DOUBLE, FLOAT, INTEGER, LONG, SHORT, STRING, UUID, JSON.
        "valueType": "STRING",
        // Optional. Flag to specify whether to map to timeseries topic. Defaults to false.
        // "mapToTimeSeriesTopic": true,
        // Optional. Prefix to be used when creating Diffusion topic. Defaults to "".
        "diffusionTopicPrefix": "kafka/",
        // Optional. Map of string key-value pairs of Diffusion topic properties which will be used during topic creation.
        // "diffusionTopicProperties": {"DONT_RETAIN_VALUE":"true", "CONFLATION":"off"}
      }
    ]
  }
],
	// Optional. Prefix to be used when creating Diffusion topics. This prefix will be applied for all Diffusion topics created. Defaults to "".
	// "commonDiffusionTopicPrefix": "kafka/",
	// Optional. Map of string key-value pairs of Diffusion topic properties which will be used during topic creation.
	// These properties will be applied for all Diffusion topics created.
	// "commonDiffusionTopicProperties": {"DONT_RETAIN_VALUE":"true", "CONFLATION":"off"}
},

The "subscriber" section defines how to subscribe to Diffusion topics and send updates to Kafka.

  "subscriber": {
"subscriptions": [
  {
    "kafkaClusterId": "localCluster",
    "kafkaProducerConfigurations": {"client.id": "diffusionProducer"},
    "topicDetails": [
      {
        // Diffusion topic path or path selector to subscribe to.
        "diffusionTopic": "?kafka/.*//",
        // Data type of value to be received for this Diffusion topic path.
        // Allowed values are: JSON, BINARY, STRING, INT64, DOUBLE.
        "diffusionTopicType": "JSON",
        // Optional. If this field is set, this topic is used to send all Diffusion updates from the matching selector to the single specified Kafka topic.
        // "mapToSingleKafkaTopic": "godTopic",
        // Optional. If this field is set, Kafka topics are created based on the source Diffusion topic path with the specified prefix.
        "kafkaTopicPrefix": "diffusion.",
        // Optional. Kafka key value to be used to send Kafka update. Defaults to "1".
        "kafkaKey": "1"
      }
    ]
  }
]
},

You can delete or comment out the "publisher" or "subscriber" section if you only need one-way functionality. You can leave both enabled if you want the adapter to work both ways at once.