I am currently starting to work with micronaut and kotlin. I have JPA query that results in about 1 million results. These results i want to stream from this one micronaut service to another.
My query returns a allQuery.resultStream
of the type java.util.stream
.
Controller of sending service:
@Get("/test{value1,value2,value3}")
fun getTestObjects(
value1: String,
value2: String,
value3: String
): Stream<TestObject> {
val entries = testRepository.findAllWhere(value1, value2, value3)
return entries
}
Client of receiving service:
@Get("/data/test{value1,value2,value3}")
override fun getTestObjects(alue1: String,
value2: String,
value3: String) : Stream<TestObject>
And JPA query looks like:
val cb = entityManager.criteriaBuilder
val cq = cb.createQuery(TestObject::class.java)
val rootEntry = cq.from(TestObject::class.java)
val predicates = mutableListOf<Predicate>()
predicates.add(cb.like(rootEntry.get<String>("value1"), value1))
predicates.add(cb.equal(rootEntry.get<String>("value2"), value2))
predicates.add(cb.equal(rootEntry.get<Int>("value3"), value3))
val cqAllWhere = cq.select(rootEntry)
.where(cb.or(*predicates.toTypedArray()))
val allQuery = entityManager.createQuery(cqAllWhere)
val entries = allQuery.resultStream
return entries
My expected output would be a kind of Flowable with pushback regulation and without the sending service to first get all objects into memory because that much memory will not be available.
Basically you just need to create a Flowable
and emit the items as they become available.
return Flowable.create(emitter -> {
//loop through result set
//for each item
emitter.onNext(item);
//If you encounter an error
emitter.onError(...);
//When you're done
emitter.onComplete();
})