Search code examples
javaxmlexceptionstream-processingjava-threads

Handling Exception in java Piped Streams


I have a xmlStream which I am converting to jsonStream using org.apache.wink.json4j.utils.XML. Here is the code

public void process(InputStream xmlStream) {
    final BufferedInputStream bufferedXmlStream = new BufferedInputStream(xmlStream);

    PipedInputStream pipedJsonInputStream = new PipedInputStream();
    final PipedOutputStream jsonStream = new PipedOutputStream(pipedJsonInputStream);
    Thread xmlToJsonThread = new Thread(new Runnable() {
        @Override
        public void run() {
            // put your code that writes data to the outputstream here.
            try {
                XML.toJson(bufferedXmlStream, jsonStream, true);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    });
    xmlToJsonThread.setDaemon(true);
    xmlToJsonThread.start();

    //process data from piped stream
    BufferedReader reader = new BufferedReader(new InputStreamReader(
            pipedJsonInputStream, StandardCharsets.UTF_8));
    try {
        // use reader to further process json in main thread...
        parseJsonStream(reader);
    } finally {
        reader.close();
        jsonStream.close();
    }
}

When XML.toJson throws exception I see that main thread does not exit. How do I handle this ? Do you guys think this is a good way of converting XML stream to Json stream for further processing ? I would really appreciate any suggestions. Thanks a lot!


Solution

  • The problem is that the exception on the child thread needs to get reported back to the main thread. This can be tricky to do unless you put both the reader and writer in separate threads.

    The way I do this is to wrap both pieces in an ExecutionCompletionService - like this:

    public void process(InputStream xmlStream) {
        ExecutorService threadPool = Executors.newFixedThreadPool(2);
        ExecutorCompletionService<Void> ecs = new ExecutorCompletionService<>(threadPool);
    
        final BufferedInputStream bufferedXmlStream = new BufferedInputStream(xmlStream);
    
        PipedInputStream pipedJsonInputStream = new PipedInputStream();
        final PipedOutputStream jsonStream = new PipedOutputStream(pipedJsonInputStream);
    
        ecs.submit( new Callable<Void>() {
           @Override
           public Void call() {
              // put your code that writes data to the outputstream here.
              try {
                  XML.toJson(bufferedXmlStream, jsonStream, true);
              } catch (Exception e) {
                  e.printStackTrace();
                  throw e;
              }
              return null;
            }
        });
    
        ecs.submit( new Callable<Void>() {
           @Override
           public Void call() {
              try {
                  // use reader to further process json in main thread...
                  parseJsonStream(reader);
              } finally {
                  reader.close();
                  jsonStream.close();
              }
              return null;
          }
        });
    
        // Wait for all tasks to be done.
        // Kill the other thread if one hits an exception.
        try {
            for (int i = 0; i < 2; ++i) {
                ecs.take().get();
            }
        } finally {
            threadPool.shutdownNow();
        } 
    }