Search code examples
javainputstreamfilestream

Reading first N bytes of a file as an InputStream in Java?


For the life of me, I haven't been able to find a question that matches what I'm trying to do, so I'll explain what my use-case is here. If you know of a topic that already covers the answer to this, please feel free to direct me to that one. :)

I have a piece of code that uploads a file to Amazon S3 periodically (every 20 seconds). The file is a log file being written by another process, so this function is effectively a means of tailing the log so that someone can read its contents in semi-real-time without having to have direct access to the machine that the log resides on.

Up until recently, I've simply been using the S3 PutObject method (using a File as input) to do this upload. But in AWS SDK 1.9, this no longer works because the S3 client rejects the request if the content size actually uploaded is greater than the content-length that was promised at the start of the upload. This method reads the size of the file before it starts streaming the data, so given the nature of this application, the file is very likely to have increased in size between that point and the end of the stream. This means that I need to now ensure I only send N bytes of data regardless of how big the file is.

I don't have any need to interpret the bytes in the file in any way, so I'm not concerned about encoding. I can transfer it byte-for-byte. Basically, what I want is a simple method where I can read the file up to the Nth byte, then have it terminate the read even if there's more data in the file past that point. (In other words, insert EOF into the stream at a specific point.)

For example, if my file is 10000 bytes long when I start the upload, but grows to 12000 bytes during the upload, I want to stop uploading at 10000 bytes regardless of that size change. (On a subsequent upload, I would then upload the 12000 bytes or more.)

I haven't found a pre-made way to do this - the best I've found so far appears to be IOUtils.copyLarge(InputStream, OutputStream, offset, length), which can be told to copy a maximum of "length" bytes to the provided OutputStream. However, copyLarge is a blocking method, as is PutObject (which presumably calls a form of read() on its InputStream), so it seems that I couldn't get that to work at all.

I haven't found any methods or pre-built streams that can do this, so it's making me think I'd need to write my own implementation that directly monitors how many bytes have been read. That would probably then work like a BufferedInputStream where the number of bytes read per batch is the lesser of the buffer size or the remaining bytes to be read. (eg. with a buffer size of 3000 bytes, I'd do three batches at 3000 bytes each, followed by a batch with 1000 bytes + EOF.)

Does anyone know a better way to do this? Thanks.

EDIT Just to clarify, I'm already aware of a couple alternatives, neither of which are ideal:

(1) I could lock the file while uploading it. Doing this would cause loss of data or operational problems in the process that's writing the file.

(2) I could create a local copy of the file before uploading it. This could be very inefficient and take up a lot of unnecessary disk space (this file can grow into the several-gigabyte range, and the machine it's running on may be that short of disk space).

EDIT 2: My final solution, based on a suggestion from a coworker, looks like this:

private void uploadLogFile(final File logFile) {
    if (logFile.exists()) {
        long byteLength = logFile.length();
        try (
            FileInputStream fileStream = new FileInputStream(logFile);
            InputStream limitStream = ByteStreams.limit(fileStream, byteLength);
        ) {
            ObjectMetadata md = new ObjectMetadata();
            md.setContentLength(byteLength);
            // Set other metadata as appropriate.
            PutObjectRequest req = new PutObjectRequest(bucket, key, limitStream, md);
            s3Client.putObject(req);
        } // plus exception handling
    }
}

LimitInputStream was what my coworker suggested, apparently not aware that it had been deprecated. ByteStreams.limit is the current Guava replacement, and it does what I want. Thanks, everyone.


Solution

  • Complete answer rip & replace:

    It is relatively straightforward to wrap an InputStream such as to cap the number of bytes it will deliver before signaling end-of-data. FilterInputStream is targeted at this general kind of job, but since you have to override pretty much every method for this particular job, it just gets in the way.

    Here's a rough cut at a solution:

    import java.io.IOException;
    import java.io.InputStream;
    
    /**
     * An {@code InputStream} wrapper that provides up to a maximum number of
     * bytes from the underlying stream.  Does not support mark/reset, even
     * when the wrapped stream does, and does not perform any buffering.
     */
    public class BoundedInputStream extends InputStream {
    
        /** This stream's underlying @{code InputStream} */
        private final InputStream data;
    
        /** The maximum number of bytes still available from this stream */ 
        private long bytesRemaining;
    
        /**
         * Initializes a new {@code BoundedInputStream} with the specified
         * underlying stream and byte limit
         * @param data the @{code InputStream} serving as the source of this
         *        one's data
         * @param maxBytes the maximum number of bytes this stream will deliver
         *        before signaling end-of-data
         */
        public BoundedInputStream(InputStream data, long maxBytes) {
            this.data = data;
            bytesRemaining = Math.max(maxBytes, 0);
        }
    
        @Override
        public int available() throws IOException {
            return (int) Math.min(data.available(), bytesRemaining);
        }
    
        @Override
        public void close() throws IOException {
            data.close();
        }
    
        @Override
        public synchronized void mark(int limit) {
            // does nothing
        }
    
        @Override
        public boolean markSupported() {
            return false;
        }
    
        @Override
        public int read(byte[] buf, int off, int len) throws IOException {
            if (bytesRemaining > 0) {
                int nRead = data.read(
                        buf, off, (int) Math.min(len, bytesRemaining));
    
                bytesRemaining -= nRead;
    
                return nRead;
            } else {
                return -1;
            }
        }
    
        @Override
        public int read(byte[] buf) throws IOException {
            return this.read(buf, 0, buf.length);
        }
    
        @Override
        public synchronized void reset() throws IOException {
            throw new IOException("reset() not supported");
        }
    
        @Override
        public long skip(long n) throws IOException {
            long skipped = data.skip(Math.min(n, bytesRemaining));
    
            bytesRemaining -= skipped;
    
            return skipped;
        }
    
        @Override
        public int read() throws IOException {
            if (bytesRemaining > 0) {
                int c = data.read();
    
                if (c >= 0) {
                    bytesRemaining -= 1;
                }
    
                return c;
            } else {
                return -1;
            }
        }
    }