Hi,
We had a requirement to listen for every modification of an asset and send the asset details to the client API. Since each modification generates multiple events, we added a map to check if the path already exists; if it does, the API should not be triggered. However, sometimes the map fails to update due to multiple threads running concurrently.
private static final ConcurrentHashMap<String, Long> processedPaths = new ConcurrentHashMap<>();
For the event path(sling:members node change) - thread id:185
/content/dam/projects/product-images/mixedmediasets/img-1/jcr:content/related/s7Set/sling:members
The processedPaths map contains:
{ /content/dam/projects/product-images/temporary-imageset-folder/img-2 = 1741863896578 } - ( old data- previous executed gtin)
During the if condition check, it verifies whether the event path- img-1 exists in processedPaths and time is > or < 30sec.
(Checking for path: /content/dam/projects/product-images/mixedmediasets/img-1, lastProcessedTime: null, currentTime: 1741865497219).
Since it does not contain img-1 hence last processedTime is null , the code proceeds to check if the timestamp for existing map entry (i.e. processed GTINs) is more than 30 seconds old.
If true, then it removes the existing entry i.e img-2>30sec from processedPaths keeping latest in record i.e. img-1 as new entry.
Logs shows old GTIN got removed :
(Removing entry: /content/dam/projects/product-images/temporary-imageset-folder/img-2, lastProcessedTime: 1741863896578).
Then, it adds the new event path with the current timestamp as below:
{ /content/dam/projects/product-images/mixedmediasets/img-1 = 1741865497247 } and trigger the sling job and call glass API.
-----------------------------------------------------------------------------------------------------------------------------------
Again, For the event path(metadata) new thread id:204
/content/dam/projects/product-images/mixedmediasets/img-1/jcr:content/metadata
processedPaths does not get updated with the new resourcePath. It still contains the previous entry
( Here is the issue as map is thread safe it should have updated data but still contain old data even it was updated in previous thread)
Logs shows processed path as:
{ /content/dam/projects/product-images/temporary-imageset-folder/img-2 = 1741863896578 }
As a result, when the if condition checks whether /content/dam/projects/product-images/mixedmediasets/img-1 exists in processedPaths, the condition fails. Consequently, the Sling job is triggered again and API called again.
how to handle this? We have made the add and remove to map synchronized.
Thank you.
Solved! Go to Solution.
Topics help categorize Community content and increase your ability to discover relevant content.
Views
Replies
Total Likes
Hi @Keerthana_H_N,
I would recommend the following approach that will utilize thread-safe implementations and guarantee from AEM for code execution.
Step 1: Workflow launcher
Use Workflow launcher to listen node modification and trigger Workflow with one custom step. It will be guaranteed to be called one time.
Step 2: Workflow
Create workflow with single workflow process. This workflow process should take asset path and put it to processing queue that will ensure thread-safety.
Step 3: Processing queue
Below you can find an example of code. This approach ensures:
import java.util.Queue;
import java.util.concurrent.*;
public class UniquePathQueue {
private final Queue<String> queue = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, Boolean> pathSet = new ConcurrentHashMap<>();
// Add path only if it's not already present
public void addPath(String path) {
if (pathSet.putIfAbsent(path, Boolean.TRUE) == null) { // Ensures unique entry
queue.offer(path);
}
}
// Retrieve and remove the next path
public String takePath() {
String path = queue.poll();
if (path != null) {
pathSet.remove(path); // Remove from tracking set
}
return path;
}
// Check if queue is empty
public boolean isEmpty() {
return queue.isEmpty();
}
}
ConcurrentHashMap
ConcurrentLinkedQueue
Thread-Safety
No Duplicate Paths
However, it's not all. You need to create a consumer of this queue.
Step 4: Queue path consumer
This consumer should check every N seconds your queue for items and start Sling Job to ensure that your path will be guaranteed to be processed.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PathConsumer {
private final UniquePathQueue pathQueue;
private final int sleepTimeSeconds;
private final JobManager jobManager;
private volatile boolean running = true;
private Thread consumerThread;
public PathConsumer(UniquePathQueue pathQueue, int sleepTimeSeconds, JobManager jobManager) {
this.pathQueue = pathQueue;
this.sleepTimeSeconds = sleepTimeSeconds;
this.jobManager = jobManager;
}
public void start() {
consumerThread = new Thread(() -> {
while (running) {
String path = pathQueue.takePath();
if (path != null) {
jobManager.addJob("my/custom/job", Map.of("damPath", path));
} else {
try {
Thread.sleep(sleepTimeSeconds * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
System.out.println("Consumer stopped.");
});
consumerThread.start();
}
public void stop() {
running = false;
if (consumerThread != null) {
consumerThread.interrupt();
}
}
}
Feel free to make changes in implementation. I just wanted to described an alternative concept.
Best regards,
Kostiantyn Diachenko.
Before we dive into the details of your code and potential fixes, I’d like to ask if you have considered using a workflow launcher with a custom workflow step implementation. If you set the launcher to trigger on the "Modified" node type, I believe the event will trigger only once. This way, you can ensure that the event corresponds to an asset that has not already been processed by your code (which is responsible for calling the external API) without worrying about the duplicated iterations.
I'd prefer this solution because it involves a simple code move into the WF implementation, rather than delving into programming topics like normalizing keys, using atomic operations or considering a time-based cache.
Hi @giuseppebag , We did try workflow launcher with our requirement it was anyway calling the job twice. Hence we are using the listener to alter it based on our requirement to trigger the job once.
Hi @Keerthana_H_N ,
The issue here seems to be related to concurrency and visibility across multiple threads. Even though ConcurrentHashMap is thread-safe, updates made in one thread may not be immediately visible to another thread due to Java memory model optimizations.
Possible Solutions:
1. Use compute method of ConcurrentHashMap (Recommended)
Instead of manually checking and updating the map using if-else, use the compute method, which ensures atomicity:
processedPaths.compute(eventPath, (key, lastProcessedTime) -> {
long currentTime = System.currentTimeMillis();
if (lastProcessedTime == null || (currentTime - lastProcessedTime) > 30000) {
return currentTime; // Update the timestamp
}
return lastProcessedTime; // Keep the old value
});
This ensures atomic updates without needing explicit synchronization.
2. Use synchronized block (Less Efficient)
If you have already synchronized add/remove operations, ensure you synchronize both read & write operations.
synchronized (processedPaths) {
Long lastProcessedTime = processedPaths.get(eventPath);
long currentTime = System.currentTimeMillis();
if (lastProcessedTime == null || (currentTime - lastProcessedTime) > 30000) {
processedPaths.put(eventPath, currentTime);
}
}
Drawback: Synchronizing the entire block can cause performance bottlenecks
3. Use Caffeine Cache (Efficient Alternative)
If you are worried about memory usage and automatic cleanup, consider using Caffeine Cache:
Cache<String, Long> processedPaths = Caffeine.newBuilder()
.expireAfterWrite(30, TimeUnit.SECONDS)
.build();
This automatically removes entries older than 30 seconds without manual cleanup.
Alternative Approach: Workflow Launcher
Another approach is using an AEM Workflow Launcher to trigger only once per asset modification. This ensures that duplicate modifications don’t trigger multiple API calls.
Regards,
Amit Vishwakarma
Views
Replies
Total Likes
Hi @AmitVishwakarma , we did try 1st 2 solution still it was same. We ll try
Cache<String, Long> processedPaths = Caffeine.newBuilder() .expireAfterWrite(30, TimeUnit.SECONDS) .build();
and get back to you.
When using
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>3.2.0</version>
<scope>provided</scope>
</dependency>
Failed to scan the classpath: java.util.concurrent.ExecutionException: java.lang.RuntimeException: Unknown constant pool tag
@AmitVishwakarma Can you please share import details as well?
Thanks
Hi @Keerthana_H_N,
I would recommend the following approach that will utilize thread-safe implementations and guarantee from AEM for code execution.
Step 1: Workflow launcher
Use Workflow launcher to listen node modification and trigger Workflow with one custom step. It will be guaranteed to be called one time.
Step 2: Workflow
Create workflow with single workflow process. This workflow process should take asset path and put it to processing queue that will ensure thread-safety.
Step 3: Processing queue
Below you can find an example of code. This approach ensures:
import java.util.Queue;
import java.util.concurrent.*;
public class UniquePathQueue {
private final Queue<String> queue = new ConcurrentLinkedQueue<>();
private final ConcurrentHashMap<String, Boolean> pathSet = new ConcurrentHashMap<>();
// Add path only if it's not already present
public void addPath(String path) {
if (pathSet.putIfAbsent(path, Boolean.TRUE) == null) { // Ensures unique entry
queue.offer(path);
}
}
// Retrieve and remove the next path
public String takePath() {
String path = queue.poll();
if (path != null) {
pathSet.remove(path); // Remove from tracking set
}
return path;
}
// Check if queue is empty
public boolean isEmpty() {
return queue.isEmpty();
}
}
ConcurrentHashMap
ConcurrentLinkedQueue
Thread-Safety
No Duplicate Paths
However, it's not all. You need to create a consumer of this queue.
Step 4: Queue path consumer
This consumer should check every N seconds your queue for items and start Sling Job to ensure that your path will be guaranteed to be processed.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class PathConsumer {
private final UniquePathQueue pathQueue;
private final int sleepTimeSeconds;
private final JobManager jobManager;
private volatile boolean running = true;
private Thread consumerThread;
public PathConsumer(UniquePathQueue pathQueue, int sleepTimeSeconds, JobManager jobManager) {
this.pathQueue = pathQueue;
this.sleepTimeSeconds = sleepTimeSeconds;
this.jobManager = jobManager;
}
public void start() {
consumerThread = new Thread(() -> {
while (running) {
String path = pathQueue.takePath();
if (path != null) {
jobManager.addJob("my/custom/job", Map.of("damPath", path));
} else {
try {
Thread.sleep(sleepTimeSeconds * 1000L);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
System.out.println("Consumer stopped.");
});
consumerThread.start();
}
public void stop() {
running = false;
if (consumerThread != null) {
consumerThread.interrupt();
}
}
}
Feel free to make changes in implementation. I just wanted to described an alternative concept.
Best regards,
Kostiantyn Diachenko.
@Keerthana_H_N, in my suggested solution, even duplicated events will be handled by queue that stores only unique paths. Take a look into it.
@Keerthana_H_N Did you find the suggestions helpful? Please let us know if you need more information. If a response worked, kindly mark it as correct for posterity; alternatively, if you found a solution yourself, we’d appreciate it if you could share it with the community. Thank you!
Views
Replies
Total Likes
Views
Likes
Replies