Search code examples
scalastreamakka-http

Akka HTTP. Streaming source from callback


I am trying to connect Akka HTTP and some old Java library. That library has two methods - one accepting a callback function to receive string, and one signaling the end of data stream. The callback function receiving the data can be called multiple times. Consider this snippet:

   oldJavaLib.receiveData((s:String) => {
       println("received:" + s)
   })

   oldJavaLib.dataEnd(() => {
       println("data transmission is over")
   })

I want to stream data using Akka HTTP as its being received by the callback function. But I am not sure what is a best way to go about that.
I was thinking to create a stream and then use it directly in HTTP route like this:

  def fetchUsers(): Source[User, NotUsed] = Source.fromIterator(() => Iterator.fill(1000000) {
    val id = Random.nextInt()
    dummyUser(id.toString)
  })
  
  lazy val routes: Route =
      pathPrefix("test") {
        concat(
          pathEnd {
            concat(
              get {
                complete(fetchUsers())
              }
            )
          }
        )
      }  

fetchUsers() function should return a stream which is getting data from some legacy java API. May-be there is a better approach.


Solution

  • I assume that you want to create an Akka stream that emits values from callback? You can use Source.queue. For the first callback it would be:

    val queue = Source.queue[String](bufferSize = 1000)
                    .toMat(Sink.ignore)(Keep.left)
                    .run()
    oldJavaLib.receiveData((s: String) => {
       queue.offer(s) match {
           case Enqueued => println("received:" + s)
           case _ => println("failed to enqueue:" + s)
       }
    })
    

    Edit after question clarification

    If you want to use the source in HTTP route you have to prematerialize it. Refering to my previous code it would look like this:

    val (queue, source) = Source.queue[String](bufferSize = 1000).preMaterialize()
    

    source then can be used in any route.