Search code examples
springproject-reactorreactor-nettyspring-data-r2dbcr2dbc

Losing event when reactor is switching scheduler


Context: Loading and streaming Excel file as Flux. Processing Flux records and inserting them into database using R2DBC.

implementation("org.apache.poi:poi:4.1.2") - apache lib which has excel domain of workbooks/sheets/rows/cells
implementation("org.apache.poi:poi-ooxml:4.1.2")
implementation("com.monitorjbl:xlsx-streamer:2.1.0") - streamer wrapper which avoids loading entire excel file into the memory and parses chunks of a file

Converting file to Flux (extract header as first row and then glue it to every subsequent event/row coming from Flux):

override fun extract(inputStream: InputStream): Flux<Map<String, String>> {
        val workbook: Workbook = StreamingReader.builder()
                .rowCacheSize(10) // number of rows to keep in memory (defaults to 10)
                .bufferSize(4096) // buffer size to use when reading InputStream to file (defaults to 1024)
                .open(inputStream)

        val xsltRows = Flux.fromIterable(workbook).flatMap { Flux.fromIterable(it) }

        return xsltRows.next()
                .map { rowToHeader(it) }
                .flatMapMany { header -> xsltRows.map { combineToMap(header, it) } }
    }

Subsequently I process this Flux into domain models for Spring R2DBC repositories and insert the entries into MySQL database.

The problem: I am missing a single Excel row (out of roughly 2 k). It is always the same row, but nothing special about data in this row.

Recall combineToMap method that associates name from the header with every cell value, it also prints the row logical sequence number as in the file:

private fun combineToMap(header: Map<Int, String>, row: Row): Map<String, String> {

        val mapRow: MutableMap<String, String> = mutableMapOf()
        val logicalRowNum = row.rowNum+1

        logger.info("Processing row: $logicalRowNum")

        for (cell in row) {
            if (cell.columnIndex >= header.keys.size) {
                continue
            }

            val headerName = header[cell.columnIndex].takeUnless { it.isNullOrBlank() }
                             ?: throw IllegalStateException("No header name for ${cell.columnIndex} column index for header " +
                                                            "$header and cell ${cell.stringCellValue} row index ${row.rowNum}")

            mapRow[headerName] = cell.stringCellValue
            mapRow["row"] = logicalRowNum.toString()

        }


        return mapRow
    }

When I added the log line I noticed the following:

2020-11-22 15:49:56.684  INFO 20034 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 255
2020-11-22 15:49:56.687  INFO 20034 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 256
2020-11-22 15:49:56.689  INFO 20034 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 257
2020-11-22 15:50:02.458  INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor    : Processing row: 259
2020-11-22 15:50:02.534  INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor    : Processing row: 260
2020-11-22 15:50:02.608  INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor    : Processing row: 261

Note that the scheduler is switched after 257 row, during the switch I lost 258 row. The pool:

tor-tcp-epoll-1

is understand is Spring R2DBC internal pool.

In my downstream, if instead of doing repository.save I return static Mono.just(entity) I get my 258 row back, notice the scheduler wasn't switched as well.

2020-11-22 16:01:14.000  INFO 21959 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 257
2020-11-22 16:01:14.006  INFO 21959 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 258
2020-11-22 16:01:14.009  INFO 21959 --- [    Test worker] c.b.XSLXFileRecordsExtractor    : Processing row: 259

Is this a problem with the Excel libraries or my implementation? Why am I losing the record when TP is switched?

P.S. I am not specifying any schedulers or parallel blocks or anything to mess with threads anywhere in my flow apart from calling Spring R2DBC repositories.

I will attempt to rewrite using implementation("org.apache.commons:commons-csv:1.8") and observe if same happens, but if anyone can spot anything obvious or experienced similar elsewhere, I would be infinitely grateful.


Solution

  • In the end I switched to commons-csv which does not have the same problem:

    2020-11-22 18:34:03.719  INFO 15733 --- [    Test worker] c.b.CSVFileRecordsExtractor     : Processing row: 256
    2020-11-22 18:34:09.062  INFO 15733 --- [tor-tcp-epoll-1] c.b.CSVFileRecordsExtractor     : Processing row: 257
    2020-11-22 18:34:09.088  INFO 15733 --- [tor-tcp-epoll-1] c.b.CSVFileRecordsExtractor     : Processing row: 258
    

    For original approach, I tried to publish all on one scheduler for xlsx-streamer and poi , even forced Spring R2DBC to publish on the same single thread scheduler and it still skipped the record.

    What I could observe is that when database callbacks start to come regardless of which thread pool, this is exact moment when the record gets lost , seems that iterator context gets broken.

    I mean the xslx library never claimed to be reactive so no expectations there.