Context: Loading and streaming Excel file as Flux. Processing Flux records and inserting them into database using R2DBC.
implementation("org.apache.poi:poi:4.1.2") - apache lib which has excel domain of workbooks/sheets/rows/cells
implementation("org.apache.poi:poi-ooxml:4.1.2")
implementation("com.monitorjbl:xlsx-streamer:2.1.0") - streamer wrapper which avoids loading entire excel file into the memory and parses chunks of a file
Converting file to Flux (extract header as first row and then glue it to every subsequent event/row coming from Flux):
override fun extract(inputStream: InputStream): Flux<Map<String, String>> {
val workbook: Workbook = StreamingReader.builder()
.rowCacheSize(10) // number of rows to keep in memory (defaults to 10)
.bufferSize(4096) // buffer size to use when reading InputStream to file (defaults to 1024)
.open(inputStream)
val xsltRows = Flux.fromIterable(workbook).flatMap { Flux.fromIterable(it) }
return xsltRows.next()
.map { rowToHeader(it) }
.flatMapMany { header -> xsltRows.map { combineToMap(header, it) } }
}
Subsequently I process this Flux into domain models for Spring R2DBC repositories and insert the entries into MySQL database.
The problem: I am missing a single Excel row (out of roughly 2 k). It is always the same row, but nothing special about data in this row.
Recall combineToMap
method that associates name from the header with every cell value, it also prints the row logical sequence number as in the file:
private fun combineToMap(header: Map<Int, String>, row: Row): Map<String, String> {
val mapRow: MutableMap<String, String> = mutableMapOf()
val logicalRowNum = row.rowNum+1
logger.info("Processing row: $logicalRowNum")
for (cell in row) {
if (cell.columnIndex >= header.keys.size) {
continue
}
val headerName = header[cell.columnIndex].takeUnless { it.isNullOrBlank() }
?: throw IllegalStateException("No header name for ${cell.columnIndex} column index for header " +
"$header and cell ${cell.stringCellValue} row index ${row.rowNum}")
mapRow[headerName] = cell.stringCellValue
mapRow["row"] = logicalRowNum.toString()
}
return mapRow
}
When I added the log line I noticed the following:
2020-11-22 15:49:56.684 INFO 20034 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 255
2020-11-22 15:49:56.687 INFO 20034 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 256
2020-11-22 15:49:56.689 INFO 20034 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 257
2020-11-22 15:50:02.458 INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor : Processing row: 259
2020-11-22 15:50:02.534 INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor : Processing row: 260
2020-11-22 15:50:02.608 INFO 20034 --- [tor-tcp-epoll-1] c.b.XSLXFileRecordsExtractor : Processing row: 261
Note that the scheduler is switched after 257 row, during the switch I lost 258 row. The pool:
tor-tcp-epoll-1
is understand is Spring R2DBC internal pool.
In my downstream, if instead of doing repository.save
I return static Mono.just(entity)
I get my 258 row back, notice the scheduler wasn't switched as well.
2020-11-22 16:01:14.000 INFO 21959 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 257
2020-11-22 16:01:14.006 INFO 21959 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 258
2020-11-22 16:01:14.009 INFO 21959 --- [ Test worker] c.b.XSLXFileRecordsExtractor : Processing row: 259
Is this a problem with the Excel libraries or my implementation? Why am I losing the record when TP is switched?
P.S. I am not specifying any schedulers or parallel blocks or anything to mess with threads anywhere in my flow apart from calling Spring R2DBC repositories.
I will attempt to rewrite using implementation("org.apache.commons:commons-csv:1.8")
and observe if same happens, but if anyone can spot anything obvious or experienced similar elsewhere, I would be infinitely grateful.
In the end I switched to commons-csv
which does not have the same problem:
2020-11-22 18:34:03.719 INFO 15733 --- [ Test worker] c.b.CSVFileRecordsExtractor : Processing row: 256
2020-11-22 18:34:09.062 INFO 15733 --- [tor-tcp-epoll-1] c.b.CSVFileRecordsExtractor : Processing row: 257
2020-11-22 18:34:09.088 INFO 15733 --- [tor-tcp-epoll-1] c.b.CSVFileRecordsExtractor : Processing row: 258
For original approach, I tried to publish all on one scheduler for xlsx-streamer
and poi
, even forced Spring R2DBC to publish on the same single thread scheduler and it still skipped the record.
What I could observe is that when database callbacks start to come regardless of which thread pool, this is exact moment when the record gets lost , seems that iterator context gets broken.
I mean the xslx library never claimed to be reactive so no expectations there.