Specifying a reconnection strategy
Reconnection behavior can be configured using custom reconnection strategies.
The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.
Reconnection can only succeed if the client session is still available on Diffusion™ Cloud . The maximum time that Diffusion Cloud keeps client sessions available for reconnection is 60 seconds.
Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to Diffusion Cloud
Examples
// When establishing a session, it is possible to specify whether reconnection // should be attempted in the event of an unexpected disconnection. This allows // the session to recover its previous state. // Set the maximum amount of time we'll try and reconnect for to 10 minutes const maximumTimeoutDuration = 1000 * 60 * 10; // Set the maximum interval between reconnect attempts to 60 seconds const maximumAttemptInterval = 1000 * 60; // Set an upper limit to the number of times we'll try to reconnect for const maximumAttempts = 25; // Count the number of reconnection attempts we've made let attempts = 0; // Create a reconnection strategy that applies an exponential back-off // The strategy will be called with two arguments, start & abort. Both // of these are functions, which allow the strategy to either start a // reconnection attempt, or to abort reconnection (which will close the session) const reconnectionStrategy = (start, abort) => { if (attempts > maximumAttempts) { abort(); } else { const wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval); // Wait the specified time period, and then start the reconnection attempt setTimeout(start, wait); } }; // Connect to the server. const session = await diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password', reconnect : { timeout : maximumTimeoutDuration, strategy : reconnectionStrategy } }); session.on('disconnect', () => { // This will be called when we lose connection. Because we've specified the // reconnection strategy, it will be called automatically when this event // is dispatched }); session.on('reconnect', () => { // If the session is able to reconnect within the reconnect timeout, this // event will be dispatched to notify that normal operations may resume attempts = 0; }); session.on('close', () => { // If the session is closed normally, or the session is unable to reconnect, // this event will be dispatched to notify that the session is no longer // operational. });
/// <summary> /// Implementation that demonstrates session reconnection strategy. /// </summary> public sealed class SessionReconnection { private int maximumTimeoutDuration = 1000 * 60 * 10; private ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy(); public SessionReconnection(string serverUrl) { var factory = Diffusion.Sessions; var session = Connect(serverUrl, factory); if (session != null) { WriteLine("The session has been created."); } Thread.Sleep(60000); session.Close(); } public ISession Connect(string url, ISessionFactory initialFactory) { try { string principal = "control"; string password = "password"; var factory = initialFactory .Principal(principal) .Credentials(Diffusion.Credentials.Password(password)) .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .ReconnectionTimeout(maximumTimeoutDuration) .ReconnectionStrategy(reconnectionStrategy) .SessionStateChangedHandler(OnSessionStateChanged); return factory.Open(url); } catch (Exception ex) { WriteLine($"Session connection error : {ex}."); } return null; } private void OnSessionStateChanged(object sender, SessionListenerEventArgs e) { if (e.NewState == SessionState.RECOVERING_RECONNECT) { // The session has been disconnected, and has entered // recovery state. It is during this state that // the reconnect strategy will be called WriteLine("The session has been disconnected."); } if (e.NewState == SessionState.CONNECTED_ACTIVE) { // The session has connected for the first time, or it has // been reconnected. reconnectionStrategy.Retries = 0; WriteLine("The session has connected."); } if (e.OldState == SessionState.RECOVERING_RECONNECT) { // The session has left recovery state. It may either be // attempting to reconnect, or the attempt has been aborted; // this will be reflected in the newState. } if (e.NewState == SessionState.CLOSED_BY_CLIENT) { WriteLine("The session has been closed."); } } /// <summary> /// A reconnection strategy that gets applied after the connection failure notification. /// </summary> private class ReconnectionStrategy : IReconnectionStrategy { public int Retries { get; set; } // Set the maximum interval between reconnect attempts to 60 seconds. private long maximumAttemptInterval = 1000 * 60; public ReconnectionStrategy() => Retries = 0; public async Task PerformReconnection(IReconnectionAttempt reconnection) { long wait = Math.Min((long)Math.Pow(2, Retries++) * 100L, maximumAttemptInterval); Thread.Sleep((int)wait); WriteLine("Attempting to reconnect..."); reconnection.Start(); } } }
final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() { private int retries = 0; private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @Override public void performReconnection(ReconnectionAttempt reconnectionAttempt) { if (retries < 5) { int delay = (int) Math.pow(2, retries); // Exponential backoff retries++; scheduler.schedule(reconnectionAttempt::start, delay, TimeUnit.SECONDS); } else { reconnectionAttempt.abort(); } } }; final Session session = Diffusion.sessions() .principal("admin") .password("password") .reconnectionStrategy(reconnectionStrategy) .open("ws://localhost:8080");
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. * */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include <time.h> #include "diffusion.h" /* * This callback is used when the session state changes, e.g. when a session * moves from a "connecting" to a "connected" state, or from "connected" to * "closed". */ static void on_session_state_changed(SESSION_T *session, const SESSION_STATE_T old_state, const SESSION_STATE_T new_state) { printf("Session state changed from %s (%d) to %s (%d)\n", session_state_as_string(old_state), old_state, session_state_as_string(new_state), new_state); } typedef struct { double current_wait; double max_wait; } BACKOFF_STRATEGY_ARGS_T; static RECONNECTION_ATTEMPT_ACTION_T backoff_reconnection_strategy( SESSION_T *session, void *args) { BACKOFF_STRATEGY_ARGS_T *backoff_args = args; printf("Waiting for %f ms\n", backoff_args->current_wait); sleep(backoff_args->current_wait); // But only up to some maximum time. if(backoff_args->current_wait > backoff_args->max_wait) { backoff_args->current_wait = backoff_args->max_wait; } return RECONNECTION_ATTEMPT_ACTION_START; } static void backoff_success(SESSION_T *session, void *args) { printf("Reconnection successful\n"); BACKOFF_STRATEGY_ARGS_T *backoff_args = args; backoff_args->current_wait = 0; // Reset wait. } static void backoff_failure(SESSION_T *session, void *args) { printf("Reconnection failed (%s)\n", session_state_as_string(session->state)); BACKOFF_STRATEGY_ARGS_T *backoff_args = args; // Exponential backoff. if(backoff_args->current_wait == 0) { backoff_args->current_wait = 0.01; } else { backoff_args->current_wait *= 2; } } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; SESSION_LISTENER_T session_listener = { .on_state_changed = on_session_state_changed }; // Set the arguments to our exponential backoff strategy BACKOFF_STRATEGY_ARGS_T backoff_args = { .current_wait = 0, .max_wait = 5000 }; // Create the backoff strategy RECONNECTION_STRATEGY_T *reconnection_strategy = make_reconnection_strategy_user_function( backoff_reconnection_strategy, &backoff_args, backoff_success, backoff_failure); // Only retry for 30 seconds reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000); // Create a session, synchronously session = session_create( url, principal, credentials, &session_listener, reconnection_strategy, &error); if(session != NULL) { char *sid_str = session_id_to_string(session->id); printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str); free(sid_str); } else { printf("Failed to create session: %s\n", error.message); free(error.message); } // With the exception of backoff_args, the reconnection strategy is // copied withing session_create() and may be freed now free_reconnection_strategy(reconnection_strategy); // Sleep for a while sleep(20); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
class ExponentialBackoffReconnectionStrategy: NSObject, PTDiffusionSessionReconnectionStrategy { var attempt_count : Int = 0 func diffusionSession(_ session: PTDiffusionSession, wishesToReconnectWith attempt: PTDiffusionSessionReconnectionAttempt) { // limit the maximum delay time between reconnection attempts to 60 seconds let maximum_attempt_interval = 60.0 // compute delay for exponential backoff on the number of attempts so far self.attempt_count += 1 let delay = min(pow(2.0, Double(self.attempt_count)) * 0.1, maximum_attempt_interval) // schedule asynchronous execution let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(Int(delay)) DispatchQueue.main.asyncAfter(deadline: dispatch_time) { print("Attempting to reconnect now") attempt.start() } } } func connect() { let configuration = PTDiffusionMutableSessionConfiguration() // Set the maximum amount of time we'll try and reconnect for to 10 minutes configuration.reconnectionTimeout = NSNumber(value: 10.0 * 60.0) // Set the reconnection strategy to be used configuration.reconnectionStrategy = ExponentialBackoffReconnectionStrategy() // Use the configuration to open a new session... PTDiffusionSession.open(with: URL(string: "url")!, configuration: configuration) { (session, error) in // Check error is `nil`, then use session as required. // Ensure to maintain a strong reference to the session beyond the lifetime // of this callback, for example by assigning it to an instance variable. if (session == nil) { print("Failed to open session: %@", error!.localizedDescription) return } // At this point we now have a connected session. print("Connected.") } }