Search code examples
javaconcurrencyinputstreamstringbuilderoutputstream

Convert InputStream to StringBuilder and StringBuilder to OutputStream


I have a specific use-case where i need to be able to convert data i recieve on an:

  • InputStream and store in a StringBuilder
  • StringBuilder and write to an OutputStream

I dont want to convert to and from Strings since i'll already have the StringBuilder (but if it's the same as having a String with the full contents in memory either way, then i can just change them to string). What i dont understand is when I created the different transfer types between them in the classes below to test, i somehow deadlock even though i've made each of the transfer classes be separate threads (as Callable<void>) so i'm confused why it's happening and i even tried replacing with Runnbale/Thread and doing start(), join() with still the same problem; I guess it's some coding logic mistake that i can't seem to see.

import java.io.*;
import java.util.concurrent.*;

public class Test {
    
    private static String create() {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
            sb.append(i+".123401234").append(System.lineSeparator());
        return sb.toString();
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes());    // data
        PipedInputStream in = new PipedInputStream();                               // intermediary pipe
        PipedOutputStream out = new PipedOutputStream(in);                          // connect OutputStream to pipe InputStream 
        StringBuilder final_out = new StringBuilder();
        NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
        NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out);   // InputStream to StringBuilder 
        Future<Void> f1 = executor.submit(ntnt);
        Future<Void> f2 = executor.submit(ntct);
        f1.get(); f2.get();     // deadlock here ? 
        System.out.println(final_out);
        System.out.println("Done");
    }

    public static class StreamTransfer implements Callable<Void> {
        public static final int BUFFER_SIZE = 1024;
        private InputStream in;
        private OutputStream out;
        
        public StreamTransfer(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
        
        @Override
        public Void call() throws Exception {
            BufferedInputStream bis = new BufferedInputStream(in);
            BufferedOutputStream bos = new BufferedOutputStream(out);
            byte[] buffer = new byte[BUFFER_SIZE];
            while (bis.read(buffer) != -1) 
                bos.write(buffer, 0, BUFFER_SIZE);
            bos.flush();
            return null;
        }
    }

    public static class NativeToCustomTransfer implements Callable<Void> {
        private InputStream in;
        private StringBuilder sb;
        
        public NativeToCustomTransfer(InputStream in, StringBuilder out) {
            this.in = in;
            sb = out;
        }
        
        @Override
        public Void call() throws Exception {
            BufferedInputStream bis = new BufferedInputStream(in);
            byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
            while (bis.read(buffer) != -1) 
                sb.append(new String(buffer));
            return null;
        }
    }
    
    public static class CustomToNativeTransfer extends StreamTransfer { 
        public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
            super(new ByteArrayInputStream(in.toString().getBytes()), out);
        }
    }
    
    public static class NativeToNativeTransfer extends StreamTransfer {
        public NativeToNativeTransfer(InputStream in, OutputStream out) {
            super(in, out);
        }
    }
    
    public static class CustomToCustomTransfer {
        public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
            in.chars().forEach(out::append);
        } 
    }
}

Edit:

After fixing all the mistakes pointed out by DuncG and Holger, the code actually works as planned and doesn't deadlock anymore:

package test;

import java.io.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

public class Test {
    
    private static String create() {
        StringBuilder sb = new StringBuilder();
        for (int i=0; i<100; i++)
            sb.append(i+".123401234").append(System.lineSeparator());
        return sb.toString();
    }
    
    public static void main(String[] args) throws Exception {
        ExecutorService executor = Executors.newFixedThreadPool(10);
        ByteArrayInputStream initial_in = new ByteArrayInputStream(create().getBytes(StandardCharsets.UTF_8));  // data
        PipedInputStream in = new PipedInputStream();                               // intermediary pipe
        PipedOutputStream out = new PipedOutputStream(in);                          // connect OutputStream to pipe InputStream 
        StringBuilder final_out = new StringBuilder();
        NativeToNativeTransfer ntnt = new NativeToNativeTransfer(initial_in, out); // InputStream to OutputStream
        NativeToCustomTransfer ntct = new NativeToCustomTransfer(in, final_out);   // InputStream to StringBuilder 
        Future<Void> f1 = executor.submit(ntnt);
        Future<Void> f2 = executor.submit(ntct);
        f1.get(); f2.get();     // no more deadlock
        System.out.println(final_out);
        System.out.println("Done");
    }
    
    public static class NativeToCustomTransfer implements Callable<Void> {
        private InputStream in;
        private StringBuilder sb;
        
        public NativeToCustomTransfer(InputStream in, StringBuilder out) {
            this.in = in;
            sb = out;
        }
        
        @Override
        public Void call() throws Exception {
            byte[] buffer = new byte[StreamTransfer.BUFFER_SIZE];
            int read = 0;
            while ((read = in.read(buffer)) != -1) 
                sb.append(new String(buffer, 0, read, StandardCharsets.UTF_8));
            in.close();
            return null;
        }
    }
    
    public static class CustomToNativeTransfer extends StreamTransfer { 
        public CustomToNativeTransfer(StringBuilder in, OutputStream out) {
            super(new ByteArrayInputStream(in.toString().getBytes()), out);
        }
    }
    
    public static class NativeToNativeTransfer extends StreamTransfer {
        public NativeToNativeTransfer(InputStream in, OutputStream out) {
            super(in, out);
        }
    }
    
    public static class CustomToCustomTransfer {
        public CustomToCustomTransfer(StringBuilder in, StringBuilder out) {
            in.chars().forEach(out::append);
        } 
    }
    
    public static class StreamTransfer implements Callable<Void> {
        public static final int BUFFER_SIZE = 1024;
        private InputStream in;
        private OutputStream out;
        
        public StreamTransfer(InputStream in, OutputStream out) {
            this.in = in;
            this.out = out;
        }
        
        @Override
        public Void call() throws Exception {
            byte[] buffer = new byte[BUFFER_SIZE];
            int read = 0;
            while ((read = in.read(buffer)) != -1) 
                out.write(buffer, 0, read);
            in.close();
            out.close();
            return null;
        }
    }
}

Solution

  • 1) None of your streams are closed when done, that means the reads from pipe will be waiting for more input. Change flush to close. For cleaner code use try with resources to guarantee that each call() closes the in/out streams. In later JRE (9+ maybe?) you only need to reference a variable in try-with-resources block to make it autoclose:

    try(var try_will_autoclose_this_resource = out)
    {
        while ((len = in.read(buffer)) != -1)
            out.write(buffer, 0, len);
    }
    

    2) your logic for in.read(buff) should store value into local variable so it copies the right length

    while ( (len = bis.read(buffer)) != -1) bos.write(buffer, 0, len); 
    

    3) You use new byte[BUFFER_SIZE] so I/O is done in chunks, this means all your BufferedInput.Output streams are unnecessary

    4) Using new String(buffer) may not work if your default character set is multi-byte, as it could incomplete set of bytes for final character.