In this code segment, I create a pipe and attach a Scanner on one end and a PrintStream on the other in order to communicate between several consumer and producer threads. Then I create and start three threads:
The first thread is a consumer thread. It checks the Scanner to see if a line of text is available to consume, consumes it, prints to stdout and then sleeps for a few milliseconds and then repeats. If there's nothing to consume, then it prints a message about that, sleeps and repeats.
The second thread in this code segment does nothing. More on that below.
2.5 There's a 3 second delay before the 3rd thread launches.
public static void main(String[] args) throws IOException
{
PipedInputStream pis = new PipedInputStream();
PipedOutputStream pos = new PipedOutputStream(pis);
Scanner scan = new Scanner(pis);
PrintStream ps = new PrintStream(pos);
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
if (scan.hasNextLine())
{
System.out.println("pulled: " + scan.nextLine());
} else
{
if (x % 100 == 0)
{
System.out.println("no data to pull");
}
}
try
{
sleep(10);
} catch (InterruptedException ex) { }
}
}
}.start();
new Thread()
{
public void run()
{
}
}.start();
try
{
sleep(3000);
} catch (InterruptedException ex) { }
new Thread()
{
public void run()
{
int x = 0;
while (true)
{
x++;
ps.println("hello: " + x);
try
{
sleep(1000);
} catch (InterruptedException ex) {}
}
}
}.start();
}
The output (as I expect):
pulled: hello: 1
pulled: hello: 2
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
Also note that the scan.nextLine() is blocking (since there are no messages indicating that no data was available... data is always "available" even if it's "on the way").
Now, if I replace the body of the 2nd thread with some code that produces some text for the first thread to consume:
new Thread()
{
public void run()
{
ps.println( "Interfere");
}
}.start();
Then I start to trigger the no data clause of the first thread:
pulled: Interfere
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
no data to pull
So if the second thread starts using the PrintStream object to produce messages, something goes wrong in the pipe and the consumer thread stops being able to find messages on the other end.
And now things get weirder. If I prevent the second thread from finishing, say by throwing it into a really long loop, then the pipe doesn't get gummed up:
new Thread()
{
public void run()
{
ps.println("interfere");
for ( long i = 0; i < 10000000000L; i++ );
System.out.println("done interfering" );
}
}.start();
Output:
pulled: interfere
pulled: hello: 1
pulled: hello: 2
done interfering
pulled: hello: 3
pulled: hello: 4
pulled: hello: 5
pulled: hello: 6
So I think that if the second thread terminates before the third thread starts producing, then the first thread won't ever get any messages from the third thread. However, if the the second thread manages to hang on until the third thread starts producing then everything goes through as expected.
What's going on here? Is the second thread closing the pipe/stream (or performing some other action on the pipe/stream) when it terminates? If so, why? And why does it not seem to close (or perform whatever action on) the pipe/stream if the third thread starts using the pipe/stream before the second thread terminates? Is there a way to make this code "work" as expected (that is so that the first thread consumes whatever is produced by either/both producer threads) when the second thread produces messages and terminates before the third thread starts?
Background: This is a condensing to essential components of a system in which several clients will consume messages from a single producer thread. However, the producer thread can't be started until all client threads have signaled that they are ready. For each client thread, there's another thread which queries if they are ready. Once all the client threads have signaled that they are ready, the producer thread is launched. I'm trying to have the threads communicate via streams so that later I can distribute them over several computers and set up the pipes using sockets with a minimal amount of change to the underlying code. Feel free to suggest an alternate solution strategy here as well, but I'd like to understand why the solution above doesn't work.
Your Scanner
instance is hitting an exception in its readInput
method which sets its sourceClosed
field to true
and prevents you from reading. If you're interested in where it actually happens:
private void readInput() {
...
int n = 0;
try {
n = source.read(buf);
} catch (IOException ioe) {
lastException = ioe;
n = -1;
}
if (n == -1) {
sourceClosed = true;
needInput = false;
}
...
}
This behavior isn't wrong, you need to fix the underlying exception. The issue here is a java.io.IOException: Write end dead
. There are a bunch of answers and blog posts that can help you address this better than I can. Also take a look at the related "Read end dead" issue. Check out: