Salesforce Winter’24 - Queueable Deduplication
Why the new Queueable deduping is not the killer feature it might initially appear to be
Given my previous blog posts about the lack of Mutexes in Apex and the problems this causes the introduction of Queueable deduplication would seem to solve for some of my use cases. I’ll explore the new feature and how it might be used to solve our queueing use case. Tl;dr - this new feature does not remove the need for Mutexes and I can’t actually see where it can be safely used without Mutexes or locks of some sort
In the Winter ‘24 release notes there is an entry for Ensure that Duplicate Queueable Jobs Aren’t Enqueued. This describes a new mechanism for adding a signature to a Queueable. The platform will ensure that there is only one Queueable with the same signature running at once. Any attempt to enqueue a second Queueable with the same signature throws a catchable exception. Thread safe, multi-user safe, multi app-server safe. Pretty amazing and clearly using the sort of Mutexes we might want for our own code behind the scenes.
However this is not actually a good enough primitive to build on. If we start with our queue example. We want to be able to process a queue of items asynchronously. The queue is represented by sObject records but that is not too important here. The semantics we expect are that we enqueue items. If there is not a process running to process the queue we start one. Otherwise we do not. At the end of the Queueable that processes the items we check for more items to process and if there are more we enqueue another Queueable. We need some for of locking to prevent race conditions around the enqueue and check for more items to process steps.
If we use the new Queueable method it might seem that we can use this to ensure that there is only one Queueable running. But there is still a nasty race condition around the enqueue and check steps. If our Queueable is suspended after it has checked for more items to process but before the Queueable actually ends and another process can enqueue in this period we have a problem (no new Queueable will be started or chained so these items will sit unprocessed). So even with this is nice new API we still need our own locks.
But this is a more general problem. Even without the queueing we can consider the general use case. When we are enqueueing to perform some work we are going to be doing that in response to some sort of event. If another event happens whilst the queueable is still running perhaps we don’t want to run two processes in parallel (perhaps they will overwrite each other or cause locking). But we still want the second event to be processed: there is no guarantee that the already running Queueable will pick up changes from our new event. So we need a queue if we are going to use this new method for just about anything. Which means we need Mutexes.
So I have to conclude this new API is not much use on it’s own and if you implement your own locks you don’t need it.
Implementing Mutexes using SObjects
In the previous posts I explored the need for platform-level Mutexes and what adding these might mean for the platform. In this post I’ll start exploring some of the complexity of implementing Mutexes using the available tools on the platform (basically SObjects). I’ll start with a naive implementation, identify some problems and then try and find some solutions.
Before I get started what sort of use case am I trying to solve. Let’s imagine we have a workload that can be broken down into a series of tasks or jobs that have to be processed in a FIFO manner system wide. That is regardless of which user we want a single unified queue that is processed single threaded. This might be the case where we are syncing to an external platform for example.
In order to ensure these operations we will expose an enqueue API that looks something like this:
public interface Queue {
void enqueue(Callable task);
}
When enqueue is called the task is written to the queue (serialised to an SObject). Only one “thread” can access (read or write) the queue at a time so internally in enqueue we would need a Mutex. If there is Queueable processing the Queue then nothing else is done in enqueue. If there is not a Queueable then one is started. So another Mutex may well be required here.
Our Mutex API would look like this:
public class Mutexes {
static boolean lock(String name) {}
static void unlock(String name) {}
static void maintain(String name) {}
}
As we require Mutexes to persist across contexts (and be lockable between contexts) the most obvious option is to use SObjects. Assume we create a Mutex SObject. We can use the standard Name field to represent the name of the lock. We will need a Checkbox to represent the status of the lock. And we will need a Datetime to store the last update time on the lock.
Given this we might assume the lock method could look like this:
static boolean lock(String name) {
Mutex__c m = MutexSelector.selectByNameForUpdate(name); // Assume we have a selector class
if (m == null) {
m = new Mutex__c(
Name = name,
Status__c = ‘Locked’,
Last_Updated__c = System.now()
);
insert m;
return true;
}
if (m.Status__c == ‘Locked’) {
return false;
}
m.Status__c = ‘Locked’;
m.Last_Updated__c = System.now();
update m;
return true;
}
Unfortunately this has some issues. One issue is that if two threads both call lock at the same time when there is no existing lock two new Mutex__c objects will be created when there should only be one. There are various options to solve this. One would be to have a custom field for Name that is unique. We could try/catch around the insert and return false if we failed to insert.
The next issue is that it is possible for a lock to be locked and never unlocked. We only expect locks to be held for as long as a context takes to run. In the case of chains of Queueables we have the maintain method to allow the lock to be maintained for longer. But if a context hits a limits exception and does not unlock then no other process will ever be able to lock the lock
The way round this is to allow locking of locked Mutexes if the Last_Updated__c is old enough (this amount of time could be configured). This will prevent locks being held for and processes stalling.
unlock and maintain are relatively simple methods that will similarly use for update to hold locks for the duration of their transactions.
This whole thing feels like a fairly fragile implementation that would be far better served at a platform level
Apex Syntactic Sugar Enabled by Mutexes
What improvements might we see in Apex if Mutexes were implemented? Explore some of the possibilities in this post
In the last post we looked at the possible API for a Mutex object and how that could be used to protect access to a queue/executor model. This required a lot of boilerplate code. What if the language/compiler were enhanced to take advantage of these Mutex objects simplifying developers life.
On Java there is the concept of synchronised methods (and blocks). What might this look like on Apex? In Java synchronised is a keyword and is used on methods like this
public synchronised void myMethod() {
}
This means that only one thread can be executing this method at a time. Access to this method is synchronised by a Mutex without any further work by the developer. This is roughly analogous to
public void myMethod() {
Mutex m = new Mutex(‘myMethod’);
m.lock();
…
m.unlock();
}
It’s easy to imagine the exact same syntax and semantics being ported to Apex. However lessons from the Java world should inform any Apex implementation. Many times in Java this is too inflexible. A couple of improvements we might want:
Ability to synchronise across multiple methods (to share the same mutex across multiple methods)
Some flexibility in the mutex to still control parallelism but allow multiple unrelated executions to run
The use case for the first is pretty clear. We might have multiple methods that all access the same protected resource. Access to any of these methods needs to be synchronised behind the same Mutex
If we change the synchronised directive from a modifier to an annotation we could allow for this. And better maintain “normal” Apex code compatibility.
@synchronised(name=“queue”)
public void method1() {}
@synchronised(name=“queue”)
public void method2() {}
Here we have two methods that share the same Mutex so only one thread can execute either of these methods at any points in time.
This would expand to something like this in the compiler
public void method1() {
Mutex m = new Mutex(‘queue’);
m.lock();
…
m.unlock();
}
public void method2() {
Mutex m = new Mutex(‘queue’);
m.lock();
…
m.unlock();
}
But what if there were some circumstances where unrelated items could run in parallel. Say we have different regions that don’t interact with each other. Well we could allow variable interpolation in the Mutex naming to use a method argument like this
@synchronised(name=“queue-${region}”)
public void method1(String region) {}
This would expand to
public void method1(String region) {
Mutex m = new Mutex(String.format(‘queue-$0’, new List<Object>{region}));
m.lock();
…
m.unlock();
}
This would allow access to different regions in parallel but synchronise access to each region
This same annotation could also be allowed on anonymous blocks of code
Hopefully this gives a flavour of the what Salesforce could choose to enable to boost developer productivity if the initial investment in Mutexes was made.
Theoretic Mutex API and Use
What might a Mutex API in Salesforce look like? How could it be used? Indulge my imagination as I look at what this might look like
In the last post I outlined the need for Mutexes for Apex. But what might this look like if implemented? In this post I’ll set out a possible API and then show how it could be used.
Before we go any further I should note this is a thought exercise. I have no knowledge of any Mutex API coming. I have no reason to believe this specific API is going to be implemented. If Salesforce happen to add Mutexes to Apex and it happens to look anything like this then I just got lucky!
System.Mutex {
// Creates an instance of Mutex. Every call to the constructor with the same name returns the same object
Mutex(String name);
// Returns true if the named Mutex is already locked
boolean isLocked();
// Attempts to lock the Mutex. If the Mutex is unlocked locks and returns true. If the Mutex is locked waits at most timeout milliseconds for it to unlock. Throws an Exception if the call has to wait longer than timeout milliseconds.
// If locked and has been locked for a long time (platform defined) without extendLock will allow locking even though locked (to allow for limits exceptions causing unlock to fail)
void lock(Long timeout);
// Extends a lock preventing timeout allowing locking of a locked lock
void extendLock();
// Release a lock
void unlock()
}
There are a few interesting things to note about this API. Mutexes are named. Only one Mutex with the same name can exist.
Attempting to lock a lock has some interesting behaviour. A timeout is passed to the lock method. If the lock cannot be locked in this time period then an exception is thrown. This may be unnecessary - the platform could enforce this instead. In fact it might have to depending on what conditions are placed around the lock method. If DML is allowed before the lock method is called then the timeout would have to be very short or possibly zero. Most likely DML would not be allowed before calling lock and then a timeout can be allowed without causing transaction locking issues.
However due to the nature of the platform there would still be concerns around locks getting locked and never unlocked. If a transaction locks a lock, does some DML then unlocks the lock all is good - a sensitive resource is guarded from concurrent access by a lock. But what if the transaction hits a governor limit and gets terminated? The lock would remain locked. For normal DML we’d expect that the transaction rolls back (or actually was never committed to the database). Locks would have to operate differently. They would have to actually lock in a persistent fashion whenever lock is called.
To protect against locks staying locked some timeout on the length of locks is also required. The system can manage this behind the scenes. However long running processes in chains of Queueables would have to be able to keep a lock locked. Hence the extendLock method.
The isLocked method would have to obey these rules too. Or a canLock method would have to be provided which could result in isLocked and canLock both returning true. For this we’ll assume isLocked returns false if the lock times out without being extended or unlocked.
Now that we have a minimal API that takes the platform foibles into account what could we do with it? Let’s imagine a not too uncommon scenario. We have some sort of action that we want to perform in a single threaded manner. The exact operation is not important. Even if multiple users at the same time are enqueuing actions the actions have to be performed from a single threaded queue in a FIFO manner.
We want to process in the background. We want to use as modern Apex as we can so we’ll use Queueables. When an action is enqueued if there is no Queueable running we should write to the queue (an sObject) and start a Queueable. If there is a Queueable running we write to the queue and don’t start a Queueable. When a Queueable ends if there are unprocessed items it chains another Queueable. If there are no more items the chain ends.
There are multiple possible race conditions here. Multiple enqueues could happen at the same time. Only one should start the Queueable. Enqueue could happen just as a Queueable is ending. At this point the enqueue could think a Queueable is running and the Queueable might think there are no actions waiting. Resulting in items in the queue but no process. A Mutex can solve all this!
global class FIFOQueue {
// Enqueue a list of QueueItems__c (not defined here for brevity) and process FIFO
global void enqueue(String queueName, List<QueueItems__c> items) {
// All queue access must be protected
Mutex qMut = new Mutex(queueName+’queue’); // Intentionally let this throw if it times out
qMut.lock(TIMEOUT);
Mutex pMut = new Mutex(queueName+’_process’);
boolean startQueueable = false;
if (!pMut.isLocked) {
// This is the only place this Mutex gets locked and is protected by the above Mutex. Only 1 thread can be here so only 1 process can ever get started
pMut.lock(TIMEOUT);
startQueueable = true;
}
// All Mutex locking done. Can do DML now
insert items;
if (startQueueable) {
System.enqueueJob(new FIFOQueueRunner(), 0);
}
}
// Queueable inner class to execute the item
private class FIFOQueueRunner implements Queueable {
public void execute(QueueableContext context) {
Mutex qMut = new Mutex(queueName+’queue’); // Intentionally let this throw if it times out
Mutex pMut = new Mutex(queueName+’_process’);
pMut.extendLock();
qMut.lock(TIMEOUT);
// Read the items from the queue to process
qMut.unlock();
// Process the items however is necessary
qMut.lock(TIMEOUT);
// Update the processed items as necessary and read if there are more items to process
If (moreItemsToProcess) {
System.enqueueJob(new FIFOQueueRunner(), 0);
} else {
pMut.unlock();
}
qMut.unlock();
}
}
}
This code is far from perfect. In the real world we’d probably use a Finalizer to do the update of the queueable items allowing for us to mark them as errors if necessary and enqueue again from the finalizer. However hopefully this gives an idea of how Mutexes could be used on the platform.
It is possible to build Mutexes out of sObjects, forUpdate SOQL and a lot of care (and no doubt quite a bit of luck at runtime). But they always feel like the luck might run out. Platform Mutexes would remove this fear. If you think Mutexes are a good idea vote for this Idea on Ideas Exchange.
The Need for Platform Mutexes
Why does Apex need Mutexes?
Apex is a Java like language but many of the things a Java developer would take for granted are missing. One of the biggest things I noticed when I came to the platform was the lack of a Thread class, Executors, Promises, async/await etc. As you work through the Apex Trailhead modules you eventually do come to async Apex based on Batch, Future and Queueable.
These are somewhat higher level abstractions than threads and the lack of any synchronisation mechanisms (beyond the database transaction ones) leads to the initial belief that these are not needed on the platform. However that is not the case. There are definitely cases where access to resources has to be limited to a single thread of execution.
It is also tempting to say that these challenges are only present when one of these methods for async programming is used. However that ignores the intrinsically multi-threaded nature of the platform. The platform runs on app servers that are multi-threaded. Each org is running on multiple app servers. So each org is capable of supporting many threads of execution. Multiple users may be performing actions at the same time, perhaps interacting with the same objects in parallel. It’s not hard to see how synchronous code in these cases may still result in race conditions that could result in problems.
An example might be a parent-child kind of object model where there is Apex code providing rollups from the children objects. A simple pattern would be to read the parent value, add the value from a new object and write it back to the parent. There is a clear race condition in this simple pattern when two users create children at the same time.
Generally we don’t see issues like this - it only happens with particular designs and implementation patterns. However once async programming is used limiting access to specific objects or resources (like API connections to external systems) may be required. On other platforms the normal way of doing this would be to use a mutex or a semaphore. Sometimes a mutex could be called a lock. Some languages even provide syntactic sugar to do this - Java has synchronised blocks for example.
Apex has no such primitives. Apex developers therefore end up trying to implement thee using tools the platform does provide. Generally this takes the form of a Lock sObject that is accessed using for update to hold database locks for the life of the transaction. As we’ll explore in the next blog post there are all sorts of gotchas with this. The least of which is that you have to use DML to acquire the lock. Which does not interact well with callouts.
What if Apex did provide a Mutex object? Well I raised an Idea to see if we could get this very thing implemented. I’d really appreciate if you could vote for this Idea.
What might this look like in code? Let’s take a look
public void lockedMethod() { Mutex mut = new Mutex('my-name'); try{ mut.lock(1000); // Do thread sensitive stuff mut.unlock(); // Do more stuff that is not thread sensitive } catch (MutexTimeoutExection e) { System.debug('Mutex Timed Out') } }
As you can see this would offer developers an easy way to achieve single-threaded access to some resource protected by some sort of named Mutex. Timeouts would prevent waiting forever for the lock. The semantics of what is going on would have to be well defined. lock/unlock of a Mutex would have to happen at that point in the code - not at the end of the transaction like DML on sObjects. More than this, in this model, unlocking a Mutex would, in fact, have to commit any DML from within the mutex lock. Otherwise another thread could write to that object before this transaction commits.
A more platform native approach would be to not have the unlock method and simply release all locked Mutexes when the transaction ends. However this would prevent an important use case from being possible. Chains of Queueables (or Batches if you really want to) might be used to process data and the entire process should be under a single lock. This requires explicit lock/unlock without the Mutex being automatically unlocked at the end of the transaction
Hopefully this goes some way to justifying why locking may be required. In future posts we’ll look at implementing locks using sObjects and what a more fully fleshed out Mutex object could look like and the syntactic sugar this might enable to be added to Apex