Socket closed errors in custom workflow step
I have a solution implemented that is described here: Moving assets to PIM system - HTTP API the correct choice
From a high level I have a custom workflow step that calls an API and sends the assett. This workflow is triggered from users placing the assets in to a "monitored" directory. This works fine unless the user drops a large number of files into directory all at once. Then a random amount of them fail and the failure is socket closed errors when sending the file. This causes 2 things, it either results in multiple versions of the file sent because even though it gets a socket closed exception the file did actually send or it doesn't send the file at all if it fails on the connect vs the send.
Here are the results of some logging:
15.01.2025 09:47:38.019 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Payload type: JCR_PATH
15.01.2025 09:47:38.019 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Stripping jcr content information from the path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original
15.01.2025 09:47:38.019 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Trimmed path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:38.078 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Adding file /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg to item test_10025 for attribute Box Art
15.01.2025 09:47:38.078 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload exists as an asset
15.01.2025 09:47:38.079 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload size: 709400 of type: image/jpeg
15.01.2025 09:47:38.079 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload file name: SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:38.079 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Post: POST https://*************/DfrWebApi-Dev/api/LTDEV_PIM/File/AddFileToItem HTTP/1.1
15.01.2025 09:47:38.367 *ERROR* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Socket closed
15.01.2025 09:47:38.367 *ERROR* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Failed when calling API to upload asset: Socket closed
15.01.2025 09:47:38.368 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Deactivating DFR API client connection...
15.01.2025 09:47:38.368 *ERROR* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.adobe.granite.workflow.core.job.JobHandler Error executing workflow step
com.adobe.granite.workflow.WorkflowException: Process execution resulted in an error
at com.adobe.granite.workflow.core.job.HandlerBase.executeProcess(HandlerBase.java:201) [com.adobe.granite.workflow.core:2.0.240.CQ654-B0017]
at com.adobe.granite.workflow.core.job.JobHandler.process(JobHandler.java:260) [com.adobe.granite.workflow.core:2.0.240.CQ654-B0017]
at org.apache.sling.event.impl.jobs.JobConsumerManager$JobConsumerWrapper.process(JobConsumerManager.java:502) [org.apache.sling.event:4.2.12]
at org.apache.sling.event.impl.jobs.queues.JobQueueImpl.startJob(JobQueueImpl.java:293) [org.apache.sling.event:4.2.12]
at org.apache.sling.event.impl.jobs.queues.JobQueueImpl.access$100(JobQueueImpl.java:60) [org.apache.sling.event:4.2.12]
at org.apache.sling.event.impl.jobs.queues.JobQueueImpl$1.run(JobQueueImpl.java:229) [org.apache.sling.event:4.2.12]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.adobe.granite.workflow.WorkflowException: Failed to load asset: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg to item: test_10025
at com.cds.core.workflows.UploadAssetToPIM.execute(UploadAssetToPIM.java:82) [cds.core:1.0.1.SNAPSHOT]
at com.adobe.granite.workflow.core.job.HandlerBase.executeProcess(HandlerBase.java:195) [com.adobe.granite.workflow.core:2.0.240.CQ654-B0017]
... 8 common frames omitted
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at com.cds.core.services.DFRApi.AddFileToItem(DFRApi.java:166) [cds.core:1.0.1.SNAPSHOT]
at com.cds.core.workflows.UploadAssetToPIM.execute(UploadAssetToPIM.java:79) [cds.core:1.0.1.SNAPSHOT]
... 9 common frames omitted
Then later this happens:
15.01.2025 09:47:40.388 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Payload type: JCR_PATH
15.01.2025 09:47:40.388 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Stripping jcr content information from the path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original
15.01.2025 09:47:40.388 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Trimmed path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:40.441 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Adding file /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg to item test_10025 for attribute Box Art
15.01.2025 09:47:40.441 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload exists as an asset
15.01.2025 09:47:40.442 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload size: 709400 of type: image/jpeg
15.01.2025 09:47:40.442 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload file name: SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:40.442 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Post: POST https://********/DfrWebApi-Dev/api/LTDEV_PIM/File/AddFileToItem HTTP/1.1
15.01.2025 09:47:40.543 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Deactivating DFR API client connection...
15.01.2025 09:47:40.564 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM UploadAssettoPIM finished
So that example is of one where we get a duplicate file sent.
Here are what I believe are the relevant code snippets:
if(StringUtils.equals(workItem.getWorkflowData().getPayloadType(), "JCR_PATH")) {
// Get payload node out of AEM in preparation for sending the data to PIM
String assetPath = workItem.getWorkflowData().getPayload().toString();
// When the workflow launcher launches the workflow automatically it adds jcr:content to the path
// this needs to be stripped
int indexIdx = assetPath.indexOf("/jcr:content");
if(indexIdx != -1) {
log.info("Stripping jcr content information from the path: " + assetPath);
assetPath = assetPath.substring(0, indexIdx);
log.info("Trimmed path: " + assetPath);
}
ResourceResolver resourceResolver = wfSession.adaptTo(ResourceResolver.class);
Resource payloadResource = resourceResolver.getResource(assetPath);
Asset payload = payloadResource.adaptTo(Asset.class);
/*
* The item number is the folder in which the file asset is in. Parse that out
*/
Path assetPathInfo = Paths.get(assetPath);
Path assetDirectory = assetPathInfo.getParent();
if(assetDirectory != null) {
itemNumber = assetDirectory.getFileName().toString();
log.debug("Item Number of asset to load: " + itemNumber);
}
if(itemNumber != null && itemNumber.isEmpty() == false)
{
if(api.connect(processArgs.get(TOKENBASEURL_KEY), processArgs.get(APIBASEURL_KEY), processArgs.get(CATALOG_KEY), processArgs.get(APISECRET_KEY))) {
try {
// Call PIM
api.AddFileToItem(itemNumber, null, "Part.0", processArgs.get(ATTR_KEY), payload);
} catch(IOException ex) {
log.error("Failed when calling API to upload asset: " + ex.getMessage());
throw new WorkflowException("Failed to load asset: " + assetPath + " to item: " + itemNumber, ex);
} finally {
api.close();
}
} else {
log.error("Failed to connect to API");
throw new WorkflowException("Failed to connect to API");
}
}
List<Route> routes = wfSession.getRoutes(workItem, false);
wfSession.complete(workItem, routes.get(0));
}
log.info("UploadAssettoPIM finished");Here are the connect and AddFileToItem methods:
@9944223
public boolean connect(String baseTokenUrl, String baseApiUrl, String catalogName, String apiSecret) {
boolean success = true;
try {
_client = httpClientBuilderFactory.newBuilder().build();
_tokenUrl = baseTokenUrl + "/identity/connect/token";
_apiUrl = baseApiUrl + "/api/" + catalogName + "/";
HttpPost post = new HttpPost(_tokenUrl);
// Encode client credentials as Base64
if(apiSecret != null) {
clientSecret = apiSecret;
}
String credentials = clientId + ":" + clientSecret;
String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));
// Set headers
post.setHeader("Authorization", "Basic " + encodedCredentials);
post.setHeader("Content-Type", "application/x-www-form-urlencoded");
post.setHeader("Accept", "application/json");
// Create the request body with the necessary parameters
StringBuilder requestBody = new StringBuilder();
requestBody.append("grant_type=password");
requestBody.append("&scope=").append(URLEncoder.encode(scope, "UTF-8"));
requestBody.append("&username=").append(URLEncoder.encode("test", "UTF-8"));
requestBody.append("&password=").append(URLEncoder.encode("user", "UTF-8"));
// Set the request entity (body)
HttpEntity entity = new StringEntity(requestBody.toString(), StandardCharsets.UTF_8);
post.setEntity(entity);
// Execute the request
try (CloseableHttpResponse response = _client.execute(post)) {
int statusCode = response.getStatusLine().getStatusCode();
String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
if (statusCode == 200) {
log.debug("Response: " + responseBody);
ObjectMapper mapper = new ObjectMapper();
JsonNode accessTokenData = mapper.readTree(responseBody);
// Example: {"access_token":"your_access_token","token_type":"bearer","expires_in":3600}
// For simplicity, let's assume responseBody is the token (you would actually parse this)
_accessToken = accessTokenData.get("access_token").asText();
} else {
log.error("Error fetching token, status code: " + statusCode);
log.error("Response: " + responseBody);
success = false;
}
}
} catch(IOException ex) {
log.error("Failed to connect to DFR identity server and obtain access token");
log.error(ex.getMessage());
success = false;
} catch(Exception ex) {
log.error(ex.getMessage());
success = false;
}
if(success == false) {
if(_client != null) {
try {
_client.close();
} catch (IOException e) {
//
log.error(e.getStackTrace().toString());
}
_client = null;
}
}
return success;
}
@9944223
public ItemKeyInfo AddFileToItem(String itemNumber, String revision, String qualifier, String attributeName, Asset payload) throws IOException, WorkflowException {
if(_client == null) {
throw new IOException("DFRAPI client is not connected. Please call connect before attempting to execute an API call");
}
try {
log.info("Adding file " + payload.getPath().toString() + " to item " + itemNumber + " for attribute " + attributeName);
String addFileUrl = _apiUrl + "File/AddFileToItem";
HttpPost post = new HttpPost(addFileUrl);
post.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + _accessToken);
// Build the multi-part request entity
MultipartEntityBuilder builder = MultipartEntityBuilder.create();
builder.addTextBody("itemNumber", itemNumber);
if(revision != null && StringUtils.isNotEmpty(revision))
builder.addTextBody("revision", revision);
builder.addTextBody("qualifier", qualifier);
builder.addTextBody("attribute", attributeName);
InputStream dataStream = null;
if (payload != null) {
log.info("Payload exists as an asset");
dataStream = payload.getOriginal().getStream();
log.info("Payload size: " + payload.getOriginal().getSize() + " of type: " + payload.getOriginal().getMimeType());
}
if(dataStream != null) {
try {
String payloadFileName = Paths.get(payload.getPath()).getFileName().toString();
log.info("Payload file name: " + payloadFileName);
builder.addBinaryBody("file", dataStream, ContentType.APPLICATION_OCTET_STREAM, payloadFileName);
HttpEntity multipart = builder.build();
post.setEntity(multipart);
//-- Setup timeouts --//
log.info("Setting up timeouts: 10 seconds connection, 120 seconds socket");
RequestConfig requestConfig = RequestConfig.custom()
.setConnectTimeout(10000) // Connection timeout in milliseconds
.setSocketTimeout(120000) // Socket timeout in milliseconds
.build();
post.setConfig(requestConfig);
log.info("Post: " + post.toString());
try (CloseableHttpResponse response = _client.execute(post)) {
int statusCode = response.getStatusLine().getStatusCode();
HttpEntity responseEntity = response.getEntity();
String responseBody = EntityUtils.toString(responseEntity, StandardCharsets.UTF_8);
EntityUtils.consume(responseEntity);
if (statusCode == 200) {
log.debug("Response: " + responseBody);
ObjectMapper mapper = new ObjectMapper();
return mapper.readValue(responseBody, ItemKeyInfo.class);
} else {
String message = "Error uploading file to DFR, status code: " + statusCode;
log.error(message);
log.error("Response: " + responseBody);
throw new WorkflowException(message);
}
}
} finally {
dataStream.close();
}
} else {
String message = "No data found in payload: " + payload.getPath().toString();
log.error(message);
throw new WorkflowException(message);
}
} catch(Exception ex) {
log.error(ex.getMessage());
throw ex;
}
}I suspect that I am having this issue due to the connection pooling logic going on in AEM. Any ideas of has anyone else experienced this? Any way to throttle the requests (workflows that run simultaneously)?
Thanks!