Search code examples
javaexcelkotlininputstream

Why my implementation of `InputStream` not working with `StreamingReader` from `com.monitorjbl:xlsx-streamer:2.2.0`


Due to the lack of activity of the library's GitHub site, I decide to drop this question here hoping for any support.

The problem I am working on is to read an Excel file in a streaming fashion. Specially, the Excel file is stored in a SQLite database, as blob, after being split into multiple rows using a certain block size. For example a 3MB file is split into three rows, each contains 1MB of raw data. Rows are property ordered, so if I pipe out the blob column of each row by order to the file system, I can get a copy of the Excel file.

Since StreamingReader works with InputStream, I decide to implement an InputStream on top of those rows in the SQLite database, so that StreamingReader reads data directly from the db.

I first construct a Sequence<Byte> on top of the query result, sequencing the bytes from all the blob columns:

    fun blocksByteSequence(id: String): Sequence<Byte> {
        return sequence {
            val conn = source.connection
            val stmt = conn.createStatement()
            val r = stmt.executeQuery(findFileQuery(id))
            while (r.next()) yieldAll(r.getBytes(raw_data_column).asIterable())
            stmt.close()
            conn.close()
        }
    }

then It is fairly straightforward to turn Sequence<Byte> into InputStream:

class ByteSequenceInputStreamFactory(
    private val seq: Sequence<Byte>,
) {
    fun inputStreamProvider(): InputStream = object : InputStream() {
        private val iter = seq.iterator()
        override fun read(): Int {
            return if (iter.hasNext()) iter.next().toInt() else -1
        }
    }
}

error arises when I am trying to construct a StreamingReader using such InputStream:

val byteSeq = blocksByteSequence(id)
val ins = ByteSequenceInputStreamFactory(byteSeq).inputStreamProvider()
val reader = StreamingReader.builder().open(ins) // error

Error Message:

Could not open the specified zip entry source stream
org.apache.poi.openxml4j.exceptions.InvalidOperationException: Could not open the specified zip entry source stream
    at app//org.apache.poi.openxml4j.opc.ZipPackage.openZipEntrySourceStream(ZipPackage.java:212)
    at app//org.apache.poi.openxml4j.opc.ZipPackage.openZipEntrySourceStream(ZipPackage.java:194)
    ...
Caused by: java.util.zip.ZipException: invalid distances set
    at org.apache.commons.compress.archivers.zip.ZipArchiveInputStream.readFromInflater(ZipArchiveInputStream.java:586)
    at org.apache.commons.compress.archivers.zip.ZipArchiveInputStream.readDeflated(ZipArchiveInputStream.java:551)
   ...
Caused by: java.util.zip.DataFormatException: invalid distances set
    at java.base/java.util.zip.Inflater.inflateBytesBytes(Native Method)
    at java.base/java.util.zip.Inflater.inflate(Inflater.java:378)
   ...

However, if I dump all the bytes from SQLite into a Excel File at some path:

val byteSeq = manager.blocksByteSequence(id)
val out = java.nio.file.Path.of("./private/test.xlsx")
out.outputStream().use { o -> byteSeq.forEach {  o.write(it.toInt()) } }

and use the InputStream produced by that file, the error is gone.

val reader = StreamingReader.builder().open(out.inputStream())

Solution

  • I think I solved the problem.

    The trouble is here

    class ByteSequenceInputStreamFactory(
        private val seq: Sequence<Byte>,
    ) {
        fun inputStreamProvider(): InputStream = object : InputStream() {
            private val iter = seq.iterator()
            override fun read(): Int {
                return if (iter.hasNext()) iter.next().toInt() /* this is not OK */  else -1
            }
        }
    }
    

    The invocation of method Byte.intoInt() doesn't have the intended result expected by the InputStream.

    According to Java Doc, method InputStream.read()

    Reads the next byte of data from the input stream. The value byte is returned as an int in the range 0 to 255. If no byte is available because the end of the stream has been reached, the value -1 is returned. This method blocks until input data is available, the end of the stream is detected, or an exception is thrown.

    The tricky part is, the Int returned from Byte.toInt() is not an int in the range 0 to 255.

    In kotlin, a Byte:

    Represents a 8-bit signed integer. On the JVM, non-nullable values of this type are represented as values of the primitive type byte.

    and the Byte.toInt() method:

    Converts this Byte value to Int. The resulting Int value represents the same numerical value as this Byte. The least significant 8 bits of the resulting Int value are the same as the bits of this Byte value, whereas the most significant 24 bits are filled with the sign bit of this value.

    Simply invocation of Byte.toInt() will return the signed integer underlying this Byte. To have the 0-255 representation of it, I need to extract the lease 8 significant bits by doing this:

    val the_0_255_int = someByte.toInt().and(0xff) // extract the last 8 bits
    

    so the correct code for my problem looks like this:

    class ByteSequenceInputStreamFactory(
        private val seq: Sequence<Byte>,
    ) {
        fun inputStreamProvider(): InputStream = object : InputStream() {
            private val iter = seq.iterator()
            override fun read(): Int {
                return if (iter.hasNext()) iter.next().toInt().and(0xff)  else -1
            }
        }
    }