PipedInputStream together with TeeOutputStream freezes the application, when not read!?
I am referring to this org.apache.commons.io.output.TeeOutputStream
. For easier testing, I added a simple variant of it as inner-class(MyTeeOutputStream
) so you don't have to get the dependency.
Any ideas why this happens and how I can fix it?
I made a JUnit5 test case for you guys to try out:
@Test
void testSplittingOutput() throws IOException, InterruptedException {
PipedInputStream pipedInput = new PipedInputStream();
OutputStream pipedOutput = new PipedOutputStream(pipedInput);
//TeeOutputStream teeOutput = new TeeOutputStream(System.out, pipedOutput);
MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
PrintStream out = new PrintStream(teeOutput);
final int expectedPrintedLinesCount = 1000;
AtomicInteger actualPrintedLinesCount = new AtomicInteger();
Thread t1 = new Thread(() -> { // Thread for writing data to OUT
try {
for (int i = 0; i < expectedPrintedLinesCount; i++) {
out.println("Hello! "+i);
actualPrintedLinesCount.incrementAndGet();
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
Thread t2 = new Thread(() -> { // Thread for reading data from IN
BufferedReader reader = new BufferedReader(new InputStreamReader(pipedInput));
StringBuilder builder = new StringBuilder();
try {
while(true){
builder.append(reader.readLine());
}
} catch (IOException e) {
//e.printStackTrace(); // ignore
System.out.println(builder);
}
});
t1.start();
//t2.start(); // If we aren't reading then PipedInputStream in Thread2, we only print 94 lines instead of 1000!?
for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
}
Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get()+1);
}
final class MyTeeOutputStream extends OutputStream {
private final OutputStream out;
private final OutputStream tee;
public MyTeeOutputStream(OutputStream out, OutputStream tee) {
if (out == null)
throw new NullPointerException();
else if (tee == null)
throw new NullPointerException();
this.out = out;
this.tee = tee;
}
@Override
public void write(int b) throws IOException {
out.write(b);
tee.write(b);
}
@Override
public void write(byte[] b) throws IOException {
out.write(b);
tee.write(b);
}
@Override
public void write(byte[] b, int off, int len) throws IOException {
out.write(b, off, len);
tee.write(b, off, len);
}
@Override
public void flush() throws IOException {
out.flush();
tee.flush();
}
@Override
public void close() throws IOException {
try {
out.close();
} finally {
tee.close();
}
}
}
As you can see, it fails to print all lines (1000) and stops at (94):
Hello! 0
Hello! 1
Hello! 2
Hello! 3
Hello! 4
Hello! 5
Hello! 6
Hello! 7
Hello! 8
Hello! 9
Hello! 10
Hello! 11
Hello! 12
Hello! 13
Hello! 14
Hello! 15
Hello! 16
Hello! 17
Hello! 18
Hello! 19
Hello! 20
Hello! 21
Hello! 22
Hello! 23
Hello! 24
Hello! 25
Hello! 26
Hello! 27
Hello! 28
Hello! 29
Hello! 30
Hello! 31
Hello! 32
Hello! 33
Hello! 34
Hello! 35
Hello! 36
Hello! 37
Hello! 38
Hello! 39
Hello! 40
Hello! 41
Hello! 42
Hello! 43
Hello! 44
Hello! 45
Hello! 46
Hello! 47
Hello! 48
Hello! 49
Hello! 50
Hello! 51
Hello! 52
Hello! 53
Hello! 54
Hello! 55
Hello! 56
Hello! 57
Hello! 58
Hello! 59
Hello! 60
Hello! 61
Hello! 62
Hello! 63
Hello! 64
Hello! 65
Hello! 66
Hello! 67
Hello! 68
Hello! 69
Hello! 70
Hello! 71
Hello! 72
Hello! 73
Hello! 74
Hello! 75
Hello! 76
Hello! 77
Hello! 78
Hello! 79
Hello! 80
Hello! 81
Hello! 82
Hello! 83
Hello! 84
Hello! 85
Hello! 86
Hello! 87
Hello! 88
Hello! 89
Hello! 90
Hello! 91
Hello! 92
Hello! 93
Hello! 94
org.opentest4j.AssertionFailedError:
Expected :1000
Actual :94
<Click to see difference>
Why am I doing this? I want to duplicate the System.out and 'read' (through the PipedInputStream) from it, then send that data to my website's http console.
The solution that suited me the best:
public class NonBlockingPipedInputStream extends PipedInputStream {
public interface WriteLineEvent<L>{
void executeOnEvent(L l);
}
/**
* Add actions to this list, which get run after a line has been written.
* Contains the line as parameter.
*/
public List<WriteLineEvent<String>> actionsOnWriteLineEvent = new CopyOnWriteArrayList<>();
/**
*
* Starts a new {@link Thread}, that reads the {@link PipedInputStream}
* and fires an event every time a full line was written.
* To listen for those events, add the action that should be run to the {@link #actionsOnWriteLineEvent} list.
*/
public NonBlockingPipedInputStream() {
new Thread(()->{
try{
BufferedReader reader = new BufferedReader(new InputStreamReader(this));
String line;
while((line = reader.readLine()) != null){
String finalLine = line;
actionsOnWriteLineEvent.forEach(action -> action.executeOnEvent(finalLine));
}
} catch (Exception e) {
e.printStackTrace();
}
}).start();
}
}
@Test
void nonBlockingPipedInputStreamExample() throws IOException, InterruptedException {
NonBlockingPipedInputStream pipedInput = new NonBlockingPipedInputStream();
OutputStream pipedOutput = new PipedOutputStream(pipedInput);
MyTeeOutputStream teeOutput = new MyTeeOutputStream(System.out, pipedOutput);
PrintStream out = new PrintStream(teeOutput);
final int expectedPrintedLinesCount = 1000;
AtomicInteger actualPrintedLinesCount = new AtomicInteger();
AtomicInteger actualReadLinesCount = new AtomicInteger();
Thread t1 = new Thread(() -> { // Thread for writing data to OUT
try {
for (int i = 1; i <= expectedPrintedLinesCount; i++) {
out.println("Hello! "+i);
actualPrintedLinesCount.incrementAndGet();
Thread.sleep(10);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});
// NonBlockingPipedInputStream starts a new thread when it is initialised.
// That thread reads the PipedInputStream and fires an event every time a full line was written.
pipedInput.actionsOnWriteLineEvent.add(line -> {
actualReadLinesCount.getAndIncrement();
});
t1.start();
for (int i = 0; i < 30; i++) { // 30 seconds max waiting for threads to complete
Thread.sleep(1000); // Do this because Junit doesn't support multithreaded stuff
}
Assertions.assertEquals(expectedPrintedLinesCount, actualPrintedLinesCount.get());
Assertions.assertEquals(expectedPrintedLinesCount, actualReadLinesCount.get());
}