Search code examples

How can a stream of data from JPA be streamed with micronaut?

I am currently starting to work with micronaut and kotlin. I have JPA query that results in about 1 million results. These results i want to stream from this one micronaut service to another.

My query returns a allQuery.resultStream of the type

Controller of sending service:

fun getTestObjects(
    value1: String,
    value2: String,
    value3: String
): Stream<TestObject> {
    val entries = testRepository.findAllWhere(value1, value2, value3)

    return entries

Client of receiving service:

override fun getTestObjects(alue1: String,
    value2: String,
    value3: String) : Stream<TestObject>

And JPA query looks like:

    val cb = entityManager.criteriaBuilder
    val cq = cb.createQuery(

    val rootEntry = cq.from(

    val predicates = mutableListOf<Predicate>()

    predicates.add(<String>("value1"), value1))
    predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
    predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))

    val cqAllWhere =

    val allQuery = entityManager.createQuery(cqAllWhere)
    val entries = allQuery.resultStream

    return entries

My expected output would be a kind of Flowable with pushback regulation and without the sending service to first get all objects into memory because that much memory will not be available.


  • Basically you just need to create a Flowable and emit the items as they become available.

    return Flowable.create(emitter -> {
      //loop through result set
      //for each item
      //If you encounter an error
      //When you're done