Specifying a reconnection strategy
Reconnection behaviour can be configured using custom reconnection strategies.
The reconnection behaviour of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.
Reconnection can only succeed if the client session is still available on Diffusion™ Cloud . The maximum time that Diffusion Cloud keeps client sessions available for reconnection is 60 seconds.
Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to Diffusion Cloud
Examples
// When establishing a session, it is possible to specify whether reconnection
// should be attempted in the event of an unexpected disconnection. This allows
// the session to recover its previous state.
// Set the maximum amount of time we'll try and reconnect for to 10 minutes
const maximumTimeoutDuration = 1000 * 60 * 10;
// Set the maximum interval between reconnect attempts to 60 seconds
const maximumAttemptInterval = 1000 * 60;
// Set an upper limit to the number of times we'll try to reconnect for
const maximumAttempts = 25;
// Count the number of reconnection attempts we've made
let attempts = 0;
// Create a reconnection strategy that applies an exponential back-off
// The strategy will be called with two arguments, start & abort. Both
// of these are functions, which allow the strategy to either start a
// reconnection attempt, or to abort reconnection (which will close the session)
const reconnectionStrategy = (start, abort) => {
if (attempts > maximumAttempts) {
abort();
} else {
const wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval);
// Wait the specified time period, and then start the reconnection attempt
setTimeout(start, wait);
}
};
// Connect to the server.
const session = await diffusion.connect({
host : 'diffusion.example.com',
port : 443,
secure : true,
principal : 'control',
credentials : 'password',
reconnect : {
timeout : maximumTimeoutDuration,
strategy : reconnectionStrategy
}
});
session.on('disconnect', () => {
// This will be called when we lose connection. Because we've specified the
// reconnection strategy, it will be called automatically when this event
// is dispatched
});
session.on('reconnect', () => {
// If the session is able to reconnect within the reconnect timeout, this
// event will be dispatched to notify that normal operations may resume
attempts = 0;
});
session.on('close', () => {
// If the session is closed normally, or the session is unable to reconnect,
// this event will be dispatched to notify that the session is no longer
// operational.
});
/// <summary>
/// Implementation that demonstrates session reconnection strategy.
/// </summary>
public sealed class SessionReconnection
{
private int maximumTimeoutDuration = 1000 * 60 * 10;
private ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy();
public SessionReconnection(string serverUrl)
{
var factory = Diffusion.Sessions;
var session = Connect(serverUrl, factory);
if (session != null)
{
WriteLine("The session has been created.");
}
Thread.Sleep(60000);
session.Close();
}
public ISession Connect(string url, ISessionFactory initialFactory)
{
try
{
string principal = "control";
string password = "password";
var factory = initialFactory
.Principal(principal)
.Credentials(Diffusion.Credentials.Password(password))
.CertificateValidation((cert, chain, errors)
=> CertificateValidationResult.ACCEPT)
.ReconnectionTimeout(maximumTimeoutDuration)
.ReconnectionStrategy(reconnectionStrategy)
.SessionStateChangedHandler(OnSessionStateChanged);
return factory.Open(url);
}
catch (Exception ex)
{
WriteLine($"Session connection error : {ex}.");
}
return null;
}
private void OnSessionStateChanged(object sender, SessionListenerEventArgs e)
{
if (e.NewState == SessionState.RECOVERING_RECONNECT)
{
// The session has been disconnected, and has entered
// recovery state. It is during this state that
// the reconnect strategy will be called
WriteLine("The session has been disconnected.");
}
if (e.NewState == SessionState.CONNECTED_ACTIVE)
{
// The session has connected for the first time, or it has
// been reconnected.
reconnectionStrategy.Retries = 0;
WriteLine("The session has connected.");
}
if (e.OldState == SessionState.RECOVERING_RECONNECT)
{
// The session has left recovery state. It may either be
// attempting to reconnect, or the attempt has been aborted;
// this will be reflected in the newState.
}
if (e.NewState == SessionState.CLOSED_BY_CLIENT)
{
WriteLine("The session has been closed.");
}
}
/// <summary>
/// A reconnection strategy that gets applied after the connection failure notification.
/// </summary>
private class ReconnectionStrategy : IReconnectionStrategy
{
public int Retries { get; set; }
// Set the maximum interval between reconnect attempts to 60 seconds.
private long maximumAttemptInterval = 1000 * 60;
public ReconnectionStrategy() => Retries = 0;
public async Task PerformReconnection(IReconnectionAttempt reconnection)
{
long wait = Math.Min((long)Math.Pow(2, Retries++) * 100L, maximumAttemptInterval);
Thread.Sleep((int)wait);
WriteLine("Attempting to reconnect...");
reconnection.Start();
}
}
}
final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() {
private int retries = 0;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@Override
public void performReconnection(ReconnectionAttempt reconnectionAttempt) {
if (retries < 5) {
final int delay = (int) Math.pow(2, retries); // Exponential backoff
retries++;
scheduler.schedule(reconnectionAttempt::start, delay, TimeUnit.SECONDS);
}
else {
reconnectionAttempt.abort();
}
}
};
final Session session = Diffusion.sessions()
.principal("admin")
.password("password")
.reconnectionStrategy(reconnectionStrategy)
.open("ws://localhost:8080");
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include <time.h>
#include "diffusion.h"
/*
* This callback is used when the session state changes, e.g. when a session
* moves from a "connecting" to a "connected" state, or from "connected" to
* "closed".
*/
static void on_session_state_changed(
SESSION_T *session,
const SESSION_STATE_T old_state,
const SESSION_STATE_T new_state)
{
printf(
"Session state changed from %s (%d) to %s (%d)\n",
session_state_as_string(old_state),
old_state,
session_state_as_string(new_state),
new_state);
}
typedef struct
{
double current_wait;
double max_wait;
} BACKOFF_STRATEGY_ARGS_T;
static RECONNECTION_ATTEMPT_ACTION_T backoff_reconnection_strategy(
SESSION_T *session,
void *args)
{
BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
printf("Waiting for %f ms\n", backoff_args->current_wait);
sleep(backoff_args->current_wait);
// But only up to some maximum time.
if (backoff_args->current_wait > backoff_args->max_wait)
{
backoff_args->current_wait = backoff_args->max_wait;
}
return RECONNECTION_ATTEMPT_ACTION_START;
}
static void backoff_success(
SESSION_T *session,
void *args)
{
printf("Reconnection successful\n");
BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
backoff_args->current_wait = 0; // Reset wait.
}
static void backoff_failure(
SESSION_T *session,
void *args)
{
printf("Reconnection failed (%s)\n", session_state_as_string(session->state));
BACKOFF_STRATEGY_ARGS_T *backoff_args = args;
// Exponential backoff.
if (backoff_args->current_wait == 0)
{
backoff_args->current_wait = 0.01;
}
else
{
backoff_args->current_wait *= 2;
}
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
SESSION_LISTENER_T session_listener = {.on_state_changed = on_session_state_changed};
// Set the arguments to our exponential backoff strategy
BACKOFF_STRATEGY_ARGS_T backoff_args = {.current_wait = 0, .max_wait = 5000};
// Create the backoff strategy
RECONNECTION_STRATEGY_T *reconnection_strategy =
make_reconnection_strategy_user_function(backoff_reconnection_strategy, &backoff_args, backoff_success, backoff_failure);
// Only retry for 30 seconds
reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000);
// Create a session, synchronously
session = session_create(url, principal, credentials, &session_listener, reconnection_strategy, &error);
if (session != NULL)
{
char *sid_str = session_id_to_string(session->id);
printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str);
free(sid_str);
}
else
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
}
// With the exception of backoff_args, the reconnection strategy is
// copied withing session_create() and may be freed now
free_reconnection_strategy(reconnection_strategy);
// Sleep for a while
sleep(20);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
return EXIT_SUCCESS;
}
class ExponentialBackoffReconnectionStrategy: NSObject, PTDiffusionSessionReconnectionStrategy {
var attempt_count : Int = 0
func diffusionSession(_ session: PTDiffusionSession,
wishesToReconnectWith attempt: PTDiffusionSessionReconnectionAttempt) {
// limit the maximum delay time between reconnection attempts to 60 seconds
let maximum_attempt_interval = 60.0
// compute delay for exponential backoff on the number of attempts so far
self.attempt_count += 1
let delay = min(pow(2.0, Double(self.attempt_count)) * 0.1, maximum_attempt_interval)
// schedule asynchronous execution
let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(Int(delay))
DispatchQueue.main.asyncAfter(deadline: dispatch_time) {
print("Attempting to reconnect now")
attempt.start()
}
}
}
func connect() {
let configuration = PTDiffusionMutableSessionConfiguration()
// Set the maximum amount of time we'll try and reconnect for to 10 minutes
configuration.reconnectionTimeout = NSNumber(value: 10.0 * 60.0)
// Set the reconnection strategy to be used
configuration.reconnectionStrategy = ExponentialBackoffReconnectionStrategy()
// Use the configuration to open a new session...
PTDiffusionSession.open(with: URL(string: "url")!,
configuration: configuration) { (session, error) in
// Check error is `nil`, then use session as required.
// Ensure to maintain a strong reference to the session beyond the lifetime
// of this callback, for example by assigning it to an instance variable.
if (session == nil) {
print("Failed to open session: %@", error!.localizedDescription)
return
}
// At this point we now have a connected session.
print("Connected.")
}
}