Search code examples
javamultithreadingweblogic12cworkmanagerscommonj

Using a Commonj Work Manager to send Asynchronous HTTP calls


I switched from making sequential HTTP calls to 4 REST services, to making 4 simultaneous calls using a commonj4 work manager task executor. I'm using WebLogic 12c. This new code works on my development environment, but in our test environment under load conditions, and occasionally while not under load, the results map is not populated with all of the results. The logging suggests that each work item did receive back the results though. Could this be a problem with the ConcurrentHashMap? In this example from IBM, they use their own version of Work and there's a getData() method, although it doesn't like that method really exists in their class definition. I had followed a different example that just used the Work class but didn't demonstrate how to get the data out of those threads into the main thread. Should I be using execute() instead of schedule()? The API doesn't appear to be well documented. The stuckthreadtimeout is sufficiently high. component.processInbound() actually contains the code for the HTTP call, but I the problem isn't there because I can switch back to the synchronous version of the class below and not have any issues.

http://publib.boulder.ibm.com/infocenter/wsdoc400/v6r0/index.jsp?topic=/com.ibm.websphere.iseries.doc/info/ae/asyncbns/concepts/casb_workmgr.html

My code:

public class WorkManagerAsyncLinkedComponentRouter implements
        MessageDispatcher<Object, Object> {

    private List<Component<Object, Object>> components;
    protected ConcurrentHashMap<String, Object> workItemsResultsMap;
    protected ConcurrentHashMap<String, Exception> componentExceptionsInThreads;
...

    //components is populated at this point with one component for each REST call to be made.
    public Object route(final Object message) throws RouterException {
    ...
        try {
            workItemsResultsMap = new ConcurrentHashMap<String, Object>();
            componentExceptionsInThreads = new ConcurrentHashMap<String, Exception>();
            final String parentThreadID = Thread.currentThread().getName();

            List<WorkItem> producerWorkItems = new ArrayList<WorkItem>();
            for (final Component<Object, Object> component : this.components) {
                producerWorkItems.add(workManagerTaskExecutor.schedule(new Work() {
                    public void run() {
                        //ExecuteThread th = (ExecuteThread) Thread.currentThread();
                        //th.setName(component.getName());
                        LOG.info("Child thread " + Thread.currentThread().getName()  +" Parent thread: " + parentThreadID + " Executing work item for: " + component.getName());
                        try {
                            Object returnObj = component.processInbound(message);
                            if (returnObj == null)
                                LOG.info("Object returned to work item is null, not adding to producer components results map, for this producer: "
                                        + component.getName());
                            else {
                                LOG.info("Added producer component thread result for: "
                                        + component.getName());
                                workItemsResultsMap.put(component.getName(), returnObj);
                            }
                            LOG.info("Finished executing work item for: " + component.getName());
                        } catch (Exception e) {
                            componentExceptionsInThreads.put(component.getName(), e);
                        }
                    }
...
                }));
            } // end loop over producer components

            // Block until all items are done
            workManagerTaskExecutor.waitForAll(producerWorkItems, stuckThreadTimeout);

            LOG.info("Finished waiting for all producer component threads.");
            if (componentExceptionsInThreads != null
                    && componentExceptionsInThreads.size() > 0) {
                ...
            }
            List<Object> resultsList = new ArrayList<Object>(workItemsResultsMap.values());
            if (resultsList.size() == 0)
                throw new RouterException(
                        "The producer thread results are all empty.  The threads were likely not created.  In testing this was observed when either 1)the system was almost out of memory (Perhaps the there is not enough memory to create a new thread for each producer, for this REST request), or 2)Timeouts were reached for all producers.");
            //** The problem is identified here.  The results in the ConcurrentHashMap aren't the number expected .
            if (workItemsResultsMap.size() != this.components.size()) {
                StringBuilder sb = new StringBuilder();
                for (String str : workItemsResultsMap.keySet()) {
                    sb.append(str + " ");
                }
                throw new RouterException(
                        "Did not receive results from all threads within the thread timeout period.  Only retrieved:"
                                + sb.toString());
            }
            LOG.info("Returning " + String.valueOf(resultsList.size()) + " results.");
            LOG.debug("List of returned feeds: " + String.valueOf(resultsList));
            return resultsList;

        }
...
    }
}

Solution

  • I ended up cloning the DOM document used as a parameter. There must be some downstream code that has side effects on the parameter.