I am trying to connect Akka HTTP and some old Java library. That library has two methods - one accepting a callback function to receive string, and one signaling the end of data stream. The callback function receiving the data can be called multiple times. Consider this snippet:
oldJavaLib.receiveData((s:String) => {
println("received:" + s)
})
oldJavaLib.dataEnd(() => {
println("data transmission is over")
})
I want to stream data using Akka HTTP as its being received by the callback function. But I am not sure what is a best way to go about that.
I was thinking to create a stream and then use it directly in HTTP route like this:
def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(1000000) {
val id = Random.nextInt()
dummyUser(id.toString)
})
lazy val routes: Route =
pathPrefix("test") {
concat(
pathEnd {
concat(
get {
complete(fetchUsers())
}
)
}
)
}
fetchUsers()
function should return a stream which is getting data from some legacy java API. May-be there is a better approach.
I assume that you want to create an Akka stream that emits values from callback? You can use Source.queue. For the first callback it would be:
val queue = Source.queue[String](bufferSize = 1000)
.toMat(Sink.ignore)(Keep.left)
.run()
oldJavaLib.receiveData((s: String) => {
queue.offer(s) match {
case Enqueued => println("received:" + s)
case _ => println("failed to enqueue:" + s)
}
})
If you want to use the source in HTTP route you have to prematerialize it. Refering to my previous code it would look like this:
val (queue, source) = Source.queue[String](bufferSize = 1000).preMaterialize()
source
then can be used in any route.