Remote Servers
The remote servers feature is used to enable topic views in a Diffusion™ server to consume source topics from another server (that is not in the same cluster).
A 'remote server' is a component which defines a link between two servers. Remote servers may be created either using the `RemoteServers` feature of the client SDKs or from within the Diffusion Console. A `RemoteServer` is persisted in the server (and automatically distributed across the cluster).
A topic view may then specify the server by the name of the remote server in its 'from' clause. Many different topic views may specify the same remote server.
A topic view may specify a remote server by name even if it does not exist (has not yet been created) and when the remote server is created and connected the topic view will automatically evaluate. If a remote server is removed then any topic views that use it will be disabled and any reference topics generated from it removed.
The server on which the topic view exists is referred to as a 'secondary server' and the server on which the source topics exist is referred to as a 'primary server'.
There are two modes of operation for remote servers:
- Connection initiated from secondary server
- Connection initiated from primary server
Both modes of operation support the propagation of missing topic notifications from the secondary to primary servers.
Connection initiated from the secondary server
This is the most common mode of operation. A remote server of type 'Secondary Initiator' is defined at the secondary server (or cluster). If there is a topic view that specifies the remote server then a connection is automatically made to the primary server which provides topics to the topic view. When no topic view specifies a remote server, no connection is made and the remote server remains dormant. If a new topic view is created which specifies the remote server, then it will connect and if all topic views that specify it are removed it will be disconnected.
In this mode there is no need to define anything at the primary server.
Where the secondary server is operating in a cluster, the remote server's definition is automatically distributed across the cluster members and each secondary cluster member will connect to the primary server and reference topics will be generated on each secondary cluster member.
If the primary server is in a cluster then a load balancer may be used to distribute the secondary connections across primary cluster members.
If the connection to the primary server is temporarily lost then reconnection without message loss may be attempted but if that fails any reference topics created by topic views using the remote server will be removed. The remote server will periodically attempt to retry the connection and if the connection is re-established the corresponding topic views will be evaluated again and reference topics created as appropriate.
The diagram below shows a simple configuration with a single secondary server connection to a single primary server and serving two topic views.


Connection initiated from the primary server
This is a more complex mode of operation which may be used when security constraints require that no inbound connections are allowed to backend (primary) servers. In this case there is a need to define a remote server of type 'Primary Initiator' at the primary server (or cluster) and also a remote server of type 'Secondary Acceptor' with exactly the same name at the secondary server (or cluster).In this mode, the primary initiator attempts to make a connection to the secondary acceptor as soon as the primary initiator is created (or its host server starts) and will keep retrying at a configured interval until it succeeds. The primary server makes a simple (non SSL) connection to the secondary. The matching secondary acceptor will accept the connection and then establish a secure SSL connection with the primary over the same socket connection. The secondary acceptor will then authenticate itself with the primary and the connection will operate in the same way as a secondary initiator connection would. As the secondary to primary connection is secure the necessary SSL context must be set up in the secondary server.
Unlike a secondary initiator connection, such a 'reverse' connection will be maintained even if there are no topic views at the secondary server that use it.
The diagram below shows a simple example of this configuration. Topics are not shown as once a connection is established the topic view mappings happen in exactly the same way as for secondary initiator connections described above.

If the connection to the primary server is lost any reference topics created by the topic views using the secondary remote server will be removed and the primary server will periodically attempt to retry the connection. Once the connection is re-established the topic views that use it will be re-evaluated and reference topics created as appropriate.
Creating remote server definitions
Required permissions: control_server A client can create a new remote server definition using the RemoteServers feature.const server = diffusion.newRemoteServerBuilder(diffusion.RemoteServerType.SECONDARY_ACCEPTOR)
    .principal('admin')
    .credentials('password')
    .connectionOption(diffusion.ConnectionOption.RECONNECTION_TIMEOUT, '10000')
    .build('Server_1', 'ws://remote_server:80');
IRemoteServer server = null;
var builder = Diffusion.NewRemoteServerBuilder();
var remoteServer1 = builder
            .Principal("principal")
            .Credentials(Diffusion.Credentials.Password("password"))
            .ConnectionOptions(new Dictionary<RemoteServerConnectionOption, string>()
                                {
                                { RemoteServerConnectionOption.RECONNECTION_TIMEOUT, "50000" },
                                { RemoteServerConnectionOption.CONNECTION_TIMEOUT, "2500" },
                                { RemoteServerConnectionOption.WRITE_TIMEOUT, "2000" }
                                })
            .MissingTopicNotificationFilter("filter")
            .Create("Server1", "ws://host:8080");
try
{
    server = await session.RemoteServers.CreateRemoteServerAsync(remoteServer1);
    WriteLine($"Remote server '{server.Name}' was created.");
}
catch (Exception ex)
{
    WriteLine($"Failed to create remote server : {ex}.");
    session.Close();
    return;
}
static int on_remote_server_created(
    DIFFUSION_REMOTE_SERVER_T *remote_server,
    LIST_T *errors,
    void *context)
{
    if (remote_server == NULL)
    {
        printf("The following errors occurred while creating the remote server:\n");
        for (int i = 0; i < list_get_size(errors); i++)
        {
            ERROR_REPORT_T *report = list_get_data_indexed(errors, i);
            printf("\t[%d, %d] %s\n", report->line, report->column, report->message);
        }
    }
    else
    {
        printf("Remote Server successfully created\n");
        char *name = diffusion_remote_server_get_name(remote_server);
        char *principal = diffusion_remote_server_get_principal(remote_server);
        char *url = diffusion_remote_server_get_url(remote_server);
        printf("%s: %s (%s)\n", name, url, principal);
        free(name);
        free(principal);
        free(url);
    }
    return HANDLER_SUCCESS;
}
static int on_remote_server_created_failed(
    SESSION_T *session,
    const DIFFUSION_ERROR_T *error)
{
    printf("An error has occurred while adding remote server definition: %s\n", error->message);
    return HANDLER_SUCCESS;
}
...
DIFFUSION_REMOTE_SERVER_BUILDER_T *builder = diffusion_remote_server_builder_init();
builder = diffusion_remote_server_builder_principal(builder, "admin");
CREDENTIALS_T *remote_server_credentials = credentials_create_password("password");
builder = diffusion_remote_server_builder_credentials(builder, remote_server_credentials);
DIFFUSION_API_ERROR api_error = {0};
DIFFUSION_REMOTE_SERVER_T *remote_server = diffusion_remote_server_builder_create(
    builder,
    "New Remote Server", // remote server name
    "10.0.0.4",          // remote server URL
    &api_error);         // api error in case of invalid parameters
DIFFUSION_CREATE_REMOTE_SERVER_PARAMS_T create_remote_server_params = {
    .remote_server = remote_server,
    .on_remote_server_created = on_remote_server_created,
    .on_error = on_remote_server_created_failed};
diffusion_create_remote_server(session, create_remote_server_params, NULL);
let credentials = PTDiffusionCredentials.init(password: "password")
let remote_server_definition = PTDiffusionRemoteServerBuilder()
    .principal("admin")
    .credentials(credentials)
    .createSecondaryInitiator(withName: "New Remote Server", andURL: "ws://10.0.0.4")
session.remoteServers.createRemoteServer(remote_server_definition) { (result, error) in
    if (error != nil) {
        print("An error has occurred while adding remote server definition: %@",
              error!.localizedDescription)
    }
    else if (result!.isSuccess) {
        print("Remote Server '%@' successfully added.", remote_server_definition.name)
    }
    else {
        print("Remote Server '%@' not added. Errors:", remote_server_definition.name)
        for error in result!.errors {
            print("%@", error.localizedDescription);
        }
    }
}
Secondary initiator
| Parameter | Description | 
|---|---|
| Name | The name of the remote server. This is the name that it will be referred to by in topic views. | 
| URL | The URL used to connect to the primary server. | 
| Principal | The name of the principal which will be used to authenticate with the primary server (default anonymous). | 
| Credentials | The credentials to use for authentication with the primary server (default none). | 
| Connection Options | Various connection options related to the connection with the primary server. These include connection retry and recovery options. For maximum throughput the input and output buffer sizes should also be specified as large as possible and tuned to the corresponding buffer sizes of the connector at the primary server. | 
| Missing Topic Notification Filter | A topic selector, which if specified will propagate missing topic notifications to the primary server if it matches with a subscription selector that matched no local topics (default no missing topic notifications are propagated). | 
IRemoteServer server = null;
var builder = (ISecondaryInitiatorBuilder)Diffusion<ISecondaryInitiatorBuilder>.NewRemoteServerBuilder(RemoteServerType.SECONDARY_INITIATOR);
var initiator = builder
    .Principal("admin")
    .Credentials(Diffusion.Credentials.Password("password"))
    .ConnectionOptions(new Dictionary<RemoteServerConnectionOption, string>()
                        {
                            { RemoteServerConnectionOption.RECONNECTION_TIMEOUT, "120000" },
                            { RemoteServerConnectionOption.CONNECTION_TIMEOUT, "15000" },
                            { RemoteServerConnectionOption.MAXIMUM_QUEUE_SIZE, "1000" }
                        })
    .MissingTopicNotificationFilter("?abc")
    .Build("Remote Server 1", "ws://new.server.url.com");
server = await session.RemoteServers.CreateRemoteServerAsync(initiator);
final SecondaryInitiator secondaryInitiator = (SecondaryInitiator)
    session.feature(RemoteServers.class).createRemoteServer(
        Diffusion.newRemoteServerBuilder(SecondaryInitiatorBuilder.class)
            .principal("admin")
            .credentials(Diffusion.credentials().password("password"))
            .build("Server_1", "ws://remote_server:80"))
        .get(5, SECONDS);
Primary initiator
| Parameter | Description | 
|---|---|
| Name | The name of the remote server. This must match with the name of a secondary acceptor defined at the secondary server (or cluster). | 
| Urls | A list of the URLs of the secondary servers to connect to. When connecting to a secondary cluster the URL of each cluster member must be supplied. | 
| Connector | The name of a locally defined connector which defines the connection options for the connection. This is not validated at creation time. If no such connector exists when the primary initiator is started then the remote server will be disabled for the duration of the primary server. The buffer sizes of this connector should be as large as possible for maximum throughput and should be tuned to match the corresponding buffers of the matching secondary acceptor. | 
| Retry Delay | The interval at which to retry the connection if it fails to establish or is lost. | 
IRemoteServer server = null;
var builder = (IPrimaryInitiatorBuilder)Diffusion<IPrimaryInitiatorBuilder>.NewRemoteServerBuilder(RemoteServerType.PRIMARY_INITIATOR);
var initiator = builder
    .RetryDelay(2500)
    .Build("Remote Server 1", new List<string> { "ws://new.server.url.com:8080", "ws://new.server.url.com:8081", "ws://new.server.url.com:8082" }, "High Volume Connector");
server = await session.RemoteServers.CreateRemoteServerAsync(initiator);
final PrimaryInitiator primaryInitiator = (PrimaryInitiator)
    session.feature(RemoteServers.class).createRemoteServer(
        Diffusion.newRemoteServerBuilder(PrimaryInitiatorBuilder.class)
            .retryDelay(5000)
            .build(
                "Server_1",
                Arrays.asList(
                    "ws://secondary_server1:80",
                    "ws://secondary_server2:80"),
                "PrimaryConnector"))
        .get(5, SECONDS);
Secondary acceptor
This is largely the same as for a secondary initiator except the name given must match the name of the primary initiator that is to connect to it.Rather than the URL of the primary server, a host name must be supplied. This host name should match the host name that the primary server will present when validating the SSL connection.
Reconnection and retry options may not be specified for a secondary acceptor as recovery from lost connections is controlled from the primary server.
IRemoteServer server = null;
var builder = (ISecondaryAcceptorBuilder)Diffusion<ISecondaryAcceptorBuilder>.NewRemoteServerBuilder(RemoteServerType.SECONDARY_ACCEPTOR);
var initiator = builder
    .ConnectionOptions(new Dictionary<RemoteServerConnectionOption, string>()
                        {
                            { RemoteServerConnectionOption.WRITE_TIMEOUT, "2000" },
                            { RemoteServerConnectionOption.CONNECTION_TIMEOUT, "15000" },
                            { RemoteServerConnectionOption.MAXIMUM_QUEUE_SIZE, "1000" }
                        })
    .MissingTopicNotificationFilter("?abc")
    .Build("Remote Server 1", "ws://new.server.url.com");
server = await session.RemoteServers.CreateRemoteServerAsync(initiator);
final SecondaryAcceptor secondaryAcceptor = (SecondaryAcceptor)
    session.feature(RemoteServers.class).createRemoteServer(
        Diffusion.newRemoteServerBuilder(SecondaryAcceptorBuilder.class)
            .principal("admin")
            .credentials(Diffusion.credentials().password("password"))
            .build("Server_1", "PrimaryHost"))
        .get(5, SECONDS);
Check a remote server
Required permissions: control_serverThis is used to check the current state of a named remote server at the server the client is connected to.
In the case of initiators it can also be used to force a retry of failed connections. paraIf the server is a cluster member it will only report the state of the remote server at that cluster member. To obtain the full picture in a clustered environment each cluster member would need to be checked separately
A primary initiator will only report the CONNECTED state if it has active connections to all specified secondary servers. The failure message will report which connections are established and which have failed. A primary initiator operating within a cluster will report INACTIVE for all but the cluster member that is maintaining the connections.
| Parameter | Description | 
|---|---|
| Name | The name of the remote server. | 
const result = await session.remoteServers.checkRemoteServer('Server_1');
switch (result.connectionState) {
case diffusion.ConnectionState.CONNECTED:
    console.log('Connected and in use');
    break;
case diffusion.ConnectionState.FAILED:
    console.log(`Failed with ${result.failureMessage}`);
    break;
case diffusion.ConnectionState.INACTIVE:
    console.log('Can connect but not currently connected as not in use');
    break;
case diffusion.ConnectionState.MISSING:
    console.log('No such remote server');
    break;
case diffusion.ConnectionState.RETRYING:
    console.log(`Failed with ${result.failureMessage} but retry is scheduled`);
    break;
default:
    break;
}
try
{
    var result = await session.RemoteServers.CheckRemoteServerAsync(server.Name);
    WriteLine($"Checking '{server.Name}':");
    WriteLine($"Connection state: '{result.ConnectionState}'");
    WriteLine($"Failure message: '{result.FailureMessage}'");
}
catch (Exception ex)
{
    WriteLine($"Failed to check remote server '{server.Name}' : {ex}.");
    session.Close();
    return;
}
final CheckRemoteServerResult result =
    session.feature(RemoteServers.class).checkRemoteServer("Server_1")
        .get(5, SECONDS);
switch (result.getConnectionState()) {
case CONNECTED:
    LOG.info("Fully connected");
    break;
case FAILED:
    LOG.info("Failed with " + result.getFailureMessage());
    break;
case INACTIVE:
    LOG.info("Inactive");
    break;
case MISSING:
    LOG.info("No such remote server");
    break;
case RETRYING:
    LOG.info("Failed with " + result.getFailureMessage() +
        " but retry is scheduled");
    break;
default:
    break;
}
static char *get_server_state_string(
    DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_T state)
{
    switch (state)
    {
    case DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_INACTIVE:
        return "Remote Server is inactive";
    case DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_CONNECTED:
        return "Remote Server is connected";
    case DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_RETRYING:
        return "Attempting to connect to Remote Server";
    case DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_FAILED:
        return "Connection to the Remote Server has failed";
    case DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_MISSING:
        return "Unable to reach Remote Server";
    default:
        return "Unknown connection state";
    }
}
static int on_remote_server_checked(
    DIFFUSION_CHECK_REMOTE_SERVER_RESPONSE_T *response,
    void *context)
{
    DIFFUSION_REMOTE_SERVER_CONNECTION_STATE_T state = diffusion_check_remote_server_response_get_state(response);
    printf("%s.\n", get_server_state_string(state));
    return HANDLER_SUCCESS;
}
static int on_remote_server_checked_failed(
    SESSION_T *session,
    const DIFFUSION_ERROR_T *error)
{
    printf("An error has occurred while checking remote server definition: %s\n", error->message);
    return HANDLER_SUCCESS;
}
session.remoteServers.checkRemoteServer("New Remote Server") { (result, error) in
    if (error != nil) {
        print("An error has occurred while checking remote server definition: %@",
              error!.localizedDescription)
    }
    else {
        let translation_map : [PTDiffusionRemoteServerConnectionState:String] = [
            PTDiffusionRemoteServerConnectionState.inactive(): "Remote Server is inactive",
            PTDiffusionRemoteServerConnectionState.connected(): "Remote Server is connected",
            PTDiffusionRemoteServerConnectionState.retrying(): "Attempting to connect to Remote Server",
            PTDiffusionRemoteServerConnectionState.failed(): "Connection to the Remote Server has failed",
            PTDiffusionRemoteServerConnectionState.missing(): "Unable to reach Remote Server"
        ]
        print("%@", translation_map[result!.state]!)
    }
}
List available remote servers
Required permissions: view_server p Used to list all of the remote servers that have been defined in the server.const servers = await session.remoteServers.listRemoteServers();
for (const remoteServer of servers) {
    switch (remoteServer.type) {
    case diffusion.RemoteServerType.PRIMARY_INITIATOR:
...
    console.log(`Primary Initiator ${remoteServer.name}: ${remoteServer.urls}`);
    break;
case diffusion.RemoteServerType.SECONDARY_INITIATOR:
...
    console.log(`Secondary Initiator ${remoteServer.name}: ${remoteServer.url}`);
    break;
case diffusion.RemoteServerType.SECONDARY_ACCEPTOR:
...
        console.log(`Secondary Acceptor ${remoteServer.name}: ${remoteServer.primaryHostName}`);
        break;
    }
}
try
{
    var listServers = await session.RemoteServers.ListRemoteServersAsync();
    WriteLine($"The following remote servers exist:");
    foreach (var remoteServer in listServers)
    {
        WriteLine($"Name: '{remoteServer.Name}', Url: '{remoteServer.ServerUrl}', Principal: '{remoteServer.Principal}', Missing Topic Notification Filter: '{remoteServer.MissingTopicNotificationFilter}'");
        foreach(var connectionOption in remoteServer.ConnectionOptions)
        {
            WriteLine($"Connection Option: '{connectionOption.Key}', Value: '{connectionOption.Value}'");
        }
    }
}
catch (Exception ex)
{
    WriteLine($"Failed to list remote servers : {ex}.");
    session.Close();
    return;
}
final List<RemoteServer> servers =
    session.feature(RemoteServers.class)
        .listRemoteServers()
        .get(5, SECONDS);
for (RemoteServer remoteServer : servers) {
    switch(remoteServer.getType()) {
    case PRIMARY_INITIATOR:
        final PrimaryInitiator pi = (PrimaryInitiator)remoteServer;
        LOG.info(
            "Primary Initiator " + pi.getName() +
            " - Connecting to " + pi.getUrls() +
            " using connector " + pi.getConnector());
        break;
    case SECONDARY_ACCEPTOR:
        final SecondaryAcceptor sa = (SecondaryAcceptor)remoteServer;
        LOG.info(
            "Secondary Acceptor " + sa.getName() +
            " - Connecting to " + sa.getPrimaryHostName());
        break;
    case SECONDARY_INITIATOR:
        final SecondaryInitiator si = (SecondaryInitiator)remoteServer;
        LOG.info(
            "Secondary Initiator " + si.getName() +
            " - Connecting to " + si.getUrl());
        break;
    default:
        break;
    }
}
static int on_remote_servers_listed(
    LIST_T *remote_servers,
    void *context)
{
    int list_size = list_get_size(remote_servers);
    printf("Remote Servers found: %d\n", list_size);
    for (int i = 0; i < list_size; i++)
    {
        DIFFUSION_REMOTE_SERVER_T *remote_server = list_get_data_indexed(remote_servers, i);
        char *name = diffusion_remote_server_get_name(remote_server);
        char *principal = diffusion_remote_server_get_principal(remote_server);
        char *url = diffusion_remote_server_get_url(remote_server);
        printf("%s: %s (%s)\n", name, url, principal);
        free(name);
        free(principal);
        free(url);
    }
    return HANDLER_SUCCESS;
}
static int on_remote_servers_listed_failed(
    SESSION_T *session,
    const DIFFUSION_ERROR_T *error)
{
    printf("An error has occurred while listing available remote servers: %s\n", error->message);
    return HANDLER_SUCCESS;
}
...
DIFFUSION_LIST_REMOTE_SERVERS_PARAMS_T list_remote_servers_params = {
    .on_remote_servers_listed = on_remote_servers_listed,
    .on_error = on_remote_servers_listed_failed};
diffusion_list_remote_servers(session, list_remote_servers_params, NULL);
session.remoteServers.listRemoteServers() { (result, error) in
    if (error != nil) {
        print("An error has occurred while listing available remote servers: %@",
              error!.localizedDescription)
    }
    else {
        for remote_server in result! {
            switch(remote_server.type) {
            case .primaryInitiator:
                let primaryInitiator = remote_server as! PTDiffusionPrimaryInitiatorRemoteServer
                print("%@: %@ (%@)", primaryInitiator.name, primaryInitiator.urls, primaryInitiator.connector)
                break;
            case .secondaryInitiator:
                let secondaryInitiator = remote_server as! PTDiffusionSecondaryInitiatorRemoteServer
                print("%@: %@ (%@)", secondaryInitiator.name, secondaryInitiator.url!, secondaryInitiator.principal)
                break;
            case .secondaryAcceptor:
                let secondaryAcceptor = remote_server as! PTDiffusionSecondaryAcceptorRemoteServer
                print("%@: %@ (%@)", secondaryAcceptor.name, secondaryAcceptor.primaryHostName!, secondaryAcceptor.principal)
                break;
            }
        }
    }
}
Remove a remote server
Required permissions: control_server
Used to remove a named remote server if it exists. In a server cluster the remote server will be removed from all cluster members.To fully remove a primary to secondary configuration the named remote server needs to be removed from both the primary server (or cluster) and the secondary server (or cluster).
If provided with a remote server name that does not exist in the server, this operation will complete successfully.
When a remote server is removed, any topic views that depend upon it will be re-evaluated and reference topics that they have created will be removed.| Name | Description | 
|---|---|
| Name | The name of the remote server. | 
try
{
    await session.RemoteServers.RemoveRemoteServerAsync(server.Name);
    WriteLine($"Remote server '{server.Name}' has been removed.");
}
catch (Exception ex)
{
    WriteLine($"Failed to remove remote server '{server.Name}' : {ex}.");
    session.Close();
    return;
}
session.feature(RemoteServers.class).removeRemoteServer("Server_1")
    .get(5, SECONDS);
static int on_remote_server_removed(
    void *context)
{
    printf("Remote server has been successfully removed.\n");
    return HANDLER_SUCCESS;
}
static int on_remote_server_removed_failed(
    SESSION_T *session,
    const DIFFUSION_ERROR_T *error)
{
    printf("An error has occurred while removing remote server: %s\n", error->message);
    return HANDLER_SUCCESS;
}
session.remoteServers.removeRemoteServer("New Remote Server") { (error) in
    if (error != nil) {
        print("An error has occurred while removing remote server: %@",
              error!.localizedDescription)
    }
    else {
        print("Remote Server has been successfully removed")
    }
}