Search code examples
scalascalazakka-streamscalaz-stream

scalaz stream structure for growing lists


I have a hunch that I can (should?) be using scalaz-streams for solving my problem which is like this.

I have a starting item A. I have a function that takes an A and returns a list of A.

def doSomething(a : A) : List[A]

I have a work queue that starts with 1 item (the starting item). When we process (doSomething) each item it may add many items to the end of the same work queue. At some point however (after many million items) each subsequent item that we doSomething on will start adding less and less items to the work queue and eventually no new items will be added (doSomething will return Nil for these items). This is how we know the computation will eventually terminate.

Assuming scalaz-streams is appropriate for this could something please give me some tips as to which overall structure or types I should be looking at to implement this?

Once a simple implementation with a single "worker" is done, I would also like to use multiple workers to process queue items in parallel, e.g. having a pool of 5 workers (and each worker would be farming its task to an agent to calculate doSomething) so I would need to handle effects (such as worker failure) as well in this algorithm.


Solution

  • So the answer to the "how?" is :

    import scalaz.stream._
    import scalaz.stream.async._
    import Process._
    
    def doSomething(i: Int) = if (i == 0) Nil else List(i - 1)
    
    val q = unboundedQueue[Int]
    val out = unboundedQueue[Int]
    
    q.dequeue
     .flatMap(e => emitAll(doSomething(e)))
     .observe(out.enqueue)
     .to(q.enqueue).run.runAsync(_ => ()) //runAsync can process failures, there is `.onFailure` as well
    
    q.enqueueAll(List(3,5,7)).run
    q.size.continuous
     .filter(0==)
     .map(_ => -1)
     .to(out.enqueue).once.run.runAsync(_ => ()) //call it only after enqueueAll
    
    import scalaz._, Scalaz._
    val result = out
      .dequeue
      .takeWhile(_ != -1)
      .map(_.point[List])
      .foldMonoid.runLast.run.get //run synchronously
    

    Result:

    result: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
    

    However, you might notice that:

    1) I had to solve termination problem . Same problem for akka-stream and much harder to resolve there as you don't have access to the Queue and no natural back-pressure to guarantee that queue won't be empty just because of fast-readers.

    2) I had to introduce another queue for the output (and convert it to the List) as working one is becoming empty at the end of computation.

    So, both libraries are not much adapted to such requirements (finite stream), however scalaz-stream (which is going to became "fs2" after removing scalaz dependency) is flexible enough to implement your idea. The big "but" about that is it's gonna be run sequentially by default. There is (at least) two ways to make it faster:

    1) split your doSomething into stages, like .flatMap(doSomething1).flatMap(doSomething2).map(doSomething3) and then put another queues between them (about 3 times faster if stages taking equal time).

    2) parallelize queue processing. Akka has mapAsync for that - it can do maps in parallel automatically. Scalaz-stream has chunks - you can group your q into chunks of let's say 5 and then process each element inside chunk in parallel. Anyway both solutions (akka vs scalaz) aren't much adapted for using one queue as both input and ouput.

    But, again, it's all too complex and pointless as there is a classic simple way:

    @tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
      if (l.isEmpty) acc else { 
        val processed = l.flatMap(doSomething) 
        calculate(processed, acc ++ processed) 
      }
    
    scala> calculate(List(3,5,7), Nil)
    res5: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
    

    And here is the parallelized one:

    @tailrec def calculate(l: List[Int], acc: List[Int]): List[Int] = 
      if (l.isEmpty) acc else { 
        val processed = l.par.flatMap(doSomething).toList
        calculate(processed, acc ++ processed) 
      }
    
    scala> calculate(List(3,5,7), Nil)
    res6: List[Int] = List(2, 4, 6, 1, 3, 5, 0, 2, 4, 1, 3, 0, 2, 1, 0)
    

    So, yes I would say that neither scalaz-stream nor akka-streams fits into your requirements; however classic scala parallel collections fit perfectly.

    If you need distributed calculations across multiple JVMs - take a look at Apache Spark, its scala-dsl uses the same map/flatMap/fold style. It allows you to work with big collections (by scaling them across JVM's), that don't fit into JVM's memory, so you can improve @tailrec def calculate by using RDD instead of List. It will also give you intruments to process failures inside doSomething.

    P.S. So here is why I don't like the idea of using streaming libraries for such tasks. Streaming is more like about infinite streams coming from some external systems (like HttpRequests) not about computation of predefined (even big) data.

    P.S.2 If you need reactive-like (without blocking) you might use Future (or scalaz.concurrent.Task) + Future.sequence