Search code examples
scalaakkaakka-stream

Converting a callback-method implementation into an akka stream Source


I am working with a data publisher from a java library that I do not control. The publisher library uses a typical callback setup; somewhere in the library code (the library is java but I will describe in scala for terseness):

type DataType = ???

trait DataConsumer {
  def onData(data : DataType) : Unit
}

The user of the library is required to write a class that implements the onData method and pass that into a DataProducer, the library code looks something like:

class DataProducer(consumer : DataConsumer) {...}

The DataProducer has its own internal thread I cannot control, and accompanying data buffer, that is calling onData whenever there is another DataType object to consume.

So, my question is: how do I write a layer that will convert/translate the original library pattern into an akka stream Source object?

Thank you in advance.


Solution

  • There are various ways this can be solved. One is to use an ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.