I am working with a data publisher from a java library that I do not control. The publisher library uses a typical callback setup; somewhere in the library code (the library is java but I will describe in scala for terseness):
type DataType = ???
trait DataConsumer {
def onData(data : DataType) : Unit
}
The user of the library is required to write a class that implements the onData
method and pass that into a DataProducer
, the library code looks something like:
class DataProducer(consumer : DataConsumer) {...}
The DataProducer
has its own internal thread I cannot control, and accompanying data buffer, that is calling onData
whenever there is another DataType
object to consume.
So, my question is: how do I write a layer that will convert/translate the original library pattern into an akka stream Source object?
Thank you in advance.
There are various ways this can be solved. One is to use an ActorPublisher: http://doc.akka.io/docs/akka-stream-and-http-experimental/1.0-M5/scala/stream-integrations.html#Integrating_with_Actors where you can just change the callback so that it sends a message to the actor. Depending how the callback works, you might be able to use mapAsync, too (converting a callback to a Future). That will only work if one request produces exactly one callback call.