Expand my Community achievements bar.

SOLVED

Socket closed errors in custom workflow step

Avatar

Level 3

I have a solution implemented that is described here: Moving assets to PIM system - HTTP API the correct choice

From a high level I have a custom workflow step that calls an API and sends the assett.  This workflow is triggered from users placing the assets in to a "monitored" directory.  This works fine unless the user drops a large number of files into directory all at once.  Then a random amount of them fail and the failure is socket closed errors when sending the file.  This causes 2 things, it either results in multiple versions of the file sent because even though it gets a socket closed exception the file did actually send or it doesn't send the file at all if it fails on the connect vs the send.

 

Here are the results of some logging:

15.01.2025 09:47:38.019 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Payload type: JCR_PATH
15.01.2025 09:47:38.019 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Stripping jcr content information from the path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original
15.01.2025 09:47:38.019 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Trimmed path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:38.078 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Adding file /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg to item test_10025 for attribute Box Art
15.01.2025 09:47:38.078 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload exists as an asset
15.01.2025 09:47:38.079 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload size: 709400 of type: image/jpeg
15.01.2025 09:47:38.079 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload file name: SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:38.079 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Post: POST https://*************/DfrWebApi-Dev/api/LTDEV_PIM/File/AddFileToItem HTTP/1.1
15.01.2025 09:47:38.367 *ERROR* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Socket closed
15.01.2025 09:47:38.367 *ERROR* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Failed when calling API to upload asset: Socket closed
15.01.2025 09:47:38.368 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Deactivating DFR API client connection...
15.01.2025 09:47:38.368 *ERROR* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.adobe.granite.workflow.core.job.JobHandler Error executing workflow step
com.adobe.granite.workflow.WorkflowException: Process execution resulted in an error
at com.adobe.granite.workflow.core.job.HandlerBase.executeProcess(HandlerBase.java:201) [com.adobe.granite.workflow.core:2.0.240.CQ654-B0017]
at com.adobe.granite.workflow.core.job.JobHandler.process(JobHandler.java:260) [com.adobe.granite.workflow.core:2.0.240.CQ654-B0017]
at org.apache.sling.event.impl.jobs.JobConsumerManager$JobConsumerWrapper.process(JobConsumerManager.java:502) [org.apache.sling.event:4.2.12]
at org.apache.sling.event.impl.jobs.queues.JobQueueImpl.startJob(JobQueueImpl.java:293) [org.apache.sling.event:4.2.12]
at org.apache.sling.event.impl.jobs.queues.JobQueueImpl.access$100(JobQueueImpl.java:60) [org.apache.sling.event:4.2.12]
at org.apache.sling.event.impl.jobs.queues.JobQueueImpl$1.run(JobQueueImpl.java:229) [org.apache.sling.event:4.2.12]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: com.adobe.granite.workflow.WorkflowException: Failed to load asset: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg to item: test_10025
at com.cds.core.workflows.UploadAssetToPIM.execute(UploadAssetToPIM.java:82) [cds.core:1.0.1.SNAPSHOT]
at com.adobe.granite.workflow.core.job.HandlerBase.executeProcess(HandlerBase.java:195) [com.adobe.granite.workflow.core:2.0.240.CQ654-B0017]
... 8 common frames omitted
Caused by: java.net.SocketException: Socket closed
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at sun.security.ssl.InputRecord.readFully(InputRecord.java:465)
at sun.security.ssl.InputRecord.read(InputRecord.java:503)
at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:983)
at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:940)
at sun.security.ssl.AppInputStream.read(AppInputStream.java:105)
at org.apache.http.impl.io.SessionInputBufferImpl.streamRead(SessionInputBufferImpl.java:137) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.io.SessionInputBufferImpl.fillBuffer(SessionInputBufferImpl.java:153) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.io.SessionInputBufferImpl.readLine(SessionInputBufferImpl.java:282) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:138) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:56) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:259) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.DefaultBHttpClientConnection.receiveResponseHeader(DefaultBHttpClientConnection.java:163) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.conn.CPoolProxy.receiveResponseHeader(CPoolProxy.java:165) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:273) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:125) [org.apache.httpcomponents.httpcore:4.4.8]
at org.apache.http.impl.execchain.MainClientExec.execute(MainClientExec.java:272) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.execchain.ProtocolExec.execute(ProtocolExec.java:185) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.execchain.RetryExec.execute(RetryExec.java:89) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at com.cds.core.services.DFRApi.AddFileToItem(DFRApi.java:166) [cds.core:1.0.1.SNAPSHOT]
at com.cds.core.workflows.UploadAssetToPIM.execute(UploadAssetToPIM.java:79) [cds.core:1.0.1.SNAPSHOT]
... 9 common frames omitted

Then later this happens:

15.01.2025 09:47:40.388 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Payload type: JCR_PATH
15.01.2025 09:47:40.388 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Stripping jcr content information from the path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original
15.01.2025 09:47:40.388 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM Trimmed path: /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:40.441 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Adding file /content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg to item test_10025 for attribute Box Art
15.01.2025 09:47:40.441 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload exists as an asset
15.01.2025 09:47:40.442 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload size: 709400 of type: image/jpeg
15.01.2025 09:47:40.442 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Payload file name: SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg
15.01.2025 09:47:40.442 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Post: POST https://********/DfrWebApi-Dev/api/LTDEV_PIM/File/AddFileToItem HTTP/1.1
15.01.2025 09:47:40.543 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.services.DFRApi Deactivating DFR API client connection...
15.01.2025 09:47:40.564 *INFO* [JobHandler: /var/workflow/instances/server1/2024-12-21/publish-to-pim_415:/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg/jcr:content/renditions/original] com.cds.core.workflows.UploadAssetToPIM UploadAssettoPIM finished

So that example is of one where we get a duplicate file sent.

Here are what I believe are the relevant code snippets:

if(StringUtils.equals(workItem.getWorkflowData().getPayloadType(), "JCR_PATH")) {
			// Get payload node out of AEM in preparation for sending the data to PIM
			String assetPath = workItem.getWorkflowData().getPayload().toString();
			
			// When the workflow launcher launches the workflow automatically it adds jcr:content to the path
			// this needs to be stripped
			int indexIdx = assetPath.indexOf("/jcr:content");
			if(indexIdx != -1) {
				log.info("Stripping jcr content information from the path: " + assetPath);
				assetPath = assetPath.substring(0, indexIdx);
				log.info("Trimmed path: " + assetPath);
			}
			ResourceResolver resourceResolver = wfSession.adaptTo(ResourceResolver.class);
			Resource payloadResource = resourceResolver.getResource(assetPath);
			Asset payload = payloadResource.adaptTo(Asset.class);
			
			/*
			 * The item number is the folder in which the file asset is in.  Parse that out
			 */
			Path assetPathInfo = Paths.get(assetPath);
			Path assetDirectory = assetPathInfo.getParent();
			if(assetDirectory != null) {
				itemNumber = assetDirectory.getFileName().toString();
				log.debug("Item Number of asset to load: " + itemNumber);
			}
			
			if(itemNumber != null && itemNumber.isEmpty() == false)
			{
				if(api.connect(processArgs.get(TOKENBASEURL_KEY), processArgs.get(APIBASEURL_KEY), processArgs.get(CATALOG_KEY), processArgs.get(APISECRET_KEY))) {
					try {
						// Call PIM
						api.AddFileToItem(itemNumber, null, "Part.0", processArgs.get(ATTR_KEY), payload);
					} catch(IOException ex) {
						log.error("Failed when calling API to upload asset: " + ex.getMessage());
						throw new WorkflowException("Failed to load asset: " + assetPath + " to item: " + itemNumber, ex);
					} finally {				
						api.close();
					}
				} else {
					log.error("Failed to connect to API");
					throw new WorkflowException("Failed to connect to API");
				}
			}
			
			List<Route> routes = wfSession.getRoutes(workItem, false);
			wfSession.complete(workItem, routes.get(0));
		}
		
		log.info("UploadAssettoPIM finished");

Here are the connect and AddFileToItem methods:

 @Override
	public boolean connect(String baseTokenUrl, String baseApiUrl, String catalogName, String apiSecret) {
		boolean success = true;
		
		try {
			_client = httpClientBuilderFactory.newBuilder().build();	
	
			_tokenUrl = baseTokenUrl + "/identity/connect/token";
			_apiUrl = baseApiUrl + "/api/" + catalogName + "/";
			HttpPost post = new HttpPost(_tokenUrl);

            // Encode client credentials as Base64
			if(apiSecret != null) {
				clientSecret = apiSecret;
			}
            String credentials = clientId + ":" + clientSecret;
            String encodedCredentials = Base64.getEncoder().encodeToString(credentials.getBytes(StandardCharsets.UTF_8));

            // Set headers
            post.setHeader("Authorization", "Basic " + encodedCredentials);
            post.setHeader("Content-Type", "application/x-www-form-urlencoded");
            post.setHeader("Accept", "application/json");

            // Create the request body with the necessary parameters
            StringBuilder requestBody = new StringBuilder();
            requestBody.append("grant_type=password");
            requestBody.append("&scope=").append(URLEncoder.encode(scope, "UTF-8"));
            requestBody.append("&username=").append(URLEncoder.encode("test", "UTF-8"));
            requestBody.append("&password=").append(URLEncoder.encode("user", "UTF-8"));

            // Set the request entity (body)
            HttpEntity entity = new StringEntity(requestBody.toString(), StandardCharsets.UTF_8);
            post.setEntity(entity);
            
            // Execute the request
            try (CloseableHttpResponse response = _client.execute(post)) {
                int statusCode = response.getStatusLine().getStatusCode();
                String responseBody = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);

                if (statusCode == 200) {
                	log.debug("Response: " + responseBody);
                	ObjectMapper mapper = new ObjectMapper();
                	JsonNode accessTokenData = mapper.readTree(responseBody);  
                    // Example: {"access_token":"your_access_token","token_type":"bearer","expires_in":3600}
                    // For simplicity, let's assume responseBody is the token (you would actually parse this)
                    _accessToken = accessTokenData.get("access_token").asText();
                } else {
                    log.error("Error fetching token, status code: " + statusCode);
                    log.error("Response: " + responseBody);
                    success = false;
                }
            }
            
		} catch(IOException ex) {
			log.error("Failed to connect to DFR identity server and obtain access token");
			log.error(ex.getMessage());
			success = false;
		} catch(Exception ex) {
			log.error(ex.getMessage());
			success = false;
		}
		
		if(success == false) {
			if(_client != null) {
				try {
					_client.close();
				} catch (IOException e) {
					// 
					log.error(e.getStackTrace().toString());
				}
				_client = null;
			}
		}
		return success;
	}
    
    @Override
	public ItemKeyInfo AddFileToItem(String itemNumber, String revision, String qualifier, String attributeName, Asset payload) throws IOException, WorkflowException {
		if(_client == null) {
			throw new IOException("DFRAPI client is not connected.  Please call connect before attempting to execute an API call");
		}
		
		try {
			log.info("Adding file " + payload.getPath().toString() + " to item " + itemNumber + " for attribute " + attributeName);
			
			String addFileUrl = _apiUrl + "File/AddFileToItem";
			HttpPost post = new HttpPost(addFileUrl);
			post.setHeader(HttpHeaders.AUTHORIZATION, "Bearer " + _accessToken);
			
			// Build the multi-part request entity
            MultipartEntityBuilder builder = MultipartEntityBuilder.create();
            builder.addTextBody("itemNumber", itemNumber);
            if(revision != null && StringUtils.isNotEmpty(revision))
            	builder.addTextBody("revision", revision);
            builder.addTextBody("qualifier", qualifier);
            builder.addTextBody("attribute", attributeName);
            
            InputStream dataStream = null;
            if (payload != null) {
            	log.info("Payload exists as an asset");
                dataStream = payload.getOriginal().getStream();
                log.info("Payload size: " + payload.getOriginal().getSize() + " of type: " + payload.getOriginal().getMimeType());
            }
            
            if(dataStream != null) {
            	try {
            		String payloadFileName = Paths.get(payload.getPath()).getFileName().toString();
            		log.info("Payload file name: " + payloadFileName);
            		builder.addBinaryBody("file", dataStream, ContentType.APPLICATION_OCTET_STREAM, payloadFileName);
	            	
	            	HttpEntity multipart = builder.build();
	                post.setEntity(multipart);
	                
	                //-- Setup timeouts --//
	                log.info("Setting up timeouts: 10 seconds connection, 120 seconds socket");
	                RequestConfig requestConfig = RequestConfig.custom()
	                        .setConnectTimeout(10000) // Connection timeout in milliseconds
	                        .setSocketTimeout(120000) // Socket timeout in milliseconds
	                        .build();
	                post.setConfig(requestConfig);
	                
	                log.info("Post: " + post.toString());
	                try (CloseableHttpResponse response = _client.execute(post)) {
	                	int statusCode = response.getStatusLine().getStatusCode();
	                	HttpEntity responseEntity = response.getEntity();
	                    String responseBody = EntityUtils.toString(responseEntity, StandardCharsets.UTF_8);
	                    EntityUtils.consume(responseEntity);
	                    if (statusCode == 200) {
	                    	log.debug("Response: " + responseBody);
	                    	ObjectMapper mapper = new ObjectMapper();
	                    	return mapper.readValue(responseBody, ItemKeyInfo.class);  
	                    } else {
	                    	String message = "Error uploading file to DFR, status code: " + statusCode;
	                        log.error(message);
	                        log.error("Response: " + responseBody);
	                        throw new WorkflowException(message);
	                    }
	                }
            	} finally {
            		dataStream.close();
            	}
            } else {
            	String message = "No data found in payload: " + payload.getPath().toString();
            	log.error(message);
            	throw new WorkflowException(message);
            }
		} catch(Exception ex) {
			log.error(ex.getMessage());
			throw ex;
		}
	}

 I suspect that I am having this issue due to the connection pooling logic going on in AEM.  Any ideas of has anyone else experienced this?  Any way to throttle the requests (workflows that run simultaneously)?

 

Thanks!

Topics

Topics help categorize Community content and increase your ability to discover relevant content.

1 Accepted Solution

Avatar

Correct answer by
Employee Advisor

the connection from AEM CS to your application takes a different route than the requests from your local machine to the backend. And that cause a variety of issues, starting from being blocked by a firewall (AEM probably acts from a different network than your local machine).

View solution in original post

19 Replies

Avatar

Level 8

Hi @ChristopherHo5 , stacktrace suggests this is not AEM failure. Is the 

com.cds.core.services.DFRApi.AddFileToItem(DFRApi.java:166) 

is this SAP API? When you call addFileToItem, it is throwing socket exception. Looks like you are running some 3rd party API within AEM JVM. API is first attempting to download this 

/content/dam/pim/test_10025/SGCO_91070_0520_Cooler_GreyRed_Studio_4 low.jpg

asset and timesout. Maybe asset is too large or permission accessing author asset. Maybe see if you can change to dispatcher cached asset url or Dynamic media url that delivers assets better than raw original asset from dam. Your attempt to configure socket timeout is correct. But leads to risk of long running threads inside JVM. Instead, should consider decoupling this addFileToItem() API call outside AEM, like on cloud microservice. 

Avatar

Level 3

That is my class in my OSGI component.  I just wrapped the calls to the API in a class.  I'm using the standard HTTP classes.

Avatar

Level 8

Sure @ChristopherHo5 , I expressed the problems I ran into with my project jobs and solution I implemented. So, you are executing 

HttpPost post = new HttpPost(addFileUrl);

This is download the addFileUrl into JVM. For any reason, as Josh points, firewall, network slowness, 3rd party outage etc, JVM will throw SocketException. Increasing timeout, whitelisting WAF are temporary solutions. jic you run large files, again you may run into timeout or memory issues. 

 

I faced similar problem in my project. I ended up writing AIO Runtime action. Download from DAM and upload of files to 3rd, happened outside AEM. At Runtime action. This reduced load at AEM, since its Runtime action on cloud, cloud knows to scale, allocate more memory and wait longer for larger files. 

 

For short term, your way of bumping timeouts trial-n-error is correct. Dont have exact fix or magic numbers for timeout value. Trial--n-error should experiment and fix convincing number. 

 

Avatar

Level 3

Thanks @sarav_prakash!  

 

When you say this:

"I faced similar problem in my project. I ended up writing AIO Runtime action. Download from DAM and upload of files to 3rd, happened outside AEM. At Runtime action."

 

This was the approach I was originally taken which was to monitor a directory in AEM and grab new files added and move them.  However, the APIs lacked calls that were needed and it was suggested that I should be using a workflow launcher with a workflow and a custom process step to handle this so that AEM could fire off when new files were created instead of me polling for them.

Avatar

Level 3

"should consider decoupling this addFileToItem() API call outside AEM, like on cloud microservice."

I'm not sure how to do this.  If you go and look at the thread I mentioned in the beginning of my question it was recommended to me to use a launcher and a workflow with a call to the an API to send the file to another system.  Also, I put that socket timeout logic in there because I thought it might be timing out on large files but the log shows that these things are failing in milliseconds when there are a lot of concurrent workflows firing off.

Avatar

Employee Advisor

This is not a problem of AEM, but caused by custom code which tries to do a call to a different HTTP endpoint. If you just let the exception bubble up, the workflow step will eventually fail and retry (IIRC up to 3 times) before the workflow will eventually fail.

 

(You might want to think about adding logic which adds retries in the workflow step, instead of using the workflow engine to retry the workflow step. Then you have more control over the retry handling.)

Avatar

Level 3

I feel like I'm not saying the correct thing.  The problem is the socket exception which only happens within the AEM platform.  There should be no socket exception.  Custom retrying is not an option because in the case I show the call was actually successful so the retry sends a second file.  I tested calling this API from Postman with 20 virtual users all posting files for a full minute.  886 files were posted and not a single failure occurred.  Something is wrong with how the calls to the API are being handled when there is a lot of them

Avatar

Correct answer by
Employee Advisor

the connection from AEM CS to your application takes a different route than the requests from your local machine to the backend. And that cause a variety of issues, starting from being blocked by a firewall (AEM probably acts from a different network than your local machine).

Avatar

Community Advisor

Hi @ChristopherHo5 ,

If I understood it correctly you are trying to make a api call which requires access token.

Following improvement you can try and check:

1. Instead of fetching access token for each calls, cache your token (most probably 1 hr but you can check exp_at property or standard set for your token). It will reduce load on the instance, you can refresh token before few(10) seconds by when token get expired.

2. You should remove explicit close of stream in your code and rewrite code with try with resources feature of java.

I don't see use of explicit close api.close(); in caller class.

Thanks

 

Avatar

Level 3

Thanks for the suggestions.

1.  How do I do that?  Doesn't my component get instantiated for each workflow?  Or does it only get instantiated once and the execute is called multiple times?  I would love to make this optimization if I can reliably store that information and not introduce race conditions/threading issues.

2.Are you talking about the dataStream variable?

The close is right here:

ChristopherHo5_0-1737131841330.png

 

Avatar

Level 3

Hey @MukeshYadav_ ,

 

Checking back in on ideas on how to accomplish #1.  Is there a way to store something that can be shared across different workflow instances?

 

Avatar

Community Advisor

Hi @ChristopherHo5 ,

Here race condition won't arise even if get access/update by multiple thread as calls are not dependent so we can use any approach(Ideally there may be multiple calls initially till completion of one successful response)

There may be many(different) approaches for storing token globally and expire before few second(e.g. 10 seconds)

1. One approach can be create a service and store in /var/somenode and create a resource resolver to access

private void cacheResponse(final String authorizationJson, final @NotNull ResourceResolver resolver) {
        final Resource cacheNodePath = resolver.getResource(CACHE_NODE_PATH);
        if (isNull(cacheNodePath)) {
            logger.info("Cache resource not found at {}, please verify permissions", CACHE_NODE_PATH);
            return;
        }
        final ModifiableValueMap vm = cacheNodePath.adaptTo(ModifiableValueMap.class);
        if (isNull(vm)) {
            logger.info("Unable to adapt value map from cache {}, please verify permissions", CACHE_NODE_PATH);
            return;
        }
        vm.put(AUTHORIZATION_CACHE_PROPERTY, authorizationJson);
        vm.put(NameConstants.PN_PAGE_LAST_MOD, Calendar.getInstance());
        try {
            resolver.commit();
        } catch (final PersistenceException e) {
            logger.error("Unable to save  Json cache", e);
        }
    }

   
    private @Nullable Authorization getFromCache(final @NotNull ResourceResolver resolver, final Gson gson) {
        final Resource cacheResource = resolver.getResource(CACHE_NODE_PATH);
        if (cacheResource == null) {
            logger.warn("Cache resource not found at {}, please verify permissions", CACHE_NODE_PATH);
            return null;
        }
        final ValueMap vm = cacheResource.getValueMap();

        final Calendar lastMod = vm.get(NameConstants.PN_PAGE_LAST_MOD, Calendar.class);
        final String cachedJson = vm.get(AUTHORIZATION_CACHE_PROPERTY, String.class);
        if (isNull(lastMod) || StringUtils.isEmpty(cachedJson)) return null;

        final Authorization auth = gson.fromJson(cachedJson, Authorization.class);

        if (Calendar.getInstance().getTimeInMillis() < (lastMod.getTimeInMillis() + auth.getExpiresIn() * 1000)) {
            return auth;
        }
        return null;
    }

2. Store in service itself(fetch for the first time and store)

#sample code

package com.projectname.core.services.azure.impl;

import com.google.gson.Gson;
import com.google.gson.JsonObject;
import com.projectname.core.services.azure.AzureKeyVaultService;
import com.projectname.core.services.azure.config.AzureKeyVaultServiceConfig;
import org.apache.commons.lang.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.entity.UrlEncodedFormEntity;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.message.BasicNameValuePair;
import org.apache.http.util.EntityUtils;
import org.jetbrains.annotations.NotNull;
import org.osgi.service.component.annotations.Activate;
import org.osgi.service.component.annotations.Component;
import org.osgi.service.metatype.annotations.Designate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

@Component(service = AzureKeyVaultService.class)
@Designate(ocd = AzureKeyVaultServiceConfig.class)
public class AzureKeyVaultServiceImpl implements AzureKeyVaultService {
    private static final Logger LOGGER = LoggerFactory.getLogger(AzureKeyVaultServiceImpl.class);
    private static JsonObject azureAccessTokenResponse;
    private String azure_auth_token_api_endpoint_url;
    private String client_id;
    private String client_secret;
    private String keyvault_scope_url;
    private static long tokenGeneratedAt;
   
    @Activate
    protected void activate(final @NotNull AzureKeyVaultServiceConfig config) {
        azure_auth_token_api_endpoint_url = config.azure_auth_token_api_endpoint_url();
        client_id = config.client_id();
        client_secret = config.client_secret();
        keyvault_scope_url = config.keyvault_scope_url();
        azureAccessTokenResponse = null;
    }

    public String getValidAzureAccessToken() {
        if (StringUtils.isNotBlank(azure_auth_token_api_endpoint_url)) {
            //Expire Azure Access Token 10 seconds early
            if (Objects.nonNull(azureAccessTokenResponse) && (System.currentTimeMillis() - tokenGeneratedAt) / 1000 <= (Integer.parseInt(azureAccessTokenResponse.get("expires_in").getAsString()) - 10)) {
                LOGGER.debug("Azure access token is expired need to fetch fresh access token");
            } else {
                //Calling Azure Access Token API if token expired
                List<BasicNameValuePair> formData = new ArrayList<>();
                formData.add(new BasicNameValuePair("grant_type", "client_credentials"));
                formData.add(new BasicNameValuePair("client_id", client_id));
                formData.add(new BasicNameValuePair("client_secret", client_secret));
                formData.add(new BasicNameValuePair("scope", keyvault_scope_url));

                try (CloseableHttpClient client = HttpClients.createDefault()) {
                    HttpPost postRequest = new HttpPost(azure_auth_token_api_endpoint_url);
                    postRequest.setEntity(new UrlEncodedFormEntity(formData));
                    HttpResponse response = client.execute(postRequest);
                    if (response.getStatusLine().getStatusCode() == 200) {
                        String responseString = EntityUtils.toString(response.getEntity(), StandardCharsets.UTF_8);
                        azureAccessTokenResponse = new Gson().fromJson(responseString, JsonObject.class);
                        tokenGeneratedAt = System.currentTimeMillis();
                    } else {
                        LOGGER.debug("Error while fetching azure access token {}", response.getStatusLine());
                        return null;
                    }
                } catch (Exception e) {
                    LOGGER.error("Exception occurred while fetching azure access token {}", e.getMessage(), e);
                }
            }
            return azureAccessTokenResponse.get("access_token").getAsString().trim();
        } else {
            LOGGER.info("AzureKeyVaultService configuration is not available");
        }
        return null;
    }
}

3. Guava cache service

Thanks

Avatar

Level 3

Thank you so much for your response!

So, the second approach is to just use a static variable in the service.  I did think about this but was nervous about race conditions with getting the token and managing it.  If I do this, this will definitely cut down on the calls being made to the API.  I can try this and see if it improves the situation.

 

The first solution seems interesting as well.

 

Thanks!

Avatar

Level 3

Due to some of the responses I think I need to reiterate 2 important points here

 

1. This logic is completely successful when dropping 1, 2, or even 3 files in the directory.  The problem ONLY occurs when dropping a large number of files into the directory all at once.

2. The exception is coming from the execute call of the http post call.

....

at org.apache.http.impl.execchain.RedirectExec.execute(RedirectExec.java:111) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.InternalHttpClient.doExecute(InternalHttpClient.java:185) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:83) [org.apache.httpcomponents.httpclient:4.5.4.B003]
at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:108) [org.apache.httpcomponents.httpclient:4.5.4.B003]

The AEM documentation says to use 

client = httpClientBuilderFactory.newBuilder().build();	

because AEM manages connection/request pooling.  This is why I believe this may be a connection/request management issue when handling a large number of requests.  I may be wrong but I can't figure out what to do to prove or disprove this theory.

Avatar

Level 7

It seems that you're facing "Socket closed" errors during the workflow, particularly when handling multiple files at once. Given that you suspect the issue is related to connection pooling, here's a shorter response with suggestions for addressing it:

  1. Throttle Requests: You can limit the number of simultaneous requests (e.g., limit parallel workflows) by adding a delay between API calls or managing concurrency in the workflow. You can use a simple sleep between requests or adjust the workflow's concurrency settings to reduce the number of simultaneous file uploads.

  2. Connection Pooling and Timeouts: Check if the connection pooling configuration in AEM is correctly tuned for your use case. You can adjust HTTP client settings, such as connection and socket timeouts, in your HttpClient configuration, as well as increase connection pool sizes to accommodate more concurrent requests.

  3. Reattempt Failed Requests: Implement retry logic in case of failures like "Socket closed," using exponential backoff or a fixed retry limit.

  4. Use Asynchronous Uploads: If the API supports it, consider implementing asynchronous uploads so that workflows are less dependent on the connection and can continue even if one upload fails.

  5. Logs & Monitoring: Increase logging around the HTTP connections to better track and understand when and why the socket is closed, helping pinpoint specific bottlenecks.

Consider starting with throttling or retry logic adjustments and ensuring that your connection pool is tuned to handle the volume of requests you're processing.

Avatar

Level 3

1. I would love to do this but not sure how.  A new workflow instance is created for each and every file.  One doesn't know about another right?

2. So I poked around for those settings and I can't find them.  Do you know where those are?

3. The problem is I can't tell if the call was successful or not.  There are times I get a socket closed exception even though the file successfully sent.

4. Do you have any documentation you could point me at that I might be able to read about this?

5. The logging I have is pretty good and there are to points of failure. One is when we are attempting to connect to the api by getting a token, the other is when we call the httpPost execute call.  The failure happens in the library code so we don't know why we randomly get them.

Avatar

Level 3

Hello @AmitVishwakarma,  

Do you have any additional information about these topics, specifically 1, 2, and 4?  I'm kind of stuck at this point.

Thanks!

Avatar

Level 3

@AmitVishwakarma for number 2 I misunderstood.  You were talking about the configuring the actual Httpclient directly like so:

import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.client.config.RequestConfig;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;

PoolingHttpClientConnectionManager cm = new PoolingHttpClientConnectionManager();

// Increase max total connection to 100
cm.setMaxTotal(100);

// Increase default max connection per route to 20
cm.setDefaultMaxPerRoute(20);

I think one thing I'm struggling with is how the OSGI component relates to the workflow instance.  Does each workflow create a new instance of the OSGI component or do different workflow instances share the same instance of the OSGI component?

 

Avatar

Administrator

@ChristopherHo5 Did you find the suggestions helpful? Please let us know if you need more information. If a response worked, kindly mark it as correct for posterity; alternatively, if you found a solution yourself, we’d appreciate it if you could share it with the community. Thank you!



Kautuk Sahni