Specifying a reconnection strategy
Reconnection behavior can be configured using custom reconnection strategies.
The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.
Reconnection can only succeed if the client session is still available on the Diffusion™ server . The maximum time that the Diffusion server keeps client sessions in the DISCONNECTED state before closing them can be configured using the Connectors.xml configuration file. For more information, see Configuring connectors.
Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to the Diffusion server
Examples
// When establishing a session, it is possible to specify whether reconnection // should be attempted in the event of an unexpected disconnection. This allows // the session to recover its previous state. // Set the maximum amount of time we'll try and reconnect for to 10 minutes const maximumTimeoutDuration = 1000 * 60 * 10; // Set the maximum interval between reconnect attempts to 60 seconds const maximumAttemptInterval = 1000 * 60; // Set an upper limit to the number of times we'll try to reconnect for const maximumAttempts = 25; // Count the number of reconnection attempts we've made let attempts = 0; // Create a reconnection strategy that applies an exponential back-off // The strategy will be called with two arguments, start & abort. Both // of these are functions, which allow the strategy to either start a // reconnection attempt, or to abort reconnection (which will close the session) const reconnectionStrategy = (start, abort) => { if (attempts > maximumAttempts) { abort(); } else { const wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval); // Wait the specified time period, and then start the reconnection attempt setTimeout(start, wait); } }; // Connect to the server. const session = await diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password', reconnect : { timeout : maximumTimeoutDuration, strategy : reconnectionStrategy } }); session.on('disconnect', () => { // This will be called when we lose connection. Because we've specified the // reconnection strategy, it will be called automatically when this event // is dispatched }); session.on('reconnect', () => { // If the session is able to reconnect within the reconnect timeout, this // event will be dispatched to notify that normal operations may resume attempts = 0; }); session.on('close', () => { // If the session is closed normally, or the session is unable to reconnect, // this event will be dispatched to notify that the session is no longer // operational. });
/// <summary> /// Implementation that demonstrates session reconnection strategy. /// </summary> public sealed class SessionReconnection { private int maximumTimeoutDuration = 1000 * 60 * 10; private ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy(); public SessionReconnection(string serverUrl) { var factory = Diffusion.Sessions; var session = Connect(serverUrl, factory); if (session != null) { WriteLine("The session has been created."); } Thread.Sleep(60000); session.Close(); } public ISession Connect(string url, ISessionFactory initialFactory) { try { string principal = "control"; string password = "password"; var factory = initialFactory .Principal(principal) .Credentials(Diffusion.Credentials.Password(password)) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .ReconnectionTimeout(maximumTimeoutDuration) .ReconnectionStrategy(reconnectionStrategy) .SessionStateChangedHandler(OnSessionStateChanged); return factory.Open(url); } catch (Exception ex) { WriteLine($"Session connection error : {ex}."); } return null; } private void OnSessionStateChanged(object sender, SessionListenerEventArgs e) { if (e.NewState == SessionState.RECOVERING_RECONNECT) { // The session has been disconnected, and has entered // recovery state. It is during this state that // the reconnect strategy will be called WriteLine("The session has been disconnected."); } if (e.NewState == SessionState.CONNECTED_ACTIVE) { // The session has connected for the first time, or it has // been reconnected. reconnectionStrategy.Retries = 0; WriteLine("The session has connected."); } if (e.OldState == SessionState.RECOVERING_RECONNECT) { // The session has left recovery state. It may either be // attempting to reconnect, or the attempt has been aborted; // this will be reflected in the newState. } if (e.NewState == SessionState.CLOSED_BY_CLIENT) { WriteLine("The session has been closed."); } } /// <summary> /// A reconnection strategy that gets applied after the connection failure notification. /// </summary> private class ReconnectionStrategy : IReconnectionStrategy { public int Retries { get; set; } // Set the maximum interval between reconnect attempts to 60 seconds. private long maximumAttemptInterval = 1000 * 60; public ReconnectionStrategy() => Retries = 0; public async Task PerformReconnection(IReconnectionAttempt reconnection) { long wait = Math.Min((long)Math.Pow(2, Retries++) * 100L, maximumAttemptInterval); Thread.Sleep((int)wait); WriteLine("Attempting to reconnect..."); reconnection.Start(); } } }
final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() { private int retries = 0; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @Override public void performReconnection(ReconnectionAttempt reconnectionAttempt) { if (retries < 5) { int delay = (int) Math.pow(2, retries); // Exponential backoff retries++; scheduler.schedule(reconnectionAttempt::start, delay, TimeUnit.SECONDS); } else { reconnectionAttempt.abort(); } } }; final Session session = Diffusion.sessions() .principal("admin") .password("password") .reconnectionStrategy(reconnectionStrategy) .open("ws://localhost:8080");
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include <time.h> #include "diffusion.h" /* * This callback is used when the session state changes, e.g. when a session * moves from a "connecting" to a "connected" state, or from "connected" to * "closed". */ static void on_session_state_changed(SESSION_T *session, const SESSION_STATE_T old_state, const SESSION_STATE_T new_state) { printf("Session state changed from %s (%d) to %s (%d)\n", session_state_as_string(old_state), old_state, session_state_as_string(new_state), new_state); } typedef struct { double current_wait; double max_wait; } BACKOFF_STRATEGY_ARGS_T; static RECONNECTION_ATTEMPT_ACTION_T backoff_reconnection_strategy( SESSION_T *session, void *args) { BACKOFF_STRATEGY_ARGS_T *backoff_args = args; printf("Waiting for %f ms\n", backoff_args->current_wait); sleep(backoff_args->current_wait); // But only up to some maximum time. if(backoff_args->current_wait > backoff_args->max_wait) { backoff_args->current_wait = backoff_args->max_wait; } return RECONNECTION_ATTEMPT_ACTION_START; } static void backoff_success(SESSION_T *session, void *args) { printf("Reconnection successful\n"); BACKOFF_STRATEGY_ARGS_T *backoff_args = args; backoff_args->current_wait = 0; // Reset wait. } static void backoff_failure(SESSION_T *session, void *args) { printf("Reconnection failed (%s)\n", session_state_as_string(session->state)); BACKOFF_STRATEGY_ARGS_T *backoff_args = args; // Exponential backoff. if(backoff_args->current_wait == 0) { backoff_args->current_wait = 0.01; } else { backoff_args->current_wait *= 2; } } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; SESSION_LISTENER_T session_listener = { .on_state_changed = on_session_state_changed }; // Set the arguments to our exponential backoff strategy BACKOFF_STRATEGY_ARGS_T backoff_args = { .current_wait = 0, .max_wait = 5000 }; // Create the backoff strategy RECONNECTION_STRATEGY_T *reconnection_strategy = make_reconnection_strategy_user_function( backoff_reconnection_strategy, &backoff_args, backoff_success, backoff_failure); // Only retry for 30 seconds reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000); // Create a session, synchronously session = session_create( url, principal, credentials, &session_listener, reconnection_strategy, &error); if(session != NULL) { char *sid_str = session_id_to_string(session->id); printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str); free(sid_str); } else { printf("Failed to create session: %s\n", error.message); free(error.message); } // With the exception of backoff_args, the reconnection strategy is // copied withing session_create() and may be freed now free_reconnection_strategy(reconnection_strategy); // Sleep for a while sleep(20); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
class ExponentialBackoffReconnectionStrategy: NSObject, PTDiffusionSessionReconnectionStrategy { var attempt_count : Int = 0 func diffusionSession(_ session: PTDiffusionSession, wishesToReconnectWith attempt: PTDiffusionSessionReconnectionAttempt) { // limit the maximum delay time between reconnection attempts to 60 seconds let maximum_attempt_interval = 60.0 // compute delay for exponential backoff on the number of attempts so far self.attempt_count += 1 let delay = min(pow(2.0, Double(self.attempt_count)) * 0.1, maximum_attempt_interval) // schedule asynchronous execution let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(Int(delay)) DispatchQueue.main.asyncAfter(deadline: dispatch_time) { print("Attempting to reconnect now") attempt.start() } } } func connect() { let configuration = PTDiffusionMutableSessionConfiguration() // Set the maximum amount of time we'll try and reconnect for to 10 minutes configuration.reconnectionTimeout = NSNumber(value: 10.0 * 60.0) // Set the reconnection strategy to be used configuration.reconnectionStrategy = ExponentialBackoffReconnectionStrategy() // Use the configuration to open a new session... PTDiffusionSession.open(with: URL(string: "url")!, configuration: configuration) { (session, error) in // Check error is `nil`, then use session as required. // Ensure to maintain a strong reference to the session beyond the lifetime // of this callback, for example by assigning it to an instance variable. if (session == nil) { print("Failed to open session: %@", error!.localizedDescription) return } // At this point we now have a connected session. print("Connected.") } }