Search code examples
scalaasynchronousfuturescala-catsmonix

Opportunistic, partially and asyncronously pre-processing of a syncronously processing iterator


Let us use Scala.

I'm trying to find the best possible way to do an opportunistic, partial, and asynchronous pre-computation of some of the elements of an iterator that is otherwise processed synchronously.

The below image illustrates the problem.

Opportunistic, partial, and asynchronous pre-computation of some of the elements of an iterator, that is otherwise processed synchronously

There is a lead thread (blue) that takes an iterator and a state. The state contains mutable data that must be protected from concurrent access. Moreover, the state must be updated while the iterator is processed from the beginning, sequentially, and in order because the elements of the iterator depend on previous elements. Moreover, the nature of the dependency is not known in advance.

Processing some elements may lead to substantial overhead (2 orders of magnitude) compared to others, meaning that some elements are 1ms to compute and some elements are 300ms to compute. It would lead to significant improvements in terms of running time if I could pre-process the next k elements speculatively. A speculative pre-processing on asynchronous threads is possible (while the blue thread is synchronously processing), but the pre-processed data must be validated by the blue thread, whether the result of pre-computation is valid at that time. Usually (90% of the time), it should be valid. Thus, launching separate asynchronous threads to pre-process the remaining portion of the iterator speculatively would spear many 300s of milliseconds in running time.

I have studied comparisons of asynchronous and functional libraries of Scala to understand better which model of computation, or in other words, which description of computation (which library) could be a better fit to this processing problem. I was thinking about communication patterns and came about with the following ideas:

  1. AKKA

Use an AKKA actor Blue for the blue thread that takes the iterator, and for each step, it sends a Step message to itself. On a Step message, before it starts the processing of the next ith element, it sends a PleasePreprocess(i+k) message with the i+kth element to one of the k pre-processor actors in place. The Blue would Step to i+1 only and only if PreprocessingKindlyDone(i+1) is received.

  1. AKKA Streams

AFAIK AKKA streams also support the previous two-way backpressure mechanism, therefore, it could be a good candidate to implement what actors do without actually using actors.

  1. Scala Futures

While the blue thread processes elements ˙processElement(e)˙ in iterator.map(processElement(_)), then it would also spawn Futures for preprocessing. However, maintaining these pre-processing Futures and awaiting their states would require a semi-blocking implementation in pure Scala as I see, so I would not go with this direction to the best of my current knowledge.

  1. Use Monix

I have some knowledge of Monix but could not wrap my head around how this problem could be elegantly solved with Monix. I'm not seeing how the blue thread could wait for the result of i+1 and then continue. For this, I was thinking of using something like a sliding window with foldLeft(blueThreadAsZero){ (blue, preProc1, preProc2, notYetPreProc) => ... }, but could not find a similar construction.

Possibly, there could be libraries I did not mention that could better express computational patterns for this.

I hope I have described my problem adequately. Thank you for the hints/ideas or code snippets!


Solution

  • You need blocking anyhow, if your blue thread happens to go faster than the yellow ones. I don't think you need any fancy libraries for this, "vanilla scala" should do (like it actually does in most cases). Something like this, perhaps ...

    def doit[T,R](it: Iterator[T], yellow: T => R, blue: R => R): Future[Seq[R]] = it
        .map { elem => Future(yellow(elem)) }
        .foldLeft(Future.successful(List.empty[R])) { (last, next) => 
           last.flatMap { acc => next.map(blue).map(_ :: acc) }
        }.map(_.reverse)
           
    

    I didn't test or compile this, so it could need some tweaks, but conceptually, this should work: pass through the iterator and start preprocessing right away, then fold to tuck the "validation" on each completing preprocess sequentially.