Search code examples
javamultithreadinginputstreamoutputstreamproducer-consumer

alternately PipedOutputStream PipedInputStream Numbers example


I want to write similar to Producer Consumer Problem example using PipedOutputStream and PipedInputStream Java classes.

NOTE: This is small example in order to apply its principle.

public static void main(String[] args) {
  try {
    final PipedOutputStream pipedSrc = new PipedOutputStream();
    final PipedInputStream pipedSnk = new PipedInputStream();
    pipedSnk.connect(pipedSrc);
    int putNumbers = ThreadLocalRandom.current().nextInt(5, 10);

    Runnable runnableGet = () -> {
      int getNumbers = getNumbers(pipedSnk);
      System.out.println("Read " + getNumbers + " numbers from the PipedInputStream");
    };
    Thread threadGet = new Thread(runnableGet, "threadGet");
    threadGet.start();

    Runnable runnablePut = () -> {
      System.out.println("Write: " + putNumbers + " numbers to the PipedOutputStream");
      putNumbers(pipedSrc, putNumbers);
    };
    Thread threadPut = new Thread(runnablePut, "threadPut");
    threadPut.start();

  } catch (IOException e) { /*Ignored Now*/}
}

Now the Methods:

public static void putNumbers(PipedOutputStream pipedSrc, int numbers) {
  try {
      for (int i = 0; i < numbers; i++) {
        Integer number = ThreadLocalRandom.current().nextInt(0, 100);
        System.out.println("Put number: " + number);
        pipedSrc.write(number.toString().getBytes());
        try {
          int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);//Simulate some duration
          Thread.sleep(timeSleeping);
        } catch (InterruptedException ex) {
          System.out.println(ex.toString());
        }
      }
      pipedSrc.flush();
      pipedSrc.close();
  } catch (IOException e) { /*Ignored Now*/}
}

public static int getNumbers(PipedInputStream pipedSnk) {
  int numbers = 0;
  try {
      byte[] readBytes = new byte[8];
      int qtyBytes;
      while ((qtyBytes = pipedSnk.read(readBytes)) != -1) {
        numbers++;
        byte[] bytesNumber = new byte[qtyBytes];
        System.arraycopy(readBytes, 0, bytesNumber, 0, qtyBytes);
        System.out.println("Get Number: " + new String(bytesNumber));
      }
  } catch (IOException e) { /*Ignored Now*/}
  return numbers;
}

OUTPUT 1 (is mixed):

$ java -cp /.../Java/classes PipedNumbers
Write: 7 numbers to the PipedOutputStream
Put number: 36
Put number: 92
Get Number: 3692
Put number: 22
Put number: 55
Get Number: 2255
Put number: 64
Get Number: 64
Put number: 2
Get Number: 2
Put number: 82
Get Number: 82
Read 5 numbers from the PipedInputStream
$ 

Changing the Source Code.

public static void putNumbers(PipedOutputStream pipedSrc, int numbers) {
  try {
    DataOutputStream dos = new DataOutputStream(new BufferedOutputStream(pipedSrc));
    for (int i = 0; i < numbers; i++) {
      Integer number = ThreadLocalRandom.current().nextInt(0, 100);
      System.out.println("Put number: " + number);
      dos.writeInt(number);
      try {
        int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);//Simulate some duration
        Thread.sleep(timeSleeping);
      } catch (InterruptedException ex) {
        System.out.println(ex.toString());
      }
    }
    dos.flush();
    dos.close();
  } catch (IOException e) { /*Ignored Now*/}
}

public static int getNumbers(PipedInputStream pipedSnk) {
  int numbers = 0;
  try {
    DataInputStream dis = new DataInputStream(new BufferedInputStream(pipedSnk));
    Boolean end = false;
    while (!end) {
      try {
        Integer number = dis.readInt();
        System.out.println("Get Number: " + number);
        numbers++;
      } catch (EOFException e) {
        end = true;
      }
    }
  } catch (IOException e) { /*Ignored Now*/}
  return numbers;
}

OUTPUT 2 (is not interlaced/alternately):

$ java -cp /.../Java/classes PipedNumbers
Write: 7 numbers to the PipedOutputStream
Put number: 17
Put number: 70
Put number: 88
Put number: 12
Put number: 60
Put number: 19
Put number: 41
Get Number: 17
Get Number: 70
Get Number: 88
Get Number: 12
Get Number: 60
Get Number: 19
Get Number: 41
Read 7 numbers from the PipedInputStream
$ 

How to interlace and separate the generated numbers using PipedInputStream and PipedOutputStream?


Solution

  • Just call dos.flush() after each iteration instead of once in the end.

    This happens because your DataOutputStream has BufferedOutputStream so it buffers before writing to output stream your PipedOutputStream. If you want to always immediately write without buffering call dos.flush() after each dos.writeInt() statement or don't use buffered stream.

    So my putNumbers looks like this:

    public static void putNumbers(PipedOutputStream pipedSrc, int numbers) {
        try {
            DataOutputStream dos = new DataOutputStream(pipedSrc);
            for (int i = 0; i < numbers; i++) {
                Integer number = ThreadLocalRandom.current().nextInt(0, 100);
                System.out.println("Put number: " + number);
                dos.writeInt(number);
                try {
                    int timeSleeping = ThreadLocalRandom.current().nextInt(500, 1000);//Simulate some duration
                    Thread.sleep(timeSleeping);
                } catch (InterruptedException ex) {
                    System.out.println(ex.toString());
                }
                dos.flush(); //<--changed
            }
            dos.close();
        } catch (IOException e) { /*Ignored Now*/}
    }