Search code examples
javastreampipe

Why is the piped input stream closed?


I run a small Java 11 http server which processes multipart stream requests containing zipped data. The data is read and unzipped with piped streams. Sometimes IOException: Pipe closed occur, however the unzipped files are ok. I am not able to spot the flaw in my code. Every help is appreciated.

The exception stacktrace

[java] 27016 [HTTP-Dispatcher] ERROR c.example.AhFormatter - e9137279-7c15-41a3-9783-eb029a975767 - Error while processing finish request 
[java] java.io.IOException: Pipe closed
[java]  at java.base/java.io.PipedInputStream.checkStateForReceive(PipedInputStream.java:260)
[java]  at java.base/java.io.PipedInputStream.receive(PipedInputStream.java:226)
[java]  at java.base/java.io.PipedOutputStream.write(PipedOutputStream.java:149)
[java]  at java.base/java.io.InputStream.transferTo(InputStream.java:705)
[java]  at com.example.upload.MultipartStream.readBodyData(MultipartStream.java:469)
[java]  at com.example.AhFormatter.processPdfData(AhFormatter.java:291)

Following method processes the multipart stream:

private void processPdfData(MultipartStream multipartStream) throws IOException, InterruptedException {
    String headers = multipartStream.readHeaders();
    if (!headers.contains("name=\"pdf\"")) {
        logger.warn("{} - Header with name pdf not found", taskId);
        throw new IllegalStateException();
    }

    PipedInputStream pipedInputStream = new PipedInputStream();
    PipedOutputStream pipedOutputStream = new PipedOutputStream();
    pipedInputStream.connect(pipedOutputStream);

    Thread unzipThread = new Thread(() -> {
        try {
            Zips.unzip(pipedInputStream, targetPath, true);
        } catch (IOException e) {
            logger.error(format("%s - Error during unzip", taskId), e);
            throw new ZipException(e);
        }
    }, "PipedZipStream");
    
    unzipThread.setName("UnzipThread");
    unzipThread.setUncaughtExceptionHandler((thread, throwable) -> {
        logger.error(format("%s - Uncaught exception while unzipping", taskId), throwable);
        server.stop(0);
        stopServer.set(true);
    });
    
    unzipThread.start();
    multipartStream.readBodyData(pipedOutputStream);
    unzipThread.join();

    pipedOutputStream.close();
    pipedInputStream.close();
}

And the unzip method:

    public static void unzip(InputStream zip, Path targetDirectory, boolean close) throws IOException {
        ZipInputStream zipStream = new ZipInputStream(zip);
        try {
            ZipEntry zipEntry;
            while ((zipEntry = zipStream.getNextEntry()) != null) {
                File targetFile = guardAgainstZipSlip(targetDirectory, zipEntry);

                if (zipEntry.isDirectory()) {
                    if (!targetFile.isDirectory() && !targetFile.mkdirs()) {
                        throw new IOException("Failed to create directory " + targetFile);
                    }
                    continue;
                }

                // fix for Windows-created archives
                File parent = targetFile.getParentFile();
                if (!parent.isDirectory() && !parent.mkdirs()) {
                    throw new IOException("Failed to create directory " + parent);
                }

                copy(zipStream, targetFile.toPath(), StandardCopyOption.REPLACE_EXISTING);
            }
        } finally {
            if (close) {
                zipStream.close();
            }
        }
    }

    private static File guardAgainstZipSlip(Path destinationDir, ZipEntry zipEntry) throws IOException {
        File targetFile = new File(destinationDir.toFile(), zipEntry.getName());
        if (!targetFile.getCanonicalPath().startsWith(destinationDir.toFile().getCanonicalPath() + File.separator)) {
            throw new IOException("Entry is outside of the target dir: " + zipEntry.getName());
        }
        return targetFile;
    }

The multipartstream class is from Apache. The class itself is included because I didn't want to pull in a whole library because of one class.

My understandig is that the unzip thread stays open until the ZipStream has no entries left. Then it will shutdown. This happens when the multipartstream wrote the last bytes of the zip into the piped output stream.

When I don't close the streams explicitly I don't see this exception, at least all my tests so far are good. But what should be wrong with closing the exceptions?


Solution

  • The comments from g00se kept me going. The readBodyData method touches two streams, the stream from the http request and the PipedOutputStream. Both streams are not closed by this method, internally it uses public long transferTo(OutputStream out) throws IOException which doesn't close either stream.
    After moving this part into another thread, i.e. out of the controlling thread which created the pipes and started the data shuffeling, my errors disappeared.

    The solution consists now of three independent threads. The first one is the controlling thread which just creates the pipe now. The second thread handles the unzipping like before. The third thread is actually new and handles the copy of the data. The streams are closed inside of the threads. Please have a look at the following code for clarification.

    Controlling thread

    private void processPdfData(MultipartStream multipartStream) throws IOException, InterruptedException {
        String headers = multipartStream.readHeaders();
        if (!headers.contains("name=\"pdf\"")) {
            logger.warn("{} - Header with name pdf not found", taskId);
            throw new IllegalStateException();
        }
    
        final PipedInputStream pipedInputStream = new PipedInputStream();
        final PipedOutputStream pipedOutputStream = new PipedOutputStream();
        pipedInputStream.connect(pipedOutputStream);
    
        UnzipThread unzipThread = new UnzipThread(pipedInputStream, targetPath);
        unzipThread.setUncaughtExceptionHandler((thread, throwable) -> {
            logger.error(format("%s - Uncaught exception while unzipping", taskId), throwable);
            server.stop(0);
            stopServer.set(true);
        });
    
        ZipReaderThread zipReaderThread = new ZipReaderThread(multipartStream, pipedOutputStream);
        zipReaderThread.setUncaughtExceptionHandler((thread, throwable) -> {
            logger.error(format("%s - Uncaught exception while transfering bytes to unzip thread", taskId), throwable);
            server.stop(0);
            stopServer.set(true);
        });
    
        unzipThread.start();
        zipReaderThread.start();
        unzipThread.join();
    }
    

    #Unzip thread

    package com.example.dita;
    
    import com.example.Zips;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.nio.file.Path;
    
    public class UnzipThread extends Thread {
    
        private static final Logger logger = LoggerFactory.getLogger(UnzipThread.class);
    
        private final InputStream stream;
    
        private final Path targetDirectory;
    
        public UnzipThread(InputStream stream, Path targetDirectory) {
            this.stream = stream;
            this.targetDirectory = targetDirectory;
            this.setName("UnzipThread");
        }
    
        @Override
        public void run() {
            try {
                Zips.unzip(stream, targetDirectory, true);
            } catch (IOException e) {
                logger.error("Error during unzip", e);
                throw new ZipException(e);
            }
        }
    
    }
    

    #Data copy thread

    package com.example.dita;
    
    import com.example.dita.upload.MultipartStream;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    import java.io.IOException;
    import java.io.OutputStream;
    
    public class ZipReaderThread extends Thread {
    
        private static final Logger logger = LoggerFactory.getLogger(ZipReaderThread.class);
    
        private final MultipartStream multipartStream;
        private final OutputStream stream;
    
        public ZipReaderThread(MultipartStream multipartStream, OutputStream stream) {
            this.multipartStream = multipartStream;
            this.stream = stream;
            this.setName("ZipReaderThread");
        }
    
        @Override
        public void run() {
            try {
                multipartStream.readBodyData(stream);
                stream.close();
            } catch (IOException e) {
                logger.error("Error during shuffling bytes into unzipper", e);
                throw new ZipException(e);
            }
        }
    
    }
    

    Unfortunatly I cannot expain why it doesn't work with just two threads. From what I understood from the Java docs it should have worked. Anyway it seems to work now and I hope this answer helps others too.