Session locks
Session locks are a way to ensure that only one session at a time can access a particular resource. For example, you can use a session lock to ensure that only one session is allowed to update a certain topic.
Session locks are a mechanism managed by the Diffusion™ server to coordinate access to shared resources among multiple sessions.
A session can acquire a lock, identified by a lock name (chosen by you to suit your application). Once a session acquires a lock, no other session can acquire the same lock.
Acquiring a lock does not automatically change anything else about a session. Locks are not linked to topics or permissions, except through your application's logic. It is up to you to design a suitable locking scheme and ensure your application implements it. For example, if you want to implement exclusive updating of a topic using a session lock, you must make sure that each session always acquires the lock and uses a lock constraint created from the lock when updating the topic.
By default, a lock is released when the session owning it closes. Alternatively, when acquiring a lock, a session can specify that the lock will be released if connection to the server is lost. This is done using a scope parameter.
A session can also explicitly release a lock.
const lock = await session.lock('lock_name', diffusion.SessionLockScope.UNLOCK_ON_CONNECTION_LOSS);
console.log('Acquired session lock');
setTimeout(async () => {
await lock.unlock();
console.log('Lock has been released');
}, 1000);
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
using System;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
using PushTechnology.ClientInterface.Client.Callbacks;
using PushTechnology.ClientInterface.Client.Factories;
using PushTechnology.ClientInterface.Client.Features;
using PushTechnology.ClientInterface.Client.Features.Topics;
using PushTechnology.ClientInterface.Client.Session;
using PushTechnology.ClientInterface.Client.Topics;
using PushTechnology.ClientInterface.Client.Topics.Details;
using static System.Console;
namespace PushTechnology.ClientInterface.Example {
/// <summary>
/// Client implementation that demonstrates session locks.
/// </summary>
public sealed class SessionLocks
{
private ISession session1, session2;
private ISessionLock sessionLock1, sessionLock2;
private static string LOCK_NAME = "lockA";
public SessionLocks(string serverUrl)
{
session1 = Diffusion.Sessions.Principal("control").Password("password")
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open(serverUrl);
session2 = Diffusion.Sessions.Principal("control").Password("password")
.CertificateValidation((cert, chain, errors) => CertificateValidationResult.ACCEPT)
.Open(serverUrl);
WriteLine("Sessions 1 and 2 have been created.");
AcquireLockSession1();
}
private async void AcquireLockSession1()
{
try
{
WriteLine("Requesting lock 1...");
sessionLock1 = await session1.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS);
WriteLine("Lock 1 has been acquired.");
AcquireLockSession2();
Thread.Sleep(1000);
ReleaseLock1();
}
catch (Exception ex)
{
WriteLine($"Failed to get lock 1 : {ex}.");
session1.Close();
session2.Close();
}
}
private async void AcquireLockSession2()
{
try
{
WriteLine("Requesting lock 2...");
sessionLock2 = await session2.LockAsync(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS);
WriteLine("Lock 2 has been acquired.");
Thread.Sleep(1000);
ReleaseLock2();
}
catch (Exception ex)
{
WriteLine($"Failed to get lock 2 : {ex}.");
session1.Close();
session2.Close();
}
}
private async void ReleaseLock1()
{
try
{
WriteLine("Requesting lock 1 release...");
await sessionLock1.UnlockAsync();
WriteLine("Lock 1 has been released.");
}
catch (Exception ex)
{
WriteLine($"Failed to release lock 1 : {ex}.");
session1.Close();
session2.Close();
}
}
private async void ReleaseLock2()
{
try
{
WriteLine("Requesting lock 2 release...");
await sessionLock2.UnlockAsync();
WriteLine("Lock 2 has been released.");
}
catch (Exception ex)
{
WriteLine($"Failed to release lock 2 : {ex}.");
}
finally
{
session1.Close();
session2.Close();
}
}
}
}
/*******************************************************************************
* Copyright (C) 2018, 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*******************************************************************************/
package com.pushtechnology.client.sdk.manual;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.pushtechnology.diffusion.client.Diffusion;
import com.pushtechnology.diffusion.client.features.Topics;
import com.pushtechnology.diffusion.client.features.Topics.UnsubscribeReason;
import com.pushtechnology.diffusion.client.features.Topics.ValueStream;
import com.pushtechnology.diffusion.client.session.Session;
import com.pushtechnology.diffusion.client.session.Session.SessionLock;
import com.pushtechnology.diffusion.client.session.Session.SessionLockScope;
import com.pushtechnology.diffusion.client.topics.details.TopicSpecification;
import com.pushtechnology.diffusion.client.types.PathPermission;
/**
* An example of a client that uses session locks to coordinate actions with
* other sessions.
*
* <p>
* In this example, a single session receives and processes updates from the
* topic {@code topicA}. Each client instance running this code creates a
* session and competes for the session lock {@code lockA}. The session that is
* assigned the session lock will subscribe to the topic and log updates.
*
* <p>
* {@link SessionLockScope#UNLOCK_ON_CONNECTION_LOSS UNLOCK_ON_CONNECTION_LOSS}
* session locks are used. If the session that owns the session lock loses its
* connection to the server, the server will reassign the lock to another
* session. This example uses a session listener to independently detect the
* connection loss, unsubscribe, unregister the stream listening for updates,
* and compete for the lock again.
*
* <p>
* The locking protocol has races documented under {@link SessionLock}. In the
* context of this example, the consequences are:
* <ul>
* <li>There may be a transient period where two sessions are subscribed to the
* topic, and both process the same update.
* <li>A session acquiring a lock may miss one or more updates that were not
* processed by the session that previously held the lock.
* </ul>
*
* <h2>Security note</h2>
*
* <p>
* To run this example, the "client" principal must be granted
* {@link PathPermission#ACQUIRE_LOCK ACQUIRE_LOCK} permission to
* {@code lockA}.
*
* @author DiffusionData Limited
* @since 6.1
*/
public class ClientUsingSessionLocks {
private static final Logger LOG =
LoggerFactory.getLogger(ClientUsingSessionLocks.class);
private static final String LOCK_NAME = "lockA";
private static final String TOPIC_PATH = "topicA";
private final Session session;
private final ValueStream<String> stream = new LogUpdates();
/**
* Construct a request handling application.
*
* @param serverURL url of the server to connect to
*/
public ClientUsingSessionLocks(String serverURL) {
// The "client" principal must have ACQUIRE_LOCK permission, see note in
// class Javadoc.
session = Diffusion.sessions().principal("client").password("password")
.open(serverURL);
}
/**
* Start competing for the lock.
*/
public void start() {
session.addListener((s, oldState, newState) -> {
if (newState.isClosed()) {
onLockLost();
}
});
requestLock();
}
private void requestLock() {
session.lock(LOCK_NAME, SessionLockScope.UNLOCK_ON_CONNECTION_LOSS)
.thenAccept(lock -> onLockAcquired());
}
private void onLockAcquired() {
final Topics topics = session.feature(Topics.class);
topics.subscribe(TOPIC_PATH);
topics.addStream(TOPIC_PATH, String.class, stream);
}
private void onLockLost() {
final Topics topics = session.feature(Topics.class);
// Remove the stream from the local registry. This will prevent
// processing of updates that may already be queued for this session and
// will be delivered on reconnection.
topics.removeStream(stream);
// Unsubscribe from the topic. This will not take effect until this
// session has reconnected to the server.
topics.unsubscribe(TOPIC_PATH);
// Compete for the lock again. This will not take effect until this
// session has reconnected to the server, and will be processed after
// the unsubscription.
requestLock();
}
/**
* Close the session. If the session owned the lock, the server is free to
* reassign it to another session.
*/
public void close() {
session.close();
}
/**
* Log updates received for a topic.
*/
private static final class LogUpdates extends Topics.ValueStream.Default<String> {
@Override
public void onSubscription(
String topicPath,
TopicSpecification specification) {
LOG.info("onSubscription({})", topicPath);
}
@Override
public void onUnsubscription(String topicPath,
TopicSpecification specification, UnsubscribeReason reason) {
LOG.info("onUnsubscription({})", topicPath);
}
@Override
public void onValue(
String topicPath,
TopicSpecification specification,
String oldValue,
String newValue) {
LOG.info("onValue({}, {})", topicPath, newValue);
}
}
}
/**
* Copyright © 2021 - 2023 DiffusionData Ltd.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include <stdio.h>
#include <stdlib.h>
#ifndef WIN32
#include <unistd.h>
#else
#define sleep(x) Sleep(1000 * x)
#endif
#include "diffusion.h"
DIFFUSION_SESSION_LOCK_T *g_session_lock;
static int on_lock_acquired(
const DIFFUSION_SESSION_LOCK_T *session_lock,
void *context)
{
// lock has been acquired
g_session_lock = diffusion_session_lock_dup(session_lock);
return HANDLER_SUCCESS;
}
static int on_unlock(
bool lock_owned,
void *context)
{
// lock has been released
diffusion_session_lock_free(g_session_lock);
g_session_lock = NULL;
return HANDLER_SUCCESS;
}
int main(
int argc,
char **argv)
{
const char *url = "ws://localhost:8080";
const char *principal = "control";
const char *password = "password";
CREDENTIALS_T *credentials = credentials_create_password(password);
SESSION_T *session;
DIFFUSION_ERROR_T error = {0};
// create a session, synchronously
session = session_create(url, principal, credentials, NULL, NULL, &error);
if (session == NULL)
{
printf("Failed to create session: %s\n", error.message);
free(error.message);
credentials_free(credentials);
return EXIT_FAILURE;
}
// acquire a session lock
DIFFUSION_SESSION_LOCK_PARAMS_T lock_params = {.on_lock_acquired = on_lock_acquired};
diffusion_session_lock(session, "lock_a", lock_params);
// sleep for a while
sleep(5);
// release the session lock
DIFFUSION_SESSION_LOCK_UNLOCK_PARAMS_T unlock_params = {.on_unlock = on_unlock};
diffusion_session_lock_unlock(session, g_session_lock, unlock_params);
// Sleep for a while
sleep(5);
// Close the session, and release resources and memory
session_close(session, NULL);
session_free(session);
credentials_free(credentials);
return EXIT_SUCCESS;
}
// Copyright (C) 2021 Push Technology Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
import Foundation
import Diffusion
class SessionLocks {
func run_example(url: URL, lock_name: String) {
let credentials = PTDiffusionCredentials(password: "password")
let configuration = PTDiffusionSessionConfiguration(principal: "control", credentials: credentials)
// establish a session
PTDiffusionSession.open(with: url,
configuration: configuration) { (session, error) in
if (error != nil) {
print("An error has occurred while establishing a session: %@",
error!.localizedDescription)
}
else {
// request a lock with name `lock_name`
session?.lock(withName:lock_name) { (lock, error) in
if (error != nil) {
print("An error has occurred while attempting to retrieve lock '%@: %@",
lock_name,
error!.localizedDescription)
}
else {
print("Successfully acquired lock '%@'", lock_name)
// wait 5 seconds, then released the lock
let dispatch_time = DispatchTime.now() + DispatchTimeInterval.seconds(5)
DispatchQueue.main.asyncAfter(deadline: dispatch_time) {
print("Releasing lock now")
lock!.unlock { (was_owner, error) in
if (error != nil) {
print("An error has occurred while releasing the lock: %@",
error!.localizedDescription)
}
else {
print("Lock has been successfully released")
}
}
}
}
}
}
}
}
}
Acquiring a lock
Required permissions:
Session locks are established on demand. There is no separate operation to create or destroy a named lock.
If a session attempts to acquire a lock that is not assigned, the server assigns it immediately to the session.
If a session attempts to acquire a lock that is already assigned, the server will record that the session is waiting to acquire it. When a lock is released and multiple sessions are waiting to acquire it, the server will arbitrarily assign it to one of the waiting sessions.
- Lock name
- A name for the lock.
- Lock scope (optional)
The scope of the lock.
By default, the scope is UNLOCK_ON_SESSION_LOSS, meaning that the lock will be released when the session is closed.
If the scope is set to UNLOCK_ON_CONNECTION_LOSS, the lock will be released when the session loses its current connection to the server.