Suppose that I have a fairly standard producer/consumer problem to code up in Scala, with this sort of structure:
Stream
or an Iterator
that lazily generates elements.map
or foreach
on the Stream
or Iterator
to process these elements and do something with them.This seems to work well, but it looks like it's single-threaded: when we want to process a new element, we ask for it to be generated, and after it's been generated, then we get to work on it. What I'd really like is a mechanism for the generation to continue while the previous element is being processed. Is there a way of getting Scala to do this?
I'm aware that I could use a BlockingQueue
, but that seems terribly imperative to me. I'm hoping there's a way to have a Stream
keep on generating elements on another thread.
Once we're generating them in advance, it's not lazy evaluation any more, of course. But nor do I want eager evaluation that generates the whole stream in advance. I want the analogue of a BlockingQueue
, but in a functional paradigm.
You could map items from your stream to a future of processing like this:
def process(x: Int): Int = // do something time consuming
val asyncProducer = Stream.from(0).map(x => future { process(x)})
Now this produces nothing, since Stream won't generate items until you try to materialize them, pretty much like you suggest your stream works. So if you want to fire off the processing of the next 10 items, you can simply materialize them like this:
val futureResults = asyncProducer.take(10).toList
This will start 10 parallel processes (depending on the ExecutionContext you have in scope) and produce a List[Future[Int]]
. To get be able to receive all those work items, you can then sequence to list of future's into a future of a list:
val futureResult = Future.sequence(futureResults)
Now this future you can map to get a list of results and hand them off to some recipient and start the next chunk of processing.