Specifying a reconnection strategy
Reconnection behavior can be configured using custom reconnection strategies.
The reconnection behavior of a client session can be configured using reconnection strategies. A reconnection strategy is applied when the session enters the RECOVERING_RECONNECT state, enabling the session to attempt to reconnect and recover its previous state.
Reconnection can only succeed if the client session is still available on Diffusion™ Cloud. The maximum time that Diffusion Cloud keeps client sessions available for reconnection is 60 seconds.
Individual client sessions can request a shorter reconnection timeout for their sessions or request to disable reconnection when they first connect to Diffusion Cloud
Examples
// When establishing a session, it is possible to specify whether reconnection // should be attempted in the event of an unexpected disconnection. This allows // the session to recover its previous state. // Set the maximum amount of time we'll try and reconnect for to 10 minutes var maximumTimeoutDuration = 1000 * 60 * 10; // Set the maximum interval between reconnect attempts to 60 seconds var maximumAttemptInterval = 1000 * 60; // Set an upper limit to the number of times we'll try to reconnect for var maximumAttempts = 25; // Count the number of reconnection attempts we've made var attempts = 0; // Create a reconnection strategy that applies an exponential back-off // The strategy will be called with two arguments, start & abort. Both // of these are functions, which allow the strategy to either start a // reconnection attempt, or to abort reconnection (which will close the session) var reconnectionStrategy = function(start, abort) { if (attempts > maximumAttempts) { abort(); } else { var wait = Math.min(Math.pow(2, attempts++) * 100, maximumAttemptInterval); // Wait the specified time period, and then start the reconnection attempt setTimeout(start, wait); } }; // Connect to the server. diffusion.connect({ host : 'diffusion.example.com', port : 443, secure : true, principal : 'control', credentials : 'password', reconnect : { timeout : maximumTimeoutDuration, strategy : reconnectionStrategy } }).then(function(session) { session.on('disconnect', function() { // This will be called when we lose connection. Because we've specified the // reconnection strategy, it will be called automatically when this event // is dispatched }); session.on('reconnect', function() { // If the session is able to reconnect within the reconnect timeout, this // event will be dispatched to notify that normal operations may resume attempts = 0; }); session.on('close', function() { // If the session is closed normally, or the session is unable to reconnect, // this event will be dispatched to notify that the session is no longer // operational. }); });
@import Diffusion; @interface ExponentialBackoffReconnectionStrategy : NSObject <PTDiffusionSessionReconnectionStrategy> @end @implementation CustomReconnectionStrategyExample { PTDiffusionSession* _session; } -(void)startWithURL:(NSURL*)url { NSLog(@"Connecting..."); PTDiffusionMutableSessionConfiguration *const sessionConfiguration = [PTDiffusionMutableSessionConfiguration new]; // Set the maximum amount of time we'll try and reconnect for to 10 minutes. sessionConfiguration.reconnectionTimeout = @(10.0 * 60.0); // seconds // Set the reconnection strategy to be used. sessionConfiguration.reconnectionStrategy = [ExponentialBackoffReconnectionStrategy new]; // Start connecting asynchronously. [PTDiffusionSession openWithURL:url configuration:sessionConfiguration completionHandler:^(PTDiffusionSession *session, NSError *error) { if (!session) { NSLog(@"Failed to open session: %@", error); return; } // At this point we now have a connected session. NSLog(@"Connected."); // Set ivar to maintain a strong reference to the session. _session = session; }]; } @end @implementation ExponentialBackoffReconnectionStrategy { NSUInteger _attemptCount; } -(void) diffusionSession:(PTDiffusionSession *const)session wishesToReconnectWithAttempt:(PTDiffusionSessionReconnectionAttempt *const)attempt { // Limit the maximum time to delay between reconnection attempts to 60 seconds. const NSTimeInterval maximumAttemptInterval = 60.0; // Compute delay for exponential backoff based on the number of attempts so far. const NSTimeInterval delay = MIN(pow(2.0, _attemptCount++) * 0.1, maximumAttemptInterval); // Schedule asynchronous execution. NSLog(@"Reconnection attempt scheduled for %.2fs", delay); dispatch_after(dispatch_time(DISPATCH_TIME_NOW, (int64_t)(delay * NSEC_PER_SEC)), dispatch_get_main_queue(), ^ { NSLog(@"Attempting reconnection."); [attempt start]; }); } @end
import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.Session.Listener; import com.pushtechnology.diffusion.client.session.Session.State; import com.pushtechnology.diffusion.client.session.reconnect.ReconnectionStrategy; /** * This example class demonstrates the ability to set a custom {@link ReconnectionStrategy} * when creating sessions. * * @author Push Technology Limited * @since 5.5 */ public class ClientWithReconnectionStrategy { private volatile int retries = 0; /** * Constructor. */ public ClientWithReconnectionStrategy() { // Set the maximum amount of time we'll try and reconnect for to 10 minutes. final int maximumTimeoutDuration = 1000 * 60 * 10; // Set the maximum interval between reconnect attempts to 60 seconds. final long maximumAttemptInterval = 1000 * 60; // Create a new reconnection strategy that applies an exponential backoff final ReconnectionStrategy reconnectionStrategy = new ReconnectionStrategy() { private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1); @Override public void performReconnection(final ReconnectionAttempt reconnection) { final long exponentialWaitTime = Math.min((long) Math.pow(2, retries++) * 100L, maximumAttemptInterval); scheduler.schedule(new Runnable() { @Override public void run() { reconnection.start(); } }, exponentialWaitTime, TimeUnit.MILLISECONDS); } }; final Session session = Diffusion.sessions().reconnectionTimeout(maximumTimeoutDuration) .reconnectionStrategy(reconnectionStrategy) .open("ws://diffusion.example.com:80"); session.addListener(new Listener() { @Override public void onSessionStateChanged(Session session, State oldState, State newState) { if (newState == State.RECOVERING_RECONNECT) { // The session has been disconnected, and has entered recovery state. It is during this state that // the reconnect strategy will be called } if (newState == State.CONNECTED_ACTIVE) { // The session has connected for the first time, or it has been reconnected. retries = 0; } if (oldState == State.RECOVERING_RECONNECT) { // The session has left recovery state. It may either be attempting to reconnect, or the attempt has // been aborted; this will be reflected in the newState. } } }); } }
/* * This example shows how to make a synchronous connection to * Diffusion, with user-provided reconnection logic. */ #include <stdio.h> #include <stdlib.h> #include <time.h> #include <unistd.h> #include <apr_time.h> #include "diffusion.h" #include "args.h" ARG_OPTS_T arg_opts[] = { ARG_OPTS_HELP, {'u', "url", "Diffusion server URL", ARG_OPTIONAL, ARG_HAS_VALUE, "ws://localhost:8080"}, {'p', "principal", "Principal (username) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'c', "credentials", "Credentials (password) for the connection", ARG_OPTIONAL, ARG_HAS_VALUE, NULL}, {'s', "sleep", "Time to sleep before disconnecting (in seconds).", ARG_OPTIONAL, ARG_HAS_VALUE, "5" }, END_OF_ARG_OPTS }; /* * This callback is used when the session state changes, e.g. when a session * moves from a "connecting" to a "connected" state, or from "connected" to * "closed". */ static void on_session_state_changed(SESSION_T *session, const SESSION_STATE_T old_state, const SESSION_STATE_T new_state) { printf("Session state changed from %s (%d) to %s (%d)\n", session_state_as_string(old_state), old_state, session_state_as_string(new_state), new_state); } typedef struct { long current_wait; long max_wait; } BACKOFF_STRATEGY_ARGS_T; static RECONNECTION_ATTEMPT_ACTION_T backoff_reconnection_strategy(SESSION_T *session, void *args) { BACKOFF_STRATEGY_ARGS_T *backoff_args = args; printf("Waiting for %ld ms\n", backoff_args->current_wait); apr_sleep(backoff_args->current_wait * 1000); // µs -> ms // But only up to some maximum time. if(backoff_args->current_wait > backoff_args->max_wait) { backoff_args->current_wait = backoff_args->max_wait; } return RECONNECTION_ATTEMPT_ACTION_START; } static void backoff_success(SESSION_T *session, void *args) { printf("Reconnection successful\n"); BACKOFF_STRATEGY_ARGS_T *backoff_args = args; backoff_args->current_wait = 0; // Reset wait. } static void backoff_failure(SESSION_T *session, void *args) { printf("Reconnection failed (%s)\n", session_state_as_string(session->state)); BACKOFF_STRATEGY_ARGS_T *backoff_args = args; // Exponential backoff. if(backoff_args->current_wait == 0) { backoff_args->current_wait = 1; } else { backoff_args->current_wait *= 2; } } /* * Entry point for the example. */ int main(int argc, char **argv) { /* * Standard command-line parsing. */ HASH_T *options = parse_cmdline(argc, argv, arg_opts); if(options == NULL || hash_get(options, "help") != NULL) { show_usage(argc, argv, arg_opts); return EXIT_FAILURE; } const char *url = hash_get(options, "url"); const char *principal = hash_get(options, "principal"); CREDENTIALS_T *credentials = NULL; const char *password = hash_get(options, "credentials"); if(password != NULL) { credentials = credentials_create_password(password); } const unsigned int sleep_time = atol(hash_get(options, "sleep")); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; SESSION_LISTENER_T session_listener = { 0 }; session_listener.on_state_changed = &on_session_state_changed; /* * Set the arguments to our exponential backoff strategy. */ BACKOFF_STRATEGY_ARGS_T *backoff_args = calloc(1, sizeof(BACKOFF_STRATEGY_ARGS_T)); backoff_args->current_wait = 0; backoff_args->max_wait = 5000; /* * Create the backoff strategy. */ RECONNECTION_STRATEGY_T *reconnection_strategy = make_reconnection_strategy_user_function(backoff_reconnection_strategy, backoff_args, backoff_success, backoff_failure, NULL); /* * Only ever retry for 30 seconds. */ reconnection_strategy_set_timeout(reconnection_strategy, 30 * 1000); /* * Create a session, synchronously. */ session = session_create(url, principal, credentials, &session_listener, reconnection_strategy, &error); if(session != NULL) { char *sid_str = session_id_to_string(session->id); printf("Session created (state=%d, id=%s)\n", session_state_get(session), sid_str); free(sid_str); } else { printf("Failed to create session: %s\n", error.message); free(error.message); } // With the exception of backoff_args, the reconnection strategy is // copied withing session_create() and may be freed now. free(reconnection_strategy); /* * Sleep for a while. */ sleep(sleep_time); /* * Close the session, and release resources and memory. */ session_close(session, NULL); session_free(session); free(backoff_args); credentials_free(credentials); hash_free(options, NULL, free); return EXIT_SUCCESS; }
This page last modified: 2020/06/25