Search code examples
javamultithreadingiofreeze

PipedInputStream together with TeeOutputStream freezes the application, when not read?


PipedInputStream together with TeeOutputStream freezes the application, when not read!?

I am referring to this org.apache.commons.io.output.TeeOutputStream. For easier testing, I added a simple variant of it as inner-class(MyTeeOutputStream) so you don't have to get the dependency.

Any ideas why this happens and how I can fix it?

Code

I made a JUnit5 test case for you guys to try out:

    @Test
    void testSplittingOutput() throws IOException, InterruptedException {
        PipedInputStream pipedInput = new PipedInputStream();
        OutputStream pipedOutput = new PipedOutputStream(pipedInput);
        //TeeOutputStream teeOutput = new TeeOutputStream(System.out, pipedOutput);
        MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
        PrintStream out = new PrintStream(teeOutput);

        final int expectedPrintedLinesCount = 1000;
        AtomicInteger actualPrintedLinesCount = new AtomicInteger();
        
        Thread t1 = new Thread(() -> { // Thread for writing data to OUT
            try {
                for (int i = 0; i < expectedPrintedLinesCount; i++) {
                    out.println("Hello! "+i);
                    actualPrintedLinesCount.incrementAndGet();
                    Thread.sleep(10);
                }
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });


        Thread t2 = new Thread(() -> { // Thread for reading data from IN
            BufferedReader reader = new BufferedReader(new InputStreamReader(pipedInput));
            StringBuilder builder = new StringBuilder();
            try {
                while(true){
                    builder.append(reader.readLine());
                }
            } catch (IOException e) {
                //e.printStackTrace(); // ignore
                System.out.println(builder);
            }
        });

        t1.start();
        //t2.start(); // If we aren't reading then PipedInputStream in Thread2, we only print 94 lines instead of 1000!?

        for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
            Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
        }
        
        Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get()+1);
    }

final class MyTeeOutputStream extends OutputStream {

    private final OutputStream out;
    private final OutputStream tee;

    public MyTeeOutputStream(OutputStream out, OutputStream tee) {
        if (out == null)
            throw new NullPointerException();
        else if (tee == null)
            throw new NullPointerException();

        this.out = out;
        this.tee = tee;
    }


    @Override
    public void write(int b) throws IOException {
        out.write(b);
        tee.write(b);
    }

    @Override
    public void write(byte[] b) throws IOException {
        out.write(b);
        tee.write(b);
    }

    @Override
    public void write(byte[] b, int off, int len) throws IOException {
        out.write(b, off, len);
        tee.write(b, off, len);
    }

    @Override
    public void flush() throws IOException {
        out.flush();
        tee.flush();
    }

    @Override
    public void close() throws IOException {
        try {
            out.close();
        } finally {
            tee.close();
        }
    }
}

Result

As you can see, it fails to print all lines (1000) and stops at (94):

Hello! 0
Hello! 1
Hello! 2
Hello! 3
Hello! 4
Hello! 5
Hello! 6
Hello! 7
Hello! 8
Hello! 9
Hello! 10
Hello! 11
Hello! 12
Hello! 13
Hello! 14
Hello! 15
Hello! 16
Hello! 17
Hello! 18
Hello! 19
Hello! 20
Hello! 21
Hello! 22
Hello! 23
Hello! 24
Hello! 25
Hello! 26
Hello! 27
Hello! 28
Hello! 29
Hello! 30
Hello! 31
Hello! 32
Hello! 33
Hello! 34
Hello! 35
Hello! 36
Hello! 37
Hello! 38
Hello! 39
Hello! 40
Hello! 41
Hello! 42
Hello! 43
Hello! 44
Hello! 45
Hello! 46
Hello! 47
Hello! 48
Hello! 49
Hello! 50
Hello! 51
Hello! 52
Hello! 53
Hello! 54
Hello! 55
Hello! 56
Hello! 57
Hello! 58
Hello! 59
Hello! 60
Hello! 61
Hello! 62
Hello! 63
Hello! 64
Hello! 65
Hello! 66
Hello! 67
Hello! 68
Hello! 69
Hello! 70
Hello! 71
Hello! 72
Hello! 73
Hello! 74
Hello! 75
Hello! 76
Hello! 77
Hello! 78
Hello! 79
Hello! 80
Hello! 81
Hello! 82
Hello! 83
Hello! 84
Hello! 85
Hello! 86
Hello! 87
Hello! 88
Hello! 89
Hello! 90
Hello! 91
Hello! 92
Hello! 93
Hello! 94
org.opentest4j.AssertionFailedError: 
Expected :1000
Actual   :94
<Click to see difference>

Details

Why am I doing this? I want to duplicate the System.out and 'read' (through the PipedInputStream) from it, then send that data to my website's http console.


Solution

  • The solution that suited me the best:

    NonBlockingPipedInputStream

    public class NonBlockingPipedInputStream extends PipedInputStream {
    
        public interface WriteLineEvent<L>{
            void executeOnEvent(L l);
        }
    
        /**
         * Add actions to this list, which get run after a line has been written.
         * Contains the line as parameter.
         */
        public List<WriteLineEvent<String>> actionsOnWriteLineEvent = new CopyOnWriteArrayList<>();
    
        /**
         *
         * Starts a new {@link Thread}, that reads the {@link PipedInputStream}
         * and fires an event every time a full line was written.
         * To listen for those events, add the action that should be run to the {@link #actionsOnWriteLineEvent} list.
         */
        public NonBlockingPipedInputStream() {
            new Thread(()->{
                try{
                    BufferedReader reader = new BufferedReader(new InputStreamReader(this));
                    String line;
                    while((line = reader.readLine()) != null){
                        String finalLine = line;
                        actionsOnWriteLineEvent.forEach(action -> action.executeOnEvent(finalLine));
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }).start();
        }
    }
    

    Usage

        @Test
        void nonBlockingPipedInputStreamExample() throws IOException, InterruptedException {
            NonBlockingPipedInputStream pipedInput = new NonBlockingPipedInputStream();
            OutputStream pipedOutput = new PipedOutputStream(pipedInput);
            MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
            PrintStream out = new PrintStream(teeOutput);
    
            final int expectedPrintedLinesCount = 1000;
            AtomicInteger actualPrintedLinesCount = new AtomicInteger();
            AtomicInteger actualReadLinesCount = new AtomicInteger();
    
            Thread t1 = new Thread(() -> { // Thread for writing data to OUT
                try {
                    for (int i = 1; i <= expectedPrintedLinesCount; i++) {
                        out.println("Hello! "+i);
                        actualPrintedLinesCount.incrementAndGet();
                        Thread.sleep(10);
                    }
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            });
    
            // NonBlockingPipedInputStream starts a new thread when it is initialised.
            // That thread reads the PipedInputStream and fires an event every time a full line was written.
            pipedInput.actionsOnWriteLineEvent.add(line -> {
                actualReadLinesCount.getAndIncrement();
            });
    
            t1.start();
    
            for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
                Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
            }
    
            Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get());
            Assertions.assertEquals(expectedPrintedLinesCount, actualReadLinesCount.get());
        }