Search code examples
restjava-streamreactive-programmingrx-java2micronaut

How can a stream of data from JPA be streamed with micronaut?


I am currently starting to work with micronaut and kotlin. I have JPA query that results in about 1 million results. These results i want to stream from this one micronaut service to another.

My query returns a allQuery.resultStream of the type java.util.stream.

Controller of sending service:

@Get("/test{value1,value2,value3}")
fun getTestObjects(
    value1: String,
    value2: String,
    value3: String
): Stream<TestObject> {
    val entries = testRepository.findAllWhere(value1, value2, value3)

    return entries
}

Client of receiving service:

@Get("/data/test{value1,value2,value3}")
override fun getTestObjects(alue1: String,
    value2: String,
    value3: String) : Stream<TestObject>

And JPA query looks like:

    val cb = entityManager.criteriaBuilder
    val cq = cb.createQuery(TestObject::class.java)

    val rootEntry = cq.from(TestObject::class.java)

    val predicates = mutableListOf<Predicate>()

    predicates.add(cb.like(rootEntry.get<String>("value1"), value1))
    predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
    predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))

    val cqAllWhere = cq.select(rootEntry)
        .where(cb.or(*predicates.toTypedArray()))

    val allQuery = entityManager.createQuery(cqAllWhere)
    val entries = allQuery.resultStream

    return entries

My expected output would be a kind of Flowable with pushback regulation and without the sending service to first get all objects into memory because that much memory will not be available.


Solution

  • Basically you just need to create a Flowable and emit the items as they become available.

    return Flowable.create(emitter -> {
    
      //loop through result set
      //for each item
      emitter.onNext(item);
    
      //If you encounter an error
      emitter.onError(...);
    
      //When you're done
      emitter.onComplete();
    
    })