Session locks
Session locks are a way to ensure that only one session at a time can access a particular resource. For example, you can use a session lock to ensure that only one session is allowed to update a certain topic.
Session locks are a mechanism managed by the Diffusion™ server to coordinate access to shared resources among multiple sessions.
A session can acquire a lock, identified by a lock name (chosen by you to suit your application). Once a session acquires a lock, no other session can acquire the same lock.
Acquiring a lock does not automatically change anything else about a session. Locks are not linked to topics or permissions, except through your application's logic. It is up to you to design a suitable locking scheme and ensure your application implements it. For example, if you want to implement exclusive updating of a topic using a session lock, you must make sure that each session always acquires the lock and uses a lock constraint created from the lock when updating the topic.
By default, a lock is released when the session owning it closes. Alternatively, when acquiring a lock, a session can specify that the lock will be released if connection to the server is lost. This is done using a scope parameter.
A session can also explicitly release a lock.
const lock = await session.lock('lock_name', diffusion.SessionLockScope.UNLOCK_ON_CONNECTION_LOSS); console.log('Acquired session lock'); setTimeout(async () => { await lock.unlock(); console.log('Lock has been released'); }, 1000);
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ using System; using System.Collections.Generic; using System.Threading; using System.Threading.Tasks; using PushTechnology.ClientInterface.Client.Callbacks; using PushTechnology.ClientInterface.Client.Factories; using PushTechnology.ClientInterface.Client.Features; using PushTechnology.ClientInterface.Client.Features.Topics; using PushTechnology.ClientInterface.Client.Session; using PushTechnology.ClientInterface.Client.Topics; using PushTechnology.ClientInterface.Client.Topics.Details; using static System.Console; namespace PushTechnology.ClientInterface.Example { /// <summary> /// Client implementation that demonstrates session locks. /// </summary> public sealed class SessionLocks { private ISession session1, session2; private ISessionLock sessionLock1, sessionLock2; private static string LOCK_NAME = "lockA"; public SessionLocks(string serverUrl) { session1 = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); session2 = Diffusion.Sessions.Principal("control").Password("password") .CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT) .Open(serverUrl); WriteLine("Sessions 1 and 2 have been created."); AcquireLockSession1(); } private async void AcquireLockSession1() { try { WriteLine("Requesting lock 1..."); sessionLock1 = await session1.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS); WriteLine("Lock 1 has been acquired."); AcquireLockSession2(); Thread.Sleep(1000); ReleaseLock1(); } catch (Exception ex) { WriteLine($"Failed to get lock 1 : {ex}."); session1.Close(); session2.Close(); } } private async void AcquireLockSession2() { try { WriteLine("Requesting lock 2..."); sessionLock2 = await session2.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS); WriteLine("Lock 2 has been acquired."); Thread.Sleep(1000); ReleaseLock2(); } catch (Exception ex) { WriteLine($"Failed to get lock 2 : {ex}."); session1.Close(); session2.Close(); } } private async void ReleaseLock1() { try { WriteLine("Requesting lock 1 release..."); await sessionLock1.UnlockAsync(); WriteLine("Lock 1 has been released."); } catch (Exception ex) { WriteLine($"Failed to release lock 1 : {ex}."); session1.Close(); session2.Close(); } } private async void ReleaseLock2() { try { WriteLine("Requesting lock 2 release..."); await sessionLock2.UnlockAsync(); WriteLine("Lock 2 has been released."); } catch (Exception ex) { WriteLine($"Failed to release lock 2 : {ex}."); } finally { session1.Close(); session2.Close(); } } } }
/******************************************************************************* * Copyright (C) 2018, 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. *******************************************************************************/ package com.pushtechnology.client.sdk.manual; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import com.pushtechnology.diffusion.client.Diffusion; import com.pushtechnology.diffusion.client.features.Topics; import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason; import com.pushtechnology.diffusion.client.features.Topics.ValueStream; import com.pushtechnology.diffusion.client.session.Session; import com.pushtechnology.diffusion.client.session.Session.SessionLock; import com.pushtechnology.diffusion.client.session.Session.SessionLockScope; import com.pushtechnology.diffusion.client.topics.details.TopicSpecification; import com.pushtechnology.diffusion.client.types.PathPermission; /** * An example of a client that uses session locks to coordinate actions with * other sessions. * * <p> * In this example, a single session receives and processes updates from the * topic {@code topicA}. Each client instance running this code creates a * session and competes for the session lock {@code lockA}. The session that is * assigned the session lock will subscribe to the topic and log updates. * * <p> * {@link SessionLockScope#UNLOCK_ON_CONNECTION_LOSS UNLOCK_ON_CONNECTION_LOSS} * session locks are used. If the session that owns the session lock loses its * connection to the server, the server will reassign the lock to another * session. This example uses a session listener to independently detect the * connection loss, unsubscribe, unregister the stream listening for updates, * and compete for the lock again. * * <p> * The locking protocol has races documented under {@link SessionLock}. In the * context of this example, the consequences are: * <ul> * <li>There may be a transient period where two sessions are subscribed to the * topic, and both process the same update. * <li>A session acquiring a lock may miss one or more updates that were not * processed by the session that previously held the lock. * </ul> * * <h2>Security note</h2> * * <p> * To run this example, the "client" principal must be granted * {@link PathPermission#ACQUIRE_LOCK ACQUIRE_LOCK} permission to * {@code lockA}. * * @author DiffusionData Limited * @since 6.1 */ public class ClientUsingSessionLocks { private static final Logger LOG = LoggerFactory.getLogger(ClientUsingSessionLocks.class); private static final String LOCK_NAME = "lockA"; private static final String TOPIC_PATH = "topicA"; private final Session session; private final ValueStream<String> stream = new LogUpdates(); /** * Construct a request handling application. * * @param serverURL url of the server to connect to */ public ClientUsingSessionLocks(String serverURL) { // The "client" principal must have ACQUIRE_LOCK permission, see note in // class Javadoc. session = Diffusion.sessions().principal("client").password("password") .open(serverURL); } /** * Start competing for the lock. */ public void start() { session.addListener((s, oldState, newState) -> { if (newState.isClosed()) { onLockLost(); } }); requestLock(); } private void requestLock() { session.lock(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS) .thenAccept(lock -> onLockAcquired()); } private void onLockAcquired() { final Topics topics = session.feature(Topics.class); topics.subscribe(TOPIC_PATH); topics.addStream(TOPIC_PATH, String.class, stream); } private void onLockLost() { final Topics topics = session.feature(Topics.class); // Remove the stream from the local registry. This will prevent // processing of updates that may already be queued for this session and // will be delivered on reconnection. topics.removeStream(stream); // Unsubscribe from the topic. This will not take effect until this // session has reconnected to the server. topics.unsubscribe(TOPIC_PATH); // Compete for the lock again. This will not take effect until this // session has reconnected to the server, and will be processed after // the unsubscription. requestLock(); } /** * Close the session. If the session owned the lock, the server is free to * reassign it to another session. */ public void close() { session.close(); } /** * Log updates received for a topic. */ private static class LogUpdates extends Topics.ValueStream.Default<String> { @Override public void onSubscription( String topicPath, TopicSpecification specification) { LOG.info("onSubscription({})", topicPath); } @Override public void onUnsubscription(String topicPath, TopicSpecification specification, UnsubscribeReason reason) { LOG.info("onUnsubscription({})", topicPath); } @Override public void onValue( String topicPath, TopicSpecification specification, String oldValue, String newValue) { LOG.info("onValue({}, {})", topicPath, newValue); } } }
/** * Copyright © 2021 - 2023 DiffusionData Ltd. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #include <stdio.h> #include <stdlib.h> #ifndef WIN32 #include <unistd.h> #else #define sleep(x) Sleep(1000 * x) #endif #include "diffusion.h" DIFFUSION_SESSION_LOCK_T *g_session_lock; static int on_lock_acquired( const DIFFUSION_SESSION_LOCK_T *session_lock, void *context) { // lock has been acquired g_session_lock = diffusion_session_lock_dup(session_lock); return HANDLER_SUCCESS; } static int on_unlock( bool lock_owned, void *context) { // lock has been released diffusion_session_lock_free(g_session_lock); g_session_lock = NULL; return HANDLER_SUCCESS; } int main(int argc, char **argv) { const char *url = "ws://localhost:8080"; const char *principal = "control"; const char *password = "password"; CREDENTIALS_T *credentials = credentials_create_password(password); SESSION_T *session; DIFFUSION_ERROR_T error = { 0 }; // create a session, synchronously session = session_create(url, principal, credentials, NULL, NULL, &error); if(session == NULL) { printf("Failed to create session: %s\n", error.message); free(error.message); credentials_free(credentials); return EXIT_FAILURE; } // acquire a session lock DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = { .on_lock_acquired = on_lock_acquired }; diffusion_session_lock(session, "lock_a", lock_params); // sleep for a while sleep(5); // release the session lock DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = { .on_unlock = on_unlock }; diffusion_session_lock_unlock(session, g_session_lock, unlock_params); // Sleep for a while sleep(5); // Close the session, and release resources and memory session_close(session, NULL); session_free(session); credentials_free(credentials); return EXIT_SUCCESS; }
// Copyright (C) 2021 Push Technology Ltd. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and // limitations under the License. import Foundation import Diffusion class SessionLocks { func run_example(url: URL, lock_name: String) { let credentials = PTDiffusionCredentials(password: "password") let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials) // establish a session PTDiffusionSession.open(with: url, configuration: configuration) { (session, error) in if (error != nil) { print("An error has occurred while establishing a session: %@", error!.localizedDescription) } else { // request a lock with name `lock_name` session?.lock(withName:lock_name) { (lock, error) in if (error != nil) { print("An error has occurred while attempting to retrieve lock '%@: %@", lock_name, error!.localizedDescription) } else { print("Successfully acquired lock '%@'", lock_name) // wait 5 seconds, then released the lock let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(5) DispatchQueue.main.asyncAfter(deadline: dispatch_time) { print("Releasing lock now") lock!.unlock { (was_owner, error) in if (error != nil) { print("An error has occurred while releasing the lock: %@", error!.localizedDescription) } else { print("Lock has been successfully released") } } } } } } } } }
Acquiring a lock
Required permissions:
Session locks are established on demand. There is no separate operation to create or destroy a named lock.
If a session attempts to acquire a lock that is not assigned, the server assigns it immediately to the session.
If a session attempts to acquire a lock that is already assigned, the server will record that the session is waiting to acquire it. When a lock is released and multiple sessions are waiting to acquire it, the server will arbitrarily assign it to one of the waiting sessions.
- Lock name
- A name for the lock.
- Lock scope (optional)
The scope of the lock.
By default, the scope is UNLOCK_ON_SESSION_LOSS, meaning that the lock will be released when the session is closed.
If the scope is set to UNLOCK_ON_CONNECTION_LOSS, the lock will be released when the session loses its current connection to the server.