Session locks
Session locks are a way to ensure that only one session at a time can access a particular resource. For example, you can use a session lock to ensure that only one session is allowed to update a certain topic.
Session locks are a mechanism managed by the Diffusion™ server to coordinate access to shared resources among multiple sessions.
A session can acquire a lock, identified by a lock name (chosen by you to suit your application). Once a session acquires a lock, no other session can acquire the same lock.
Acquiring a lock does not automatically change anything else about a session. Locks are not linked to topics or permissions, except through your application's logic. It is up to you to design a suitable locking scheme and ensure your application implements it. For example, if you want to implement exclusive updating of a topic using a session lock, you must make sure that each session always acquires the lock and uses a lock constraint created from the lock when updating the topic.
By default, a lock is released when the session owning it closes. Alternatively, when acquiring a lock, a session can specify that the lock will be released if connection to the server is lost. This is done using a scope parameter.
A session can also explicitly release a lock.
const lock = await session.lock('lock_name', diffusion.SessionLockScope.UNLOCK_ON_CONNECTION_LOSS); console.log('Acquired session lock'); setTimeout(async () => { await lock.unlock(); console.log('Lock has been released'); }, 1000);
/** * Copyright © 2021, 2022 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that demonstrates session locks. /// </summary> public sealed class SessionLocks { private ISession session1, session2; private ISessionLock sessionLock1, sessionLock2; private static string LOCK_NAME = "lockA"; public SessionLocks(string serverUrl) { session1 = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); session2 = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); WriteLine("Sessions 1 and 2 have been created."); AcquireLockSession1(); } private async void AcquireLockSession1() { try { WriteLine("Requesting lock 1..."); sessionLock1 = await session1.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS); WriteLine("Lock 1 has been acquired."); AcquireLockSession2(); Thread.Sleep(1000); ReleaseLock1(); } catch (Exception ex) { WriteLine($"Failed to get lock 1 : {ex}."); session1.Close(); session2.Close(); } } private async void AcquireLockSession2() { try { WriteLine("Requesting lock 2..."); sessionLock2 = await session2.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS); WriteLine("Lock 2 has been acquired."); Thread.Sleep(1000); ReleaseLock2(); } catch (Exception ex) { WriteLine($"Failed to get lock 2 : {ex}."); session1.Close(); session2.Close(); } } private async void ReleaseLock1() { try { WriteLine("Requesting lock 1 release..."); await sessionLock1.UnlockAsync(); WriteLine("Lock 1 has been released."); } catch (Exception ex) { WriteLine($"Failed to release lock 1 : {ex}."); session1.Close(); session2.Close(); } } private async void ReleaseLock2() { try { WriteLine("Requesting lock 2 release..."); await sessionLock2.UnlockAsync(); WriteLine("Lock 2 has been released."); } catch (Exception ex) { WriteLine($"Failed to release lock 2 : {ex}."); } finally { session1.Close(); session2.Close(); } } } }
/** * Copyright © 2021 Push Technology Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" DIFFUSION_SESSION_LOCK_T *g_session_lock; static int on_lock_acquired( const DIFFUSION_SESSION_LOCK_T *session_lock, void *context) { // lock has been acquired g_session_lock = diffusion_session_lock_dup(session_lock); return HANDLER_SUCCESS; } static int on_unlock( bool lock_owned, void *context) { // lock has been released diffusion_session_lock_free(g_session_lock); g_session_lock = NULL; return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // acquire a session lock DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = { .on_lock_acquired = on_lock_acquired }; diffusion_session_lock(session, "lock_a", lock_params); // sleep for a while sleep(5); // release the session lock DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = { .on_unlock = on_unlock }; diffusion_session_lock_unlock(session, g_session_lock, unlock_params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class SessionLocks { func run_example(url: URL, lock_name: String) { let credentials = PTDiffusionCredentials(password: "password") let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials) // establish a session PTDiffusionSession.open(with: url, configuration: configuration) { (session, error) in if (error != nil) { print("An error has occurred while establishing a session: %@", error!.localizedDescription) } else { // request a lock with name `lock_name` session?.lock(withName:lock_name) { (lock, error) in if (error != nil) { print("An error has occurred while attempting to retrieve lock '%@: %@", lock_name, error!.localizedDescription) } else { print("Successfully acquired lock '%@'", lock_name) // wait 5 seconds, then released the lock let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(5) DispatchQueue.main.asyncAfter(deadline: dispatch_time) { print("Releasing lock now") lock!.unlock { (was_owner, error) in if (error != nil) { print("An error has occurred while releasing the lock: %@", error!.localizedDescription) } else { print("Lock has been successfully released") } } } } } } } } }
Acquiring a lock
Required permissions:
Session locks are established on demand. There is no separate operation to create or destroy a named lock.
If a session attempts to acquire a lock that is not assigned, the server assigns it immediately to the session.
If a session attempts to acquire a lock that is already assigned, the server will record that the session is waiting to acquire it. When a lock is released and multiple sessions are waiting to acquire it, the server will arbitrarily assign it to one of the waiting sessions.
- Lock name
- A name for the lock.
- Lock scope (optional)
The scope of the lock.
By default, the scope is UNLOCK_ON_SESSION_LOSS, meaning that the lock will be released when the session is closed.
If the scope is set to UNLOCK_ON_CONNECTION_LOSS, the lock will be released when the session loses its current connection to the server.